This is an automated email from the ASF dual-hosted git repository.

kgiusti pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/main by this push:
     new b12d49a  DISPATCH-2227: simplify the 
system_tests_distribution.test_09_closest_linear
b12d49a is described below

commit b12d49a18f5bf055487e1809f854024c935b36a0
Author: Kenneth Giusti <kgiu...@apache.org>
AuthorDate: Fri Sep 17 14:59:58 2021 -0400

    DISPATCH-2227: simplify the system_tests_distribution.test_09_closest_linear
    
    This closes #1370
---
 tests/system_tests_distribution.py | 420 +++++++++++++++++++------------------
 1 file changed, 211 insertions(+), 209 deletions(-)

diff --git a/tests/system_tests_distribution.py 
b/tests/system_tests_distribution.py
index 4e200f7..e6126a7 100644
--- a/tests/system_tests_distribution.py
+++ b/tests/system_tests_distribution.py
@@ -17,11 +17,11 @@
 # under the License.
 #
 
-import sys
+from time import sleep
 
-from proton import Message
+from proton import Message, Delivery
 from system_test import TestCase, Qdrouterd, main_module, TIMEOUT, TestTimeout
-from system_test import unittest
+from system_test import unittest, Logger
 from proton.handlers import MessagingHandler
 from proton.reactor import Container, LinkOption, ApplicationEvent, 
EventInjector
 
@@ -44,6 +44,10 @@ class AddressCheckResponse(object):
     def __getattr__(self, key):
         return self.attrs[key]
 
+    def __str__(self):
+        return "Address Check Response: status=%s desc=%s attrs=%s" % \
+            (self.status_code, self.status_description, self.attrs)
+
 
 class AddressChecker (object):
     """
@@ -1643,9 +1647,10 @@ class DistributionTests (TestCase):
 next_link_sequence = 1
 
 
-def link_name():
+def link_name(suffix=None):
     global next_link_sequence
-    name = "link-name.%d" % next_link_sequence
+    suffix = suffix or "name"
+    name = "link-%s.%d" % (suffix, next_link_sequence)
     next_link_sequence += 1
     return name
 
@@ -2172,26 +2177,50 @@ class LinkAttachRouting (MessagingHandler):
 
 class ClosestTest (MessagingHandler):
     """
-    Test whether distance-based message routing works in a
-    3-router network. The network may be linear or mesh,
-    depending on which routers the caller gives us.
+    Test whether distance-based message routing works in a 3-router
+    network. The network may be linear or mesh, depending on which routers the
+    caller gives us.
 
     (Illustration is a linear network.)
 
     sender -----> Router_1 -----> Router_2 -----> Router_3
                      |              |                |
                      v              v                v
-                  rcvr_1_a       rcvr_2_a         rcvr_3_a
-                  rcvr_1_b       rcvr_2_b         rcvr_3_b
+                  rcvr_1         rcvr_2           rcvr_3
+
+    With a linear network of 3 routers, set up a sender on router_1, and then 1
+    receiver each on all 3 routers.  Requirement: router 2 is closer than
+    router 3 by one hop.
+
+    Once the closest pair of receivers has received the required amount of
+    messages they are closed. Neither of the other receivers should have
+    received any messages, as they were not the closest receivers.
+
+    Repeat until all three routers have received messages.
+
+    The test is set up in phases to ensure there are no races between fast/slow
+    clients and routers:
+
+    Phase 1: bring up all connections and create receivers, wait until all
+    receivers can finished link setup.
 
-    With a linear network of 3 routers, set up a sender on
-    router_1, and then 2 receivers each on all 3 routers.
+    Phase 2: poll routers until the subscriber count shows all receivers are
+    ready.
 
