Repository: qpid-dispatch Updated Branches: refs/heads/master b38e63edf -> 889644675
DISPATCH-209 : add multicast {linear,mesh} tests Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/88964467 Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/88964467 Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/88964467 Branch: refs/heads/master Commit: 8896446756dc158673c9222e8bdb13bf41bfbc97 Parents: b38e63e Author: mick goulish <mgoul...@redhat.com> Authored: Fri Aug 18 16:39:19 2017 -0400 Committer: mick goulish <mgoul...@redhat.com> Committed: Fri Aug 18 16:39:19 2017 -0400 ---------------------------------------------------------------------- tests/system_tests_distribution.py | 207 +++++++++++++++++++++++++++++++- 1 file changed, 203 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/88964467/tests/system_tests_distribution.py ---------------------------------------------------------------------- diff --git a/tests/system_tests_distribution.py b/tests/system_tests_distribution.py index 5cfc23d..c244c54 100644 --- a/tests/system_tests_distribution.py +++ b/tests/system_tests_distribution.py @@ -121,8 +121,9 @@ class DistributionTests ( TestCase ): def router(name, more_config): config = [ ('router', {'mode': 'interior', 'id': name}), - ('address', {'prefix': 'closest', 'distribution': 'closest'}), - ('address', {'prefix': 'balanced', 'distribution': 'balanced'}) + ('address', {'prefix': 'closest', 'distribution': 'closest'}), + ('address', {'prefix': 'balanced', 'distribution': 'balanced'}), + ('address', {'prefix': 'multicast', 'distribution': 'multicast'}) ] + more_config config = Qdrouterd.Config(config) @@ -422,7 +423,7 @@ class DistributionTests ( TestCase ): "addr_08" ) test.run() - self.assertEqual(None, test.error) + self.assertEqual ( None, test.error ) def test_09_closest_mesh ( self ): @@ -524,7 +525,7 @@ class DistributionTests ( TestCase ): omit_middle_receiver ) test.run() - self.assertEqual(None, test.error) + self.assertEqual ( None, test.error ) def test_11_balanced_linear_omit_middle_receiver ( self ): @@ -638,6 +639,26 @@ class DistributionTests ( TestCase ): self.assertEqual ( None, test.error ) + def test_13_multicast_linear ( self ): + test = MulticastTest ( self.A_addr, + self.B_addr, + self.C_addr, + "addr_13" + ) + test.run() + self.assertEqual ( None, test.error ) + + + def test_14_multicast_mesh ( self ): + test = MulticastTest ( self.A_addr, + self.B_addr, + self.D_addr, + "addr_14" + ) + test.run() + self.assertEqual ( None, test.error ) + + @@ -1406,5 +1427,183 @@ class BalancedTest ( MessagingHandler ): +class MulticastTest ( MessagingHandler ): + """ + Using multicast, we should see all receivers get everything, + whether the topology is linear or mesh. + """ + def __init__ ( self, router_1, router_2, router_3, addr_suffix ): + super ( MulticastTest, self ).__init__(prefetch=0) + self.error = None + self.router_1 = router_1 + self.router_2 = router_2 + self.router_3 = router_3 + self.addr_suffix = addr_suffix + self.dest = "multicast/" + addr_suffix + + self.n_to_send = 100 + self.n_sent = 0 + + self.n_received = 0 + + self.count_1_a = 0 + self.count_1_b = 0 + self.count_2_a = 0 + self.count_2_b = 0 + self.count_3_a = 0 + self.count_3_b = 0 + + self.addr_check_timer = None + self.addr_check_receiver = None + self.addr_check_sender = None + self.sender = None + self.bailed = False + + def timeout ( self ): + self.check_results ( ) + self.bail ( "Timeout Expired " ) + + + def address_check_timeout(self): + self.addr_check() + + + def bail ( self, text ): + self.timer.cancel() + self.error = text + self.send_cnx.close() + self.cnx_1.close() + self.cnx_2.close() + self.cnx_3.close() + if self.addr_check_timer: + self.addr_check_timer.cancel() + + + def on_start ( self, event ): + self.timer = event.reactor.schedule ( TIMEOUT, Timeout(self) ) + self.send_cnx = event.container.connect ( self.router_1 ) + self.cnx_1 = event.container.connect ( self.router_1 ) + self.cnx_2 = event.container.connect ( self.router_2 ) + self.cnx_3 = event.container.connect ( self.router_3 ) + + # Warning! + # The two receiver-links on each router must be given + # explicit distinct names, or we will in fact only get + # one link. And then wonder why receiver 2 on each + # router isn't getting any messages. + self.recv_1_a = event.container.create_receiver ( self.cnx_1, self.dest, name="1" ) + self.recv_1_b = event.container.create_receiver ( self.cnx_1, self.dest, name="2" ) + + self.recv_2_a = event.container.create_receiver ( self.cnx_2, self.dest, name="1" ) + self.recv_2_b = event.container.create_receiver ( self.cnx_2, self.dest, name="2" ) + + self.recv_3_a = event.container.create_receiver ( self.cnx_3, self.dest, name="1" ) + self.recv_3_b = event.container.create_receiver ( self.cnx_3, self.dest, name="2" ) + + self.recv_1_a.flow ( self.n_to_send ) + self.recv_2_a.flow ( self.n_to_send ) + self.recv_3_a.flow ( self.n_to_send ) + + self.recv_1_b.flow ( self.n_to_send ) + self.recv_2_b.flow ( self.n_to_send ) + self.recv_3_b.flow ( self.n_to_send ) + + self.addr_check_receiver = event.container.create_receiver ( self.cnx_1, dynamic=True ) + self.addr_check_sender = event.container.create_sender ( self.cnx_1, "$management" ) + + + def on_link_opened(self, event): + if event.receiver: + event.receiver.flow ( self.n_to_send ) + if event.receiver == self.addr_check_receiver: + # my addr-check link has opened: make the addr_checker with the given address. + self.addr_checker = AddressChecker ( self.addr_check_receiver.remote_source.address ) + self.addr_check() + + + def on_sendable ( self, event ): + if self.sender and self.n_sent < self.n_to_send : + msg = Message ( body = "Hello, closest.", + address = self.dest + ) + dlv = self.sender.send ( msg ) + self.n_sent += 1 + dlv.settle() + + + def on_message ( self, event ): + + #if self.bailed is True : + #return + + if event.receiver == self.addr_check_receiver: + # This is a response to one of my address-readiness checking messages. + response = self.addr_checker.parse_address_query_response(event.message) + if response.status_code == 200 and response.subscriberCount == 2 and response.remoteCount == 2: + # now we know that we have two subscribers on attached router, and two remote + # routers that know about the address. The network is ready. + # Now we can make the sender without getting a + # "No Path To Destination" error. + self.sender = event.container.create_sender ( self.send_cnx, self.dest ) + + # And we can quit checking. + if self.addr_check_timer: + self.addr_check_timer.cancel() + self.addr_check_timer = None + else: + # If the latest check did not find the link-attack route ready, + # schedule another check a little while from now. + self.addr_check_timer = event.reactor.schedule(0.25, AddressCheckerTimeout(self)) + else : + # This is a payload message. + self.n_received += 1 + + # Count the messages that have come in for + # each receiver. + if event.receiver == self.recv_1_a: + self.count_1_a += 1 + elif event.receiver == self.recv_1_b: + self.count_1_b += 1 + elif event.receiver == self.recv_2_a: + self.count_2_a += 1 + elif event.receiver == self.recv_2_b: + self.count_2_b += 1 + elif event.receiver == self.recv_3_a: + self.count_3_a += 1 + elif event.receiver == self.recv_3_b: + self.count_3_b += 1 + + if self.n_received >= 6 * self.n_to_send : + # In multicast, everybody gets everything. + # Our reception count should be 6x our send-count, + # and all receiver-counts should be equal. + if self.count_1_a == self.count_1_b and self.count_1_b == self.count_2_a and self.count_2_a == self.count_2_b and self.count_2_b == self.count_3_a and self.count_3_a == self.count_3_b : + self.bail ( None ) + self.bailed = True + else: + self.bail ( "receivers not equal: %d %d %d %d %d %d" % (self.count_1_a, self.count_1_b, self.count_2_a, self.count_2_b, self.count_3_a, self.count_3_b) ) + self.bailed = True + + + + def addr_check ( self ): + # Send the message that will query the management code to discover + # information about our destination address. We cannot make our payload + # sender until the network is ready. + # + # BUGALERT: We have to prepend the 'M0' to this address prefix + # because that's what the router does internally. Someday this + # may change. + self.addr_check_sender.send ( self.addr_checker.make_address_query("M0" + self.dest) ) + + + def run(self): + container = Container(self) + container.run() + + + + + if __name__ == '__main__': unittest.main(main_module()) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org