Repository: qpid-dispatch
Updated Branches:
  refs/heads/master 098ec7d13 -> 9739f93e0


DISPATCH-209 : change name and add new all-test topology


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/9739f93e
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/9739f93e
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/9739f93e

Branch: refs/heads/master
Commit: 9739f93e0d94e3297def7a06b6e5ca2d67c9e6a3
Parents: 098ec7d
Author: mick goulish <mgoul...@redhat.com>
Authored: Mon Aug 14 15:18:51 2017 -0400
Committer: mick goulish <mgoul...@redhat.com>
Committed: Mon Aug 14 15:18:51 2017 -0400

----------------------------------------------------------------------
 tests/CMakeLists.txt                |    2 +-
 tests/system_tests_distribution.py  | 1356 ++++++++++++++++++++++++++++++
 tests/system_tests_three_routers.py |  814 ------------------
 3 files changed, 1357 insertions(+), 815 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/9739f93e/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index 8108434..fc9d548 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -92,7 +92,7 @@ foreach(py_test_module
     system_tests_user_id_proxy
     system_tests_deprecated
     system_tests_two_routers
-    system_tests_three_routers
+    system_tests_distribution
     system_tests_multi_tenancy
     system_tests_dynamic_terminus
     system_tests_log_message_components

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/9739f93e/tests/system_tests_distribution.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_distribution.py 
b/tests/system_tests_distribution.py
new file mode 100644
index 0000000..59924e0
--- /dev/null
+++ b/tests/system_tests_distribution.py
@@ -0,0 +1,1356 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import unittest, os, json
+from subprocess      import PIPE, STDOUT
+from proton          import Message, PENDING, ACCEPTED, REJECTED, RELEASED, 
SSLDomain, SSLUnavailable, Timeout
+from system_test     import TestCase, Qdrouterd, main_module, DIR, TIMEOUT, 
Process
+from proton.handlers import MessagingHandler
+from proton.reactor  import Container, AtMostOnce, AtLeastOnce, 
DynamicNodeProperties, LinkOption
+from proton.utils    import BlockingConnection
+from qpid_dispatch.management.client import Node
+
+import time
+
+
+# PROTON-828:
+try:
+    from proton import MODIFIED
+except ImportError:
+    from proton import PN_STATUS_MODIFIED as MODIFIED
+
+
+
+
+#------------------------------------------------
+# Helper classes for all tests.
+#------------------------------------------------
+
+class Timeout(object):
+    def __init__(self, parent):
+        self.parent = parent
+
+    def on_timer_task(self, event):
+        self.parent.timeout()
+
+
+
+class AddressCheckResponse(object):
+    """
+    Convenience class for the responses returned by an AddressChecker.
+    """
+    def __init__(self, status_code, status_description, attrs):
+        self.status_code        = status_code
+        self.status_description = status_description
+        self.attrs              = attrs
+
+    def __getattr__(self, key):
+        return self.attrs[key]
+
+
+
+class AddressChecker ( object ):
+    """
+    Format address-query messages and parse the responses.
+    """
+    def __init__ ( self, reply_addr ):
+        self.reply_addr = reply_addr
+
+    def parse_address_query_response ( self, msg ):
+        ap = msg.properties
+        return AddressCheckResponse ( ap['statusCode'], 
ap['statusDescription'], msg.body )
+
+    def make_address_query ( self, name ):
+        ap = {'operation': 'READ', 'type': 
'org.apache.qpid.dispatch.router.address', 'name': name}
+        return Message ( properties=ap, reply_to=self.reply_addr )
+
+    def make_addresses_query ( self ):
+        ap = {'operation': 'QUERY', 'type': 
'org.apache.qpid.dispatch.router.address'}
+        return Message ( properties=ap, reply_to=self.reply_addr )
+
+
+
+class AddressCheckerTimeout ( object ):
+    def __init__(self, parent):
+        self.parent = parent
+
+    def on_timer_task(self, event):
+        self.parent.address_check_timeout()
+
+#------------------------------------------------
+# END Helper classes for all tests.
+#------------------------------------------------
+
+
+
+
+#================================================================
+#     Setup 
+#================================================================
+
+class DistributionTests ( TestCase ):
+
+    @classmethod
+    def setUpClass(cls):
+        """
+        Create a router topology that is a superset of the topologies we will
+        need for various tests.  So far, we have only two types of tests: 
+        3-router linear, and 3-router triangular.  The various tests simply
+        attach their senders and receivers appropriately to 'see' their
+        desired topology.
+        """
+        super(DistributionTests, cls).setUpClass()
+
+
+        def router(name, more_config):
+
+            config = [ ('router',  {'mode': 'interior', 'id': name}),
+                       ('address', {'prefix': 'closest',  'distribution': 
'closest'}),
+                       ('address', {'prefix': 'balanced', 'distribution': 
'balanced'})
+                     ] + more_config
+
+            config = Qdrouterd.Config(config)
+
+            cls.routers.append(cls.tester.qdrouterd(name, config, wait=True))
+
+        cls.routers = []
+
+
+
+        #                                                                      
    
+        #     Connection picture
+        #
+        #           1           1                                              
      
+        #         A <-------- B <------ C                                      
   
+        #          ^ 2       ^ 2                                               
     
+        #           \       /                                                  
    
+        #            \     /                                                   
    
+        #             \   /                                                    
    
+        #              \ /                                                     
    
+        #               D                                                      
    
+        #                                                                      
    
+        #
+
+        A_client_port          = cls.tester.get_port()
+        B_client_port          = cls.tester.get_port()
+        C_client_port          = cls.tester.get_port()
+        D_client_port          = cls.tester.get_port()
+
+        A_inter_router_port_1  = cls.tester.get_port()
+        A_inter_router_port_2  = cls.tester.get_port()
+        B_inter_router_port_1  = cls.tester.get_port()
+        B_inter_router_port_2  = cls.tester.get_port()
+
+        # "Route-container port" does not mean that the port
+        # contains a route.  It means that any client that 
+        # connectsd to the port is considered to be a route-
+        # container.
+        A_route_container_port = cls.tester.get_port()
+        B_route_container_port = cls.tester.get_port()
+        C_route_container_port = cls.tester.get_port()
+        D_route_container_port = cls.tester.get_port()
+
+        # Costs for balanced tests. The 'balanced' distribution
+        # takes these costs into account in its algorithm.
+        # Costs are associated not with routers, but with the
+        # connections between routers.  In the config, they may
+        # be attached to the inter-router listener, or the connector,
+        # or both.  If both the inter-router listener and the 
+        # connector have associated costs, the higher of the two
+        # will be used.
+        cls.A_B_cost =   10
+        cls.B_C_cost =   20
+        cls.A_D_cost =   50
+        cls.B_D_cost =  100
+
+        cls.linkroute_prefix = "0.0.0.0/linkroute"
+
+        router ( 'A',
+                 [
+                    ( 'listener',
+                      { 'port': A_client_port,
+                        'role': 'normal',
+                        'stripAnnotations': 'no'
+                      }
+                    ),
+                    ( 'listener',
+                      {  'role': 'inter-router',
+                         'port': A_inter_router_port_1
+                      }
+                    ),
+                    ( 'listener',
+                      {  'role': 'inter-router',
+                         'port': A_inter_router_port_2
+                      }
+                    ),
+                    ( 'listener',
+                      { 'port': A_route_container_port,
+                        'stripAnnotations': 'no',
+                        'role': 'route-container'
+                      }
+                    ),
+                    ( 'linkRoute',
+                      { 'prefix': cls.linkroute_prefix,
+                        'dir': 'in',
+                        'containerId': 'LinkRouteTest'
+                      }
+                    ),
+                    ( 'linkRoute',
+                      { 'prefix': cls.linkroute_prefix,
+                        'dir': 'out',
+                        'containerId': 'LinkRouteTest'
+                      }
+                    )
+                 ] 
+               )
+
+        router ( 'B',
+                 [  
+                    ( 'listener',
+                      { 'port': B_client_port,
+                        'role': 'normal',
+                        'stripAnnotations': 'no'
+                      }
+                    ),
+                    ( 'listener',
+                      {  'role': 'inter-router',
+                         'port': B_inter_router_port_1
+                      }
+                    ),
+                    ( 'listener',
+                      {  'role': 'inter-router',
+                         'port': B_inter_router_port_2
+                      }
+                    ),
+                    ( 'listener',
+                      { 'port': B_route_container_port,
+                        'stripAnnotations': 'no',
+                        'role': 'route-container'
+                      }
+                    ),
+                    ( 'linkRoute',
+                      { 'prefix': cls.linkroute_prefix,
+                        'dir': 'in',
+                        'containerId': 'LinkRouteTest'
+                      }
+                    ),
+                    ( 'linkRoute',
+                      { 'prefix': cls.linkroute_prefix,
+                        'dir': 'out',
+                        'containerId': 'LinkRouteTest'
+                      }
+                    ),
+                    ( 'connector',
+                      {  'name': 'connectorToA',
+                         'role': 'inter-router',
+                         'port': A_inter_router_port_1,
+                         'verifyHostName': 'no',
+                         'cost':  cls.A_B_cost
+                      }
+                    )
+                 ]
+               )
+
+        router ( 'C',
+                 [
+                    ( 'listener',
+                      { 'port': C_client_port,
+                        'role': 'normal',
+                        'stripAnnotations': 'no'
+                      }
+                    ),
+                    ( 'listener',
+                       { 'port': C_route_container_port,
+                         'stripAnnotations': 'no',
+                         'role': 'route-container'
+                       }
+                    ),
+                    ( 'linkRoute',
+                      { 'prefix': cls.linkroute_prefix,
+                        'dir': 'in',
+                        'containerId': 'LinkRouteTest'
+                      }
+                    ),
+                    ( 'linkRoute',
+                      { 'prefix': cls.linkroute_prefix,
+                        'dir': 'out',
+                        'containerId': 'LinkRouteTest'
+                      }
+                    ),
+                    ( 'connector',
+                      {  'name': 'connectorToB',
+                         'role': 'inter-router',
+                         'port': B_inter_router_port_1,
+                         'verifyHostName': 'no',
+                         'cost' : cls.B_C_cost
+                      }
+                    )
+                 ]
+               )
+
+        router ( 'D',
+                 [
+                    ( 'listener',
+                      { 'port': D_client_port,
+                        'role': 'normal',
+                        'stripAnnotations': 'no'
+                      }
+                    ),
+                    ( 'listener',
+                       { 'port': D_route_container_port,
+                         'stripAnnotations': 'no',
+                         'role': 'route-container'
+                       }
+                    ),
+                    ( 'connector',
+                      {  'name': 'connectorToA',
+                         'role': 'inter-router',
+                         'port': A_inter_router_port_2,
+                         'verifyHostName': 'no',
+                         'cost' : cls.A_D_cost
+                      }
+                    ),
+                    ( 'linkRoute',
+                      { 'prefix': cls.linkroute_prefix,
+                        'dir': 'in',
+                        'containerId': 'LinkRouteTest'
+                      }
+                    ),
+                    ( 'linkRoute',
+                      { 'prefix': cls.linkroute_prefix,
+                        'dir': 'out',
+                        'containerId': 'LinkRouteTest'
+                      }
+                    ),
+                    ( 'connector',
+                      {  'name': 'connectorToB',
+                         'role': 'inter-router',
+                         'port': B_inter_router_port_2,
+                         'verifyHostName': 'no',
+                         'cost' : cls.B_D_cost
+                      }
+                    )
+                 ]
+               )
+
+        router_A = cls.routers[0]
+        router_B = cls.routers[1]
+        router_C = cls.routers[2]
+        router_D = cls.routers[3]
+
+        cls.A_route_container_addr = router_A.addresses[3]
+        cls.B_route_container_addr = router_B.addresses[3]
+        cls.C_route_container_addr = router_B.addresses[1]
+        cls.D_route_container_addr = router_B.addresses[1]
+
+        router_A.wait_router_connected('B')
+        router_A.wait_router_connected('C')
+        router_A.wait_router_connected('D')
+
+        cls.A_addr = router_A.addresses[0]
+        cls.B_addr = router_B.addresses[0]
+        cls.C_addr = router_C.addresses[0]
+        cls.D_addr = router_D.addresses[0]
+
+    def test_01_targeted_sender_AC ( self ):
+        test = TargetedSenderTest ( self.A_addr, self.C_addr, "closest/01" )
+        test.run()
+        self.assertEqual ( None, test.error )
+
+    def test_02_targeted_sender_DC ( self ):
+        test = TargetedSenderTest ( self.D_addr, self.C_addr, "closest/02" )
+        test.run()
+        self.assertEqual ( None, test.error )
+
+    def test_03_anonymous_sender_AC ( self ):
+        test = AnonymousSenderTest ( self.A_addr, self.C_addr )
+        test.run()
+        self.assertEqual ( None, test.error )
+
+    def test_04_anonymous_sender_DC ( self ):
+        test = AnonymousSenderTest ( self.D_addr, self.C_addr )
+        test.run()
+        self.assertEqual ( None, test.error )
+
+    def test_05_dynamic_reply_to_AC ( self ):
+        test = DynamicReplyTo ( self.A_addr, self.C_addr )
+        test.run()
+        self.assertEqual ( None, test.error )
+
+
+    def test_06_dynamic_reply_to_DC ( self ):
+        test = DynamicReplyTo ( self.D_addr, self.C_addr )
+        test.run()
+        self.assertEqual ( None, test.error )
+
+
+    def test_07_linkroute ( self ):
+        test = LinkAttachRouting ( self.C_addr,
+                                   self.A_route_container_addr,
+                                   self.linkroute_prefix,
+                                   "addr_07"
+                                 )
+        test.run()
+        self.assertEqual(None, test.error)
+
+
+    def test_08_closest ( self ):
+        test = ClosestTest ( self.A_addr,
+                             self.B_addr,
+                             self.C_addr,
+                             "addr_08"
+                           )
+        test.run()
+        self.assertEqual(None, test.error)
+
+        #                                                                      
    
+        #     Cost picture for balanced distribution tests.
+        #
+        #              10          20
+        #         A <-------- B <------ C                                      
   
+        #          ^         ^                                                 
     
+        #           \       /                                                  
    
+        #       50   \     /  100                                              
    
+        #             \   /                                                    
    
+        #              \ /                                                     
    
+        #               D                                                      
    
+        #                                                                      
    
+
+           
+        #  Here is how the message balancing should work for 
+        #  various total number of messages, up to 100:
+        # 
+        #  NOTE: remember these messages are all unsettled.
+        #        And will stay that way.  This is not a realistic
+        #        usage scenario, but it the best way to test the 
+        #        balanced distribution algorithm.
+        #
+        #  1. Messages start flowing in at A.  They will all
+        #     be used by A (sent to its receiver) until the
+        #     total == cost ( A, B ).
+        # 
+        #  2. At that point, A will start sharing with B,
+        #     one-for-me-one-for-you. (So A will go to 11 before
+        #     B gets its first message.)
+        #
+        #  3. A and B will count up until B reaches 
+        #     cost ( B, C )
+        #     B will then start sharings its messages with C,
+        #     one-for-me-one-for-you.  (So B will go to 21 before
+        #     C gets its first message.)
+        #
+        #  4. However note: it is NOT round-robin at this point.
+        #     A is still taking every other message, B is only getting
+        #     A's overflow, and now B is sharing half of that with C.
+        #     So at this point B will start falling farther behind A.
+        #  
+        #  5. The totals here are completely deterministic, so we pass
+        #     to the test a 'slop' amount of 0.
+        #     
+        #    total   near --10--> mid ---20--> far
+        #
+        #     1        1            0            0
+        #     10      10            0            0
+        #     11      11            0            0
+        #     12      11            1            0
+        #     13      12            1            0
+        #     14      12            2            0
+        #     ...
+        #     50      30           20            0
+        #     51      31           20            0
+        #     52      31           21            0
+        #     53      32           21            0
+        #     54      32           21            1
+        #     55      33           21            1
+        #     56      33           22            1
+        #     57      34           22            1
+        #     58      34           22            2
+        #     59      35           22            2
+        #     60      35           23            2
+        #     ...
+        #     100     55           33           12
+        #
+
+ 
+    def test_09_balanced_linear ( self ):
+        # slop is how much the second two values may diverge from 
+        # the expected.  But they still must sum to total - A.
+        total      = 100
+        expected_A = 55
+        expected_B = 33
+        expected_C = 12
+        slop       = 0
+        test = BalancedTest ( self.A_addr,
+                              self.B_addr,
+                              self.C_addr,
+                              "addr_09",
+                              total,
+                              expected_A,
+                              expected_B,
+                              expected_C,
+                              slop
+                            )
+        test.run()
+        self.assertEqual(None, test.error)
+
+
+        #     Reasoning for the triangular balanced case:
+
+        #     Cost picture
+        #
+        #              10          20
+        #         A <-------- B <------ C                                      
         
+        #          ^         ^
+        #           \       /
+        #       50   \     /  100
+        #             \   /
+        #              \ /
+        #               D
+        #
+        # We are doing  ( A, B, D ), with the sender attached at A.
+        # All these messages are unsettled, which is what allows us to
+        # see how the balanced distribution algorithm works.
+        #
+        #  1. total unsettled msgs at A cannot be more than B_cost + 1,
+        #     and also cannot be more than D_cost + 1
+        #
+        #  2. A will always keep the message for itself (for its own receiver)
+        #     if it can do so without violating rule (1).
+        #
+        #  3. So, A will count up to 11, and then it will start alternating 
+        #     with B.  
+        #
+        #  4. When A counts up to 51, it must also start sharing with D.
+        #     It will alternate between B and D.
+        #
+        #  5. As long as B does not yet have 100 messages, it will not 
+        #     share with D.
+        #
+        #  6. So! at 100 messages total, A must be above both of its 
+        #     neighbors by that neighbor's cost, or 1 more -- and the total 
+        #     of all 3 must sum to 100.
+        #     
+        #     A = B + 10      B = A - 10
+        #     A = D + 50      D = A - 50
+        #     A + B + D == 100
+        #     -->
+        #     A + (A - 10) + (A - 50) == 100
+        #     3A - 60 == 100
+        #     A == 53.333...
+        #     A == 54
+        #
+        #     so B + D == 46
+        #     A is 10 or 11 > B --> B == 44 or 43
+        #     A is 50 or 51 > D --> D ==  4 or  3
+        #     B == 43 and D == 3
+
+        #     So pass these values in to the test: (54, 43, 3) 
+        #     and test that:
+        #       1. A is exactly that value.
+        #       2. B and D sum to 100 - A
+        #       3. B and D are both with 1 of their expected values.
+        #
+    def test_10_balanced_triangle ( self ):
+        total      = 100
+        expected_A = 54
+        expected_B = 43
+        expected_C = 3
+        slop       = 1
+        test = BalancedTest ( self.A_addr,
+                              self.B_addr,
+                              self.D_addr,
+                              "addr_10",
+                              total,
+                              expected_A,
+                              expected_B,
+                              expected_C,
+                              slop
+                            )
+        test.run()
+        self.assertEqual(None, test.error)
+
+
+
+
+
+
+
+
+
+
+
+#================================================================
+#     Tests 
+#================================================================
+
+
+class TargetedSenderTest ( MessagingHandler ):
+    """
+    A 'targeted' sender is one in which we tell the router what 
+    address we want to send to. (As opposed to letting the router
+    pass back an address to us.)
+    """
+    def __init__ ( self, send_addr, recv_addr, destination ):
+        super(TargetedSenderTest, self).__init__(prefetch=0)
+        self.send_addr  = send_addr
+        self.recv_addr  = recv_addr
+        self.dest       = destination
+        self.error      = None
+        self.sender     = None
+        self.receiver   = None
+        self.n_expected = 10
+        self.n_sent     = 0
+        self.n_received = 0
+        self.n_accepted = 0
+
+    def timeout(self):
+        self.error = "Timeout Expired: n_sent=%d n_received=%d n_accepted=%d" 
% \
+                     (self.n_sent, self.n_received, self.n_accepted)
+        self.send_conn.close()
+        self.recv_conn.close()
+
+    def on_start(self, event):
+        self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
+        self.send_conn = event.container.connect(self.send_addr)
+        self.recv_conn = event.container.connect(self.recv_addr)
+        self.sender   = event.container.create_sender(self.send_conn, 
self.dest)
+        self.receiver = event.container.create_receiver(self.recv_conn, 
self.dest)
+        self.receiver.flow(self.n_expected)
+
+    def send(self):
+      while self.sender.credit > 0 and self.n_sent < self.n_expected:
+        msg = Message(body=self.n_sent)
+        self.sender.send(msg)
+        self.n_sent += 1
+
+    def on_sendable(self, event):
+        if self.n_sent < self.n_expected:
+            self.send()
+
+    def on_accepted(self, event):
+        self.n_accepted += 1
+
+    def on_message(self, event):
+        self.n_received += 1
+        if self.n_received == self.n_expected:
+            self.receiver.close()
+            self.send_conn.close()
+            self.recv_conn.close()
+            self.timer.cancel()
+
+    def run(self):
+        Container(self).run()
+
+
+
+class DynamicTarget(LinkOption):
+
+    def apply(self, link):
+        link.target.dynamic = True
+        link.target.address = None
+
+
+
+class AnonymousSenderTest ( MessagingHandler ):
+    """
+    An 'anonymous' sender is one in which we let the router tell
+    us what address the sender should use.  It will pass back this
+    information to us when we get the on_link_opened event.
+    """
+
+    def __init__(self, send_addr, recv_addr):
+        super(AnonymousSenderTest, self).__init__()
+        self.send_addr = send_addr
+        self.recv_addr = recv_addr
+
+        self.error     = None
+        self.recv_conn = None
+        self.send_conn = None
+        self.sender    = None
+        self.receiver  = None
+        self.address   = None
+
+        self.expected   = 10
+        self.n_sent     = 0
+        self.n_received = 0
+        self.n_accepted = 0
+
+
+    def timeout ( self ):
+        self.error = "Timeout Expired: n_sent=%d n_received=%d n_accepted=%d" 
% \
+                     (self.n_sent, self.n_received, self.n_accepted)
+        self.send_conn.close()
+        self.recv_conn.close()
+
+    def on_start(self, event):
+        self.timer     = event.reactor.schedule(TIMEOUT, Timeout(self))
+        self.send_conn = event.container.connect(self.send_addr)
+        self.recv_conn = event.container.connect(self.recv_addr)
+        self.sender    = event.container.create_sender(self.send_conn, 
options=DynamicTarget())
+
+    def send(self):
+        while self.sender.credit > 0 and self.n_sent < self.expected:
+            self.n_sent += 1
+            m = Message(address=self.address, body="Message %d of %d" % 
(self.n_sent, self.expected))
+            self.sender.send(m)
+
+    def on_link_opened(self, event):
+        if event.sender == self.sender:
+            # Here we are told the address that we will use for the sender.
+            self.address = self.sender.remote_target.address
+            self.receiver = event.container.create_receiver(self.recv_conn, 
self.address)
+
+    def on_sendable(self, event):
+        self.send()
+
+    def on_message(self, event):
+        if event.receiver == self.receiver:
+            self.n_received += 1
+
+    def on_accepted(self, event):
+        self.n_accepted += 1
+        if self.n_accepted == self.expected:
+            self.send_conn.close()
+            self.recv_conn.close()
+            self.timer.cancel()
+
+    def run(self):
+        Container(self).run()
+
+
+
+
+
+#=======================================================================
+#=======================================================================
+class DynamicReplyTo(MessagingHandler):
+    """
+    In this test we have a separate 'client' and 'server' with separate
+    connections.  The client sends requests to the server, and embeds in
+    them its desired reply-to address.  The server uses that address to
+    send back messages.  The tests ends with success if the client receives 
+    the expected number of replies, or with failure if we time out before
+    that happens.
+    """
+    def __init__(self, client_addr, server_addr):
+        super(DynamicReplyTo, self).__init__(prefetch=10)
+        self.client_addr        = client_addr
+        self.server_addr        = server_addr
+        self.dest               = "closest.dynamicRequestResponse"
+        self.error              = None
+        self.server_receiver    = None
+        self.client_receiver    = None
+        self.sender             = None
+        self.server_sender      = None
+        self.n_expected         = 10
+        self.n_sent             = 0
+        self.received_by_server = 0
+        self.received_by_client = 0
+
+
+    def timeout(self):
+        self.error = "Timeout Expired: n_sent=%d received_by_server=%d 
received_by_client=%d" % \
+                     (self.n_sent, self.received_by_server, 
self.received_by_client)
+        self.client_connection.close()
+        self.server_connection.close()
+
+
+    def on_start ( self, event ):
+        self.timer             = event.reactor.schedule ( TIMEOUT, 
Timeout(self) )
+        # separate connections to simulate client and server.
+        self.client_connection = event.container.connect(self.client_addr)
+        self.server_connection = event.container.connect(self.server_addr)
+
+        self.sender            = 
event.container.create_sender(self.client_connection, self.dest)
+        self.server_sender     = 
event.container.create_sender(self.server_connection, None)
+
+        self.server_receiver   = 
event.container.create_receiver(self.server_connection, self.dest)
+        self.client_receiver   = 
event.container.create_receiver(self.client_connection, None, dynamic=True)
+
+
+
+    def on_sendable(self, event):
+        reply_to_addr = self.client_receiver.remote_source.address
+
+        if reply_to_addr == None:
+          return
+
+        while event.sender.credit > 0 and self.n_sent < self.n_expected:
+            # We send to server, and tell it how to reply to the client.
+            request = Message ( body=self.n_sent,
+                                address=self.dest,
+                                reply_to = reply_to_addr )
+            event.sender.send ( request )
+            self.n_sent += 1
+
+
+    def on_message(self, event):
+        # Server gets a request and responds to
+        # the address that is embedded in the message.
+        if event.receiver == self.server_receiver :
+            self.server_sender.send ( Message(address=event.message.reply_to,
+                                      body="Reply hazy, try again later.") )
+            self.received_by_server += 1
+
+        # Client gets a response and counts it.
+        elif event.receiver == self.client_receiver :
+            self.received_by_client += 1
+            if self.received_by_client == self.n_expected:
+                self.timer.cancel()
+                self.server_receiver.close()
+                self.client_receiver.close()
+                self.client_connection.close()
+                self.server_connection.close()
+
+
+    def run(self):
+        Container(self).run()
+
+
+
+
+class LinkAttachRouting ( MessagingHandler ):
+    """
+    There are two hosts: near, and far.  The far host is the one that
+    the route container will connect to, and it will receive our messages.
+    The near host is what our sender will attach to.
+    """
+    def __init__ ( self, nearside_host, farside_host, linkroute_prefix, 
addr_suffix ):
+        super ( LinkAttachRouting, self ).__init__(prefetch=0)
+        self.nearside_host         = nearside_host
+        self.farside_host          = farside_host
+        self.linkroute_prefix      = linkroute_prefix
+        self.link_routable_address = self.linkroute_prefix + '.' + addr_suffix
+
+        self.nearside_cnx             = None
+        self.farside_cnx              = None
+        self.error                    = None
+        self.nearside_sender          = None
+        self.farside_receiver         = None
+        self.linkroute_check_timer    = None
+        self.linkroute_check_receiver = None
+        self.linkroute_check_sender   = None
+
+        self.count     = 10
+        self.n_sent    = 0
+        self.n_rcvd    = 0
+        self.n_settled = 0
+
+
+    def timeout ( self ):
+        self.bail ( "Timeout Expired: n_sent=%d n_rcvd=%d n_settled=%d" %
+                    (self.n_sent, self.n_rcvd, self.n_settled) )
+
+
+    def address_check_timeout(self):
+        self.linkroute_check()
+
+
+    def bail ( self, text ):
+        self.error = text
+        self.farside_cnx.close()
+        self.nearside_cnx.close()
+        self.timer.cancel()
+        if self.linkroute_check_timer:
+            self.linkroute_check_timer.cancel()
+
+
+    def on_start(self, event):
+        self.timer        = event.reactor.schedule(TIMEOUT, Timeout(self))
+        self.nearside_cnx = event.container.connect(self.nearside_host)
+
+        # Step 1: I make the far cnx.  Once this is done, if we later attach
+        # anywhere with a link whose address matches the link-attach routable
+        # prefix, the link-attach route will be formed.
+        self.farside_cnx = event.container.connect(self.farside_host)
+
+        # Since the route container will be connected to Farside, and
+        # my router network is linear, I make the linkroute checker attach
+        # to Nearside.
+        self.linkroute_check_receiver = 
event.container.create_receiver(self.nearside_cnx, dynamic=True)
+        self.linkroute_check_sender   = 
event.container.create_sender(self.nearside_cnx, "$management")
+
+
+    def on_link_opened(self, event):
+        if event.receiver:
+            event.receiver.flow(self.count)
+        if event.receiver == self.linkroute_check_receiver:
+            # Step 2. my linkroute check-link has opened: make the 
linkroute_checker
+            self.linkroute_checker = 
AddressChecker(self.linkroute_check_receiver.remote_source.address)
+            self.linkroute_check()
+
+
+    def on_message(self, event):
+        if event.receiver == self.farside_receiver:
+            # This is a payload message.
+            self.n_rcvd += 1
+
+        elif event.receiver == self.linkroute_check_receiver:
+            # This is one of my route-readiness checking messages.
+            response = 
self.linkroute_checker.parse_address_query_response(event.message)
+            if response.status_code == 200 and (response.remoteCount + 
response.containerCount) > 0:
+                # Step 3: got confirmation of link-attach knowledge fully 
propagated
+                # to Nearside router.  Now we can make the nearside sender 
without getting
+                # a No Path To Destination error.
+                self.nearside_sender = 
event.container.create_sender(self.nearside_cnx, self.link_routable_address)
+                # And we can quit checking.
+                if self.linkroute_check_timer:
+                    self.linkroute_check_timer.cancel()
+                    self.linkroute_check_timer = None
+            else:
+                # If the latest check did not find the link-attack route ready,
+                # schedule another check a little while from now.
+                self.linkroute_check_timer = event.reactor.schedule(0.25, 
AddressCheckerTimeout(self))
+
+
+    def on_link_opening ( self, event ):
+        if event.receiver:
+            # Step 4.  At start-up, I connected to the route-container 
listener on
+            # Farside, which makes me the route container.  So when a sender 
attaches
+            # to the network and wants to send to the linkroutable address, 
the router
+            # network creates the link-attach route, and then hands me a 
receiver for it.
+            if event.receiver.remote_target.address == 
self.link_routable_address:
+                self.farside_receiver = event.receiver
+                event.receiver.target.address = self.link_routable_address
+                event.receiver.open()
+            else:
+                self.bail("Incorrect address on incoming receiver: got %s, 
expected %s" %
+                          (event.receiver.remote_target.address, 
self.link_routable_address))
+
+
+    def on_sendable ( self, event ):
+        # Step 5: once there is someone on the network who can receive
+        # my messages, I get the go-ahead for my sender.
+        if event.sender == self.nearside_sender:
+            self.send()
+
+
+    def send ( self ):
+        while self.nearside_sender.credit > 0 and self.n_sent < self.count:
+            self.n_sent += 1
+            m = Message(body="Message %d of %d" % (self.n_sent, self.count))
+            self.nearside_sender.send(m)
+
+
+    def linkroute_check ( self ):
+        # Send the message that will query the management code to discover
+        # information about our destination address. We cannot make our payload
+        # sender until the network is ready.
+        #
+        # BUGALERT: We have to prepend the 'D' to this linkroute prefix
+        # because that's what the router does internally.  Someday this
+        # may change.
+        self.linkroute_check_sender.send ( 
self.linkroute_checker.make_address_query("D" + self.linkroute_prefix) )
+
+
+    def on_settled(self, event):
+        if event.sender == self.nearside_sender:
+            self.n_settled += 1
+            if self.n_settled == self.count:
+                self.bail ( None )
+
+
+    def run(self):
+        container = Container(self)
+        container.container_id = 'LinkRouteTest'
+        container.run()
+
+
+
+class ClosestTest ( MessagingHandler ):
+    """
+    Test whether distance-based message routing works in a
+    linear 3-router network.
+
+    sender -----> NEAR -----> MID -----> FAR
+                   |           |          |
+                   v           v          v
+                  near        mid        far
+                  rcvrs       rcvrs      rcvrs
+
+    With a linear network of 3 routers, set up a sender on the
+    near one, and then 2 receivers each on the near, middle, and
+    far routers.
+    After the first 10 messages have been received, close the
+    near routers and check results so far.  All 10 messages should
+    have gone to the near receivers, and none to the mid or far
+    receivers.
+    After the next 10 messages have been received, close the two
+    middle routers and check again.  All 10 messages should have
+    gone to the middle receivers, and none to the far ones.
+    Finally, after another 10 messages have been received, check
+    that they went to the far receivers.
+    """
+    def __init__ ( self, near_router, mid_router, far_router, addr_suffix ):
+        super ( ClosestTest, self ).__init__(prefetch=0)
+        self.error         = None
+        self.near_router   = near_router
+        self.mid_router    = mid_router
+        self.far_router    = far_router
+        self.addr_suffix   = addr_suffix
+        self.dest          = "closest/" + addr_suffix
+
+        # This n_expected is actually the minimum number of messages
+        # I will send.  The real number will be higher because some
+        # will be released when I close the near and middle receivers.
+        self.n_expected = 300
+        self.one_third  = self.n_expected / 3
+
+        # n_received -- the grand total -- is used to decide when to
+        # close the near receivers and later the middle ones.
+        self.n_received    = 0
+
+        # Counters for the near, middle, and far receivers are used
+        # to determine whether there has been an error.
+        self.near_1_count = 0
+        self.near_2_count = 0
+        self.mid_1_count  = 0
+        self.mid_2_count  = 0
+        self.far_1_count  = 0
+        self.far_2_count  = 0
+
+        self.addr_check_timer    = None
+        self.addr_check_receiver = None
+        self.addr_check_sender   = None
+
+    def timeout ( self ):
+        self.check_results ( )
+        self.bail ( "Timeout Expired " )
+
+
+    def address_check_timeout(self):
+        self.addr_check()
+
+
+    def bail ( self, text ):
+        self.timer.cancel()
+        self.error = text
+        self.send_cnx.close()
+        self.near_cnx.close()
+        self.mid_cnx.close()
+        self.far_cnx.close()
+        if self.addr_check_timer:
+            self.addr_check_timer.cancel()
+
+
+    def on_start ( self, event ):
+        self.timer    = event.reactor.schedule  ( TIMEOUT, Timeout(self) )
+        self.send_cnx = event.container.connect ( self.near_router )
+        self.near_cnx = event.container.connect ( self.near_router )
+        self.mid_cnx  = event.container.connect ( self.mid_router )
+        self.far_cnx  = event.container.connect ( self.far_router )
+
+        # Warning!
+        # The two receiver-links on each router must be given
+        # explicit distinct names, or we will in fact only get
+        # one link.  And then wonder why receiver 2 on each
+        # router isn't getting any messages.
+        self.near_recv_1 = event.container.create_receiver  ( self.near_cnx, 
self.dest, name="1" )
+        self.near_recv_2 = event.container.create_receiver  ( self.near_cnx, 
self.dest, name="2" )
+
+        self.mid_recv_1  = event.container.create_receiver  ( self.mid_cnx,  
self.dest, name="1" )
+        self.mid_recv_2  = event.container.create_receiver  ( self.mid_cnx,  
self.dest, name="2" )
+
+        self.far_recv_1  = event.container.create_receiver  ( self.far_cnx,  
self.dest, name="1" )
+        self.far_recv_2  = event.container.create_receiver  ( self.far_cnx,  
self.dest, name="2" )
+
+        self.near_recv_1.flow ( self.n_expected )
+        self.mid_recv_1.flow  ( self.n_expected )
+        self.far_recv_1.flow  ( self.n_expected )
+
+        self.near_recv_2.flow ( self.n_expected )
+        self.mid_recv_2.flow  ( self.n_expected )
+        self.far_recv_2.flow  ( self.n_expected )
+
+        self.addr_check_receiver = event.container.create_receiver ( 
self.near_cnx, dynamic=True )
+        self.addr_check_sender   = event.container.create_sender ( 
self.near_cnx, "$management" )
+
+
+    def on_link_opened(self, event):
+        if event.receiver:
+            event.receiver.flow ( self.n_expected )
+        if event.receiver == self.addr_check_receiver:
+            # Step 2. my addr-check link has opened: make the addr_checker 
with the given address.
+            self.addr_checker = AddressChecker ( 
self.addr_check_receiver.remote_source.address )
+            self.addr_check()
+
+
+    def on_sendable ( self, event ):
+        msg = Message ( body     = "Hello, closest.",
+                        address  = self.dest
+                      )
+        event.sender.send ( msg )
+
+
+    def on_message ( self, event ):
+
+        if event.receiver == self.addr_check_receiver:
+            # This is a response to one of my address-readiness checking 
messages.
+            response = 
self.addr_checker.parse_address_query_response(event.message)
+            if response.status_code == 200 and response.subscriberCount == 2 
and response.remoteCount == 2:
+                # now we know that we have two subscribers on nearside router, 
and two remote
+                # routers that know about the address. The network is ready.
+                # Now we can make the nearside sender without getting a 
+                # "No Path To Destination" error.
+                self.sender = event.container.create_sender ( self.send_cnx, 
self.dest )
+
+                # And we can quit checking.
+                if self.addr_check_timer:
+                    self.addr_check_timer.cancel()
+                    self.addr_check_timer = None
+            else:
+                # If the latest check did not find the link-attack route ready,
+                # schedule another check a little while from now.
+                self.addr_check_timer = event.reactor.schedule(0.25, 
AddressCheckerTimeout(self))
+        else :
+            # This is a payload message.
+            self.n_received += 1
+
+            # Increment the near, mid, or far counts, depending on
+            # which receiver the message came in on.
+            if event.receiver == self.near_recv_1:
+                self.near_1_count        += 1
+            elif event.receiver == self.near_recv_2:
+                self.near_2_count        += 1
+            elif event.receiver == self.mid_recv_1:
+                self.mid_1_count         += 1
+            elif event.receiver == self.mid_recv_2:
+                self.mid_2_count         += 1
+            elif event.receiver == self.far_recv_1:
+                self.far_1_count         += 1
+            elif event.receiver == self.far_recv_2:
+                self.far_2_count         += 1
+
+            if self.n_received == self.one_third:
+                # The first one-third of messages should have gone exclusively
+                # to the near receivers.  At this point we should have
+                # no messages in the mid or far receivers.
+                self.near_recv_1.close()
+                self.near_recv_2.close()
+                if self.mid_1_count + self.mid_2_count + self.far_1_count + 
self.far_2_count > 0 :
+                    self.bail ( "error: mid or far receivers got messages 
before near were closed." )
+                # Make sure we got one third of the messages.
+                if self.near_1_count + self.near_2_count < self.one_third:
+                    self.bail ( "error: the near receivers got too few 
messages." )
+                # Make sure both receivers got some messages.
+                if self.near_1_count * self.near_2_count == 0:
+                    self.bail ( "error: one of the near receivers got no 
messages." )
+
+            elif self.n_received == 2 * self.one_third:
+                # The next one-third of messages should have gone exclusively
+                # to the mid receivers.  At this point we should have
+                # no messages in the far receivers.
+                self.mid_recv_1.close()
+                self.mid_recv_2.close()
+                if self.far_1_count + self.far_2_count > 0 :
+                    self.bail ( "error: far receivers got messages before mid 
were closed." )
+                # Make sure we got one third of the messages.
+                if self.mid_1_count + self.mid_2_count < self.one_third:
+                    self.bail ( "error: the mid receivers got too few 
messages." )
+                # Make sure both receivers got some messages.
+                if self.mid_1_count * self.mid_2_count == 0:
+                    self.bail ( "error: one of the mid receivers got no 
messages." )
+
+            # By the time we reach the expected number of messages
+            # we have closed the near and middle receivers.  If the far
+            # receivers are empty at this point, something is wrong.
+            if self.n_received >= self.n_expected :
+                if self.far_1_count + self.far_2_count < self.one_third:
+                    self.bail ( "error: the far receivers got too few 
messages." )
+                if self.far_1_count * self.far_2_count == 0:
+                    self.bail ( "error: one of the far receivers got no 
messages." )
+                else:
+                    self.bail ( None )
+
+
+    def addr_check ( self ):
+        # Send the message that will query the management code to discover
+        # information about our destination address. We cannot make our payload
+        # sender until the network is ready.
+        #
+        # BUGALERT: We have to prepend the 'M0' to this address prefix
+        # because that's what the router does internally.  Someday this
+        # may change.
+        self.addr_check_sender.send ( 
self.addr_checker.make_address_query("M0" + self.dest) )
+
+
+    def run(self):
+        container = Container(self)
+        container.run()
+
+
+
+
+
+class BalancedTest ( MessagingHandler ):
+    """
+    This test is topology-agnostic. This code thinks of its nodes as 1, 2, 3.
+    The caller knows if they are linear or triangular, or a tree.  It 
calculates 
+    the expected results for nodes 1, 2, and 3, and also tells me if there can 
be
+    a little 'slop' in the results. 
+    ( Slop can happen in some topologies when you can't tell whether spillover 
+    will happen first to node 2, or to node 3.
+    """
+    def __init__ ( self, router_1, router_2, router_3, addr_suffix, 
total_messages, expected_1, expected_2, expected_3, slop ):
+        super ( BalancedTest, self ).__init__(prefetch=0, auto_accept=False)
+        self.error       = None
+        self.router_3    = router_3
+        self.router_2    = router_2
+        self.router_1    = router_1
+        self.addr_suffix = addr_suffix
+        self.dest        = "balanced/" + addr_suffix
+
+        self.total_messages  = total_messages
+        self.n_sent          = 0
+        self.n_received      = 0
+
+        self.count_3 = 0
+        self.count_2 = 0
+        self.count_1 = 0
+
+        self.expected_1 = expected_1
+        self.expected_2 = expected_2
+        self.expected_3 = expected_3
+        self.slop       = slop
+
+        self.address_check_timer    = None
+        self.address_check_receiver = None
+        self.address_check_sender   = None
+
+        self.payload_sender = None
+
+
+    def timeout ( self ):
+        self.bail ( "Timeout Expired " )
+
+
+    def address_check_timeout(self):
+        self.address_check()
+
+
+    def bail ( self, text ):
+        self.timer.cancel()
+        self.error = text
+        self.cnx_3.close()
+        self.cnx_2.close()
+        self.cnx_1.close()
+        if self.address_check_timer:
+            self.address_check_timer.cancel()
+
+
+    def on_start ( self, event ):
+        self.timer    = event.reactor.schedule  ( TIMEOUT, Timeout(self) )
+        self.cnx_3    = event.container.connect ( self.router_3 )
+        self.cnx_2    = event.container.connect ( self.router_2 )
+        self.cnx_1    = event.container.connect ( self.router_1 )
+
+        self.recv_3  = event.container.create_receiver ( self.cnx_3,  
self.dest )
+        self.recv_2  = event.container.create_receiver ( self.cnx_2,  
self.dest )
+        self.recv_1  = event.container.create_receiver ( self.cnx_1,  
self.dest )
+
+        self.recv_3.flow ( self.total_messages )
+        self.recv_2.flow ( self.total_messages )
+        self.recv_1.flow ( self.total_messages )
+
+        self.address_check_receiver = event.container.create_receiver ( 
self.cnx_1, dynamic=True )
+        self.address_check_sender   = event.container.create_sender   ( 
self.cnx_1, "$management" )
+
+
+    def on_link_opened(self, event):
+        if event.receiver:
+            event.receiver.flow ( self.total_messages )
+        if event.receiver == self.address_check_receiver:
+            # My address check-link has opened: make the address_checker
+            self.address_checker = AddressChecker ( 
self.address_check_receiver.remote_source.address )
+            self.address_check()
+
+
+    def on_message ( self, event ):
+
+        if self.n_received >= self.total_messages:
+            return   # Sometimes you can get a message or two even after you 
have called bail().
+
+        if event.receiver == self.address_check_receiver:
+            # This is one of my route-readiness checking messages.
+            response = 
self.address_checker.parse_address_query_response(event.message)
+            if response.status_code == 200 and response.subscriberCount == 1 
and response.remoteCount == 2:
+                # Got confirmation of dest addr fully propagated through 
network.
+                # Since I have 3 nodes, I want to see 1 subscriber (which is 
on the local router) and
+                # 2 remote routers that know about my destination address.
+                # Now we can safely make the payload sender without getting a 
'No Path To Destination' error.
+                self.payload_sender = event.container.create_sender ( 
self.cnx_1, self.dest )
+                # And we can quit checking.
+                if self.address_check_timer:
+                    self.address_check_timer.cancel()
+                    self.address_check_timer = None
+            else:
+                # If the latest check did not find the link-attack route ready,
+                # schedule another check a little while from now.
+                self.address_check_timer = event.reactor.schedule(0.50, 
AddressCheckerTimeout(self))
+
+        else:
+            self.n_received += 1
+
+            if   event.receiver == self.recv_1: self.count_1 += 1
+            elif event.receiver == self.recv_2: self.count_2 += 1
+            elif event.receiver == self.recv_3: self.count_3 += 1
+
+            # I do not check for count_1 + count_2 + count_3 == total,
+            # because it always will be due to how the code counts things.
+            if self.n_received == self.total_messages:
+                if self.count_1 != self.expected_1:
+                    self.bail ( "bad count 1: cxount %d != expected %d" % 
(self.count_1, self.expected_1) )
+                elif abs(self.count_2 - self.expected_2) > self.slop:
+                    self.bail ( "count_2 %d is more than %d different from 
expectation %d" % (self.count_2, self.slop, self.expected_2) )
+                elif abs(self.count_3 - self.expected_3) > self.slop:
+                    self.bail ( "count_3 %d is more than %d different from 
expectation %d" % (self.count_3, self.slop, self.expected_3) )
+                else:
+                    self.bail ( None) # All is well.
+
+
+    def on_sendable ( self, event ):
+        if self.n_sent < self.total_messages and event.sender == 
self.payload_sender :
+            msg = Message ( body     = "Hello, balanced.",
+                            address  = self.dest
+                          )
+            self.payload_sender.send ( msg )
+            self.n_sent += 1
+
+
+    def address_check ( self ):
+        # Send the message that will query the management code to discover
+        # information about our destination address. We cannot make our payload
+        # sender until the network is ready.
+        #
+        # BUGALERT: We have to prepend the 'M0' to this address prefix
+        # because that's what the router does internally.  Someday this
+        # may change.
+        self.address_check_sender.send ( 
self.address_checker.make_address_query("M0" + self.dest) )
+
+
+    def run(self):
+        container = Container(self)
+        container.run()
+
+
+
+
+
+if __name__ == '__main__':
+    unittest.main(main_module())

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/9739f93e/tests/system_tests_three_routers.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_three_routers.py 
b/tests/system_tests_three_routers.py
deleted file mode 100644
index 07ea8fd..0000000
--- a/tests/system_tests_three_routers.py
+++ /dev/null
@@ -1,814 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-import unittest, os, json
-from subprocess      import PIPE, STDOUT
-from proton          import Message, PENDING, ACCEPTED, REJECTED, RELEASED, 
SSLDomain, SSLUnavailable, Timeout
-from system_test     import TestCase, Qdrouterd, main_module, DIR, TIMEOUT, 
Process
-from proton.handlers import MessagingHandler
-from proton.reactor  import Container, AtMostOnce, AtLeastOnce, 
DynamicNodeProperties, LinkOption
-from proton.utils    import BlockingConnection
-from qpid_dispatch.management.client import Node
-
-import time
-
-
-# PROTON-828:
-try:
-    from proton import MODIFIED
-except ImportError:
-    from proton import PN_STATUS_MODIFIED as MODIFIED
-
-
-class RouterTest(TestCase):
-
-    inter_router_port = None
-
-    @classmethod
-    def setUpClass(cls):
-        """Start a router and a sender-listener client"""
-        super(RouterTest, cls).setUpClass()
-
-        def router(name, more_config):
-
-
-            config = [ ('router',  {'mode': 'interior', 'id': name}),
-                       ('address', {'prefix': 'closest', 'distribution': 
'closest'})
-                     ] + more_config
-
-            config = Qdrouterd.Config(config)
-
-            cls.routers.append(cls.tester.qdrouterd(name, config, wait=True))
-
-        cls.routers = []
-
-        inter_router_port_B = cls.tester.get_port()
-
-        A_normal_port          = cls.tester.get_port()
-        A_route_container_port = cls.tester.get_port()
-        A_inter_router_port    = cls.tester.get_port()
-
-        cls.linkroute_prefix = "0.0.0.0/link"
-
-        router ( 'A',
-                 [
-                    ( 'listener',
-                      { 'port': A_normal_port,
-                        'stripAnnotations': 'no'
-                      }
-                    ),
-                    ( 'listener',
-                      { 'port': A_route_container_port,
-                        'stripAnnotations': 'no',
-                        'role': 'route-container'
-                      }
-                    ),
-                   ( 'listener',
-                     {  'role': 'inter-router',
-                        'port': A_inter_router_port,
-                     }
-                   ),
-                   ( 'linkRoute',
-                     { 'prefix': cls.linkroute_prefix,
-                       'dir': 'in',
-                       'containerId': 'LinkRouteTest'
-                     }
-                   ),
-                   ( 'linkRoute',
-                     { 'prefix': cls.linkroute_prefix,
-                       'dir': 'out',
-                       'containerId': 'LinkRouteTest'
-                     }
-                   )
-                 ]
-               )
-
-        B_normal_port       = cls.tester.get_port()
-        B_inter_router_port = cls.tester.get_port()
-
-        router ( 'B',
-                 [
-                    ( 'listener',
-                      { 'port': B_normal_port,
-                        'stripAnnotations': 'no'
-                      }
-                    ),
-                    ( 'listener',
-                      {  'role': 'inter-router',
-                         'port': B_inter_router_port
-                      }
-                    ),
-                    ( 'connector',
-                      {  'name': 'connectorToA',
-                         'role': 'inter-router',
-                         'port': A_inter_router_port,
-                         'verifyHostName': 'no'
-                      }
-                   ),
-                   ( 'linkRoute',
-                     { 'prefix': cls.linkroute_prefix,
-                       'dir': 'in',
-                       'containerId': 'LinkRouteTest'
-                     }
-                   ),
-                   ( 'linkRoute',
-                     { 'prefix': cls.linkroute_prefix,
-                       'dir': 'out',
-                       'containerId': 'LinkRouteTest'
-                     }
-                   )
-                 ]
-               )
-
-        C_normal_port          = cls.tester.get_port()
-        C_route_container_port = cls.tester.get_port()
-
-        router ( 'C',
-                 [
-                    ( 'listener',
-                      { 'port': C_normal_port,
-                        'stripAnnotations': 'no'
-                      }
-                    ),
-                    ( 'listener',
-                      { 'port': C_route_container_port,
-                        'stripAnnotations': 'no',
-                        'role': 'route-container'
-                      }
-                    ),
-                    ( 'connector',
-                      {  'name': 'connectorToB',
-                         'role': 'inter-router',
-                         'port': B_inter_router_port,
-                         'verifyHostName': 'no'
-                      }
-                   ),
-                   ( 'linkRoute',
-                     { 'prefix': cls.linkroute_prefix,
-                       'dir': 'in',
-                       'containerId': 'LinkRouteTest'
-                     }
-                   ),
-                   ( 'linkRoute',
-                     { 'prefix': cls.linkroute_prefix,
-                       'dir': 'out',
-                       'containerId': 'LinkRouteTest'
-                     }
-                   )
-                 ]
-               )
-
-        cls.routers[0].wait_router_connected('B')
-        cls.routers[0].wait_router_connected('C')
-        cls.routers[1].wait_router_connected('A')
-        cls.routers[1].wait_router_connected('C')
-        cls.routers[2].wait_router_connected('A')
-        cls.routers[2].wait_router_connected('B')
-
-        cls.A_normal_addr      = cls.routers[0].addresses[0]
-        cls.B_normal_addr      = cls.routers[1].addresses[0]
-        cls.C_normal_addr      = cls.routers[2].addresses[0]
-
-        cls.sender_addr        = cls.C_normal_addr
-        cls.receiver_addr      = cls.A_normal_addr
-
-        # Whatever container connects to this addr
-        # will be the route-container for link-attach routing tests..
-        cls.C_route_container_addr = cls.routers[2].addresses[1]
-
-    def test_01_targeted_sender ( self ):
-        test = TargetedSenderTest ( self.sender_addr, self.receiver_addr )
-        test.run()
-        self.assertEqual ( None, test.error )
-
-
-    def test_02_anonymous_sender(self):
-        test = AnonymousSenderTest ( self.sender_addr, self.receiver_addr )
-        test.run()
-        self.assertEqual ( None, test.error )
-
-
-    def test_03_dynamic_reply_to(self):
-        test = DynamicReplyTo ( self.sender_addr, self.receiver_addr )
-        test.run()
-        self.assertEqual ( None, test.error )
-
-    def test_04_linkroute ( self ):
-        test = LinkAttachRouting ( self.A_normal_addr,
-                                   self.C_route_container_addr,
-                                   self.linkroute_prefix,
-                                   "addr_04"
-                                 )
-        test.run()
-        self.assertEqual(None, test.error)
-
-    def test_05_closest ( self ):
-        test = ClosestTest ( self.A_normal_addr,
-                             self.B_normal_addr,
-                             self.C_normal_addr,
-                             "addr_05"
-                           )
-        test.run()
-        self.assertEqual(None, test.error)
-
-
-
-
-
-class TargetedSenderTest(MessagingHandler):
-    def __init__(self, send_addr, recv_addr):
-        super(TargetedSenderTest, self).__init__(prefetch=0)
-        self.send_addr = send_addr
-        self.recv_addr = recv_addr
-        self.dest = "closest.Targeted"
-        self.error      = None
-        self.sender     = None
-        self.receiver   = None
-        self.n_expected = 10
-        self.n_sent     = 0
-        self.n_received = 0
-        self.n_accepted = 0
-
-    def timeout(self):
-        self.error = "Timeout Expired: n_sent=%d n_received=%d n_accepted=%d" 
% \
-                     (self.n_sent, self.n_received, self.n_accepted)
-        self.send_conn.close()
-        self.recv_conn.close()
-
-    def on_start(self, event):
-        self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
-        self.send_conn = event.container.connect(self.send_addr)
-        self.recv_conn = event.container.connect(self.recv_addr)
-        self.sender   = event.container.create_sender(self.send_conn, 
self.dest)
-        self.receiver = event.container.create_receiver(self.recv_conn, 
self.dest)
-        self.receiver.flow(self.n_expected)
-
-    def send(self):
-      while self.sender.credit > 0 and self.n_sent < self.n_expected:
-        msg = Message(body=self.n_sent)
-        self.sender.send(msg)
-        self.n_sent += 1
-
-    def on_sendable(self, event):
-        if self.n_sent < self.n_expected:
-            self.send()
-
-    def on_accepted(self, event):
-        self.n_accepted += 1
-
-    def on_message(self, event):
-        self.n_received += 1
-        if self.n_received == self.n_expected:
-            self.receiver.close()
-            self.send_conn.close()
-            self.recv_conn.close()
-            self.timer.cancel()
-
-    def run(self):
-        Container(self).run()
-
-
-
-class DynamicTarget(LinkOption):
-    def apply(self, link):
-        link.target.dynamic = True
-        link.target.address = None
-
-
-
-class AnonymousSenderTest(MessagingHandler):
-
-    def __init__(self, send_addr, recv_addr):
-        super(AnonymousSenderTest, self).__init__()
-        self.send_addr = send_addr
-        self.recv_addr = recv_addr
-
-        self.error     = None
-        self.recv_conn = None
-        self.send_conn = None
-        self.sender    = None
-        self.receiver  = None
-        self.address   = None
-
-        self.expected   = 10
-        self.n_sent     = 0
-        self.n_received = 0
-        self.n_accepted = 0
-
-
-    def timeout ( self ):
-        self.error = "Timeout Expired: n_sent=%d n_received=%d n_accepted=%d" 
% \
-                     (self.n_sent, self.n_received, self.n_accepted)
-        self.send_conn.close()
-        self.recv_conn.close()
-
-    def on_start(self, event):
-        self.timer     = event.reactor.schedule(TIMEOUT, Timeout(self))
-        self.send_conn = event.container.connect(self.send_addr)
-        self.recv_conn = event.container.connect(self.recv_addr)
-        self.sender    = event.container.create_sender(self.send_conn, 
options=DynamicTarget())
-
-    def send(self):
-        while self.sender.credit > 0 and self.n_sent < self.expected:
-            self.n_sent += 1
-            m = Message(address=self.address, body="Message %d of %d" % 
(self.n_sent, self.expected))
-            self.sender.send(m)
-
-    def on_link_opened(self, event):
-        if event.sender == self.sender:
-            self.address = self.sender.remote_target.address
-            self.receiver = event.container.create_receiver(self.recv_conn, 
self.address)
-
-    def on_sendable(self, event):
-        self.send()
-
-    def on_message(self, event):
-        if event.receiver == self.receiver:
-            self.n_received += 1
-
-    def on_accepted(self, event):
-        self.n_accepted += 1
-        if self.n_accepted == self.expected:
-            self.send_conn.close()
-            self.recv_conn.close()
-            self.timer.cancel()
-
-    def run(self):
-        Container(self).run()
-
-
-
-
-#=======================================================================
-# In this test we have a separate 'client' and 'server' with separate
-# connections.  The client sends requests to the server, and embeds in
-# them its desired reply-to address.  The server uses that address to
-# send back ambiguous and noncommittal messages.  The tests ends with
-# success if the client receives the expected number of replies, or
-# with failure if we time out.
-#=======================================================================
-class DynamicReplyTo(MessagingHandler):
-    def __init__(self, client_addr, server_addr):
-        super(DynamicReplyTo, self).__init__(prefetch=10)
-        self.client_addr        = client_addr
-        self.server_addr        = server_addr
-        self.dest               = "closest.dynamicRequestResponse"
-        self.error              = None
-        self.server_receiver    = None
-        self.client_receiver    = None
-        self.sender             = None
-        self.server_sender      = None
-        self.n_expected         = 10
-        self.n_sent             = 0
-        self.received_by_server = 0
-        self.received_by_client = 0
-
-
-    def timeout(self):
-        self.error = "Timeout Expired: n_sent=%d received_by_server=%d 
received_by_client=%d" % \
-                     (self.n_sent, self.received_by_server, 
self.received_by_client)
-        self.client_connection.close()
-        self.server_connection.close()
-
-
-    def on_start ( self, event ):
-        self.timer             = event.reactor.schedule ( TIMEOUT, 
Timeout(self) )
-
-        # separate connections to simulate client and server.
-        self.client_connection = event.container.connect(self.client_addr)
-        self.server_connection = event.container.connect(self.server_addr)
-
-        self.sender            = 
event.container.create_sender(self.client_connection, self.dest)
-        self.server_sender     = 
event.container.create_sender(self.server_connection, None)
-
-        self.server_receiver   = 
event.container.create_receiver(self.server_connection, self.dest)
-        self.client_receiver   = 
event.container.create_receiver(self.client_connection, None, dynamic=True)
-
-
-
-    def on_sendable(self, event):
-        reply_to_addr = self.client_receiver.remote_source.address
-
-        if reply_to_addr == None:
-          return
-
-        while event.sender.credit > 0 and self.n_sent < self.n_expected:
-            # We send to server, and tell it how to reply to the client.
-
-            request = Message ( body=self.n_sent,
-                                address=self.dest,
-                                reply_to = reply_to_addr )
-            event.sender.send ( request )
-            self.n_sent += 1
-
-
-    def on_message(self, event):
-        # Server gets a request and responds to
-        # the address that is embedded in the message.
-        if event.receiver == self.server_receiver :
-            self.server_sender.send ( Message(address=event.message.reply_to,
-                                      body="Reply hazy, try again later.") )
-            self.received_by_server += 1
-
-        # Client gets a response and counts it.
-        elif event.receiver == self.client_receiver :
-            self.received_by_client += 1
-            if self.received_by_client == self.n_expected:
-                self.timer.cancel()
-                self.server_receiver.close()
-                self.client_receiver.close()
-                self.client_connection.close()
-                self.server_connection.close()
-
-
-    def run(self):
-        Container(self).run()
-
-
-
-
-
-class Entity(object):
-    def __init__(self, status_code, status_description, attrs):
-        self.status_code        = status_code
-        self.status_description = status_description
-        self.attrs              = attrs
-
-    def __getattr__(self, key):
-        return self.attrs[key]
-
-
-
-
-class AddressChecker(object):
-    def __init__(self, reply_addr):
-        self.reply_addr = reply_addr
-
-    def response(self, msg):
-        ap = msg.properties
-        return Entity(ap['statusCode'], ap['statusDescription'], msg.body)
-
-    def read_address(self, name):
-        ap = {'operation': 'READ', 'type': 
'org.apache.qpid.dispatch.router.address', 'name': name}
-        return Message(properties=ap, reply_to=self.reply_addr)
-
-    def query_addresses(self):
-        ap = {'operation': 'QUERY', 'type': 
'org.apache.qpid.dispatch.router.address'}
-        return Message(properties=ap, reply_to=self.reply_addr)
-
-
-
-
-class LinkRouteCheckTimeout(object):
-    def __init__(self, parent):
-        self.parent = parent
-
-    def on_timer_task(self, event):
-        self.parent.linkroute_check_timeout()
-
-
-
-class Timeout(object):
-    def __init__(self, parent):
-        self.parent = parent
-
-    def on_timer_task(self, event):
-        self.parent.timeout()
-
-
-
-class LinkAttachRouting ( MessagingHandler ):
-    """
-    There are two hosts: near, and far.  The far host is the one that
-    the route container will connect to, and it will receive our messages.
-    The near host is what our sender will attach to.
-    """
-    def __init__ ( self, nearside_host, farside_host, linkroute_prefix, 
addr_suffix ):
-        super ( LinkAttachRouting, self ).__init__(prefetch=0)
-        self.nearside_host         = nearside_host
-        self.farside_host          = farside_host
-        self.linkroute_prefix      = linkroute_prefix
-        self.link_routable_address = self.linkroute_prefix + '.' + addr_suffix
-
-        self.nearside_cnx             = None
-        self.farside_cnx              = None
-        self.error                    = None
-        self.nearside_sender          = None
-        self.farside_receiver         = None
-        self.linkroute_check_timer    = None
-        self.linkroute_check_receiver = None
-        self.linkroute_check_agent    = None
-
-        self.count     = 10
-        self.n_sent    = 0
-        self.n_rcvd    = 0
-        self.n_settled = 0
-
-
-    def timeout ( self ):
-        self.bail ( "Timeout Expired: n_sent=%d n_rcvd=%d n_settled=%d" %
-                    (self.n_sent, self.n_rcvd, self.n_settled) )
-
-    def linkroute_check_timeout(self):
-        self.linkroute_check()
-
-
-    def bail ( self, text ):
-        self.error = text
-        self.farside_cnx.close()
-        self.nearside_cnx.close()
-        self.timer.cancel()
-        if self.linkroute_check_timer:
-            self.linkroute_check_timer.cancel()
-
-
-    def on_start(self, event):
-        self.timer        = event.reactor.schedule(TIMEOUT, Timeout(self))
-        self.nearside_cnx = event.container.connect(self.nearside_host)
-
-        # Step 1: I make the far cnx.  Once this is done, if we later attach
-        # anywhere with a link whose address matches the link-attach routable
-        # prefix, the link-attach route will be formed.
-        self.farside_cnx = event.container.connect(self.farside_host)
-
-        # Since the route container will be connected to Farside, and
-        # my router network is linear, I make the linkroute checker attach
-        # to Nearside.
-        self.linkroute_check_receiver = 
event.container.create_receiver(self.nearside_cnx, dynamic=True)
-        self.linkroute_check_agent    = 
event.container.create_sender(self.nearside_cnx, "$management")
-
-
-    def on_link_opened(self, event):
-        if event.receiver:
-            event.receiver.flow(self.count)
-        if event.receiver == self.linkroute_check_receiver:
-            # Step 2. my linkroute check-link has opened: make the 
linkroute_checker
-            self.linkroute_checker = 
AddressChecker(self.linkroute_check_receiver.remote_source.address)
-            self.linkroute_check()
-
-
-    def on_message(self, event):
-        if event.receiver == self.farside_receiver:
-            # This is a payload message.
-            self.n_rcvd += 1
-
-        elif event.receiver == self.linkroute_check_receiver:
-            # This is one of my route-readiness checking messages.
-            response = self.linkroute_checker.response(event.message)
-            if response.status_code == 200 and (response.remoteCount + 
response.containerCount) > 0:
-                # Step 3: got confirmation of link-attach knowledge fully 
propagated
-                # to Nearside router.  Now we can make the nearside sender 
without getting
-                # a No Path To Destination error.
-                self.nearside_sender = 
event.container.create_sender(self.nearside_cnx, self.link_routable_address)
-                # And we can quit checking.
-                if self.linkroute_check_timer:
-                    self.linkroute_check_timer.cancel()
-                    self.linkroute_check_timer = None
-            else:
-                # If the latest check did not find the link-attack route ready,
-                # schedule another check a little while from now.
-                self.linkroute_check_timer = event.reactor.schedule(0.25, 
LinkRouteCheckTimeout(self))
-
-
-    def on_link_opening ( self, event ):
-        if event.receiver:
-            # Step 4.  At start-up, I connected to the route-container 
listener on
-            # Farside, which makes me the route container.  So when a sender 
attaches
-            # to the network and wants to send to the linkroutable address, 
the router
-            # network creates the link-attach route, and then hands me a 
receiver for it.
-            if event.receiver.remote_target.address == 
self.link_routable_address:
-                self.farside_receiver = event.receiver
-                event.receiver.target.address = self.link_routable_address
-                event.receiver.open()
-            else:
-                self.bail("Incorrect address on incoming receiver: got %s, 
expected %s" %
-                          (event.receiver.remote_target.address, 
self.link_routable_address))
-
-
-    def on_sendable ( self, event ):
-        # Step 5: once there is someone on the network who can receive
-        # my messages, I get the go-ahead for my sender.
-        if event.sender == self.nearside_sender:
-            self.send()
-
-
-    def send ( self ):
-        while self.nearside_sender.credit > 0 and self.n_sent < self.count:
-            self.n_sent += 1
-            m = Message(body="Message %d of %d" % (self.n_sent, self.count))
-            self.nearside_sender.send(m)
-
-
-    def linkroute_check ( self ):
-        # BUGALERT: We have to prepend the 'D' to this linkroute prefix
-        # because that's what the router does internally.  Someday this
-        # may change.
-        request = self.linkroute_checker.read_address("D" + 
self.linkroute_prefix )
-        self.linkroute_check_agent.send(request)
-
-
-    def on_settled(self, event):
-        if event.sender == self.nearside_sender:
-            self.n_settled += 1
-            if self.n_settled == self.count:
-                self.bail ( None )
-
-
-    def run(self):
-        container = Container(self)
-        container.container_id = 'LinkRouteTest'
-        container.run()
-
-
-
-class ClosestTest ( MessagingHandler ):
-    """
-         Test whether distance-based message routing works in a
-         linear 3-router network.
-
-         sender -----> NEAR -----> MID -----> FAR
-                        |           |          |
-                        v           v          v
-                       near        mid        far
-                       rcvrs       rcvrs      rcvrs
-
-         With a linear network of 3 routers, set up a sender on the
-         near one, and then 2 receivers each on the near, middle, and
-         far routers.
-         After the first 10 messages have been received, close the
-         near routers and check results so far.  All 10 messages should
-         have gone to the near receivers, and none to the mid or far
-         receivers.
-         After the next 10 messages have been received, close the two
-         middle routers and check again.  All 10 messages should have
-         gone to the middle receivers, and none to the far ones.
-         Finally, after another 10 messages have been received, check
-         that they went to the far receivers.
-    """
-    def __init__ ( self, near_router, mid_router, far_router, addr_suffix ):
-        super ( ClosestTest, self ).__init__(prefetch=0)
-        self.error         = None
-        self.near_router   = near_router
-        self.mid_router    = mid_router
-        self.far_router    = far_router
-        self.addr_suffix   = addr_suffix
-        self.dest          = "closest/" + addr_suffix
-
-        # This n_expected is actually the minimum number of messages
-        # I will send.  The real number will be higher because some
-        # will be released when I close the near and middle receivers.
-        self.n_expected = 150
-        self.one_third  = self.n_expected / 3
-
-        # n_received -- the grand total -- is used to decide when to
-        # close the near receivers and later the middle ones.
-        self.n_received    = 0
-
-        # Counters for the near, middle, and far receivers are used
-        # to determine whether there has been an error.
-        self.near_1 = 0
-        self.near_2 = 0
-        self.mid_1  = 0
-        self.mid_2  = 0
-        self.far_1  = 0
-        self.far_2  = 0
-
-
-    def timeout ( self ):
-        self.check_results ( )
-        self.bail ( "Timeout Expired " )
-
-
-    def bail ( self, text ):
-        self.timer.cancel()
-        self.error = text
-        self.send_cnx.close()
-        self.near_cnx.close()
-        self.mid_cnx.close()
-        self.far_cnx.close()
-
-
-    def on_start ( self, event ):
-        self.timer       = event.reactor.schedule  ( TIMEOUT, Timeout(self) )
-        self.send_cnx    = event.container.connect ( self.near_router )
-        self.near_cnx    = event.container.connect ( self.near_router )
-        self.mid_cnx     = event.container.connect ( self.mid_router )
-        self.far_cnx     = event.container.connect ( self.far_router )
-
-        self.sender      = event.container.create_sender   ( self.send_cnx, 
self.dest)
-
-        # The two receiver-links on each router must be given
-        # explicit distinct names, or we will in fact only get
-        # one link.  And then wonder why receiver 2 on each
-        # router isn't getting any messages.
-        self.near_recv_1 = event.container.create_receiver  ( self.near_cnx, 
self.dest, name="1" )
-        self.near_recv_2 = event.container.create_receiver  ( self.near_cnx, 
self.dest, name="2" )
-
-        self.mid_recv_1  = event.container.create_receiver  ( self.mid_cnx,  
self.dest, name="1" )
-        self.mid_recv_2  = event.container.create_receiver  ( self.mid_cnx,  
self.dest, name="2" )
-
-        self.far_recv_1  = event.container.create_receiver  ( self.far_cnx,  
self.dest, name="1" )
-        self.far_recv_2  = event.container.create_receiver  ( self.far_cnx,  
self.dest, name="2" )
-
-        self.near_recv_1.flow ( self.n_expected )
-        self.mid_recv_1.flow  ( self.n_expected )
-        self.far_recv_1.flow  ( self.n_expected )
-
-        self.near_recv_2.flow ( self.n_expected )
-        self.mid_recv_2.flow  ( self.n_expected )
-        self.far_recv_2.flow  ( self.n_expected )
-
-
-    def on_sendable ( self, event ):
-        msg = Message ( body     = "Hello, closest.",
-                        address  = self.dest
-                      )
-        event.sender.send ( msg )
-
-
-    def on_message ( self, event ):
-
-        self.n_received += 1
-
-        # Increment the near, mid, or far counts, depending on
-        # which receiver the message came in on.
-        if event.receiver == self.near_recv_1:
-            self.near_1        += 1
-        elif event.receiver == self.near_recv_2:
-            self.near_2        += 1
-        elif event.receiver == self.mid_recv_1:
-            self.mid_1         += 1
-        elif event.receiver == self.mid_recv_2:
-            self.mid_2         += 1
-        elif event.receiver == self.far_recv_1:
-            self.far_1         += 1
-        elif event.receiver == self.far_recv_2:
-            self.far_2         += 1
-
-        if self.n_received == self.one_third:
-            # The first one-third of messages should have gone exclusively
-            # to the near receivers.  At this point we should have
-            # no messages in the mid or far receivers.
-            self.near_recv_1.close()
-            self.near_recv_2.close()
-            if self.mid_1 + self.mid_2 + self.far_1 + self.far_2 > 0 :
-                self.bail ( "error: mid or far receivers got messages before 
near were closed." )
-            # Make sure we got one third of the messages.
-            if self.near_1 + self.near_2 < self.one_third:
-                self.bail ( "error: the near receivers got too few messages." )
-            # Make sure both receivers got some messages.
-            if self.near_1 * self.near_2 == 0:
-                self.bail ( "error: one of the near receivers got no 
messages." )
-
-        elif self.n_received == 2 * self.one_third:
-            # The next one-third of messages should have gone exclusively
-            # to the mid receivers.  At this point we should have
-            # no messages in the far receivers.
-            self.mid_recv_1.close()
-            self.mid_recv_2.close()
-            if self.far_1 + self.far_2 > 0 :
-                self.bail ( "error: far receivers got messages before mid were 
closed." )
-            # Make sure we got one third of the messages.
-            if self.mid_1 + self.mid_2 < self.one_third:
-                self.bail ( "error: the mid receivers got too few messages." )
-            # Make sure both receivers got some messages.
-            if self.mid_1 * self.mid_2 == 0:
-                self.bail ( "error: one of the mid receivers got no messages." 
)
-
-        # By the time we reach the expected number of messages
-        # we have closed the near and middle receivers.  If the far
-        # receivers are empty at this point, something is wrong.
-        if self.n_received >= self.n_expected :
-            if self.far_1 + self.far_2 < self.one_third:
-                self.bail ( "error: the far receivers got too few messages." )
-            if self.far_1 * self.far_2 == 0:
-                self.bail ( "error: one of the far receivers got no messages." 
)
-            else:
-                self.bail ( None )
-
-
-    def run(self):
-        container = Container(self)
-        container.run()
-
-
-
-
-if __name__ == '__main__':
-    unittest.main(main_module())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to