Repository: qpid-dispatch
Updated Branches:
  refs/heads/master 93972821f -> 1f1089b10


DISPATCH-209 -- linkroute test based on multi tenancy

(cherry picked from commit 63b2b52c739d3fb9e090861a68b568b6d09262fb)


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/1f1089b1
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/1f1089b1
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/1f1089b1

Branch: refs/heads/master
Commit: 1f1089b1052080645160fd3701004d386e4432b1
Parents: 9397282
Author: mick goulish <mgoul...@redhat.com>
Authored: Mon Jul 10 13:39:39 2017 -0400
Committer: mick goulish <mgoul...@redhat.com>
Committed: Mon Jul 10 13:43:32 2017 -0400

----------------------------------------------------------------------
 tests/system_tests_three_routers.py | 437 ++++++++++++++++++++++++-------
 1 file changed, 344 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/1f1089b1/tests/system_tests_three_routers.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_three_routers.py 
b/tests/system_tests_three_routers.py
index cfc210c..289cc02 100644
--- a/tests/system_tests_three_routers.py
+++ b/tests/system_tests_three_routers.py
@@ -18,11 +18,14 @@
 #
 
 import unittest, os, json
-from subprocess import PIPE, STDOUT
-from proton import Message, PENDING, ACCEPTED, REJECTED, RELEASED, SSLDomain, 
SSLUnavailable, Timeout
-from system_test import TestCase, Qdrouterd, main_module, DIR, TIMEOUT, Process
+from subprocess      import PIPE, STDOUT
+from proton          import Message, PENDING, ACCEPTED, REJECTED, RELEASED, 
SSLDomain, SSLUnavailable, Timeout
+from system_test     import TestCase, Qdrouterd, main_module, DIR, TIMEOUT, 
Process
 from proton.handlers import MessagingHandler
-from proton.reactor import Container, AtMostOnce, AtLeastOnce, 
DynamicNodeProperties, LinkOption
+from proton.reactor  import Container, AtMostOnce, AtLeastOnce, 
DynamicNodeProperties, LinkOption
+from proton.utils    import BlockingConnection
+from qpid_dispatch.management.client import Node
+
 import time
 
 
@@ -42,126 +45,178 @@ class RouterTest(TestCase):
         """Start a router and a sender-listener client"""
         super(RouterTest, cls).setUpClass()
 
-        def router ( name, connection_1, connection_2=None ):
-
-            config = [
-                ('router',
-                  {'mode' : 'interior',
-                   'id'   : 'QDR.%s' % name
-                  }
-                ),
-                ('listener',
-                  {'port'             : cls.tester.get_port(),
-                   'stripAnnotations' : 'no'
-                  }
-                ),
-                ('address',
-                    { 'prefix'       : 'closest',
-                      'distribution' : 'closest'
-                    }
-                ),
-            ]
-            config.append ( connection_1 )
-            if None != connection_2:
-                config.append ( connection_2 )
-
-            config = Qdrouterd.Config ( config )
-
-            cls.routers.append ( cls.tester.qdrouterd(name, config, wait=True) 
)
+        def router(name, more_config):
+
+
+            config = [ ('router', {'mode': 'interior', 'id': name}) ] + 
more_config
+
+            config = Qdrouterd.Config(config)
+
+            cls.routers.append(cls.tester.qdrouterd(name, config, wait=True))
 
         cls.routers = []
 
-        inter_router_port_A = cls.tester.get_port()
         inter_router_port_B = cls.tester.get_port()
-        port_for_sender     = cls.tester.get_port()
 
