Author: aconway
Date: Wed Dec  2 20:32:12 2009
New Revision: 886297

URL: http://svn.apache.org/viewvc?rev=886297&view=rev
Log:
Fix test race condition that was causing the test to hang.

Modified:
    qpid/trunk/qpid/python/qpid/brokertest.py

Modified: qpid/trunk/qpid/python/qpid/brokertest.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/brokertest.py?rev=886297&r1=886296&r2=886297&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/brokertest.py (original)
+++ qpid/trunk/qpid/python/qpid/brokertest.py Wed Dec  2 20:32:12 2009
@@ -352,7 +352,7 @@
         self.join()
         if self.error: raise self.error
     
-class NumberedSender(StoppableThread):
+class NumberedSender(Thread):
     """
     Thread to run a sender client and send numbered messages until stopped.
     """
@@ -362,12 +362,14 @@
         max_depth: enable flow control, ensure sent - received <= max_depth.
         Requires self.received(n) to be called each time messages are received.
         """
-        StoppableThread.__init__(self)
+        Thread.__init__(self)
         self.sender = broker.test.popen(
             [broker.test.sender_exec, "--port", broker.port()], 
expect=EXPECT_RUNNING)
         self.condition = Condition()
         self.max = max_depth
         self.received = 0
+        self.stopped = False
+        self.error = None
 
     def run(self):
         try:
@@ -375,7 +377,7 @@
             while not self.stopped:
                 if self.max:
                     self.condition.acquire()
-                    while self.sent - self.received > self.max:
+                    while not self.stopped and self.sent - self.received > 
self.max:
                         self.condition.wait()
                     self.condition.release()
                 self.sender.stdin.write(str(self.sent)+"\n")
@@ -389,6 +391,16 @@
         self.received = count
         self.condition.notify()
         self.condition.release()
+
+    def stop(self):
+        log.debug("NumberedSender.stop")
+        self.condition.acquire()
+        self.stopped = True
+        self.condition.notify()
+        self.condition.release()
+        self.join()
+        log.debug("NumberedSender.stop - joined")
+        if self.error: raise self.error
         
 class NumberedReceiver(Thread):
     """
@@ -407,30 +419,36 @@
         self.lock = Lock()
         self.error = None
         self.sender = sender
-        
+
+    def continue_test(self):
+        self.lock.acquire()
+        ret = self.stopat is None or self.received < self.stopat
+        self.lock.release()
+        return ret
+    
     def run(self):
         try:
             self.received = 0
-            while self.stopat is None or self.received < self.stopat:
-                self.lock.acquire()
-                try:
-                    m = int(self.receiver.stdout.readline())
-                    assert(m <= self.received) # Allow for duplicates
-                    if (m == self.received):
-                        self.received += 1
-                        if self.sender:
-                            self.sender.notify_received(self.received)
-                finally:
-                    self.lock.release()
+            while self.continue_test():
+                m = int(self.receiver.stdout.readline())
+                assert(m <= self.received) # Allow for duplicates
+                if (m == self.received):
+                    self.received += 1
+                    if self.sender:
+                        self.sender.notify_received(self.received)
         except Exception, e:
+            log.debug("NumberedReceiver.run exception %s" % (e)) # FIXME 
aconway 2009-12-02: 
             self.error = RethrownException(e, self.receiver.pname)
 
     def stop(self, count):
         """Returns when received >= count"""
+        log.debug("NumberedReceiver.stop") # FIXME aconway 2009-12-02: 
         self.lock.acquire()
+        log.debug("NumberedReceiver.stop at %d, received=%d" % (count, 
self.received))
         self.stopat = count
         self.lock.release()
         self.join()
+        log.debug("NumberedReceiver.stop - joined")
         if self.error: raise self.error
 
 class ErrorGenerator(StoppableThread):



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to