Author: aconway Date: Wed Dec 2 20:32:12 2009 New Revision: 886297 URL: http://svn.apache.org/viewvc?rev=886297&view=rev Log: Fix test race condition that was causing the test to hang.
Modified: qpid/trunk/qpid/python/qpid/brokertest.py Modified: qpid/trunk/qpid/python/qpid/brokertest.py URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/brokertest.py?rev=886297&r1=886296&r2=886297&view=diff ============================================================================== --- qpid/trunk/qpid/python/qpid/brokertest.py (original) +++ qpid/trunk/qpid/python/qpid/brokertest.py Wed Dec 2 20:32:12 2009 @@ -352,7 +352,7 @@ self.join() if self.error: raise self.error -class NumberedSender(StoppableThread): +class NumberedSender(Thread): """ Thread to run a sender client and send numbered messages until stopped. """ @@ -362,12 +362,14 @@ max_depth: enable flow control, ensure sent - received <= max_depth. Requires self.received(n) to be called each time messages are received. """ - StoppableThread.__init__(self) + Thread.__init__(self) self.sender = broker.test.popen( [broker.test.sender_exec, "--port", broker.port()], expect=EXPECT_RUNNING) self.condition = Condition() self.max = max_depth self.received = 0 + self.stopped = False + self.error = None def run(self): try: @@ -375,7 +377,7 @@ while not self.stopped: if self.max: self.condition.acquire() - while self.sent - self.received > self.max: + while not self.stopped and self.sent - self.received > self.max: self.condition.wait() self.condition.release() self.sender.stdin.write(str(self.sent)+"\n") @@ -389,6 +391,16 @@ self.received = count self.condition.notify() self.condition.release() + + def stop(self): + log.debug("NumberedSender.stop") + self.condition.acquire() + self.stopped = True + self.condition.notify() + self.condition.release() + self.join() + log.debug("NumberedSender.stop - joined") + if self.error: raise self.error class NumberedReceiver(Thread): """ @@ -407,30 +419,36 @@ self.lock = Lock() self.error = None self.sender = sender - + + def continue_test(self): + self.lock.acquire() + ret = self.stopat is None or self.received < self.stopat + self.lock.release() + return ret + def run(self): try: self.received = 0 - while self.stopat is None or self.received < self.stopat: - self.lock.acquire() - try: - m = int(self.receiver.stdout.readline()) - assert(m <= self.received) # Allow for duplicates - if (m == self.received): - self.received += 1 - if self.sender: - self.sender.notify_received(self.received) - finally: - self.lock.release() + while self.continue_test(): + m = int(self.receiver.stdout.readline()) + assert(m <= self.received) # Allow for duplicates + if (m == self.received): + self.received += 1 + if self.sender: + self.sender.notify_received(self.received) except Exception, e: + log.debug("NumberedReceiver.run exception %s" % (e)) # FIXME aconway 2009-12-02: self.error = RethrownException(e, self.receiver.pname) def stop(self, count): """Returns when received >= count""" + log.debug("NumberedReceiver.stop") # FIXME aconway 2009-12-02: self.lock.acquire() + log.debug("NumberedReceiver.stop at %d, received=%d" % (count, self.received)) self.stopat = count self.lock.release() self.join() + log.debug("NumberedReceiver.stop - joined") if self.error: raise self.error class ErrorGenerator(StoppableThread): --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org