This is an automated email from the ASF dual-hosted git repository.

gmurthy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/main by this push:
     new 6d03790  DISPATCH-2319: Modified test to verify closed receiver before 
sending second batch of presettled messages
6d03790 is described below

commit 6d03790f6a47032bff4f751d57b2035349056b7c
Author: Ganesh Murthy <gmur...@apache.org>
AuthorDate: Tue Feb 1 15:15:41 2022 -0500

    DISPATCH-2319: Modified test to verify closed receiver before sending 
second batch of presettled messages
---
 tests/system_tests_one_router.py | 119 ++++++++++++++++++++++++++-------------
 1 file changed, 80 insertions(+), 39 deletions(-)

diff --git a/tests/system_tests_one_router.py b/tests/system_tests_one_router.py
index 8da2ebc..ac6dcf1 100644
--- a/tests/system_tests_one_router.py
+++ b/tests/system_tests_one_router.py
@@ -659,10 +659,9 @@ class OneRouterTest(TestCase):
     def test_43_dropped_presettled_receiver_stops(self):
         local_node = Node.connect(self.address, timeout=TIMEOUT)
         res = local_node.query('org.apache.qpid.dispatch.router')
-        deliveries_ingress = res.attribute_names.index('deliveriesIngress')
-        presettled_dropped_count = 
res.attribute_names.index('droppedPresettledDeliveries')
-        ingress_delivery_count = res.results[0][deliveries_ingress]
-        test = DroppedPresettledTest(self.address, 200, 
ingress_delivery_count, presettled_dropped_count)
+        presettled_dropped_count_index = 
res.attribute_names.index('droppedPresettledDeliveries')
+        presettled_dropped_count = 
res.results[0][presettled_dropped_count_index]
+        test = DroppedPresettledTest(self.address, 200, 
presettled_dropped_count)
         test.run()
         self.assertIsNone(test.error)
 
@@ -1462,7 +1461,39 @@ class PreSettled (MessagingHandler) :
             self.bail(None)
 
 
-class PresettledCustomTimeout:
+class SendPresettledAfterReceiverCloses(object):
+    def __init__(self, parent):
+        self.parent = parent
+        self.num_tries = 0
+
+    def on_timer_task(self, event):
+        self.num_tries += 1
+        local_node = Node.connect(self.parent.addr, timeout=TIMEOUT)
+        res = local_node.query('org.apache.qpid.dispatch.router.link')
+        owning_addr_index = res.attribute_names.index('owningAddr')
+        has_address = False
+        for out in res.results:
+            owning_addr = out[owning_addr_index]
+            # Check if the receiver's address is present in the router's 
address table.
+            # If the address is still there, try one more time until 
self.parent.max_tries.
+            if self.parent.addr in owning_addr:
+                has_address = True
+                break
+
+        if has_address:
+            if self.num_tries == self.parent.max_tries:
+                self.parent.bail("Address %s is still in routing table" % 
owning_addr)
+            else:
+                self.parent.schedule_send_timer()
+        else:
+            # Address is not there in the address table anymore.
+            # Send the remaining messages. These presettled messages must be
+            # dropped by the router which we will verify using the router's
+            # droppedPresettledDeliveries
+            self.parent.send_remaining()
+
+
+class PresettledCustomTimeout(object):
     def __init__(self, parent):
         self.parent = parent
         self.num_tries = 0
@@ -1471,34 +1502,36 @@ class PresettledCustomTimeout:
         self.num_tries += 1
         local_node = Node.connect(self.parent.addr, timeout=TIMEOUT)
         res = local_node.query('org.apache.qpid.dispatch.router')
-        deliveries_ingress = res.attribute_names.index(
-            'deliveriesIngress')
-        presettled_deliveries_dropped = res.attribute_names.index(
-            'droppedPresettledDeliveries')
-        ingress_delivery_count = res.results[0][deliveries_ingress]
-        self.parent.cancel_custom()
+        presettled_deliveries_dropped_index = 
res.attribute_names.index('droppedPresettledDeliveries')
+        presettled_dropped_count =  
res.results[0][presettled_deliveries_dropped_index]
 
-        deliveries_dropped_diff = presettled_deliveries_dropped - 
self.parent.begin_dropped_presettled_count
+        deliveries_dropped_diff = presettled_dropped_count - 
self.parent.begin_dropped_presettled_count
 
         # Without the fix for DISPATCH-1213  the ingress count will be less 
than
         # 200 because the sender link has stalled. The q2_holdoff happened
         # and so all the remaining messages are still in the
         # proton buffers.
