Repository: qpid-dispatch Updated Branches: refs/heads/master b38d81d0f -> ee4285f9b
make all receivers get messages Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/ee4285f9 Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/ee4285f9 Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/ee4285f9 Branch: refs/heads/master Commit: ee4285f9bcceb788d0cd5c7d4da01991733cdd55 Parents: b38d81d Author: mick goulish <mgoul...@redhat.com> Authored: Fri Jul 21 20:06:14 2017 -0400 Committer: mick goulish <mgoul...@redhat.com> Committed: Fri Jul 21 20:06:14 2017 -0400 ---------------------------------------------------------------------- tests/system_tests_three_routers.py | 188 ++++++++++++++++++++++++++++++- 1 file changed, 187 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ee4285f9/tests/system_tests_three_routers.py ---------------------------------------------------------------------- diff --git a/tests/system_tests_three_routers.py b/tests/system_tests_three_routers.py index 289cc02..07ea8fd 100644 --- a/tests/system_tests_three_routers.py +++ b/tests/system_tests_three_routers.py @@ -48,7 +48,9 @@ class RouterTest(TestCase): def router(name, more_config): - config = [ ('router', {'mode': 'interior', 'id': name}) ] + more_config + config = [ ('router', {'mode': 'interior', 'id': name}), + ('address', {'prefix': 'closest', 'distribution': 'closest'}) + ] + more_config config = Qdrouterd.Config(config) @@ -180,6 +182,7 @@ class RouterTest(TestCase): cls.routers[2].wait_router_connected('B') cls.A_normal_addr = cls.routers[0].addresses[0] + cls.B_normal_addr = cls.routers[1].addresses[0] cls.C_normal_addr = cls.routers[2].addresses[0] cls.sender_addr = cls.C_normal_addr @@ -215,6 +218,15 @@ class RouterTest(TestCase): test.run() self.assertEqual(None, test.error) + def test_05_closest ( self ): + test = ClosestTest ( self.A_normal_addr, + self.B_normal_addr, + self.C_normal_addr, + "addr_05" + ) + test.run() + self.assertEqual(None, test.error) + @@ -624,5 +636,179 @@ class LinkAttachRouting ( MessagingHandler ): container.run() + +class ClosestTest ( MessagingHandler ): + """ + Test whether distance-based message routing works in a + linear 3-router network. + + sender -----> NEAR -----> MID -----> FAR + | | | + v v v + near mid far + rcvrs rcvrs rcvrs + + With a linear network of 3 routers, set up a sender on the + near one, and then 2 receivers each on the near, middle, and + far routers. + After the first 10 messages have been received, close the + near routers and check results so far. All 10 messages should + have gone to the near receivers, and none to the mid or far + receivers. + After the next 10 messages have been received, close the two + middle routers and check again. All 10 messages should have + gone to the middle receivers, and none to the far ones. + Finally, after another 10 messages have been received, check + that they went to the far receivers. + """ + def __init__ ( self, near_router, mid_router, far_router, addr_suffix ): + super ( ClosestTest, self ).__init__(prefetch=0) + self.error = None + self.near_router = near_router + self.mid_router = mid_router + self.far_router = far_router + self.addr_suffix = addr_suffix + self.dest = "closest/" + addr_suffix + + # This n_expected is actually the minimum number of messages + # I will send. The real number will be higher because some + # will be released when I close the near and middle receivers. + self.n_expected = 150 + self.one_third = self.n_expected / 3 + + # n_received -- the grand total -- is used to decide when to + # close the near receivers and later the middle ones. + self.n_received = 0 + + # Counters for the near, middle, and far receivers are used + # to determine whether there has been an error. + self.near_1 = 0 + self.near_2 = 0 + self.mid_1 = 0 + self.mid_2 = 0 + self.far_1 = 0 + self.far_2 = 0 + + + def timeout ( self ): + self.check_results ( ) + self.bail ( "Timeout Expired " ) + + + def bail ( self, text ): + self.timer.cancel() + self.error = text + self.send_cnx.close() + self.near_cnx.close() + self.mid_cnx.close() + self.far_cnx.close() + + + def on_start ( self, event ): + self.timer = event.reactor.schedule ( TIMEOUT, Timeout(self) ) + self.send_cnx = event.container.connect ( self.near_router ) + self.near_cnx = event.container.connect ( self.near_router ) + self.mid_cnx = event.container.connect ( self.mid_router ) + self.far_cnx = event.container.connect ( self.far_router ) + + self.sender = event.container.create_sender ( self.send_cnx, self.dest) + + # The two receiver-links on each router must be given + # explicit distinct names, or we will in fact only get + # one link. And then wonder why receiver 2 on each + # router isn't getting any messages. + self.near_recv_1 = event.container.create_receiver ( self.near_cnx, self.dest, name="1" ) + self.near_recv_2 = event.container.create_receiver ( self.near_cnx, self.dest, name="2" ) + + self.mid_recv_1 = event.container.create_receiver ( self.mid_cnx, self.dest, name="1" ) + self.mid_recv_2 = event.container.create_receiver ( self.mid_cnx, self.dest, name="2" ) + + self.far_recv_1 = event.container.create_receiver ( self.far_cnx, self.dest, name="1" ) + self.far_recv_2 = event.container.create_receiver ( self.far_cnx, self.dest, name="2" ) + + self.near_recv_1.flow ( self.n_expected ) + self.mid_recv_1.flow ( self.n_expected ) + self.far_recv_1.flow ( self.n_expected ) + + self.near_recv_2.flow ( self.n_expected ) + self.mid_recv_2.flow ( self.n_expected ) + self.far_recv_2.flow ( self.n_expected ) + + + def on_sendable ( self, event ): + msg = Message ( body = "Hello, closest.", + address = self.dest + ) + event.sender.send ( msg ) + + + def on_message ( self, event ): + + self.n_received += 1 + + # Increment the near, mid, or far counts, depending on + # which receiver the message came in on. + if event.receiver == self.near_recv_1: + self.near_1 += 1 + elif event.receiver == self.near_recv_2: + self.near_2 += 1 + elif event.receiver == self.mid_recv_1: + self.mid_1 += 1 + elif event.receiver == self.mid_recv_2: + self.mid_2 += 1 + elif event.receiver == self.far_recv_1: + self.far_1 += 1 + elif event.receiver == self.far_recv_2: + self.far_2 += 1 + + if self.n_received == self.one_third: + # The first one-third of messages should have gone exclusively + # to the near receivers. At this point we should have + # no messages in the mid or far receivers. + self.near_recv_1.close() + self.near_recv_2.close() + if self.mid_1 + self.mid_2 + self.far_1 + self.far_2 > 0 : + self.bail ( "error: mid or far receivers got messages before near were closed." ) + # Make sure we got one third of the messages. + if self.near_1 + self.near_2 < self.one_third: + self.bail ( "error: the near receivers got too few messages." ) + # Make sure both receivers got some messages. + if self.near_1 * self.near_2 == 0: + self.bail ( "error: one of the near receivers got no messages." ) + + elif self.n_received == 2 * self.one_third: + # The next one-third of messages should have gone exclusively + # to the mid receivers. At this point we should have + # no messages in the far receivers. + self.mid_recv_1.close() + self.mid_recv_2.close() + if self.far_1 + self.far_2 > 0 : + self.bail ( "error: far receivers got messages before mid were closed." ) + # Make sure we got one third of the messages. + if self.mid_1 + self.mid_2 < self.one_third: + self.bail ( "error: the mid receivers got too few messages." ) + # Make sure both receivers got some messages. + if self.mid_1 * self.mid_2 == 0: + self.bail ( "error: one of the mid receivers got no messages." ) + + # By the time we reach the expected number of messages + # we have closed the near and middle receivers. If the far + # receivers are empty at this point, something is wrong. + if self.n_received >= self.n_expected : + if self.far_1 + self.far_2 < self.one_third: + self.bail ( "error: the far receivers got too few messages." ) + if self.far_1 * self.far_2 == 0: + self.bail ( "error: one of the far receivers got no messages." ) + else: + self.bail ( None ) + + + def run(self): + container = Container(self) + container.run() + + + + if __name__ == '__main__': unittest.main(main_module()) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org