Author: rhs
Date: Thu Nov 12 18:33:25 2009
New Revision: 835488

URL: http://svn.apache.org/viewvc?rev=835488&view=rev
Log:
removed listeners in favor of next_receiver

Modified:
    qpid/trunk/qpid/python/qpid/messaging.py
    qpid/trunk/qpid/python/qpid/tests/messaging.py

Modified: qpid/trunk/qpid/python/qpid/messaging.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging.py?rev=835488&r1=835487&r2=835488&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging.py Thu Nov 12 18:33:25 2009
@@ -295,10 +295,6 @@
     self.closed = False
 
     self._lock = connection._lock
-    self.running = True
-    self.thread = Thread(target = self.run)
-    self.thread.setDaemon(True)
-    self.thread.start()
 
   def __repr__(self):
     return "<Session %s>" % self.name
@@ -342,8 +338,8 @@
   @synchronized
   def receiver(self, source, **options):
     """
-    Creates a receiver that may be used to actively fetch or to listen
-    for the arrival of L{Messages<Message>} from the specified source.
+    Creates a receiver that may be used to fetch L{Messages<Message>}
+    from the specified source.
 
     @type source: str
     @param source: the source of L{Messages<Message>}
@@ -392,6 +388,13 @@
     return None
 
   @synchronized
+  def next_receiver(self, timeout=None):
+    if self._ewait(lambda: self.incoming, timeout):
+      return self.incoming[0]._receiver
+    else:
+      raise Empty
+
+  @synchronized
   def acknowledge(self, message=None, sync=True):
     """
     Acknowledge the given L{Message}. If message is None, then all
@@ -465,29 +468,8 @@
     """
     for rcv in self.receivers:
       rcv.stop()
-    # TODO: think about stopping individual receivers in listen mode
-    self._wait(lambda: self._peek(self._pred) is None)
     self.started = False
 
-  def _pred(self, m):
-    return m._receiver.listener is not None
-
-  @synchronized
-  def run(self):
-    self.running = True
-    try:
-      while True:
-        msg = self._get(self._pred)
-        if msg is None:
-          break;
-        else:
-          msg._receiver.listener(msg)
-          if self._peek(self._pred) is None:
-            self.connection._waiter.notifyAll()
-    finally:
-      self.running = False
-      self.connection._waiter.notifyAll()
-
   @synchronized
   def close(self):
     """
@@ -498,10 +480,7 @@
 
     self.closing = True
     self._wakeup()
-    self._ewait(lambda: self.closed and not self.running)
-    while self.thread.isAlive():
-      self.thread.join(3)
-    self.thread = None
+    self._ewait(lambda: self.closed)
     # XXX: should be able to express this condition through API calls
     self._ewait(lambda: not self.outgoing and not self.acked)
     self.connection._remove_session(self)
@@ -636,8 +615,7 @@
 
   """
   Receives incoming messages from a remote source. Messages may be
-  actively fetched with L{fetch} or a listener may be installed with
-  L{listen}.
+  fetched with L{fetch}.
   """
 
   def __init__(self, session, index, source, options, started):
@@ -659,7 +637,6 @@
     self.linked = False
     self.closing = False
     self.closed = False
-    self.listener = None
     self._lock = self.session._lock
 
   def _wakeup(self):
@@ -694,16 +671,6 @@
     else:
       return self.capacity
 
-  @synchronized
-  def listen(self, listener=None):
-    """
-    Sets the message listener for this receiver.
-
-    @type listener: callable
-    @param listener: a callable object to be notified on message arrival
-    """
-    self.listener = listener
-
   def _pred(self, msg):
     return msg._receiver == self
 

Modified: qpid/trunk/qpid/python/qpid/tests/messaging.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/tests/messaging.py?rev=835488&r1=835487&r2=835488&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/tests/messaging.py (original)
+++ qpid/trunk/qpid/python/qpid/tests/messaging.py Thu Nov 12 18:33:25 2009
@@ -228,6 +228,35 @@
     assert msg.content == content
     self.ssn.acknowledge(msg)
 
+  def testNextReceiver(self):
+    ADDR = 'test-next-rcv-queue {create: always}'
+    rcv1 = self.ssn.receiver(ADDR, capacity=UNLIMITED)
+    rcv2 = self.ssn.receiver(ADDR, capacity=UNLIMITED)
+    rcv3 = self.ssn.receiver(ADDR, capacity=UNLIMITED)
+
+    # XXX: this won't work if it is before the receiver creation
+    self.ssn.start()
+
+    snd = self.ssn.sender(ADDR)
+
+    msgs = []
+    for i in range(10):
+      content = self.content("testNextReceiver", i)
+      snd.send(content)
+      msgs.append(content)
+
+    fetched = []
+    try:
+      while True:
+        rcv = self.ssn.next_receiver(timeout=self.delay())
+        assert rcv in (rcv1, rcv2, rcv3)
+        assert rcv.pending() > 0
+        fetched.append(rcv.fetch().content)
+    except Empty:
+      pass
+    assert msgs == fetched, "expecting %s, got %s" % (msgs, fetched)
+    self.ssn.acknowledge()
+
   def testStart(self):
     START_Q = 'test-start-queue {create: always}'
     rcv = self.ssn.receiver(START_Q)
@@ -437,22 +466,6 @@
     self.snd.send(content)
     return content
 
-  def testListen(self):
-    msgs = Queue()
-    def listener(m):
-      msgs.put(m)
-      self.ssn.acknowledge(m)
-    self.rcv.listen(listener)
-    content = self.send("testListen")
-    try:
-      msg = msgs.get(timeout=self.delay())
-      assert False, "did not expect message: %s" % msg
-    except QueueEmpty:
-      pass
-    self.rcv.start()
-    msg = msgs.get(timeout=self.delay())
-    assert msg.content == content
-
   def testFetch(self):
     try:
       msg = self.rcv.fetch(0)



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to