On Fri, Dec 25, 2009 at 9:03 PM, Brian Jones <bkjo...@gmail.com> wrote: > Hi all, > I'm having a design issue that's really bothering me. The code I'm writing > is fairly large by now, but I've written what I think is a decent example > that illustrates my problem. > My app launches threads that each consume messages from a queue, send them > to a processor object, and then the processor needs to return the message to > the main thread -- if the processing was successful -- and then the main > thread puts the message into a different queue. > Here's a shortened version of my code. > > class Processor(object): > def __init__(self): > self.qhost = 'localhost' > self.qport = '5672' > self.uname = 'guest' > self.pwd = 'guest' > self.ssl = false > self.vhost = '/' > self.exch_name = 'fooX' > self.exch_type = 'direct' > self.queue_name = 'fooQ' > self.conn = amqp.Connection(userid=self.uname, password=self.pwd, > host=self.qhost, > virtual_host=self.vhost, ssl=self.ssl) > self.chan = self.conn.channel() > self.chan.exchange_declare(self.exch_name, type=self.exch_type) > self.chan.queue_declare(self.qname) > self.chan.queue_bind(self.qname, self.exch_name) > > def consume(self, callback): > self.chan.basic_consume(self.qname, callback=callback) > while True: > self.chan.wait() > > > class Munger(object): > def munge(self,msg): > if msg % 2 == 0: > yield msg
Why is this a generator function? From what I understand looking at channel.py in py-amqplib this should just be a callable that processes the message. If you want to notify the Sender that the message has been processed, I would either make another Queue to pass messages back, or give the Munger a callback function to pass successful messages to. The difference between them is that a Queue is asynchronous and you would have to process the output in another thread; a callback is synchronous and will run in the same thread as the munger. Either of these can be handled in a base class leaving subclasses to just implement the message handling. For example, using a callback (a Queue would be similar): class MungerBase(object): def __init__(self, callback): self.callback = callback def munge(self, msg): if self.process(msg): self.callback(msg) A real munger looks like this: class Munger(MungerBase): def process(self, msg): if msg % 2 == 0: return True return False > class Sender(object): > def run(self): > p = Processor() > m = Munger() > for msg in p.consume(m.munge): > """ > I know this doesn't work right now. This > piece of the code should send 'msg' to another queue. > > """ > pass This would look like class Sender(object): def run(self): p = Processor() m = Munger(self.handle_processed_message) p.consume(m.munge) def handle_processed_message(self, msg): # send 'msg' to another queue or whatever Kent _______________________________________________ Tutor maillist - Tutor@python.org To unsubscribe or change subscription options: http://mail.python.org/mailman/listinfo/tutor