+        A_normal_port          = cls.tester.get_port()
+        A_route_container_port = cls.tester.get_port()
+        A_inter_router_port    = cls.tester.get_port()
+
+        cls.linkroute_prefix = "0.0.0.0/link"
 
         router ( 'A',
+                 [
+                    ( 'listener',
+                      { 'port': A_normal_port,
+                        'stripAnnotations': 'no'
+                      }
+                    ),
+                    ( 'listener',
+                      { 'port': A_route_container_port,
+                        'stripAnnotations': 'no',
+                        'role': 'route-container'
+                      }
+                    ),
                    ( 'listener',
-                       {'role': 'inter-router',
-                        'port': inter_router_port_A
-                       }
+                     {  'role': 'inter-router',
+                        'port': A_inter_router_port,
+                     }
+                   ),
+                   ( 'linkRoute',
+                     { 'prefix': cls.linkroute_prefix,
+                       'dir': 'in',
+                       'containerId': 'LinkRouteTest'
+                     }
+                   ),
+                   ( 'linkRoute',
+                     { 'prefix': cls.linkroute_prefix,
+                       'dir': 'out',
+                       'containerId': 'LinkRouteTest'
+                     }
                    )
+                 ]
                )
 
+        B_normal_port       = cls.tester.get_port()
+        B_inter_router_port = cls.tester.get_port()
+
         router ( 'B',
-                   ( 'listener',
-                       { 'role': 'inter-router',
-                         'port': inter_router_port_B
-                       }
-                   ),
-                   ( 'connector',
-                       { 'name': 'connectorToA',
+                 [
+                    ( 'listener',
+                      { 'port': B_normal_port,
+                        'stripAnnotations': 'no'
+                      }
+                    ),
+                    ( 'listener',
+                      {  'role': 'inter-router',
+                         'port': B_inter_router_port
+                      }
+                    ),
+                    ( 'connector',
+                      {  'name': 'connectorToA',
                          'role': 'inter-router',
-                         'port': inter_router_port_A,
+                         'port': A_inter_router_port,
                          'verifyHostName': 'no'
-                       }
+                      }
+                   ),
+                   ( 'linkRoute',
+                     { 'prefix': cls.linkroute_prefix,
+                       'dir': 'in',
+                       'containerId': 'LinkRouteTest'
+                     }
+                   ),
+                   ( 'linkRoute',
+                     { 'prefix': cls.linkroute_prefix,
+                       'dir': 'out',
+                       'containerId': 'LinkRouteTest'
+                     }
                    )
+                 ]
                )
 
+        C_normal_port          = cls.tester.get_port()
+        C_route_container_port = cls.tester.get_port()
+
         router ( 'C',
-                   ( 'connector',
-                       { 'name': 'connectorToB',
+                 [
+                    ( 'listener',
+                      { 'port': C_normal_port,
+                        'stripAnnotations': 'no'
+                      }
+                    ),
+                    ( 'listener',
+                      { 'port': C_route_container_port,
+                        'stripAnnotations': 'no',
+                        'role': 'route-container'
+                      }
+                    ),
+                    ( 'connector',
+                      {  'name': 'connectorToB',
                          'role': 'inter-router',
-                         'port': inter_router_port_B,
+                         'port': B_inter_router_port,
                          'verifyHostName': 'no'
-                       }
+                      }
                    ),
-                   ( 'listener',
-                       { 'role': 'normal',
-                         'port': port_for_sender
-                       }
+                   ( 'linkRoute',
+                     { 'prefix': cls.linkroute_prefix,
+                       'dir': 'in',
+                       'containerId': 'LinkRouteTest'
+                     }
+                   ),
+                   ( 'linkRoute',
+                     { 'prefix': cls.linkroute_prefix,
+                       'dir': 'out',
+                       'containerId': 'LinkRouteTest'
+                     }
                    )
+                 ]
                )
 
+        cls.routers[0].wait_router_connected('B')
+        cls.routers[0].wait_router_connected('C')
+        cls.routers[1].wait_router_connected('A')
+        cls.routers[1].wait_router_connected('C')
+        cls.routers[2].wait_router_connected('A')
+        cls.routers[2].wait_router_connected('B')
+
+        cls.A_normal_addr      = cls.routers[0].addresses[0]
+        cls.C_normal_addr      = cls.routers[2].addresses[0]
+
+        cls.sender_addr        = cls.C_normal_addr
+        cls.receiver_addr      = cls.A_normal_addr
 
-        cls.router_A = cls.routers[0]
-        cls.router_B = cls.routers[1]
-        cls.router_C = cls.routers[2]
-
-        #----------------------------------------------
-        # Wait until everybody can see everybody,
-        # to minimize the time when the network
-        # doesn't know how to route my messages.
-        #----------------------------------------------
-        cls.router_C.wait_router_connected('QDR.B')
-        cls.router_B.wait_router_connected('QDR.A')
-        cls.router_A.wait_router_connected('QDR.C')
-
-        #------------------------------------------------
-        # In these tests, first address will be used
-        # by the sender, second by the receiver.
-        #
-        #   receiver <--- A <--- B <--- C <--- sender
-        #
-        #------------------------------------------------
-        cls.send_addr = cls.router_C.addresses[1]
-        cls.recv_addr = cls.router_A.addresses[0]
-
-
-    def test_01_targeted_sender(self):
-        test = TargetedSenderTest ( self.send_addr, self.recv_addr )
+        # Whatever container connects to this addr
+        # will be the route-container for link-attach routing tests..
+        cls.C_route_container_addr = cls.routers[2].addresses[1]
+
+    def test_01_targeted_sender ( self ):
+        test = TargetedSenderTest ( self.sender_addr, self.receiver_addr )
         test.run()
-        self.assertEqual(None, test.error)
+        self.assertEqual ( None, test.error )
+
 
     def test_02_anonymous_sender(self):
-        test = AnonymousSenderTest ( self.send_addr, self.recv_addr )
+        test = AnonymousSenderTest ( self.sender_addr, self.receiver_addr )
         test.run()
-        self.assertEqual(None, test.error)
+        self.assertEqual ( None, test.error )
+
 
     def test_03_dynamic_reply_to(self):
-        test = DynamicReplyTo ( self.send_addr, self.recv_addr )
+        test = DynamicReplyTo ( self.sender_addr, self.receiver_addr )
+        test.run()
+        self.assertEqual ( None, test.error )
+
+    def test_04_linkroute ( self ):
+        test = LinkAttachRouting ( self.A_normal_addr,
+                                   self.C_route_container_addr,
+                                   self.linkroute_prefix,
+                                   "addr_04"
+                                 )
         test.run()
         self.assertEqual(None, test.error)
 
 
 
-class Timeout(object):
-    def __init__(self, parent):
-        self.parent = parent
-
-    def on_timer_task(self, event):
-        self.parent.timeout()
 
 
 class TargetedSenderTest(MessagingHandler):
@@ -185,14 +240,13 @@ class TargetedSenderTest(MessagingHandler):
         self.recv_conn.close()
 
     def on_start(self, event):
-        self.timer = event.reactor.schedule(10, Timeout(self))
+        self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
         self.send_conn = event.container.connect(self.send_addr)
         self.recv_conn = event.container.connect(self.recv_addr)
         self.sender   = event.container.create_sender(self.send_conn, 
self.dest)
         self.receiver = event.container.create_receiver(self.recv_conn, 
self.dest)
         self.receiver.flow(self.n_expected)
 
-
     def send(self):
       while self.sender.credit > 0 and self.n_sent < self.n_expected:
         msg = Message(body=self.n_sent)
@@ -253,7 +307,7 @@ class AnonymousSenderTest(MessagingHandler):
         self.recv_conn.close()
 
     def on_start(self, event):
-        self.timer     = event.reactor.schedule(10, Timeout(self))
+        self.timer     = event.reactor.schedule(TIMEOUT, Timeout(self))
         self.send_conn = event.container.connect(self.send_addr)
         self.recv_conn = event.container.connect(self.recv_addr)
         self.sender    = event.container.create_sender(self.send_conn, 
options=DynamicTarget())
@@ -322,7 +376,7 @@ class DynamicReplyTo(MessagingHandler):
 
 
     def on_start ( self, event ):
-        self.timer             = event.reactor.schedule ( 10, Timeout(self) )
+        self.timer             = event.reactor.schedule ( TIMEOUT, 
Timeout(self) )
 
         # separate connections to simulate client and server.
         self.client_connection = event.container.connect(self.client_addr)
@@ -337,9 +391,13 @@ class DynamicReplyTo(MessagingHandler):
 
 
     def on_sendable(self, event):
+        reply_to_addr = self.client_receiver.remote_source.address
+
+        if reply_to_addr == None:
+          return
+
         while event.sender.credit > 0 and self.n_sent < self.n_expected:
             # We send to server, and tell it how to reply to the client.
-            reply_to_addr = self.client_receiver.remote_source.address
 
             request = Message ( body=self.n_sent,
                                 address=self.dest,
@@ -373,5 +431,198 @@ class DynamicReplyTo(MessagingHandler):
 
 
 
+
+class Entity(object):
+    def __init__(self, status_code, status_description, attrs):
+        self.status_code        = status_code
+        self.status_description = status_description
+        self.attrs              = attrs
+
+    def __getattr__(self, key):
+        return self.attrs[key]
+
+
+
+
+class AddressChecker(object):
+    def __init__(self, reply_addr):
+        self.reply_addr = reply_addr
+
+    def response(self, msg):
+        ap = msg.properties
+        return Entity(ap['statusCode'], ap['statusDescription'], msg.body)
+
+    def read_address(self, name):
+        ap = {'operation': 'READ', 'type': 
'org.apache.qpid.dispatch.router.address', 'name': name}
+        return Message(properties=ap, reply_to=self.reply_addr)
+
+    def query_addresses(self):
+        ap = {'operation': 'QUERY', 'type': 
'org.apache.qpid.dispatch.router.address'}
+        return Message(properties=ap, reply_to=self.reply_addr)
+
+
+
+
+class LinkRouteCheckTimeout(object):
+    def __init__(self, parent):
+        self.parent = parent
+
+    def on_timer_task(self, event):
+        self.parent.linkroute_check_timeout()
+
+
+
+class Timeout(object):
+    def __init__(self, parent):
+        self.parent = parent
+
+    def on_timer_task(self, event):
+        self.parent.timeout()
+
+
+
+class LinkAttachRouting ( MessagingHandler ):
+    """
+    There are two hosts: near, and far.  The far host is the one that
+    the route container will connect to, and it will receive our messages.
+    The near host is what our sender will attach to.
+    """
+    def __init__ ( self, nearside_host, farside_host, linkroute_prefix, 
addr_suffix ):
+        super ( LinkAttachRouting, self ).__init__(prefetch=0)
+        self.nearside_host         = nearside_host
+        self.farside_host          = farside_host
+        self.linkroute_prefix      = linkroute_prefix
+        self.link_routable_address = self.linkroute_prefix + '.' + addr_suffix
+
+        self.nearside_cnx             = None
+        self.farside_cnx              = None
+        self.error                    = None
+        self.nearside_sender          = None
+        self.farside_receiver         = None
+        self.linkroute_check_timer    = None
+        self.linkroute_check_receiver = None
+        self.linkroute_check_agent    = None
+
+        self.count     = 10
+        self.n_sent    = 0
+        self.n_rcvd    = 0
+        self.n_settled = 0
+
+
+    def timeout ( self ):
+        self.bail ( "Timeout Expired: n_sent=%d n_rcvd=%d n_settled=%d" %
+                    (self.n_sent, self.n_rcvd, self.n_settled) )
+
+    def linkroute_check_timeout(self):
+        self.linkroute_check()
+
+
+    def bail ( self, text ):
+        self.error = text
+        self.farside_cnx.close()
+        self.nearside_cnx.close()
+        self.timer.cancel()
+        if self.linkroute_check_timer:
+            self.linkroute_check_timer.cancel()
+
+
+    def on_start(self, event):
+        self.timer        = event.reactor.schedule(TIMEOUT, Timeout(self))
+        self.nearside_cnx = event.container.connect(self.nearside_host)
+
+        # Step 1: I make the far cnx.  Once this is done, if we later attach
+        # anywhere with a link whose address matches the link-attach routable
+        # prefix, the link-attach route will be formed.
+        self.farside_cnx = event.container.connect(self.farside_host)
+
+        # Since the route container will be connected to Farside, and
+        # my router network is linear, I make the linkroute checker attach
+        # to Nearside.
+        self.linkroute_check_receiver = 
event.container.create_receiver(self.nearside_cnx, dynamic=True)
+        self.linkroute_check_agent    = 
event.container.create_sender(self.nearside_cnx, "$management")
+
+
+    def on_link_opened(self, event):
+        if event.receiver:
+            event.receiver.flow(self.count)
+        if event.receiver == self.linkroute_check_receiver:
+            # Step 2. my linkroute check-link has opened: make the 
linkroute_checker
+            self.linkroute_checker = 
AddressChecker(self.linkroute_check_receiver.remote_source.address)
+            self.linkroute_check()
+
+
+    def on_message(self, event):
+        if event.receiver == self.farside_receiver:
+            # This is a payload message.
+            self.n_rcvd += 1
+
+        elif event.receiver == self.linkroute_check_receiver:
+            # This is one of my route-readiness checking messages.
+            response = self.linkroute_checker.response(event.message)
+            if response.status_code == 200 and (response.remoteCount + 
response.containerCount) > 0:
+                # Step 3: got confirmation of link-attach knowledge fully 
propagated
+                # to Nearside router.  Now we can make the nearside sender 
without getting
+                # a No Path To Destination error.
+                self.nearside_sender = 
event.container.create_sender(self.nearside_cnx, self.link_routable_address)
+                # And we can quit checking.
+                if self.linkroute_check_timer:
+                    self.linkroute_check_timer.cancel()
+                    self.linkroute_check_timer = None
+            else:
+                # If the latest check did not find the link-attack route ready,
+                # schedule another check a little while from now.
+                self.linkroute_check_timer = event.reactor.schedule(0.25, 
LinkRouteCheckTimeout(self))
+
+
+    def on_link_opening ( self, event ):
+        if event.receiver:
+            # Step 4.  At start-up, I connected to the route-container 
listener on
+            # Farside, which makes me the route container.  So when a sender 
attaches
+            # to the network and wants to send to the linkroutable address, 
the router
+            # network creates the link-attach route, and then hands me a 
receiver for it.
+            if event.receiver.remote_target.address == 
self.link_routable_address:
+                self.farside_receiver = event.receiver
+                event.receiver.target.address = self.link_routable_address
+                event.receiver.open()
+            else:
+                self.bail("Incorrect address on incoming receiver: got %s, 
expected %s" %
+                          (event.receiver.remote_target.address, 
self.link_routable_address))
+
+
+    def on_sendable ( self, event ):
+        # Step 5: once there is someone on the network who can receive
+        # my messages, I get the go-ahead for my sender.
+        if event.sender == self.nearside_sender:
+            self.send()
+
+
+    def send ( self ):
+        while self.nearside_sender.credit > 0 and self.n_sent < self.count:
+            self.n_sent += 1
+            m = Message(body="Message %d of %d" % (self.n_sent, self.count))
+            self.nearside_sender.send(m)
+
+
+    def linkroute_check ( self ):
+        # BUGALERT: We have to prepend the 'D' to this linkroute prefix
+        # because that's what the router does internally.  Someday this
+        # may change.
+        request = self.linkroute_checker.read_address("D" + 
self.linkroute_prefix )
+        self.linkroute_check_agent.send(request)
+
+
+    def on_settled(self, event):
+        if event.sender == self.nearside_sender:
+            self.n_settled += 1
+            if self.n_settled == self.count:
+                self.bail ( None )
+
+
+    def run(self):
+        container = Container(self)
+        container.container_id = 'LinkRouteTest'
+        container.run()
+
+
 if __name__ == '__main__':
     unittest.main(main_module())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to