Re: [Tutor] oo design/interaction quandary
"Brian Jones" wrote My app launches threads that each consume messages from a queue, send them to a processor object, and then the processor needs to return the message to the main thread -- if the processing was successful -- and then the main thread puts the message into a different queue. Why don't the messages munge and process themselves? What is the purpose of the processor and munger classes? Aren't they just functions and therefore should e metjods of the thing upon which they operate - ie Message? HTH, -- Alan Gauld Author of the Learn to Program web site http://www.alan-g.me.uk/ ___ Tutor maillist - Tutor@python.org To unsubscribe or change subscription options: http://mail.python.org/mailman/listinfo/tutor
Re: [Tutor] oo design/interaction quandary
On Fri, Dec 25, 2009 at 9:03 PM, Brian Jones wrote: > Hi all, > I'm having a design issue that's really bothering me. The code I'm writing > is fairly large by now, but I've written what I think is a decent example > that illustrates my problem. > My app launches threads that each consume messages from a queue, send them > to a processor object, and then the processor needs to return the message to > the main thread -- if the processing was successful -- and then the main > thread puts the message into a different queue. > Here's a shortened version of my code. > > class Processor(object): > def __init__(self): > self.qhost = 'localhost' > self.qport = '5672' > self.uname = 'guest' > self.pwd = 'guest' > self.ssl = false > self.vhost = '/' > self.exch_name = 'fooX' > self.exch_type = 'direct' > self.queue_name = 'fooQ' > self.conn = amqp.Connection(userid=self.uname, password=self.pwd, > host=self.qhost, > virtual_host=self.vhost, ssl=self.ssl) > self.chan = self.conn.channel() > self.chan.exchange_declare(self.exch_name, type=self.exch_type) > self.chan.queue_declare(self.qname) > self.chan.queue_bind(self.qname, self.exch_name) > > def consume(self, callback): > self.chan.basic_consume(self.qname, callback=callback) > while True: > self.chan.wait() > > > class Munger(object): > def munge(self,msg): > if msg % 2 == 0: > yield msg Why is this a generator function? From what I understand looking at channel.py in py-amqplib this should just be a callable that processes the message. If you want to notify the Sender that the message has been processed, I would either make another Queue to pass messages back, or give the Munger a callback function to pass successful messages to. The difference between them is that a Queue is asynchronous and you would have to process the output in another thread; a callback is synchronous and will run in the same thread as the munger. Either of these can be handled in a base class leaving subclasses to just implement the message handling. For example, using a callback (a Queue would be similar): class MungerBase(object): def __init__(self, callback): self.callback = callback def munge(self, msg): if self.process(msg): self.callback(msg) A real munger looks like this: class Munger(MungerBase): def process(self, msg): if msg % 2 == 0: return True return False > class Sender(object): > def run(self): > p = Processor() > m = Munger() > for msg in p.consume(m.munge): > """ > I know this doesn't work right now. This > piece of the code should send 'msg' to another queue. > > """ > pass This would look like class Sender(object): def run(self): p = Processor() m = Munger(self.handle_processed_message) p.consume(m.munge) def handle_processed_message(self, msg): # send 'msg' to another queue or whatever Kent ___ Tutor maillist - Tutor@python.org To unsubscribe or change subscription options: http://mail.python.org/mailman/listinfo/tutor
[Tutor] oo design/interaction quandary
Hi all, I'm having a design issue that's really bothering me. The code I'm writing is fairly large by now, but I've written what I think is a decent example that illustrates my problem. My app launches threads that each consume messages from a queue, send them to a processor object, and then the processor needs to return the message to the main thread -- if the processing was successful -- and then the main thread puts the message into a different queue. Here's a shortened version of my code. class Processor(object): def __init__(self): self.qhost = 'localhost' self.qport = '5672' self.uname = 'guest' self.pwd = 'guest' self.ssl = false self.vhost = '/' self.exch_name = 'fooX' self.exch_type = 'direct' self.queue_name = 'fooQ' self.conn = amqp.Connection(userid=self.uname, password=self.pwd, host=self.qhost, virtual_host=self.vhost, ssl=self.ssl) self.chan = self.conn.channel() self.chan.exchange_declare(self.exch_name, type=self.exch_type) self.chan.queue_declare(self.qname) self.chan.queue_bind(self.qname, self.exch_name) def consume(self, callback): self.chan.basic_consume(self.qname, callback=callback) while True: self.chan.wait() class Munger(object): def munge(self,msg): if msg % 2 == 0: yield msg class Sender(object): def run(self): p = Processor() m = Munger() for msg in p.consume(m.munge): """ I know this doesn't work right now. This piece of the code should send 'msg' to another queue. """ pass if __name__ == '__main__': s = Sender() s.run() The problem might be obvious to you, but I'll quickly explain: The Sender object (a thread in the real code), is calling p.consume, which just wraps the basic_consume method in py-amqplib. The basic_consume method registers a consumer and a callback with the amqp server. The chan.wait() call blocks and waits for messages. When the messages arrive they are passed to the python callable we passed to the basic_consume method. Therein lies the problem. Messages go from my Processor object, to the Munger object, and this is all initiated by the Sender object, but I can't find a clean way to get messages successfully processed by Munger back to Sender, so that Sender can requeue the message to the new queue. I've thought about having a Queue object in Sender, or maybe registering Sender as an observer of (at different times) the Munger or Processor objects, but I'd like this to be as simple and understandable as possible by other developers, because I'm wanting to make the Processor and Munger objects pluggable (so the Processor can support different queuing protocols, Munger can do literally anything... etc). Ideas welcome. If I've failed to explain some aspect of my issue, give me a poke and I'll expand as best I can. brian Brian K. Jones Python Magazine http://www.pythonmagazine.com My Blog http://www.protocolostomy.com ___ Tutor maillist - Tutor@python.org To unsubscribe or change subscription options: http://mail.python.org/mailman/listinfo/tutor