Message Queue

I wrote some code for a group project that I am kind of proud of. It’s not very clean code, but it accomplishes something cool. It’s a way for a website to sent messages to a browser in real time, without the browser needing to constantly be checking to see if the website has a message that is ready to be sent.

We did most of our project in php. Here is our php code:

<?php include_once("json.php"); function get_url($url) { $output = array(); exec("curl " . $url, &$output); return $output[0]; } function msgq_new() { $id = get_url("http://127.0.0.1:8888/new/"); return $id; } function send_message($data) { return get_url("http://127.0.0.1:8888/post/" . urlencode(array2json($data))); } if (isset($_GET['action']) && $_GET['action'] == "wait_for_message") { header("Content-type: text/plain"); echo get_url("http://127.0.0.1:8888/wait/" . $_GET['id']); }

Here is the Javascript part. We were using the YUI library, but you could easily do this without it.:

function wait_for_message() {
  var id = document.body.id;
  YAHOO.util.Connect.asyncRequest('GET', '/wait_for_message.php?id=' + id, {success: function(response) {
    wait_for_message();
    if (!response.responseText || response.responseText == "\n") return; // Server sent a nop
    var data = YAHOO.lang.JSON.parse(response.responseText);
    window[data.handler](data);
  }});
}
YAHOO.util.Event.addListener(window,'load', wait_for_message)

Here is the Python part:

from time import time
from random import uniform
from Queue import Queue,Empty
from socket import error
from threading import Thread
from urllib import unquote_plus
from wsgiref.simple_server import make_server


clients={} # A Queue and a http server thread for each client

def send_to_all(msg):
    print "sending %r to all" % msg
    for k, x in clients.items()[:]:
        if (time() - x.last_get > 300):
            # Client has not asked for any messages for 5 minutes
            # Delete them.
	    x.active = False
	    del clients[k]
	    continue
        print "sending to %s" % k
        x.q.put(msg)
    print "finished sending messages."

def wait_for_message(q):
    try:
        #Wait for a new message.
        return q.get(True, uniform(55, 59))
    except Empty:
        return "" # no message within a minute, send keep-alive

def handle(environ, start_response):
    start_response('200 OK', [('Content-type', 'text/plain')])
    path = environ['PATH_INFO']

    if path.startswith("/wait/"):
	id = unquote_plus(path[len("/wait/"):])
	if id not in clients:
	    clients[id] = Server()
	clients[id].last_get = time()
        print "%s is waiting for a message..." % id
        return wait_for_message(clients[id].q)

    if path.startswith("/new"):
        from hashlib import md5
        id = md5(str(time())).hexdigest()
        clients[id] = Server()
        return id

    if path.startswith("/post/"):
	msg = unquote_plus(path[len("/post/"):])
        send_to_all(msg)

    return ""

class Server(Thread):
    "A Queue and a http server thread for each client"
    def __init__(self):
        Thread.__init__(self)
	self.q = Queue()
        self.setDaemon(1)
	self.active = True
	self.last_get = time()
        self.start()
    def run(self):
        while self.active:
            self.httpd.handle_request()

def start():
    httpd = make_server('0.0.0.0', 8888, handle)
    Server.httpd = httpd
    httpd.serve_forever()

start()
Advertisements