+    Phase 3: start the sender, wait until on_sendable triggers
+
+    Phase 4: send test messages, verify distribution.
+
+    Once a batch of messages has completely arrived at the current closest
+    receiver, close that receiver. Note that there can be a few seconds before
+    that loss of link propagates to all three routers.  During that time any
+    sent messages may fail with outcome RELEASED - that is expected.
     """
 
     def __init__(self, test_name, router_1, router_2, router_3, addr_suffix,
                  print_debug=False):
         super(ClosestTest, self).__init__(prefetch=0)
+        self.test_name   = test_name
         self.error       = None
         self.router_1    = router_1
         self.router_2    = router_2
@@ -2199,58 +2228,45 @@ class ClosestTest (MessagingHandler):
         self.addr_suffix = addr_suffix
         self.dest        = "closest/" + addr_suffix
 
-        # This n_expected is actually the minimum number of messages
-        # I will send.  The real number will be higher because some
-        # will be released when I close some receivers.
-        self.n_expected = 300
-        self.one_third  = self.n_expected / 3
-
+        # after send_batch sent messages are accepted, verify the closest
+        # receivers have consumed the batch.
+        self.send_batch = 4
+        self.n_sent = 0
         self.n_received = 0
+        self.sender = None
 
-        self.count_1_a = 0
-        self.count_1_b = 0
-        self.count_2_a = 0
-        self.count_2_b = 0
-        self.count_3_a = 0
-        self.count_3_b = 0
+        self.rx_opened  = 0
+        self.rx_count_1 = 0
+        self.rx_count_2 = 0
+        self.rx_count_3 = 0
+        self.closest_rx = None
 
-        self.addr_check_timer    = None
+        # for checking the number of subscribers to the destination address
+        self.addr_checker        = None
         self.addr_check_receiver = None
         self.addr_check_sender   = None
-        self.bailed              = False
-        self.test_name           = test_name
-
-        self.sender = None
-        self.n_sent_1 = 0
-        self.n_sent_2 = 0
-        self.n_sent_3 = 0
-
-        self.recv_1_a_closed = False
-        self.recv_1_b_closed = False
-        self.recv_2_a_closed = False
-        self.recv_2_b_closed = False
 
-        self.first_check = True
-        self.send_on_sendable = True
+        self._logger = Logger(title=test_name, print_to_console=print_debug)
 
-        self.print_debug = print_debug
+    def _new_message(self):
+        """Add expected rx for log tracing
+        """
+        return Message(body="%s: Hello, %s." % (self.test_name, 
self.closest_rx.name),
+                       address=self.dest)
 
     def timeout(self):
-        sys.stdout.flush()
-        self.bail("Timeout Expired ")
-
-    def address_check_timeout(self):
-        self.addr_check()
+        self.bail("Timeout Expired")
 
-    def bail(self, text):
+    def bail(self, error_text=None):
         self.timer.cancel()
-        self.error = text
+        self.error = error_text
         self.send_cnx.close()
         self.cnx_1.close()
         self.cnx_2.close()
         self.cnx_3.close()
-        if self.addr_check_timer:
-            self.addr_check_timer.cancel()
+        if error_text:
+            self._logger.log("Test failed: %s" % error_text)
+            self._logger.dump()
 
     def on_start(self, event):
         self.timer    = event.reactor.schedule(TIMEOUT, TestTimeout(self))
@@ -2259,185 +2275,170 @@ class ClosestTest (MessagingHandler):
         self.cnx_2    = event.container.connect(self.router_2)
         self.cnx_3    = event.container.connect(self.router_3)
 
-        # Warning!
-        # The two receiver-links on each router must be given
-        # explicit distinct names, or we will in fact only get
-        # one link.  And then wonder why receiver 2 on each
-        # router isn't getting any messages.
-        self.recv_1_a  = event.container.create_receiver(self.cnx_1, 
self.dest, name=link_name())
-        self.recv_1_b  = event.container.create_receiver(self.cnx_1, 
self.dest, name=link_name())
-
-        self.recv_2_a  = event.container.create_receiver(self.cnx_2,  
self.dest, name=link_name())
-        self.recv_2_b  = event.container.create_receiver(self.cnx_2,  
self.dest, name=link_name())
-
-        self.recv_3_a  = event.container.create_receiver(self.cnx_3,  
self.dest, name=link_name())
-        self.recv_3_b  = event.container.create_receiver(self.cnx_3,  
self.dest, name=link_name())
+        self.recv_1  = event.container.create_receiver(self.cnx_1, self.dest,
+                                                       name=link_name("rx1"))
+        self.recv_2  = event.container.create_receiver(self.cnx_2, self.dest,
+                                                       name=link_name("rx2"))
+        self.recv_3  = event.container.create_receiver(self.cnx_3, self.dest,
+                                                       name=link_name("rx3"))
 
-        self.recv_1_a.flow(self.n_expected)
-        self.recv_2_a.flow(self.n_expected)
-        self.recv_3_a.flow(self.n_expected)
+        # grant way more flow than necessary so we can consume any mis-routed
+        # message (that is a bug)
+        self.recv_1.flow(100)
+        self.recv_2.flow(100)
+        self.recv_3.flow(100)
 
-        self.recv_1_b.flow(self.n_expected)
-        self.recv_2_b.flow(self.n_expected)
-        self.recv_3_b.flow(self.n_expected)
-
-        self.addr_check_receiver = event.container.create_receiver(self.cnx_1, 
dynamic=True)
-        self.addr_check_receiver.flow(100)
-        self.addr_check_sender   = event.container.create_sender(self.cnx_1, 
"$management")
-
-        self.m_sent_1 = False
-        self.m_sent_2 = False
-        self.m_sent_3 = False
+        self.closest_rx = self.recv_1
 
     def on_link_opened(self, event):
-        if event.receiver == self.addr_check_receiver:
-            # my addr-check link has opened: make the addr_checker with the 
given address.
-            self.addr_checker = 
AddressChecker(self.addr_check_receiver.remote_source.address)
+        self._logger.log("Link opened: %s" % event.link.name)
+        if event.link.is_receiver:
+            if event.receiver == self.addr_check_receiver:
+                # Phase 2: address checker address available:
+                self._logger.log("Address check receiver link opened")
+                assert self.addr_checker is None
+                self.addr_checker = 
AddressChecker(self.addr_check_receiver.remote_source.address)
+                self.addr_check_sender = 
event.container.create_sender(self.cnx_1,
+                                                                       
"$management",
+                                                                       
name=link_name("check_tx"))
+            elif self.rx_opened != 3:
+                # Phase 1: wait for receivers to come up
+                assert event.receiver in [self.recv_1, self.recv_2, 
self.recv_3]
+                self._logger.log("Test receiver link opened: %s" % 
event.link.name)
+                self.rx_opened += 1
+                if self.rx_opened == 3:
+                    # All test receivers links open, start Phase 2:
+                    self._logger.log("Opening address check receiver")
+                    self.addr_check_receiver = 
event.container.create_receiver(self.cnx_1,
+                                                                               
dynamic=True,
+                                                                               
name=link_name("check_rx"))
+                    self.addr_check_receiver.flow(100)
+
+        elif event.sender == self.addr_check_sender:
+            # fire off the first address query
             self.addr_check()
 
     def on_message(self, event):
+        self._logger.log("on_message %s" % event.receiver.name)
         if event.receiver == self.addr_check_receiver:
             # This is a response to one of my address-readiness checking 
messages.
             response = 
self.addr_checker.parse_address_query_response(event.message)
-            if self.first_check:
-                if response.status_code == 200 and response.subscriberCount == 
2 and response.remoteCount == 2:
-                    # now we know that we have two subscribers on attached 
router, and two remote
-                    # routers that know about the address. The network is 
ready.
-                    # Now we can make the sender without getting a
-                    # "No Path To Destination" error.
-                    self.sender = event.container.create_sender(self.send_cnx, 
self.dest)
-
-                    if not self.m_sent_1:
-                        self.m_sent_1 = True
-                        while self.n_sent_1 < self.one_third:
-                            msg = Message(body="Hello, closest.",
-                                          address=self.dest)
-                            self.sender.send(msg)
-                            self.n_sent_1 += 1
-                        if self.print_debug:
-                            print("First hundred sent")
-
-                    # And we can quit checking.
-                    if self.addr_check_timer:
-                        self.addr_check_timer.cancel()
-                        self.addr_check_timer = None
-                else:
-                    # If the latest check did not find the link-attack route 
ready,
-                    # schedule another check a little while from now.
-                    self.addr_check_timer = event.reactor.schedule(3, 
AddressCheckerTimeout(self))
+            if response.status_code == 200 and response.subscriberCount == 1 
and response.remoteCount == 2:
+                # now we know that we have two subscribers on attached router, 
and two remote
+                # routers that know about the address. The network is ready.
+                self._logger.log("Network ready")
+                assert self.sender is None
+                self.sender = event.container.create_sender(self.send_cnx,
+                                                            self.dest,
+                                                            
name=link_name("sender"))
             else:
-                if response.status_code == 200 and response.subscriberCount == 
0 and response.remoteCount == 1:
-                    if not self.m_sent_3:
-                        self.m_sent_3 = True
-                        while self.n_sent_2 < self.one_third:
-                            msg = Message(body="Hello, closest.",
-                                          address=self.dest)
-                            self.sender.send(msg)
-                            self.n_sent_2 += 1
-
-                        if self.print_debug:
-                            print("Third hundred sent")
-
-                    if self.addr_check_timer:
-                        self.addr_check_timer.cancel()
-                        self.addr_check_timer = None
-                else:
-                    # If the latest check did not find the link-attack route 
ready,
-                    # schedule another check a little while from now.
-                    self.addr_check_timer = event.reactor.schedule(3,
-                                                                   
AddressCheckerTimeout(
-                                                                       self))
-        else :
+                # Not ready yet - poll again. This will continue until either
+                # the routers have updated or the test times out
+                self._logger.log("Network not ready yet: %s" % response)
+                self.addr_check_receiver.flow(1)
+                sleep(0.25)
+                self.addr_check()
+        else:
             # This is a payload message.
             self.n_received += 1
 
             # Count the messages that have come in for
             # each receiver.
-            if event.receiver == self.recv_1_a:
-                self.count_1_a += 1
-            elif event.receiver == self.recv_1_b:
-                self.count_1_b += 1
-            elif event.receiver == self.recv_2_a:
-                self.count_2_a += 1
-            elif event.receiver == self.recv_2_b:
-                self.count_2_b += 1
-            elif event.receiver == self.recv_3_a:
-                self.count_3_a += 1
-                if self.print_debug:
-                    print("self.count_3_a", self.count_3_a)
-            elif event.receiver == self.recv_3_b:
-                self.count_3_b += 1
-                if self.print_debug:
-                    print("self.count_3_b", self.count_3_b)
-
-            if self.n_received == self.one_third:
-                if self.print_debug:
-                    print("First one third received")
-                # The first one-third of messages should have gone exclusively
-                # to the near receivers.  At this point we should have
-                # no messages in the mid or far receivers.
-                self.recv_1_a.close()
-                self.recv_1_b.close()
-                if (self.count_2_a + self.count_2_b + self.count_3_a + 
self.count_3_b) > 0 :
-                    self.bail("error: routers 2 or 3 got messages before 
router 1 receivers were closed.")
-                # Make sure both receivers got some messages.
-                if (self.count_1_a < self.one_third / 2 or self.count_1_b < 
self.one_third / 2) or (self.count_1_b != self.count_1_a):
-                    self.bail("error: recv_1_a and  recv_1_b did not get equal 
number of messages")
-
-            elif self.n_received == 2 * self.one_third:
-                if self.print_debug:
-                    print("Second one third received")
-                # The next one-third of messages should have gone exclusively
-                # to the router_2 receivers.  At this point we should have
-                # no messages in the far receivers.
-                self.recv_2_a.close()
-                self.recv_2_b.close()
-                if (self.count_3_a + self.count_3_b) > 0 :
-                    self.bail("error: router 3 got messages before 2 was 
closed.")
-                # Make sure both receivers got some messages.
-                if (self.count_2_a < self.one_third / 2 or self.count_2_b < 
self.one_third / 2) or (self.count_2_b != self.count_2_a):
-                    self.bail("error: recv_2_a and  recv_2_b did not get equal 
number of messages")
-
-            # By the time we reach the expected number of messages
-            # we have closed the router_1 and router_2 receivers.  If the
-            # router_3 receivers are empty at this point, something is wrong.
-            if self.n_received >= self.n_expected :
-                if self.print_debug:
-                    print("Third one third received")
-                if (self.count_3_a < self.one_third / 2 or self.count_3_b < 
self.one_third / 2) or (self.count_3_b != self.count_3_a):
-                    self.bail("error: recv_3_a and  recv_3_b did not get equal 
number of messages")
+            if event.receiver == self.recv_1:
+                self.rx_count_1 += 1
+                self._logger.log("RX 1 got message, total=%s" % 
self.rx_count_1)
+            elif event.receiver == self.recv_2:
+                self.rx_count_2 += 1
+                self._logger.log("RX 2 got message, total=%s" % 
self.rx_count_2)
+            elif event.receiver == self.recv_3:
+                self.rx_count_3 += 1
+                self._logger.log("RX 3 got message, total=%s" % 
self.rx_count_3)
+            else:
+                self.bail("Unexpected receiver?")
+
+    def on_sendable(self, event):
+        self._logger.log("on_sendable %s" % event.sender.name)
+        if event.sender == self.sender:
+            if self.n_sent == 0:
+                # only have one message outstanding
+                self._logger.log("sending (sent=%s)" % self.n_sent)
+                self.sender.send(self._new_message())
+                self.n_sent += 1
+
+    def on_settled(self, event):
+        self._logger.log("On settled, link: %s" % event.link.name)
+        if event.link == self.sender:
+            dlv = event.delivery
+            if dlv.remote_state == Delivery.ACCEPTED:
+                if self.closest_rx == self.recv_1:
+                    if self.rx_count_2 or self.rx_count_3:
+                        self.bail("Error: non-closest client got message!")
+                    else:
+                        self.rx_count_1 += 1
+                        if self.rx_count_1 == self.send_batch:
+                            self._logger.log("RX 1 complete, closing")
+                            self.recv_1.close()
+                            self.closest_rx = self.recv_2
+                            # now wait for close to complete before sending 
more
+                        else:
+                            self.sender.send(self._new_message())
+                            self.n_sent += 1
+                elif self.closest_rx == self.recv_2:
+                    if self.rx_count_1 != self.send_batch or self.rx_count_3:
+                        self.bail("Error: non-closest client got message!")
+                    else:
+                        self.rx_count_2 += 1
+                        if self.rx_count_2 == self.send_batch:
+                            self._logger.log("RX 2 complete, closing")
+                            self.recv_2.close()
+                            self.closest_rx = self.recv_3
+                            # now wait for close to complete before sending 
more
+                        else:
+                            self.sender.send(self._new_message())
+                            self.n_sent += 1
+                elif self.closest_rx == self.recv_3:
+                    if (self.rx_count_1 != self.send_batch or self.rx_count_2 
!= self.send_batch):
+                        self.bail("Error: non-closest client got message!")
+                    else:
+                        self.rx_count_3 += 1
+                        if self.rx_count_3 == self.send_batch:
+                            self._logger.log("RX 3 complete, closing, Test 
Done!")
+                            self.recv_3.close()
+                            self.closest_rx = None
+                            self.bail()
+                        else:
+                            self.sender.send(self._new_message())
+                            self.n_sent += 1
                 else:
-                    self.bail(None)
+                    self.bail("Error: self.closest_rx no match!")
+            else:
+                self._logger.log("Delivery Not Accepted: %s" % 
dlv.remote_state)
+                if dlv.remote_state == Delivery.RELEASED:
+                    # This occurs when the loss-of-rx-link event has not been
+                    # propagated to all routers. When that happens the message
+                    # may be forwarded back to the router with the last known
+                    # location of the dropped link.
+                    # This is expected, just try again
+                    sleep(0.25)
+                    self.sender.send(self._new_message())
+                    self.n_sent += 1
+                else:
+                    self.bail("Error: unexpected delivery failure: %s"
+                              % dlv.remote_state)
 
     def on_link_closed(self, event):
-        if event.receiver == self.recv_1_b or event.receiver == self.recv_1_a:
-            if event.receiver == self.recv_1_a:
-                self.recv_1_a_closed = True
-            if event.receiver == self.recv_1_b:
-                self.recv_1_b_closed = True
-            if self.recv_1_a_closed and self.recv_1_b_closed and not 
self.m_sent_2:
-                if self.print_debug:
-                    print("self.recv_1_a_closed and self.recv_1_b_closed")
-
-                self.m_sent_2 = True
-                while self.n_sent_3 < self.one_third:
-                    msg = Message(body="Hello, closest.",
-                                  address=self.dest)
-                    self.sender.send(msg)
-                    self.n_sent_3 += 1
-                if self.print_debug:
-                    print("Second hundred sent")
-
-        if event.receiver == self.recv_2_a or event.receiver == self.recv_2_b:
-            if event.receiver == self.recv_2_a:
-                self.recv_2_a_closed = True
-            if event.receiver == self.recv_2_b:
-                self.recv_2_b_closed = True
-            if self.recv_2_a_closed and self.recv_2_b_closed:
-                self.first_check = False
-                if self.print_debug:
-                    print("self.recv_2_a_closed and self.recv_2_b_closed")
-                self.addr_check()
+        self._logger.log("Link closed %s" % event.link.name)
+        if event.link == self.recv_1 and self.closest_rx == self.recv_2:
+            self._logger.log("Next send for RX 2")
+            sleep(2.0)  # give time for the link loss to propagate
+            self.sender.send(self._new_message())
+            self.n_sent += 1
+
+        elif event.link == self.recv_2 and self.closest_rx == self.recv_3:
+            self._logger.log("Next send for RX 3")
+            sleep(2.0)  # give time for the link loss to propagate
+            self.sender.send(self._new_message())
+            self.n_sent += 1
 
     def addr_check(self):
         # Send the message that will query the management code to discover
@@ -2447,6 +2448,7 @@ class ClosestTest (MessagingHandler):
         # BUGALERT: We have to prepend the 'M0' to this address prefix
         # because that's what the router does internally.  Someday this
         # may change.
+        self._logger.log("Query addresses...")
         self.addr_check_sender.send(self.addr_checker.make_address_query("M0" 
+ self.dest))
 
     def run(self):

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to