This is an automated email from the ASF dual-hosted git repository. kgiusti pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/main by this push: new b12d49a DISPATCH-2227: simplify the system_tests_distribution.test_09_closest_linear b12d49a is described below commit b12d49a18f5bf055487e1809f854024c935b36a0 Author: Kenneth Giusti <kgiu...@apache.org> AuthorDate: Fri Sep 17 14:59:58 2021 -0400 DISPATCH-2227: simplify the system_tests_distribution.test_09_closest_linear This closes #1370 --- tests/system_tests_distribution.py | 420 +++++++++++++++++++------------------ 1 file changed, 211 insertions(+), 209 deletions(-) diff --git a/tests/system_tests_distribution.py b/tests/system_tests_distribution.py index 4e200f7..e6126a7 100644 --- a/tests/system_tests_distribution.py +++ b/tests/system_tests_distribution.py @@ -17,11 +17,11 @@ # under the License. # -import sys +from time import sleep -from proton import Message +from proton import Message, Delivery from system_test import TestCase, Qdrouterd, main_module, TIMEOUT, TestTimeout -from system_test import unittest +from system_test import unittest, Logger from proton.handlers import MessagingHandler from proton.reactor import Container, LinkOption, ApplicationEvent, EventInjector @@ -44,6 +44,10 @@ class AddressCheckResponse(object): def __getattr__(self, key): return self.attrs[key] + def __str__(self): + return "Address Check Response: status=%s desc=%s attrs=%s" % \ + (self.status_code, self.status_description, self.attrs) + class AddressChecker (object): """ @@ -1643,9 +1647,10 @@ class DistributionTests (TestCase): next_link_sequence = 1 -def link_name(): +def link_name(suffix=None): global next_link_sequence - name = "link-name.%d" % next_link_sequence + suffix = suffix or "name" + name = "link-%s.%d" % (suffix, next_link_sequence) next_link_sequence += 1 return name @@ -2172,26 +2177,50 @@ class LinkAttachRouting (MessagingHandler): class ClosestTest (MessagingHandler): """ - Test whether distance-based message routing works in a - 3-router network. The network may be linear or mesh, - depending on which routers the caller gives us. + Test whether distance-based message routing works in a 3-router + network. The network may be linear or mesh, depending on which routers the + caller gives us. (Illustration is a linear network.) sender -----> Router_1 -----> Router_2 -----> Router_3 | | | v v v - rcvr_1_a rcvr_2_a rcvr_3_a - rcvr_1_b rcvr_2_b rcvr_3_b + rcvr_1 rcvr_2 rcvr_3 + + With a linear network of 3 routers, set up a sender on router_1, and then 1 + receiver each on all 3 routers. Requirement: router 2 is closer than + router 3 by one hop. + + Once the closest pair of receivers has received the required amount of + messages they are closed. Neither of the other receivers should have + received any messages, as they were not the closest receivers. + + Repeat until all three routers have received messages. + + The test is set up in phases to ensure there are no races between fast/slow + clients and routers: + + Phase 1: bring up all connections and create receivers, wait until all + receivers can finished link setup. - With a linear network of 3 routers, set up a sender on - router_1, and then 2 receivers each on all 3 routers. + Phase 2: poll routers until the subscriber count shows all receivers are + ready. + Phase 3: start the sender, wait until on_sendable triggers + + Phase 4: send test messages, verify distribution. + + Once a batch of messages has completely arrived at the current closest + receiver, close that receiver. Note that there can be a few seconds before + that loss of link propagates to all three routers. During that time any + sent messages may fail with outcome RELEASED - that is expected. """ def __init__(self, test_name, router_1, router_2, router_3, addr_suffix, print_debug=False): super(ClosestTest, self).__init__(prefetch=0) + self.test_name = test_name self.error = None self.router_1 = router_1 self.router_2 = router_2 @@ -2199,58 +2228,45 @@ class ClosestTest (MessagingHandler): self.addr_suffix = addr_suffix self.dest = "closest/" + addr_suffix - # This n_expected is actually the minimum number of messages - # I will send. The real number will be higher because some - # will be released when I close some receivers. - self.n_expected = 300 - self.one_third = self.n_expected / 3 - + # after send_batch sent messages are accepted, verify the closest + # receivers have consumed the batch. + self.send_batch = 4 + self.n_sent = 0 self.n_received = 0 + self.sender = None - self.count_1_a = 0 - self.count_1_b = 0 - self.count_2_a = 0 - self.count_2_b = 0 - self.count_3_a = 0 - self.count_3_b = 0 + self.rx_opened = 0 + self.rx_count_1 = 0 + self.rx_count_2 = 0 + self.rx_count_3 = 0 + self.closest_rx = None - self.addr_check_timer = None + # for checking the number of subscribers to the destination address + self.addr_checker = None self.addr_check_receiver = None self.addr_check_sender = None - self.bailed = False - self.test_name = test_name - - self.sender = None - self.n_sent_1 = 0 - self.n_sent_2 = 0 - self.n_sent_3 = 0 - - self.recv_1_a_closed = False - self.recv_1_b_closed = False - self.recv_2_a_closed = False - self.recv_2_b_closed = False - self.first_check = True - self.send_on_sendable = True + self._logger = Logger(title=test_name, print_to_console=print_debug) - self.print_debug = print_debug + def _new_message(self): + """Add expected rx for log tracing + """ + return Message(body="%s: Hello, %s." % (self.test_name, self.closest_rx.name), + address=self.dest) def timeout(self): - sys.stdout.flush() - self.bail("Timeout Expired ") - - def address_check_timeout(self): - self.addr_check() + self.bail("Timeout Expired") - def bail(self, text): + def bail(self, error_text=None): self.timer.cancel() - self.error = text + self.error = error_text self.send_cnx.close() self.cnx_1.close() self.cnx_2.close() self.cnx_3.close() - if self.addr_check_timer: - self.addr_check_timer.cancel() + if error_text: + self._logger.log("Test failed: %s" % error_text) + self._logger.dump() def on_start(self, event): self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self)) @@ -2259,185 +2275,170 @@ class ClosestTest (MessagingHandler): self.cnx_2 = event.container.connect(self.router_2) self.cnx_3 = event.container.connect(self.router_3) - # Warning! - # The two receiver-links on each router must be given - # explicit distinct names, or we will in fact only get - # one link. And then wonder why receiver 2 on each - # router isn't getting any messages. - self.recv_1_a = event.container.create_receiver(self.cnx_1, self.dest, name=link_name()) - self.recv_1_b = event.container.create_receiver(self.cnx_1, self.dest, name=link_name()) - - self.recv_2_a = event.container.create_receiver(self.cnx_2, self.dest, name=link_name()) - self.recv_2_b = event.container.create_receiver(self.cnx_2, self.dest, name=link_name()) - - self.recv_3_a = event.container.create_receiver(self.cnx_3, self.dest, name=link_name()) - self.recv_3_b = event.container.create_receiver(self.cnx_3, self.dest, name=link_name()) + self.recv_1 = event.container.create_receiver(self.cnx_1, self.dest, + name=link_name("rx1")) + self.recv_2 = event.container.create_receiver(self.cnx_2, self.dest, + name=link_name("rx2")) + self.recv_3 = event.container.create_receiver(self.cnx_3, self.dest, + name=link_name("rx3")) - self.recv_1_a.flow(self.n_expected) - self.recv_2_a.flow(self.n_expected) - self.recv_3_a.flow(self.n_expected) + # grant way more flow than necessary so we can consume any mis-routed + # message (that is a bug) + self.recv_1.flow(100) + self.recv_2.flow(100) + self.recv_3.flow(100) - self.recv_1_b.flow(self.n_expected) - self.recv_2_b.flow(self.n_expected) - self.recv_3_b.flow(self.n_expected) - - self.addr_check_receiver = event.container.create_receiver(self.cnx_1, dynamic=True) - self.addr_check_receiver.flow(100) - self.addr_check_sender = event.container.create_sender(self.cnx_1, "$management") - - self.m_sent_1 = False - self.m_sent_2 = False - self.m_sent_3 = False + self.closest_rx = self.recv_1 def on_link_opened(self, event): - if event.receiver == self.addr_check_receiver: - # my addr-check link has opened: make the addr_checker with the given address. - self.addr_checker = AddressChecker(self.addr_check_receiver.remote_source.address) + self._logger.log("Link opened: %s" % event.link.name) + if event.link.is_receiver: + if event.receiver == self.addr_check_receiver: + # Phase 2: address checker address available: + self._logger.log("Address check receiver link opened") + assert self.addr_checker is None + self.addr_checker = AddressChecker(self.addr_check_receiver.remote_source.address) + self.addr_check_sender = event.container.create_sender(self.cnx_1, + "$management", + name=link_name("check_tx")) + elif self.rx_opened != 3: + # Phase 1: wait for receivers to come up + assert event.receiver in [self.recv_1, self.recv_2, self.recv_3] + self._logger.log("Test receiver link opened: %s" % event.link.name) + self.rx_opened += 1 + if self.rx_opened == 3: + # All test receivers links open, start Phase 2: + self._logger.log("Opening address check receiver") + self.addr_check_receiver = event.container.create_receiver(self.cnx_1, + dynamic=True, + name=link_name("check_rx")) + self.addr_check_receiver.flow(100) + + elif event.sender == self.addr_check_sender: + # fire off the first address query self.addr_check() def on_message(self, event): + self._logger.log("on_message %s" % event.receiver.name) if event.receiver == self.addr_check_receiver: # This is a response to one of my address-readiness checking messages. response = self.addr_checker.parse_address_query_response(event.message) - if self.first_check: - if response.status_code == 200 and response.subscriberCount == 2 and response.remoteCount == 2: - # now we know that we have two subscribers on attached router, and two remote - # routers that know about the address. The network is ready. - # Now we can make the sender without getting a - # "No Path To Destination" error. - self.sender = event.container.create_sender(self.send_cnx, self.dest) - - if not self.m_sent_1: - self.m_sent_1 = True - while self.n_sent_1 < self.one_third: - msg = Message(body="Hello, closest.", - address=self.dest) - self.sender.send(msg) - self.n_sent_1 += 1 - if self.print_debug: - print("First hundred sent") - - # And we can quit checking. - if self.addr_check_timer: - self.addr_check_timer.cancel() - self.addr_check_timer = None - else: - # If the latest check did not find the link-attack route ready, - # schedule another check a little while from now. - self.addr_check_timer = event.reactor.schedule(3, AddressCheckerTimeout(self)) + if response.status_code == 200 and response.subscriberCount == 1 and response.remoteCount == 2: + # now we know that we have two subscribers on attached router, and two remote + # routers that know about the address. The network is ready. + self._logger.log("Network ready") + assert self.sender is None + self.sender = event.container.create_sender(self.send_cnx, + self.dest, + name=link_name("sender")) else: - if response.status_code == 200 and response.subscriberCount == 0 and response.remoteCount == 1: - if not self.m_sent_3: - self.m_sent_3 = True - while self.n_sent_2 < self.one_third: - msg = Message(body="Hello, closest.", - address=self.dest) - self.sender.send(msg) - self.n_sent_2 += 1 - - if self.print_debug: - print("Third hundred sent") - - if self.addr_check_timer: - self.addr_check_timer.cancel() - self.addr_check_timer = None - else: - # If the latest check did not find the link-attack route ready, - # schedule another check a little while from now. - self.addr_check_timer = event.reactor.schedule(3, - AddressCheckerTimeout( - self)) - else : + # Not ready yet - poll again. This will continue until either + # the routers have updated or the test times out + self._logger.log("Network not ready yet: %s" % response) + self.addr_check_receiver.flow(1) + sleep(0.25) + self.addr_check() + else: # This is a payload message. self.n_received += 1 # Count the messages that have come in for # each receiver. - if event.receiver == self.recv_1_a: - self.count_1_a += 1 - elif event.receiver == self.recv_1_b: - self.count_1_b += 1 - elif event.receiver == self.recv_2_a: - self.count_2_a += 1 - elif event.receiver == self.recv_2_b: - self.count_2_b += 1 - elif event.receiver == self.recv_3_a: - self.count_3_a += 1 - if self.print_debug: - print("self.count_3_a", self.count_3_a) - elif event.receiver == self.recv_3_b: - self.count_3_b += 1 - if self.print_debug: - print("self.count_3_b", self.count_3_b) - - if self.n_received == self.one_third: - if self.print_debug: - print("First one third received") - # The first one-third of messages should have gone exclusively - # to the near receivers. At this point we should have - # no messages in the mid or far receivers. - self.recv_1_a.close() - self.recv_1_b.close() - if (self.count_2_a + self.count_2_b + self.count_3_a + self.count_3_b) > 0 : - self.bail("error: routers 2 or 3 got messages before router 1 receivers were closed.") - # Make sure both receivers got some messages. - if (self.count_1_a < self.one_third / 2 or self.count_1_b < self.one_third / 2) or (self.count_1_b != self.count_1_a): - self.bail("error: recv_1_a and recv_1_b did not get equal number of messages") - - elif self.n_received == 2 * self.one_third: - if self.print_debug: - print("Second one third received") - # The next one-third of messages should have gone exclusively - # to the router_2 receivers. At this point we should have - # no messages in the far receivers. - self.recv_2_a.close() - self.recv_2_b.close() - if (self.count_3_a + self.count_3_b) > 0 : - self.bail("error: router 3 got messages before 2 was closed.") - # Make sure both receivers got some messages. - if (self.count_2_a < self.one_third / 2 or self.count_2_b < self.one_third / 2) or (self.count_2_b != self.count_2_a): - self.bail("error: recv_2_a and recv_2_b did not get equal number of messages") - - # By the time we reach the expected number of messages - # we have closed the router_1 and router_2 receivers. If the - # router_3 receivers are empty at this point, something is wrong. - if self.n_received >= self.n_expected : - if self.print_debug: - print("Third one third received") - if (self.count_3_a < self.one_third / 2 or self.count_3_b < self.one_third / 2) or (self.count_3_b != self.count_3_a): - self.bail("error: recv_3_a and recv_3_b did not get equal number of messages") + if event.receiver == self.recv_1: + self.rx_count_1 += 1 + self._logger.log("RX 1 got message, total=%s" % self.rx_count_1) + elif event.receiver == self.recv_2: + self.rx_count_2 += 1 + self._logger.log("RX 2 got message, total=%s" % self.rx_count_2) + elif event.receiver == self.recv_3: + self.rx_count_3 += 1 + self._logger.log("RX 3 got message, total=%s" % self.rx_count_3) + else: + self.bail("Unexpected receiver?") + + def on_sendable(self, event): + self._logger.log("on_sendable %s" % event.sender.name) + if event.sender == self.sender: + if self.n_sent == 0: + # only have one message outstanding + self._logger.log("sending (sent=%s)" % self.n_sent) + self.sender.send(self._new_message()) + self.n_sent += 1 + + def on_settled(self, event): + self._logger.log("On settled, link: %s" % event.link.name) + if event.link == self.sender: + dlv = event.delivery + if dlv.remote_state == Delivery.ACCEPTED: + if self.closest_rx == self.recv_1: + if self.rx_count_2 or self.rx_count_3: + self.bail("Error: non-closest client got message!") + else: + self.rx_count_1 += 1 + if self.rx_count_1 == self.send_batch: + self._logger.log("RX 1 complete, closing") + self.recv_1.close() + self.closest_rx = self.recv_2 + # now wait for close to complete before sending more + else: + self.sender.send(self._new_message()) + self.n_sent += 1 + elif self.closest_rx == self.recv_2: + if self.rx_count_1 != self.send_batch or self.rx_count_3: + self.bail("Error: non-closest client got message!") + else: + self.rx_count_2 += 1 + if self.rx_count_2 == self.send_batch: + self._logger.log("RX 2 complete, closing") + self.recv_2.close() + self.closest_rx = self.recv_3 + # now wait for close to complete before sending more + else: + self.sender.send(self._new_message()) + self.n_sent += 1 + elif self.closest_rx == self.recv_3: + if (self.rx_count_1 != self.send_batch or self.rx_count_2 != self.send_batch): + self.bail("Error: non-closest client got message!") + else: + self.rx_count_3 += 1 + if self.rx_count_3 == self.send_batch: + self._logger.log("RX 3 complete, closing, Test Done!") + self.recv_3.close() + self.closest_rx = None + self.bail() + else: + self.sender.send(self._new_message()) + self.n_sent += 1 else: - self.bail(None) + self.bail("Error: self.closest_rx no match!") + else: + self._logger.log("Delivery Not Accepted: %s" % dlv.remote_state) + if dlv.remote_state == Delivery.RELEASED: + # This occurs when the loss-of-rx-link event has not been + # propagated to all routers. When that happens the message + # may be forwarded back to the router with the last known + # location of the dropped link. + # This is expected, just try again + sleep(0.25) + self.sender.send(self._new_message()) + self.n_sent += 1 + else: + self.bail("Error: unexpected delivery failure: %s" + % dlv.remote_state) def on_link_closed(self, event): - if event.receiver == self.recv_1_b or event.receiver == self.recv_1_a: - if event.receiver == self.recv_1_a: - self.recv_1_a_closed = True - if event.receiver == self.recv_1_b: - self.recv_1_b_closed = True - if self.recv_1_a_closed and self.recv_1_b_closed and not self.m_sent_2: - if self.print_debug: - print("self.recv_1_a_closed and self.recv_1_b_closed") - - self.m_sent_2 = True - while self.n_sent_3 < self.one_third: - msg = Message(body="Hello, closest.", - address=self.dest) - self.sender.send(msg) - self.n_sent_3 += 1 - if self.print_debug: - print("Second hundred sent") - - if event.receiver == self.recv_2_a or event.receiver == self.recv_2_b: - if event.receiver == self.recv_2_a: - self.recv_2_a_closed = True - if event.receiver == self.recv_2_b: - self.recv_2_b_closed = True - if self.recv_2_a_closed and self.recv_2_b_closed: - self.first_check = False - if self.print_debug: - print("self.recv_2_a_closed and self.recv_2_b_closed") - self.addr_check() + self._logger.log("Link closed %s" % event.link.name) + if event.link == self.recv_1 and self.closest_rx == self.recv_2: + self._logger.log("Next send for RX 2") + sleep(2.0) # give time for the link loss to propagate + self.sender.send(self._new_message()) + self.n_sent += 1 + + elif event.link == self.recv_2 and self.closest_rx == self.recv_3: + self._logger.log("Next send for RX 3") + sleep(2.0) # give time for the link loss to propagate + self.sender.send(self._new_message()) + self.n_sent += 1 def addr_check(self): # Send the message that will query the management code to discover @@ -2447,6 +2448,7 @@ class ClosestTest (MessagingHandler): # BUGALERT: We have to prepend the 'M0' to this address prefix # because that's what the router does internally. Someday this # may change. + self._logger.log("Query addresses...") self.addr_check_sender.send(self.addr_checker.make_address_query("M0" + self.dest)) def run(self): --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org