Repository: qpid-dispatch Updated Branches: refs/heads/master 8c9f4a581 -> 5fd599d4d
DISPATCH-209 -- three-router test #3 dynamic reply-to. This closes #163 (cherry picked from commit b66a5c7620409252287ae2e3993a17c56758beae) Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/5fd599d4 Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/5fd599d4 Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/5fd599d4 Branch: refs/heads/master Commit: 5fd599d4df3e0dfef377419636e5fffe9537b296 Parents: 8c9f4a5 Author: mick goulish <mgoul...@redhat.com> Authored: Wed May 17 14:12:40 2017 -0400 Committer: Ganesh Murthy <gmur...@redhat.com> Committed: Wed May 24 09:38:55 2017 -0400 ---------------------------------------------------------------------- tests/system_tests_three_routers.py | 354 ++++++++++++++++++++++--------- 1 file changed, 253 insertions(+), 101 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/5fd599d4/tests/system_tests_three_routers.py ---------------------------------------------------------------------- diff --git a/tests/system_tests_three_routers.py b/tests/system_tests_three_routers.py index 83719e5..cfc210c 100644 --- a/tests/system_tests_three_routers.py +++ b/tests/system_tests_three_routers.py @@ -22,7 +22,7 @@ from subprocess import PIPE, STDOUT from proton import Message, PENDING, ACCEPTED, REJECTED, RELEASED, SSLDomain, SSLUnavailable, Timeout from system_test import TestCase, Qdrouterd, main_module, DIR, TIMEOUT, Process from proton.handlers import MessagingHandler -from proton.reactor import Container, AtMostOnce, AtLeastOnce +from proton.reactor import Container, AtMostOnce, AtLeastOnce, DynamicNodeProperties, LinkOption import time @@ -39,56 +39,121 @@ class RouterTest(TestCase): @classmethod def setUpClass(cls): - """Start a router and a messenger""" + """Start a router and a sender-listener client""" super(RouterTest, cls).setUpClass() - def router(name, connection_1, connection_2=None): + def router ( name, connection_1, connection_2=None ): config = [ - ('router', {'mode': 'interior', 'id': 'QDR.%s'%name}), - - ('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'no'}), - - ('address', {'prefix': 'closest', 'distribution': 'closest'}), - ('address', {'prefix': 'spread', 'distribution': 'balanced'}), - ('address', {'prefix': 'multicast', 'distribution': 'multicast'}) + ('router', + {'mode' : 'interior', + 'id' : 'QDR.%s' % name + } + ), + ('listener', + {'port' : cls.tester.get_port(), + 'stripAnnotations' : 'no' + } + ), + ('address', + { 'prefix' : 'closest', + 'distribution' : 'closest' + } + ), ] - config.append(connection_1) + config.append ( connection_1 ) if None != connection_2: - config.append(connection_2) + config.append ( connection_2 ) - config = Qdrouterd.Config(config) + config = Qdrouterd.Config ( config ) - cls.routers.append(cls.tester.qdrouterd(name, config, wait=True)) + cls.routers.append ( cls.tester.qdrouterd(name, config, wait=True) ) cls.routers = [] - inter_router_port_1 = cls.tester.get_port() - inter_router_port_2 = cls.tester.get_port() - - # A <--- B <--- C - router('A', ('listener', {'role': 'inter-router', 'port': inter_router_port_1}) ) - - router('B', ('listener', {'role': 'inter-router', 'port': inter_router_port_2}), - ('connector', {'name': 'connectorToA', 'role': 'inter-router', 'port': inter_router_port_1, 'verifyHostName': 'no'})) + inter_router_port_A = cls.tester.get_port() + inter_router_port_B = cls.tester.get_port() + port_for_sender = cls.tester.get_port() + + + router ( 'A', + ( 'listener', + {'role': 'inter-router', + 'port': inter_router_port_A + } + ) + ) + + router ( 'B', + ( 'listener', + { 'role': 'inter-router', + 'port': inter_router_port_B + } + ), + ( 'connector', + { 'name': 'connectorToA', + 'role': 'inter-router', + 'port': inter_router_port_A, + 'verifyHostName': 'no' + } + ) + ) + + router ( 'C', + ( 'connector', + { 'name': 'connectorToB', + 'role': 'inter-router', + 'port': inter_router_port_B, + 'verifyHostName': 'no' + } + ), + ( 'listener', + { 'role': 'normal', + 'port': port_for_sender + } + ) + ) + + + cls.router_A = cls.routers[0] + cls.router_B = cls.routers[1] + cls.router_C = cls.routers[2] + + #---------------------------------------------- + # Wait until everybody can see everybody, + # to minimize the time when the network + # doesn't know how to route my messages. + #---------------------------------------------- + cls.router_C.wait_router_connected('QDR.B') + cls.router_B.wait_router_connected('QDR.A') + cls.router_A.wait_router_connected('QDR.C') + + #------------------------------------------------ + # In these tests, first address will be used + # by the sender, second by the receiver. + # + # receiver <--- A <--- B <--- C <--- sender + # + #------------------------------------------------ + cls.send_addr = cls.router_C.addresses[1] + cls.recv_addr = cls.router_A.addresses[0] - router('C', ('connector', {'name': 'connectorToB', 'role': 'inter-router', 'port': inter_router_port_2, 'verifyHostName': 'no'})) - - cls.routers[0].wait_router_connected('QDR.C') - cls.routers[1].wait_router_connected('QDR.B') - cls.routers[2].wait_router_connected('QDR.A') + def test_01_targeted_sender(self): + test = TargetedSenderTest ( self.send_addr, self.recv_addr ) + test.run() + self.assertEqual(None, test.error) + def test_02_anonymous_sender(self): + test = AnonymousSenderTest ( self.send_addr, self.recv_addr ) + test.run() + self.assertEqual(None, test.error) - def test_01_targeted_sender(self): - test = TargetedSenderTest(self.routers[0].addresses[0], self.routers[2].addresses[0]) + def test_03_dynamic_reply_to(self): + test = DynamicReplyTo ( self.send_addr, self.recv_addr ) test.run() self.assertEqual(None, test.error) -# def test_02_anonymous_sender(self): -# test = AnonymousSenderTest(self.routers[0].addresses[0], self.routers[2].addresses[0]) -# test.run() -# self.assertEqual(None, test.error) class Timeout(object): @@ -100,10 +165,10 @@ class Timeout(object): class TargetedSenderTest(MessagingHandler): - def __init__(self, address1, address2): + def __init__(self, send_addr, recv_addr): super(TargetedSenderTest, self).__init__(prefetch=0) - self.address1 = address1 - self.address2 = address2 + self.send_addr = send_addr + self.recv_addr = recv_addr self.dest = "closest.Targeted" self.error = None self.sender = None @@ -114,24 +179,29 @@ class TargetedSenderTest(MessagingHandler): self.n_accepted = 0 def timeout(self): - self.error = "Timeout Expired" - self.conn1.close() - self.conn2.close() + self.error = "Timeout Expired: n_sent=%d n_received=%d n_accepted=%d" % \ + (self.n_sent, self.n_received, self.n_accepted) + self.send_conn.close() + self.recv_conn.close() def on_start(self, event): - # receiver <--- A <--- B <---- C <--- sender - self.timer = event.reactor.schedule(TIMEOUT, Timeout(self)) - self.conn1 = event.container.connect(self.address1) - self.conn2 = event.container.connect(self.address2) - self.sender = event.container.create_sender(self.conn1, self.dest) - self.receiver = event.container.create_receiver(self.conn2, self.dest) + self.timer = event.reactor.schedule(10, Timeout(self)) + self.send_conn = event.container.connect(self.send_addr) + self.recv_conn = event.container.connect(self.recv_addr) + self.sender = event.container.create_sender(self.send_conn, self.dest) + self.receiver = event.container.create_receiver(self.recv_conn, self.dest) self.receiver.flow(self.n_expected) + + def send(self): + while self.sender.credit > 0 and self.n_sent < self.n_expected: + msg = Message(body=self.n_sent) + self.sender.send(msg) + self.n_sent += 1 + def on_sendable(self, event): if self.n_sent < self.n_expected: - msg = Message(body=self.n_sent) - event.sender.send(msg) - self.n_sent += 1 + self.send() def on_accepted(self, event): self.n_accepted += 1 @@ -140,8 +210,8 @@ class TargetedSenderTest(MessagingHandler): self.n_received += 1 if self.n_received == self.n_expected: self.receiver.close() - self.conn1.close() - self.conn2.close() + self.send_conn.close() + self.recv_conn.close() self.timer.cancel() def run(self): @@ -149,77 +219,159 @@ class TargetedSenderTest(MessagingHandler): +class DynamicTarget(LinkOption): + def apply(self, link): + link.target.dynamic = True + link.target.address = None + + + class AnonymousSenderTest(MessagingHandler): - def __init__(self, address1, address2): - super(AnonymousSenderTest, self).__init__(prefetch=0) - self.address1 = address1 - self.address2 = address2 - self.dest = "closest.Anonymous" - self.error = None - self.sender = None - self.receiver = None - self.n_expected = 10 + + def __init__(self, send_addr, recv_addr): + super(AnonymousSenderTest, self).__init__() + self.send_addr = send_addr + self.recv_addr = recv_addr + + self.error = None + self.recv_conn = None + self.send_conn = None + self.sender = None + self.receiver = None + self.address = None + + self.expected = 10 self.n_sent = 0 self.n_received = 0 self.n_accepted = 0 - def timeout(self): - self.error = "Timeout Expired %d messages received." % self.n_received - self.conn1.close() - self.conn2.close() - - # The problem with using an anonymous sender in a router - # network is that it takes finite time for endpoint information - # to propagate around the network. It is possible for me to - # start sending before my router knows how to route my messages, - # which will cause them to get released, and my test will hang, - # doomed to wait eternally for the tenth message to be received. - # To fix this, we will detect released messages here, and decrement - # the sent message counter, forcing a resend for each drop. - # And also pause for a moment, since we know that the network is - # not yet ready. - def on_released(self, event): - self.n_sent -= 1 - time.sleep(0.1) - def on_link_opened(self, event): - if event.receiver: - # This sender has no destination addr, so we will have to - # address each message individually. - # Also -- Create the sender here, when we know that the - # receiver link has opened, because then we are at least - # close to being able to send. (See comment above.) - self.sender = event.container.create_sender(self.conn1, None) + def timeout ( self ): + self.error = "Timeout Expired: n_sent=%d n_received=%d n_accepted=%d" % \ + (self.n_sent, self.n_received, self.n_accepted) + self.send_conn.close() + self.recv_conn.close() def on_start(self, event): - self.timer = event.reactor.schedule(TIMEOUT, Timeout(self)) - self.conn1 = event.container.connect(self.address1) - self.conn2 = event.container.connect(self.address2) - self.receiver = event.container.create_receiver(self.conn2, self.dest) - self.receiver.flow(self.n_expected) + self.timer = event.reactor.schedule(10, Timeout(self)) + self.send_conn = event.container.connect(self.send_addr) + self.recv_conn = event.container.connect(self.recv_addr) + self.sender = event.container.create_sender(self.send_conn, options=DynamicTarget()) - def on_sendable(self, event): - if self.n_sent < self.n_expected: - # Add the destination addr to each message. - msg = Message(body=self.n_sent, address=self.dest) - event.sender.send(msg) + def send(self): + while self.sender.credit > 0 and self.n_sent < self.expected: self.n_sent += 1 + m = Message(address=self.address, body="Message %d of %d" % (self.n_sent, self.expected)) + self.sender.send(m) + + def on_link_opened(self, event): + if event.sender == self.sender: + self.address = self.sender.remote_target.address + self.receiver = event.container.create_receiver(self.recv_conn, self.address) + + def on_sendable(self, event): + self.send() + + def on_message(self, event): + if event.receiver == self.receiver: + self.n_received += 1 def on_accepted(self, event): self.n_accepted += 1 + if self.n_accepted == self.expected: + self.send_conn.close() + self.recv_conn.close() + self.timer.cancel() + + def run(self): + Container(self).run() + + + + +#======================================================================= +# In this test we have a separate 'client' and 'server' with separate +# connections. The client sends requests to the server, and embeds in +# them its desired reply-to address. The server uses that address to +# send back ambiguous and noncommittal messages. The tests ends with +# success if the client receives the expected number of replies, or +# with failure if we time out. +#======================================================================= +class DynamicReplyTo(MessagingHandler): + def __init__(self, client_addr, server_addr): + super(DynamicReplyTo, self).__init__(prefetch=10) + self.client_addr = client_addr + self.server_addr = server_addr + self.dest = "closest.dynamicRequestResponse" + self.error = None + self.server_receiver = None + self.client_receiver = None + self.sender = None + self.server_sender = None + self.n_expected = 10 + self.n_sent = 0 + self.received_by_server = 0 + self.received_by_client = 0 + + + def timeout(self): + self.error = "Timeout Expired: n_sent=%d received_by_server=%d received_by_client=%d" % \ + (self.n_sent, self.received_by_server, self.received_by_client) + self.client_connection.close() + self.server_connection.close() + + + def on_start ( self, event ): + self.timer = event.reactor.schedule ( 10, Timeout(self) ) + + # separate connections to simulate client and server. + self.client_connection = event.container.connect(self.client_addr) + self.server_connection = event.container.connect(self.server_addr) + + self.sender = event.container.create_sender(self.client_connection, self.dest) + self.server_sender = event.container.create_sender(self.server_connection, None) + + self.server_receiver = event.container.create_receiver(self.server_connection, self.dest) + self.client_receiver = event.container.create_receiver(self.client_connection, None, dynamic=True) + + + + def on_sendable(self, event): + while event.sender.credit > 0 and self.n_sent < self.n_expected: + # We send to server, and tell it how to reply to the client. + reply_to_addr = self.client_receiver.remote_source.address + + request = Message ( body=self.n_sent, + address=self.dest, + reply_to = reply_to_addr ) + event.sender.send ( request ) + self.n_sent += 1 + def on_message(self, event): - self.n_received += 1 - if self.n_received == self.n_expected: - self.receiver.close() - self.conn1.close() - self.conn2.close() - self.timer.cancel() + # Server gets a request and responds to + # the address that is embedded in the message. + if event.receiver == self.server_receiver : + self.server_sender.send ( Message(address=event.message.reply_to, + body="Reply hazy, try again later.") ) + self.received_by_server += 1 + + # Client gets a response and counts it. + elif event.receiver == self.client_receiver : + self.received_by_client += 1 + if self.received_by_client == self.n_expected: + self.timer.cancel() + self.server_receiver.close() + self.client_receiver.close() + self.client_connection.close() + self.server_connection.close() + def run(self): Container(self).run() + if __name__ == '__main__': unittest.main(main_module()) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org