This is an automated email from the ASF dual-hosted git repository.

gmurthy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/main by this push:
     new 90e156f  DISPATCH-1757: Added code to print out sender stats so we can 
see what is going on. This closes #1232
90e156f is described below

commit 90e156f41ef08dc083bb5346373461418f91cf1c
Author: Ganesh Murthy <gmur...@apache.org>
AuthorDate: Mon May 24 09:02:57 2021 -0400

    DISPATCH-1757: Added code to print out sender stats so we can see what is 
going on. This closes #1232
---
 tests/system_test.py              | 29 ++++++++++++++++++---
 tests/system_tests_edge_router.py | 55 +++++++++++++++++++++++----------------
 2 files changed, 58 insertions(+), 26 deletions(-)

diff --git a/tests/system_test.py b/tests/system_test.py
index 8553810..bc35300 100755
--- a/tests/system_test.py
+++ b/tests/system_test.py
@@ -947,6 +947,20 @@ class AsyncTestReceiver(MessagingHandler):
     """
     Empty = Queue.Empty
 
+    class MyQueue(Queue.Queue):
+        def __init__(self, receiver):
+            self._async_receiver = receiver
+            super(AsyncTestReceiver.MyQueue, self).__init__()
+
+        def get(self, timeout=TIMEOUT):
+            self._async_receiver.num_queue_gets += 1
+            msg = super(AsyncTestReceiver.MyQueue, self).get(timeout=timeout)
+            return msg
+
+        def put(self, msg):
+            self._async_receiver.num_queue_puts += 1
+            super(AsyncTestReceiver.MyQueue, self).put(msg)
+
     def __init__(self, address, source, conn_args=None, container_id=None,
                  wait=True, recover_link=False, msg_args=None):
         if msg_args is None:
@@ -955,7 +969,7 @@ class AsyncTestReceiver(MessagingHandler):
         self.address = address
         self.source = source
         self.conn_args = conn_args
-        self.queue = Queue.Queue()
+        self.queue = AsyncTestReceiver.MyQueue(self)
         self._conn = None
         self._container = Container(self)
         cid = container_id or "ATR-%s:%s" % (source, uuid.uuid4())
@@ -967,8 +981,14 @@ class AsyncTestReceiver(MessagingHandler):
         self._thread = Thread(target=self._main)
         self._thread.daemon = True
         self._thread.start()
+        self.num_queue_puts = 0
+        self.num_queue_gets = 0
         if wait and self._ready.wait(timeout=TIMEOUT) is False:
             raise Exception("Timed out waiting for receiver start")
+        self.queue_stats = "self.num_queue_puts=%d, self.num_queue_gets=%d"
+
+    def get_queue_stats(self):
+        return self.queue_stats % (self.num_queue_puts, self.num_queue_gets)
 
     def _main(self):
         self._container.timeout = 0.5
@@ -996,8 +1016,7 @@ class AsyncTestReceiver(MessagingHandler):
 
     def on_connection_opened(self, event):
         kwargs = {'source': self.source}
-        rcv = event.container.create_receiver(event.connection,
-                                              **kwargs)
+        event.container.create_receiver(event.connection, **kwargs)
 
     def on_link_opened(self, event):
         self._ready.set()
@@ -1059,6 +1078,7 @@ class AsyncTestSender(MessagingHandler):
         self._thread = Thread(target=self._main)
         self._thread.daemon = True
         self._thread.start()
+        self.msg_stats = "self.sent=%d, self.accepted=%d, self.released=%d, 
self.modified=%d, self.rejected=%d"
 
     def _main(self):
         self._container.timeout = 0.5
@@ -1066,6 +1086,9 @@ class AsyncTestSender(MessagingHandler):
         while self._container.process():
             self._check_if_done()
 
+    def get_msg_stats(self):
+        return self.msg_stats % (self.sent, self.accepted, self.released, 
self.modified, self.rejected)
+
     def wait(self):
         # don't stop it - wait until everything is sent
         self._thread.join(timeout=TIMEOUT)
diff --git a/tests/system_tests_edge_router.py 
b/tests/system_tests_edge_router.py
index a608498..fcb41f5 100644
--- a/tests/system_tests_edge_router.py
+++ b/tests/system_tests_edge_router.py
@@ -1685,8 +1685,13 @@ class LinkRouteProxyTest(TestCase):
         ts = AsyncTestSender(sender, address, count)
         ts.wait()  # wait until all sent
         for i in range(count):
-            tr.queue.get(timeout=TIMEOUT)
+            try:
+                tr.queue.get()
+            except AsyncTestReceiver.Empty:
+                return "Sender Stats=" + ts.get_msg_stats() + " Receiver Queue 
Stats=" + tr.get_queue_stats()
+
         tr.stop()
+        return None
 
     def test_01_immedate_detach_reattach(self):
         if self.skip['test_01'] :
@@ -1735,7 +1740,7 @@ class LinkRouteProxyTest(TestCase):
                              message=Message(body="HEY HO LET'S GO!"))
         tx.wait()
 
-        msg = rx.queue.get(timeout=TIMEOUT)
+        msg = rx.queue.get()
         self.assertTrue(msg.body == "HEY HO LET'S GO!")
         rx.stop()
         fs.join()
@@ -1847,10 +1852,11 @@ class LinkRouteProxyTest(TestCase):
                               attributes={'role': 'edge',
                                           'port': self.INTA_edge_port})
         self.INT_B.wait_address("Edge1/*", count=2)
-        self._test_traffic(self.INT_B.listener,
-                           self.INT_B.listener,
-                           "Edge1/One",
-                           count=5)
+        out = self._test_traffic(self.INT_B.listener,
+                                 self.INT_B.listener,
+                                 "Edge1/One",
+                                 count=5)
+        self.assertIsNone(out, out)
         fs.join()
         self.assertEqual(5, fs.in_count)
         self.assertEqual(5, fs.out_count)
@@ -1937,11 +1943,11 @@ class LinkRouteProxyTest(TestCase):
         fs = FakeService(self.EA1.route_container)
         self.INT_B.wait_address("CfgLinkRoute1", count=2)
 
-        self._test_traffic(self.INT_B.listener,
-                           self.INT_B.listener,
-                           "CfgLinkRoute1/hi",
-                           count=5)
-
+        out = self._test_traffic(self.INT_B.listener,
+                                 self.INT_B.listener,
+                                 "CfgLinkRoute1/hi",
+                                 count=5)
+        self.assertIsNone(out, out)
         fs.join()
         self.assertEqual(5, fs.in_count)
         self.assertEqual(5, fs.out_count)
@@ -1955,10 +1961,11 @@ class LinkRouteProxyTest(TestCase):
         fs = FakeService(self.EB1.route_container)
         self.INT_A.wait_address("*.cfg.pattern.#", count=2)
 
-        self._test_traffic(self.INT_A.listener,
-                           self.INT_A.listener,
-                           "MATCH.cfg.pattern",
-                           count=5)
+        out = self._test_traffic(self.INT_A.listener,
+                                 self.INT_A.listener,
+                                 "MATCH.cfg.pattern",
+                                 count=5)
+        self.assertIsNone(out, out)
 
         fs.join()
         self.assertEqual(5, fs.in_count)
@@ -1989,16 +1996,18 @@ class LinkRouteProxyTest(TestCase):
         self.assertEqual(2, len(self._get_address(self.INT_A, "Conn/*/One")))
 
         # between interiors
-        self._test_traffic(self.INT_B.listener,
-                           self.INT_A.listener,
-                           "Conn/BLAB/One",
-                           count=5)
+        out = self._test_traffic(self.INT_B.listener,
+                                 self.INT_A.listener,
+                                 "Conn/BLAB/One",
+                                 count=5)
+        self.assertIsNone(out, out)
 
         # edge to edge
-        self._test_traffic(self.EB1.listener,
-                           self.EA1.listener,
-                           "Conn/BLECH/One",
-                           count=5)
+        out = self._test_traffic(self.EB1.listener,
+                                 self.EA1.listener,
+                                 "Conn/BLECH/One",
+                                 count=5)
+        self.assertIsNone(out, out)
         fs.join()
         self.assertEqual(10, fs.in_count)
         self.assertEqual(10, fs.out_count)

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to