Repository: qpid-dispatch
Updated Branches:
  refs/heads/master b38e63edf -> 889644675


DISPATCH-209 : add multicast {linear,mesh} tests


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/88964467
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/88964467
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/88964467

Branch: refs/heads/master
Commit: 8896446756dc158673c9222e8bdb13bf41bfbc97
Parents: b38e63e
Author: mick goulish <mgoul...@redhat.com>
Authored: Fri Aug 18 16:39:19 2017 -0400
Committer: mick goulish <mgoul...@redhat.com>
Committed: Fri Aug 18 16:39:19 2017 -0400

----------------------------------------------------------------------
 tests/system_tests_distribution.py | 207 +++++++++++++++++++++++++++++++-
 1 file changed, 203 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/88964467/tests/system_tests_distribution.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_distribution.py 
b/tests/system_tests_distribution.py
index 5cfc23d..c244c54 100644
--- a/tests/system_tests_distribution.py
+++ b/tests/system_tests_distribution.py
@@ -121,8 +121,9 @@ class DistributionTests ( TestCase ):
         def router(name, more_config):
 
             config = [ ('router',  {'mode': 'interior', 'id': name}),
-                       ('address', {'prefix': 'closest',  'distribution': 
'closest'}),
-                       ('address', {'prefix': 'balanced', 'distribution': 
'balanced'})
+                       ('address', {'prefix': 'closest',   'distribution': 
'closest'}),
+                       ('address', {'prefix': 'balanced',  'distribution': 
'balanced'}),
+                       ('address', {'prefix': 'multicast', 'distribution': 
'multicast'})
                      ] + more_config
 
             config = Qdrouterd.Config(config)
@@ -422,7 +423,7 @@ class DistributionTests ( TestCase ):
                              "addr_08"
                            )
         test.run()
-        self.assertEqual(None, test.error)
+        self.assertEqual ( None, test.error )
 
 
     def test_09_closest_mesh ( self ):
@@ -524,7 +525,7 @@ class DistributionTests ( TestCase ):
                               omit_middle_receiver
                             )
         test.run()
-        self.assertEqual(None, test.error)
+        self.assertEqual ( None, test.error )
 
 
     def test_11_balanced_linear_omit_middle_receiver ( self ):
@@ -638,6 +639,26 @@ class DistributionTests ( TestCase ):
         self.assertEqual ( None, test.error )
 
 
+    def test_13_multicast_linear ( self ):
+        test = MulticastTest ( self.A_addr,
+                               self.B_addr,
+                               self.C_addr,
+                               "addr_13"
+                             )
+        test.run()
+        self.assertEqual ( None, test.error )
+
+
+    def test_14_multicast_mesh ( self ):
+        test = MulticastTest ( self.A_addr,
+                               self.B_addr,
+                               self.D_addr,
+                               "addr_14"
+                             )
+        test.run()
+        self.assertEqual ( None, test.error )
+
+
 
 
 
@@ -1406,5 +1427,183 @@ class BalancedTest ( MessagingHandler ):
 
 
 
