Re: [Tutor] oo design/interaction quandary

2009-12-26 Thread Alan Gauld


"Brian Jones"  wrote

My app launches threads that each consume messages from a queue, send 
them
to a processor object, and then the processor needs to return the message 
to

the main thread -- if the processing was successful -- and then the main
thread puts the message into a different queue.


Why don't the messages munge and process themselves?
What is the purpose of the processor and munger classes?
Aren't they just functions and therefore should e metjods of
the thing upon which they operate - ie Message?

HTH,

--
Alan Gauld
Author of the Learn to Program web site
http://www.alan-g.me.uk/ 



___
Tutor maillist  -  Tutor@python.org
To unsubscribe or change subscription options:
http://mail.python.org/mailman/listinfo/tutor


Re: [Tutor] oo design/interaction quandary

2009-12-25 Thread Kent Johnson
On Fri, Dec 25, 2009 at 9:03 PM, Brian Jones  wrote:
> Hi all,
> I'm having a design issue that's really bothering me. The code I'm writing
> is fairly large by now, but I've written what I think is a decent example
> that illustrates my problem.
> My app launches threads that each consume messages from a queue, send them
> to a processor object, and then the processor needs to return the message to
> the main thread -- if the processing was successful -- and then the main
> thread puts the message into a different queue.
> Here's a shortened version of my code.
>
> class Processor(object):
> def __init__(self):
> self.qhost = 'localhost'
> self.qport = '5672'
> self.uname = 'guest'
> self.pwd = 'guest'
> self.ssl = false
> self.vhost = '/'
> self.exch_name = 'fooX'
> self.exch_type = 'direct'
> self.queue_name = 'fooQ'
> self.conn = amqp.Connection(userid=self.uname, password=self.pwd,
> host=self.qhost,
> virtual_host=self.vhost, ssl=self.ssl)
> self.chan = self.conn.channel()
> self.chan.exchange_declare(self.exch_name, type=self.exch_type)
> self.chan.queue_declare(self.qname)
> self.chan.queue_bind(self.qname, self.exch_name)
>
> def consume(self, callback):
> self.chan.basic_consume(self.qname, callback=callback)
> while True:
> self.chan.wait()
>
>
> class Munger(object):
> def munge(self,msg):
> if msg % 2 == 0:
> yield msg

Why is this a generator function? From what I understand looking at
channel.py in py-amqplib this should just be a callable that processes
the message.

If you want to notify the Sender that the message has been processed,
I would either make another Queue to pass messages back, or give the
Munger a callback function to pass successful messages to. The
difference between them is that a Queue is asynchronous and you would
have to process the output in another thread; a callback is
synchronous and will run in the same thread as the munger. Either of
these can be handled in a base class leaving subclasses to just
implement the message handling. For example, using a callback (a Queue
would be similar):

class MungerBase(object):
  def __init__(self, callback):
self.callback = callback

  def munge(self, msg):
if self.process(msg):
  self.callback(msg)

A real munger looks like this:

class Munger(MungerBase):
  def process(self, msg):
if msg % 2 == 0:
  return True
return False


> class Sender(object):
> def run(self):
> p = Processor()
> m = Munger()
> for msg in p.consume(m.munge):
> """
> I know this doesn't work right now. This
> piece of the code should send 'msg' to another queue.
>
> """
> pass

This would look like
class Sender(object):
  def run(self):
p = Processor()
m = Munger(self.handle_processed_message)
p.consume(m.munge)

  def handle_processed_message(self, msg):
# send 'msg' to another queue or whatever

Kent
___
Tutor maillist  -  Tutor@python.org
To unsubscribe or change subscription options:
http://mail.python.org/mailman/listinfo/tutor


[Tutor] oo design/interaction quandary

2009-12-25 Thread Brian Jones
Hi all,

I'm having a design issue that's really bothering me. The code I'm writing
is fairly large by now, but I've written what I think is a decent example
that illustrates my problem.

My app launches threads that each consume messages from a queue, send them
to a processor object, and then the processor needs to return the message to
the main thread -- if the processing was successful -- and then the main
thread puts the message into a different queue.

Here's a shortened version of my code.


class Processor(object):
def __init__(self):
self.qhost = 'localhost'
self.qport = '5672'
self.uname = 'guest'
self.pwd = 'guest'
self.ssl = false
self.vhost = '/'
self.exch_name = 'fooX'
self.exch_type = 'direct'
self.queue_name = 'fooQ'
self.conn = amqp.Connection(userid=self.uname,
password=self.pwd, host=self.qhost,
virtual_host=self.vhost, ssl=self.ssl)
self.chan = self.conn.channel()
self.chan.exchange_declare(self.exch_name, type=self.exch_type)
self.chan.queue_declare(self.qname)
self.chan.queue_bind(self.qname, self.exch_name)

def consume(self, callback):
self.chan.basic_consume(self.qname, callback=callback)
while True:
self.chan.wait()


class Munger(object):
def munge(self,msg):
if msg % 2 == 0:
yield msg

class Sender(object):
def run(self):
p = Processor()
m = Munger()
for msg in p.consume(m.munge):
"""
I know this doesn't work right now. This
piece of the code should send 'msg' to another queue.

"""
pass



if __name__ == '__main__':
s = Sender()
s.run()


The problem might be obvious to you, but I'll quickly explain:

The Sender object (a thread in the real code), is  calling p.consume,
which just wraps the basic_consume method in py-amqplib. The
basic_consume method registers a consumer and a callback with the amqp
server. The chan.wait() call blocks and waits for messages. When the
messages arrive they are passed to the python callable we passed to
the basic_consume method. Therein lies the problem.

Messages go from my Processor object, to the Munger object, and this
is all initiated by the Sender object, but I can't find a clean way to
get messages successfully processed by Munger back to Sender, so that
Sender can requeue the message to the new queue.

I've thought about having a Queue object in Sender, or maybe
registering Sender as an observer of (at different times) the Munger
or Processor objects, but I'd like this to be as simple and
understandable as possible by other developers, because I'm wanting to
make the Processor and Munger objects pluggable (so the Processor can
support different queuing protocols, Munger can do literally
anything... etc).

Ideas welcome. If I've failed to explain some aspect of my issue, give
me a poke and I'll expand as best I can.


brian


Brian K. Jones

Python Magazine  http://www.pythonmagazine.com
My Blog  http://www.protocolostomy.com
___
Tutor maillist  -  Tutor@python.org
To unsubscribe or change subscription options:
http://mail.python.org/mailman/listinfo/tutor