Hi all, I'm having a design issue that's really bothering me. The code I'm writing is fairly large by now, but I've written what I think is a decent example that illustrates my problem.
My app launches threads that each consume messages from a queue, send them to a processor object, and then the processor needs to return the message to the main thread -- if the processing was successful -- and then the main thread puts the message into a different queue. Here's a shortened version of my code. class Processor(object): def __init__(self): self.qhost = 'localhost' self.qport = '5672' self.uname = 'guest' self.pwd = 'guest' self.ssl = false self.vhost = '/' self.exch_name = 'fooX' self.exch_type = 'direct' self.queue_name = 'fooQ' self.conn = amqp.Connection(userid=self.uname, password=self.pwd, host=self.qhost, virtual_host=self.vhost, ssl=self.ssl) self.chan = self.conn.channel() self.chan.exchange_declare(self.exch_name, type=self.exch_type) self.chan.queue_declare(self.qname) self.chan.queue_bind(self.qname, self.exch_name) def consume(self, callback): self.chan.basic_consume(self.qname, callback=callback) while True: self.chan.wait() class Munger(object): def munge(self,msg): if msg % 2 == 0: yield msg class Sender(object): def run(self): p = Processor() m = Munger() for msg in p.consume(m.munge): """ I know this doesn't work right now. This piece of the code should send 'msg' to another queue. """ pass if __name__ == '__main__': s = Sender() s.run() The problem might be obvious to you, but I'll quickly explain: The Sender object (a thread in the real code), is calling p.consume, which just wraps the basic_consume method in py-amqplib. The basic_consume method registers a consumer and a callback with the amqp server. The chan.wait() call blocks and waits for messages. When the messages arrive they are passed to the python callable we passed to the basic_consume method. Therein lies the problem. Messages go from my Processor object, to the Munger object, and this is all initiated by the Sender object, but I can't find a clean way to get messages successfully processed by Munger back to Sender, so that Sender can requeue the message to the new queue. I've thought about having a Queue object in Sender, or maybe registering Sender as an observer of (at different times) the Munger or Processor objects, but I'd like this to be as simple and understandable as possible by other developers, because I'm wanting to make the Processor and Munger objects pluggable (so the Processor can support different queuing protocols, Munger can do literally anything... etc). Ideas welcome. If I've failed to explain some aspect of my issue, give me a poke and I'll expand as best I can. brian Brian K. Jones Python Magazine http://www.pythonmagazine.com My Blog http://www.protocolostomy.com
_______________________________________________ Tutor maillist - Tutor@python.org To unsubscribe or change subscription options: http://mail.python.org/mailman/listinfo/tutor