Author: rhs Date: Thu Sep 3 18:22:10 2009 New Revision: 811066 URL: http://svn.apache.org/viewvc?rev=811066&view=rev Log: added timeout option to send
Modified: qpid/trunk/qpid/python/qpid/messaging.py qpid/trunk/qpid/python/qpid/tests/messaging.py Modified: qpid/trunk/qpid/python/qpid/messaging.py URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging.py?rev=811066&r1=811065&r2=811066&view=diff ============================================================================== --- qpid/trunk/qpid/python/qpid/messaging.py (original) +++ qpid/trunk/qpid/python/qpid/messaging.py Thu Sep 3 18:22:10 2009 @@ -544,18 +544,25 @@ return self.queued - self.acked @synchronized - def send(self, object, sync=True): + def send(self, object, sync=True, timeout=None): """ Send a message. If the object passed in is of type L{unicode}, L{str}, L{list}, or L{dict}, it will automatically be wrapped in a L{Message} and sent. If it is of type L{Message}, it will be sent - directly. + directly. If the sender capacity is not L{UNLIMITED} then send + will block until there is available capacity to send the message. + If the timeout parameter is specified, then send will throw an + L{InsufficientCapacity} exception if capacity does not become + available within the specified time. @type object: unicode, str, list, dict, Message @param object: the message or content to send @type sync: boolean @param sync: if true then block until the message is sent + + @type timeout: float + @param timeout: the time to wait for available capacity """ if not self.session.connection._connected or self.session.closing: @@ -569,7 +576,8 @@ if self.capacity is not UNLIMITED: if self.capacity <= 0: raise InsufficientCapacity("capacity = %s" % self.capacity) - self._ewait(lambda: self.pending() < self.capacity) + if not self._ewait(lambda: self.pending() < self.capacity, timeout=timeout): + raise InsufficientCapacity("capacity = %s" % self.capacity) # XXX: what if we send the same message to multiple senders? message._sender = self Modified: qpid/trunk/qpid/python/qpid/tests/messaging.py URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/tests/messaging.py?rev=811066&r1=811065&r2=811066&view=diff ============================================================================== --- qpid/trunk/qpid/python/qpid/tests/messaging.py (original) +++ qpid/trunk/qpid/python/qpid/tests/messaging.py Thu Sep 3 18:22:10 2009 @@ -599,6 +599,22 @@ def testSendAsyncCapacityUNLIMITED(self): self.asyncTest(UNLIMITED) + def testCapacityTimeout(self): + self.snd.capacity = 1 + msgs = [] + caught = False + while len(msgs) < 100: + m = self.content("testCapacity", len(msgs)) + try: + self.snd.send(m, sync=False, timeout=0) + msgs.append(m) + except InsufficientCapacity: + caught = True + break + self.drain(self.rcv, expected=msgs) + self.ssn.acknowledge() + assert caught, "did not exceed capacity" + class MessageTests(Base): def testCreateString(self): --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org