-        deliveries_ingress_diff = ingress_delivery_count - 
self.parent.begin_ingress_count
-        if deliveries_ingress_diff + deliveries_dropped_diff > 
self.parent.n_messages:
+        if deliveries_dropped_diff == self.parent.n_messages - 
self.parent.max_receive:
             self.parent.bail(None)
         else:
             if self.num_tries == self.parent.max_tries:
                 self.parent.bail("Messages sent to the router is %d, "
-                                 "Messages processed by the router is %d" %
+                                 "Messages dropped by the router is %d" %
                                  (self.parent.n_messages,
-                                  deliveries_ingress_diff + 
deliveries_dropped_diff))
+                                  deliveries_dropped_diff))
             else:
-                self.parent.schedule_timer()
+                self.parent.schedule_custom_timer()
 
 
 class DroppedPresettledTest(MessagingHandler):
-    def __init__(self, addr, n_messages, begin_ingress_count, 
begin_dropped_presettled_count):
+    """
+    First send 10 large messages and a receiver receives them all and exits.
+    These first 10 messages are presettled messages and the router network did 
not
+    drop them.
+    Now send an additional 190 messages and make sure they are dropped by 
checking
+    the droppedPresettledCount
+    """
+    def __init__(self, addr, n_messages, begin_dropped_presettled_count):
         super(DroppedPresettledTest, self).__init__()
         self.addr = addr
         self.n_messages = n_messages
@@ -1512,19 +1545,21 @@ class DroppedPresettledTest(MessagingHandler):
         self.test_timer = None
         self.max_receive = 10
         self.custom_timer = None
+        self.send_timer = None
         self.timer = None
         self.begin_dropped_presettled_count = begin_dropped_presettled_count
-        self.begin_ingress_count = begin_ingress_count
         self.str1 = "0123456789abcdef"
         self.msg_str = ""
         self.max_tries = 10
         self.reactor = None
         for i in range(8192):
             self.msg_str += self.str1
-        self.timer_instance = PresettledCustomTimeout(self)
 
-    def schedule_timer(self):
-        self.custom_timer = self.reactor.schedule(0.5, self.timer_instance)
+    def schedule_custom_timer(self):
+        self.custom_timer = self.reactor.schedule(0.5, 
PresettledCustomTimeout(self))
+
+    def schedule_send_timer(self):
+        self.send_timer = self.reactor.schedule(0.5, 
SendPresettledAfterReceiverCloses(self))
 
     def run(self):
         Container(self).run()
@@ -1535,7 +1570,10 @@ class DroppedPresettledTest(MessagingHandler):
         if self.recv_conn:
             self.recv_conn.close()
         self.timer.cancel()
-        self.custom_timer.cancel()
+        if self.custom_timer:
+            self.custom_timer.cancel()
+        if self.send_timer:
+            self.send_timer.cancel()
 
     def timeout(self,):
         self.bail("Timeout Expired: %d messages received, %d expected." %
@@ -1551,18 +1589,24 @@ class DroppedPresettledTest(MessagingHandler):
                                                     "test_43")
         self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self))
 
-    def cancel_custom(self):
-        self.custom_timer.cancel()
+    def send(self):
+        msg = Message(id=(self.n_sent + 1),
+                      body={'sequence': (self.n_sent + 1),
+                            'msg_str': self.msg_str})
+        # Presettle the delivery.
+        dlv = self.sender.send(msg)
+        dlv.settle()
+        self.n_sent += 1
 
-    def on_sendable(self, event):
+    def send_remaining(self):
         while self.n_sent < self.n_messages:
-            msg = Message(id=(self.n_sent + 1),
-                          body={'sequence': (self.n_sent + 1),
-                                'msg_str': self.msg_str})
-            # Presettle the delivery.
-            dlv = self.sender.send(msg)
-            dlv.settle()
-            self.n_sent += 1
+            self.send()
+        self.schedule_custom_timer()
+
+    def on_sendable(self, event):
+        # Send only self.max_receive messages.
+        while self.n_sent < self.max_receive:
+            self.send()
 
     def on_message(self, event):
         self.n_received += 1
@@ -1570,12 +1614,9 @@ class DroppedPresettledTest(MessagingHandler):
             # Receiver bails after receiving max_receive messages.
             self.receiver.close()
             self.recv_conn.close()
-
-            # The sender is only sending 200 large messages which is less
-            # that the initial credit of 250 that the router gives.
-            # Lets do a qdstat to find out if all 200 messages is handled
-            # by the router.
-            self.schedule_timer()
+            # When self.max_receive messages have been received and the 
receiver has been closed
+            # we try to check to see if the receiver's address is gone from 
the address table.
+            self.schedule_send_timer()
 
 
 class MulticastUnsettled (MessagingHandler) :

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to