Repository: qpid-dispatch Updated Branches: refs/heads/DISPATCH-938 a813cd363 -> 90a6e78fc
DISPATCH-947 - Modified 3 tests to not use Messenger Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/1242610d Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/1242610d Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/1242610d Branch: refs/heads/DISPATCH-938 Commit: 1242610d6e784fd9fb86b77f812993cbd6ede22c Parents: a813cd3 Author: Ganesh Murthy <gmur...@redhat.com> Authored: Wed Mar 28 14:20:46 2018 -0400 Committer: Ganesh Murthy <gmur...@redhat.com> Committed: Wed Mar 28 14:20:46 2018 -0400 ---------------------------------------------------------------------- tests/system_tests_one_router.py | 207 ++++++++++++++++++++-------------- 1 file changed, 125 insertions(+), 82 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1242610d/tests/system_tests_one_router.py ---------------------------------------------------------------------- diff --git a/tests/system_tests_one_router.py b/tests/system_tests_one_router.py index c20ccba..fc81cff 100644 --- a/tests/system_tests_one_router.py +++ b/tests/system_tests_one_router.py @@ -414,55 +414,9 @@ class OneRouterTest(TestCase): self.assertEqual(None, test.error) def test_21_semantics_closest(self): - addr = self.address+"/closest.1" - M1 = self.messenger() - M2 = self.messenger() - M3 = self.messenger() - M4 = self.messenger() - - - M1.start() - M2.start() - M3.start() - M4.start() - - M2.subscribe(addr) - M3.subscribe(addr) - M4.subscribe(addr) - - tm = Message() - rm = Message() - - tm.address = addr - for i in range(30): - tm.body = {'number': i} - M1.put(tm) - M1.send() - - i = 0 - rx_set = [] - for i in range(10): - M2.recv(1) - M2.get(rm) - rx_set.append(rm.body['number']) - - M3.recv(1) - M3.get(rm) - rx_set.append(rm.body['number']) - - M4.recv(1) - M4.get(rm) - rx_set.append(rm.body['number']) - - self.assertEqual(30, len(rx_set)) - rx_set.sort() - for i in range(30): - self.assertEqual(i, rx_set[i]) - - M1.stop() - M2.stop() - M3.stop() - M4.stop() + test = SemanticsClosest(self.address) + test.run() + self.assertEqual(None, test.error) def test_22_semantics_balanced(self): test = SemanticsBalanced(self.address) @@ -470,39 +424,8 @@ class OneRouterTest(TestCase): self.assertEqual(None, test.error) def test_23_to_override(self): - addr = self.address+"/toov/1" - M1 = self.messenger() - M2 = self.messenger() - - M1.start() - M2.start() - M2.subscribe(addr) - - tm = Message() - rm = Message() - - tm.address = addr - - ## - ## Pre-existing TO - ## - tm.annotations = {'x-opt-qd.to': 'toov/1'} - for i in range(10): - tm.body = {'number': i} - M1.put(tm) - M1.send() - - for i in range(10): - M2.recv(1) - M2.get(rm) - self.assertEqual(i, rm.body['number']) - ma = rm.annotations - self.assertEqual(ma.__class__, dict) - self.assertEqual(ma['x-opt-qd.to'], 'toov/1') - - M1.stop() - M2.stop() - + test = MessageAnnotaionsPreExistingOverride(self.address) + test.run() def test_24_send_settle_mode_settled(self): """ @@ -697,6 +620,126 @@ class OneRouterTest(TestCase): client.connection.close() +class SemanticsClosest(MessagingHandler): + def __init__(self, address): + super(SemanticsClosest, self).__init__() + self.address = address + self.dest = "closest.1" + self.timer = None + self.conn = None + self.sender = None + self.receiver_a = None + self.receiver_b = None + self.receiver_c = None + self.num_messages = 100 + self.n_received_a = 0 + self.n_received_b = 0 + self.n_received_c = 0 + self.error = None + self.n_sent = 0 + self.rx_set = [] + + def on_start(self, event): + self.timer = event.reactor.schedule(TIMEOUT, Timeout(self)) + self.conn = event.container.connect(self.address) + self.sender = event.container.create_sender(self.conn, self.dest) + # Receiver on same router as the sender must receive all the messages. The other two + # receivers are on the other router + self.receiver_a = event.container.create_receiver(self.conn, self.dest, name="A") + self.receiver_b = event.container.create_receiver(self.conn, self.dest, name="B") + self.receiver_c = event.container.create_receiver(self.conn, self.dest, name="C") + + def timeout(self): + self.error = "Timeout Expired: sent=%d rcvd=%d/%d/%d" % \ + (self.n_sent, self.n_received_a, self.n_received_b, self.n_received_c) + self.conn.close() + + def check_if_done(self): + if self.n_received_a + self.n_received_b + self.n_received_c == self.num_messages\ + and self.n_received_b != 0 and self.n_received_c != 0: + self.rx_set.sort() + #print self.rx_set + all_messages_received = True + for i in range(self.num_messages): + if not i == self.rx_set[i]: + all_messages_received = False + + if all_messages_received: + self.timer.cancel() + self.conn.close() + + def on_sendable(self, event): + if self.n_sent < self.num_messages: + msg = Message(body={'number': self.n_sent}) + self.sender.send(msg) + self.n_sent += 1 + + def on_message(self, event): + if event.receiver == self.receiver_a: + self.n_received_a += 1 + self.rx_set.append(event.message.body['number']) + if event.receiver == self.receiver_b: + self.n_received_b += 1 + self.rx_set.append(event.message.body['number']) + if event.receiver == self.receiver_c: + self.n_received_c += 1 + self.rx_set.append(event.message.body['number']) + + def on_accepted(self, event): + self.check_if_done() + + def run(self): + Container(self).run() + + +class MessageAnnotaionsPreExistingOverride(MessagingHandler): + def __init__(self, address): + super(MessageAnnotaionsPreExistingOverride, self).__init__() + self.address = address + self.dest = "toov/1" + self.error = "Pre-existing x-opt-qd.to has been stripped" + self.timer = None + self.conn = None + self.sender = None + self.receiver = None + self.msg_not_sent = True + + def on_start(self, event): + self.timer = event.reactor.schedule(TIMEOUT, Timeout(self)) + self.conn = event.container.connect(self.address) + self.sender = event.container.create_sender(self.conn, self.dest) + self.receiver = event.container.create_receiver(self.conn, self.dest) + + def timeout(self): + self.error = "Timeout Expired: Sent message not received" + self.conn.close() + + def bail(self, message): + self.error = message + self.conn.close() + self.timer.cancel() + + def on_sendable(self, event): + if self.msg_not_sent: + msg = Message(body={'number': 0}) + msg.annotations = {'x-opt-qd.to': 'toov/1'} + event.sender.send(msg) + self.msg_not_sent = False + + def on_message(self, event): + if 0 == event.message.body['number']: + ma = event.message.annotations + if ma['x-opt-qd.to'] == 'toov/1': + self.bail(None) + else: + self.bail("Pre-existing x-opt-qd.to has been stripped") + else: + self.bail("body does not match with the sent message body") + + def run(self): + Container(self).run() + + class SemanticsMulticast(MessagingHandler): def __init__(self, address): super(SemanticsMulticast, self).__init__() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org