+class MulticastTest ( MessagingHandler ):
+    """
+    Using multicast, we should see all receivers get everything,
+    whether the topology is linear or mesh.
+    """
+    def __init__ ( self, router_1, router_2, router_3, addr_suffix ):
+        super ( MulticastTest, self ).__init__(prefetch=0)
+        self.error       = None
+        self.router_1    = router_1
+        self.router_2    = router_2
+        self.router_3    = router_3
+        self.addr_suffix = addr_suffix
+        self.dest        = "multicast/" + addr_suffix
+
+        self.n_to_send = 100
+        self.n_sent    = 0
+
+        self.n_received = 0
+
+        self.count_1_a = 0
+        self.count_1_b = 0
+        self.count_2_a = 0
+        self.count_2_b = 0
+        self.count_3_a = 0
+        self.count_3_b = 0
+
+        self.addr_check_timer    = None
+        self.addr_check_receiver = None
+        self.addr_check_sender   = None
+        self.sender              = None
+        self.bailed = False
+
+    def timeout ( self ):
+        self.check_results ( )
+        self.bail ( "Timeout Expired " )
+
+
+    def address_check_timeout(self):
+        self.addr_check()
+
+
+    def bail ( self, text ):
+        self.timer.cancel()
+        self.error = text
+        self.send_cnx.close()
+        self.cnx_1.close()
+        self.cnx_2.close()
+        self.cnx_3.close()
+        if self.addr_check_timer:
+            self.addr_check_timer.cancel()
+
+
+    def on_start ( self, event ):
+        self.timer    = event.reactor.schedule  ( TIMEOUT, Timeout(self) )
+        self.send_cnx = event.container.connect ( self.router_1 )
+        self.cnx_1    = event.container.connect ( self.router_1 )
+        self.cnx_2    = event.container.connect ( self.router_2 )
+        self.cnx_3    = event.container.connect ( self.router_3 )
+
+        # Warning!
+        # The two receiver-links on each router must be given
+        # explicit distinct names, or we will in fact only get
+        # one link.  And then wonder why receiver 2 on each
+        # router isn't getting any messages.
+        self.recv_1_a  = event.container.create_receiver  ( self.cnx_1, 
self.dest, name="1" )
+        self.recv_1_b  = event.container.create_receiver  ( self.cnx_1, 
self.dest, name="2" )
+
+        self.recv_2_a  = event.container.create_receiver  ( self.cnx_2,  
self.dest, name="1" )
+        self.recv_2_b  = event.container.create_receiver  ( self.cnx_2,  
self.dest, name="2" )
+
+        self.recv_3_a  = event.container.create_receiver  ( self.cnx_3,  
self.dest, name="1" )
+        self.recv_3_b  = event.container.create_receiver  ( self.cnx_3,  
self.dest, name="2" )
+
+        self.recv_1_a.flow ( self.n_to_send )
+        self.recv_2_a.flow ( self.n_to_send )
+        self.recv_3_a.flow ( self.n_to_send )
+
+        self.recv_1_b.flow ( self.n_to_send )
+        self.recv_2_b.flow ( self.n_to_send )
+        self.recv_3_b.flow ( self.n_to_send )
+
+        self.addr_check_receiver = event.container.create_receiver ( 
self.cnx_1, dynamic=True )
+        self.addr_check_sender   = event.container.create_sender ( self.cnx_1, 
"$management" )
+
+
+    def on_link_opened(self, event):
+        if event.receiver:
+            event.receiver.flow ( self.n_to_send )
+        if event.receiver == self.addr_check_receiver:
+            # my addr-check link has opened: make the addr_checker with the 
given address.
+            self.addr_checker = AddressChecker ( 
self.addr_check_receiver.remote_source.address )
+            self.addr_check()
+
+
+    def on_sendable ( self, event ):
+        if self.sender and self.n_sent < self.n_to_send :
+            msg = Message ( body     = "Hello, closest.",
+                            address  = self.dest
+                          )
+            dlv = self.sender.send ( msg )
+            self.n_sent += 1
+            dlv.settle()
+
+
+    def on_message ( self, event ):
+
+        #if self.bailed is True :
+            #return
+
+        if event.receiver == self.addr_check_receiver:
+            # This is a response to one of my address-readiness checking 
messages.
+            response = 
self.addr_checker.parse_address_query_response(event.message)
+            if response.status_code == 200 and response.subscriberCount == 2 
and response.remoteCount == 2:
+                # now we know that we have two subscribers on attached router, 
and two remote
+                # routers that know about the address. The network is ready.
+                # Now we can make the sender without getting a
+                # "No Path To Destination" error.
+                self.sender = event.container.create_sender ( self.send_cnx, 
self.dest )
+
+                # And we can quit checking.
+                if self.addr_check_timer:
+                    self.addr_check_timer.cancel()
+                    self.addr_check_timer = None
+            else:
+                # If the latest check did not find the link-attack route ready,
+                # schedule another check a little while from now.
+                self.addr_check_timer = event.reactor.schedule(0.25, 
AddressCheckerTimeout(self))
+        else :
+            # This is a payload message.
+            self.n_received += 1
+
+            # Count the messages that have come in for
+            # each receiver.
+            if   event.receiver == self.recv_1_a:
+                self.count_1_a += 1
+            elif event.receiver == self.recv_1_b:
+                self.count_1_b += 1
+            elif event.receiver == self.recv_2_a:
+                self.count_2_a += 1
+            elif event.receiver == self.recv_2_b:
+                self.count_2_b += 1
+            elif event.receiver == self.recv_3_a:
+                self.count_3_a += 1
+            elif event.receiver == self.recv_3_b:
+                self.count_3_b += 1
+
+            if self.n_received >= 6 * self.n_to_send :
+                # In multicast, everybody gets everything.
+                # Our reception count should be 6x our send-count,
+                # and all receiver-counts should be equal.
+                if self.count_1_a == self.count_1_b and self.count_1_b == 
self.count_2_a and self.count_2_a == self.count_2_b and self.count_2_b == 
self.count_3_a and self.count_3_a == self.count_3_b :
+                    self.bail ( None )
+                    self.bailed = True
+                else:
+                    self.bail ( "receivers not equal: %d %d %d %d %d %d" % 
(self.count_1_a, self.count_1_b, self.count_2_a, self.count_2_b, 
self.count_3_a, self.count_3_b) )
+                    self.bailed = True
+
+
+
+    def addr_check ( self ):
+        # Send the message that will query the management code to discover
+        # information about our destination address. We cannot make our payload
+        # sender until the network is ready.
+        #
+        # BUGALERT: We have to prepend the 'M0' to this address prefix
+        # because that's what the router does internally.  Someday this
+        # may change.
+        self.addr_check_sender.send ( 
self.addr_checker.make_address_query("M0" + self.dest) )
+
+
+    def run(self):
+        container = Container(self)
+        container.run()
+
+
+
+
+
 if __name__ == '__main__':
     unittest.main(main_module())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to