Author: rhs Date: Thu Nov 12 18:33:25 2009 New Revision: 835488 URL: http://svn.apache.org/viewvc?rev=835488&view=rev Log: removed listeners in favor of next_receiver
Modified: qpid/trunk/qpid/python/qpid/messaging.py qpid/trunk/qpid/python/qpid/tests/messaging.py Modified: qpid/trunk/qpid/python/qpid/messaging.py URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging.py?rev=835488&r1=835487&r2=835488&view=diff ============================================================================== --- qpid/trunk/qpid/python/qpid/messaging.py (original) +++ qpid/trunk/qpid/python/qpid/messaging.py Thu Nov 12 18:33:25 2009 @@ -295,10 +295,6 @@ self.closed = False self._lock = connection._lock - self.running = True - self.thread = Thread(target = self.run) - self.thread.setDaemon(True) - self.thread.start() def __repr__(self): return "<Session %s>" % self.name @@ -342,8 +338,8 @@ @synchronized def receiver(self, source, **options): """ - Creates a receiver that may be used to actively fetch or to listen - for the arrival of L{Messages<Message>} from the specified source. + Creates a receiver that may be used to fetch L{Messages<Message>} + from the specified source. @type source: str @param source: the source of L{Messages<Message>} @@ -392,6 +388,13 @@ return None @synchronized + def next_receiver(self, timeout=None): + if self._ewait(lambda: self.incoming, timeout): + return self.incoming[0]._receiver + else: + raise Empty + + @synchronized def acknowledge(self, message=None, sync=True): """ Acknowledge the given L{Message}. If message is None, then all @@ -465,29 +468,8 @@ """ for rcv in self.receivers: rcv.stop() - # TODO: think about stopping individual receivers in listen mode - self._wait(lambda: self._peek(self._pred) is None) self.started = False - def _pred(self, m): - return m._receiver.listener is not None - - @synchronized - def run(self): - self.running = True - try: - while True: - msg = self._get(self._pred) - if msg is None: - break; - else: - msg._receiver.listener(msg) - if self._peek(self._pred) is None: - self.connection._waiter.notifyAll() - finally: - self.running = False - self.connection._waiter.notifyAll() - @synchronized def close(self): """ @@ -498,10 +480,7 @@ self.closing = True self._wakeup() - self._ewait(lambda: self.closed and not self.running) - while self.thread.isAlive(): - self.thread.join(3) - self.thread = None + self._ewait(lambda: self.closed) # XXX: should be able to express this condition through API calls self._ewait(lambda: not self.outgoing and not self.acked) self.connection._remove_session(self) @@ -636,8 +615,7 @@ """ Receives incoming messages from a remote source. Messages may be - actively fetched with L{fetch} or a listener may be installed with - L{listen}. + fetched with L{fetch}. """ def __init__(self, session, index, source, options, started): @@ -659,7 +637,6 @@ self.linked = False self.closing = False self.closed = False - self.listener = None self._lock = self.session._lock def _wakeup(self): @@ -694,16 +671,6 @@ else: return self.capacity - @synchronized - def listen(self, listener=None): - """ - Sets the message listener for this receiver. - - @type listener: callable - @param listener: a callable object to be notified on message arrival - """ - self.listener = listener - def _pred(self, msg): return msg._receiver == self Modified: qpid/trunk/qpid/python/qpid/tests/messaging.py URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/tests/messaging.py?rev=835488&r1=835487&r2=835488&view=diff ============================================================================== --- qpid/trunk/qpid/python/qpid/tests/messaging.py (original) +++ qpid/trunk/qpid/python/qpid/tests/messaging.py Thu Nov 12 18:33:25 2009 @@ -228,6 +228,35 @@ assert msg.content == content self.ssn.acknowledge(msg) + def testNextReceiver(self): + ADDR = 'test-next-rcv-queue {create: always}' + rcv1 = self.ssn.receiver(ADDR, capacity=UNLIMITED) + rcv2 = self.ssn.receiver(ADDR, capacity=UNLIMITED) + rcv3 = self.ssn.receiver(ADDR, capacity=UNLIMITED) + + # XXX: this won't work if it is before the receiver creation + self.ssn.start() + + snd = self.ssn.sender(ADDR) + + msgs = [] + for i in range(10): + content = self.content("testNextReceiver", i) + snd.send(content) + msgs.append(content) + + fetched = [] + try: + while True: + rcv = self.ssn.next_receiver(timeout=self.delay()) + assert rcv in (rcv1, rcv2, rcv3) + assert rcv.pending() > 0 + fetched.append(rcv.fetch().content) + except Empty: + pass + assert msgs == fetched, "expecting %s, got %s" % (msgs, fetched) + self.ssn.acknowledge() + def testStart(self): START_Q = 'test-start-queue {create: always}' rcv = self.ssn.receiver(START_Q) @@ -437,22 +466,6 @@ self.snd.send(content) return content - def testListen(self): - msgs = Queue() - def listener(m): - msgs.put(m) - self.ssn.acknowledge(m) - self.rcv.listen(listener) - content = self.send("testListen") - try: - msg = msgs.get(timeout=self.delay()) - assert False, "did not expect message: %s" % msg - except QueueEmpty: - pass - self.rcv.start() - msg = msgs.get(timeout=self.delay()) - assert msg.content == content - def testFetch(self): try: msg = self.rcv.fetch(0) --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org