Repository: qpid-dispatch Updated Branches: refs/heads/master e494da0a9 -> 58ecc97bc
DISPATCH-209 : first topology test Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/58ecc97b Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/58ecc97b Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/58ecc97b Branch: refs/heads/master Commit: 58ecc97bcc3d9f89f62d308d3c9b01027151f5e3 Parents: e494da0 Author: mgoulish <mgoul...@redhat.com> Authored: Wed Nov 29 11:49:59 2017 -0500 Committer: mgoulish <mgoul...@redhat.com> Committed: Wed Nov 29 11:49:59 2017 -0500 ---------------------------------------------------------------------- tests/CMakeLists.txt | 1 + tests/system_tests_topology.py | 609 ++++++++++++++++++++++++++++++++++++ 2 files changed, 610 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/58ecc97b/tests/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index aaf65ec..0c6454c 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -100,6 +100,7 @@ foreach(py_test_module system_tests_denied_unsettled_multicast system_tests_auth_service_plugin system_tests_delivery_abort + system_tests_topology ${SYSTEM_TESTS_HTTP} ) http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/58ecc97b/tests/system_tests_topology.py ---------------------------------------------------------------------- diff --git a/tests/system_tests_topology.py b/tests/system_tests_topology.py new file mode 100644 index 0000000..5e58483 --- /dev/null +++ b/tests/system_tests_topology.py @@ -0,0 +1,609 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import unittest, os, json +from subprocess import PIPE, STDOUT +from proton import Message, PENDING, ACCEPTED, REJECTED, RELEASED, SSLDomain, SSLUnavailable, Timeout +from system_test import TestCase, Qdrouterd, main_module, DIR, TIMEOUT, Process +from proton.handlers import MessagingHandler +from proton.reactor import Container, AtMostOnce, AtLeastOnce, DynamicNodeProperties, LinkOption, ApplicationEvent, EventInjector +from proton.utils import BlockingConnection +from qpid_dispatch.management.client import Node + +import time +import datetime +import pdb + + + +# PROTON-828: +try: + from proton import MODIFIED +except ImportError: + from proton import PN_STATUS_MODIFIED as MODIFIED + + + + +#------------------------------------------------ +# Helper classes for all tests. +#------------------------------------------------ + +class Timeout(object): + """ + Named timeout object can handle multiple simultaneous + timers, by telling the parent which one fired. + """ + def __init__ ( self, parent, name ): + self.parent = parent + self.name = name + + def on_timer_task ( self, event ): + self.parent.timeout ( self.name ) + + + +class ManagementMessageHelper ( object ): + """ + Format management messages. + """ + def __init__ ( self, reply_addr ): + self.reply_addr = reply_addr + + def make_connector_query ( self, connector_name ): + props = {'operation': 'READ', 'type': 'org.apache.qpid.dispatch.connector', 'name' : connector_name } + msg = Message ( properties=props, reply_to=self.reply_addr ) + return msg + + def make_connector_delete_command ( self, connector_name ): + props = {'operation': 'DELETE', 'type': 'org.apache.qpid.dispatch.connector', 'name' : connector_name } + msg = Message ( properties=props, reply_to=self.reply_addr ) + return msg + + +#------------------------------------------------ +# END Helper classes for all tests. +#------------------------------------------------ + + + + + +#================================================================ +# Setup +#================================================================ + +class TopologyTests ( TestCase ): + + @classmethod + def setUpClass(cls): + super(TopologyTests, cls).setUpClass() + + + + def router(name, more_config): + + config = [ ('router', {'mode': 'interior', 'id': name}), + ('address', {'prefix': 'closest', 'distribution': 'closest'}), + ('address', {'prefix': 'balanced', 'distribution': 'balanced'}), + ('address', {'prefix': 'multicast', 'distribution': 'multicast'}) + ] \ + + more_config + + config = Qdrouterd.Config(config) + + cls.routers.append(cls.tester.qdrouterd(name, config, wait=True)) + + cls.routers = [] + + A_client_port = cls.tester.get_port() + B_client_port = cls.tester.get_port() + C_client_port = cls.tester.get_port() + D_client_port = cls.tester.get_port() + + A_inter_router_port = cls.tester.get_port() + B_inter_router_port = cls.tester.get_port() + C_inter_router_port = cls.tester.get_port() + + # + # + # Topology of the 4-mesh, with costs of connections marked. + # Tail of arrow indicates initiator of connection. + # + # 1 + # D ----------> A + # | \ > ^ + # | 20\ 50/ | + # | \ / | + # 1 | / \ | 100 + # | / \ | + # v / > | + # C ----------> B + # 1 + # + # Test 1 TopologyFailover Notes + # + # 1. Messages are always sent from A, and go to B. + # 2. First route ahould be ADCB. + # 3. Then we kill connector CD. + # 4. Next route should be ADB. + # 5. Then we kill connector BD. + # 6. Next route should be ACB. + # 7. Then we kill connector BC. + # 8. Final route should be AB. + + cls.A_B_cost = 100 + cls.A_C_cost = 50 + cls.A_D_cost = 1 + cls.B_C_cost = 1 + cls.B_D_cost = 20 + cls.C_D_cost = 1 + + router ( 'A', + [ + ( 'listener', + { 'port': A_client_port, + 'role': 'normal', + 'stripAnnotations': 'no' + } + ), + ( 'listener', + { 'role': 'inter-router', + 'port': A_inter_router_port, + 'stripAnnotations': 'no' + } + ) + ] + ) + + + router ( 'B', + [ + ( 'listener', + { 'port': B_client_port, + 'role': 'normal', + 'stripAnnotations': 'no' + } + ), + ( 'listener', + { 'role': 'inter-router', + 'port': B_inter_router_port, + 'stripAnnotations': 'no' + } + ), + ( 'connector', + { 'name': 'AB_connector', + 'role': 'inter-router', + 'port': A_inter_router_port, + 'verifyHostName': 'no', + 'cost': cls.A_B_cost, + 'stripAnnotations': 'no' + } + ) + ] + ) + + + router ( 'C', + [ + ( 'listener', + { 'port': C_client_port, + 'role': 'normal', + 'stripAnnotations': 'no' + } + ), + ( 'listener', + { 'role': 'inter-router', + 'port': C_inter_router_port, + 'stripAnnotations': 'no' + } + ), + ( 'connector', + { 'name': 'AC_connector', + 'role': 'inter-router', + 'port': A_inter_router_port, + 'verifyHostName': 'no', + 'cost' : cls.A_C_cost, + 'stripAnnotations': 'no' + } + ), + ( 'connector', + { 'name': 'BC_connector', + 'role': 'inter-router', + 'port': B_inter_router_port, + 'verifyHostName': 'no', + 'cost' : cls.B_C_cost, + 'stripAnnotations': 'no' + } + ) + ] + ) + + + router ( 'D', + [ + ( 'listener', + { 'port': D_client_port, + 'role': 'normal', + 'stripAnnotations': 'no' + } + ), + ( 'connector', + { 'name': 'AD_connector', + 'role': 'inter-router', + 'port': A_inter_router_port, + 'verifyHostName': 'no', + 'cost' : cls.A_D_cost, + 'stripAnnotations': 'no' + } + ), + ( 'connector', + { 'name': 'BD_connector', + 'role': 'inter-router', + 'port': B_inter_router_port, + 'verifyHostName': 'no', + 'cost' : cls.B_D_cost, + 'stripAnnotations': 'no' + } + ), + ( 'connector', + { 'name': 'CD_connector', + 'role': 'inter-router', + 'port': C_inter_router_port, + 'verifyHostName': 'no', + 'cost' : cls.C_D_cost, + 'stripAnnotations': 'no' + } + ) + ] + ) + + + router_A = cls.routers[0] + router_B = cls.routers[1] + router_C = cls.routers[2] + router_D = cls.routers[3] + + router_A.wait_router_connected('B') + router_A.wait_router_connected('C') + router_A.wait_router_connected('D') + + cls.client_addrs = ( router_A.addresses[0], + router_B.addresses[0], + router_C.addresses[0], + router_D.addresses[0] + ) + + # 1 means skip that test. + cls.skip = { 'test_01' : 0 + } + + + + def test_01_topology_failover ( self ): + name = 'test_01' + if self.skip [ name ] : + self.skipTest ( "Test skipped during development." ) + test = TopologyFailover ( name, + self.client_addrs, + "closest/01" + ) + test.run() + self.assertEqual ( None, test.error ) + + + +#================================================================ +# Tests +#================================================================ + + +# Also see 'Test 1 TopologyFailover Notes', above. + +class TopologyFailover ( MessagingHandler ): + """ + Test that the lowest-cost route is always chosen in a 4-mesh + network topology, as one link after another is lost. + + This test also ensures that connections that have been + deliberately severed do no get restored. + """ + def __init__ ( self, test_name, client_addrs, destination ): + super(TopologyFailover, self).__init__(prefetch=0) + self.client_addrs = client_addrs + self.dest = destination + self.error = None + self.sender = None + self.receiver = None + self.test_timer = None + self.send_timer = None + self.n_sent = 0 + self.n_received = 0 + self.n_accepted = 0 + self.n_released = 0 + self.reactor = None + self.state = None + self.send_conn = None + self.recv_conn = None + self.nap_time = 2 + self.debug = False + + # Holds the management sender, receiver, and 'helper' + # associated with each router. + self.routers = { + 'A' : dict(), + 'B' : dict(), + 'C' : dict(), + 'D' : dict() + } + + # These are the expectes routing traces, in the order we + # expect to receive them. + self.expected_traces = [ + [u'0/A', u'0/D', u'0/C', u'0/B'], + [u'0/A', u'0/D', u'0/B'], + [u'0/A', u'0/C', u'0/B'], + [u'0/A', u'0/B'] + ] + self.trace_count = 0 + + # This tells the system in what order to kill the connectors. + self.kill_list = ( + ( 'D', 'CD_connector' ), + ( 'D', 'BD_connector' ), + ( 'C', 'BC_connector' ) + ) + + # Use this to keep track of which connectors we have found + # when the test is first getting started and we are checking + # the topology. + self.connectors_map = { 'AB_connector' : 0, + 'AC_connector' : 0, + 'AD_connector' : 0, + 'BC_connector' : 0, + 'BD_connector' : 0, + 'CD_connector' : 0 + } + + + # The simple state machine transitions when certain events happen, + # if certain conditions are met. The conditions are checked for + # by the callbacks for the events. + # The normal sequence of states in the state machine is: + # 1. starting -- doesn't do anything + # 2. checking -- checks initial topology + # 3. examine_trace -- look at routing trace of first message + # 4. kill_connector -- kills the first connector (CD) + # 5. examine_trace -- checks routing trace of next message + # 5. kill_connector -- kills the next connector (BD) + # 5. examine_trace -- checks routing trace of next message + # 5. kill_connector -- kills the next connector (BC) + # 5. examine_trace -- checks routing trace of final message + # 5. bailing -- bails out with success + + + def state_transition ( self, message, new_state ) : + if self.state == new_state : + return + self.state = new_state + self.debug_print ( "state transition to : %s -- because %s" % ( self.state, message ) ) + + + def debug_print ( self, text ) : + if self.debug == True: + print time.time(), text + + + # Shut down everything and exit. + def bail ( self, text ): + self.error = text + + self.send_conn.close ( ) + self.recv_conn.close ( ) + + self.routers['B'] ['mgmt_conn'].close() + self.routers['C'] ['mgmt_conn'].close() + self.routers['D'] ['mgmt_conn'].close() + + self.test_timer.cancel ( ) + self.send_timer.cancel ( ) + + + #------------------------------------------------------------------------ + # I want some behavior from this test that is a little too complex + # to be governed by the usual callback functions. The way I do this + # is by making a simple state machine that checks some conditions + # during some callback, and then either steps forward or terminates + # the test. + # The callbacks that activate the state machine are mostly on_message, + # or timeout. But there are two different timers: the one-second + # timer that mostly runs the test, and the 60-second timer that, if it + # fires, will terminate the test with a timeout error. + #------------------------------------------------------------------------ + def timeout ( self, name ): + if name == 'test': + self.set_state ( 'Timeout Expired', 'bailing' ) + self.bail ( "Timeout Expired: n_sent=%d n_received=%d n_accepted=%d" % \ + ( self.n_sent, self.n_received, self.n_accepted ) ) + elif name == 'sender': + if self.state == 'examine_trace' : + self.send ( ) + self.send_timer = self.reactor.schedule(1, Timeout(self, "sender")) + + + def on_start ( self, event ): + self.state_transition ( 'on_start', 'starting' ) + self.reactor = event.reactor + self.test_timer = event.reactor.schedule ( 60, Timeout(self, "test") ) + self.send_timer = event.reactor.schedule ( 1, Timeout(self, "sender") ) + self.send_conn = event.container.connect ( self.client_addrs[0] ) # A + self.recv_conn = event.container.connect ( self.client_addrs[1] ) # B + + self.sender = event.container.create_sender ( self.send_conn, self.dest ) + self.receiver = event.container.create_receiver ( self.recv_conn, self.dest ) + self.receiver.flow ( 100 ) + + # I will only send management messages to B, C, and D, because + # they are the owners of the connections that I will want to delete. + self.routers['B'] ['mgmt_conn'] = event.container.connect ( self.client_addrs[1] ) + self.routers['C'] ['mgmt_conn'] = event.container.connect ( self.client_addrs[2] ) + self.routers['D'] ['mgmt_conn'] = event.container.connect ( self.client_addrs[3] ) + + self.routers['B'] ['mgmt_receiver'] = event.container.create_receiver ( self.routers['B'] ['mgmt_conn'], dynamic=True ) + self.routers['C'] ['mgmt_receiver'] = event.container.create_receiver ( self.routers['C'] ['mgmt_conn'], dynamic=True ) + self.routers['D'] ['mgmt_receiver'] = event.container.create_receiver ( self.routers['D'] ['mgmt_conn'], dynamic=True ) + + self.routers['B'] ['mgmt_sender'] = event.container.create_sender ( self.routers['B'] ['mgmt_conn'], "$management" ) + self.routers['C'] ['mgmt_sender'] = event.container.create_sender ( self.routers['C'] ['mgmt_conn'], "$management" ) + self.routers['D'] ['mgmt_sender'] = event.container.create_sender ( self.routers['D'] ['mgmt_conn'], "$management" ) + + + + #----------------------------------------------------------------- + # At start-time, as the links to the three managed routers + # open, check each one to make sure that it has all the expected + # connections. + #----------------------------------------------------------------- + def on_link_opened ( self, event ) : + self.state_transition ( 'on_link_opened', 'checking' ) + # The B mgmt link has opened. Check its connections. -------------------------- + if event.receiver == self.routers['B'] ['mgmt_receiver'] : + event.receiver.flow ( 1000 ) + self.routers['B'] ['mgmt_helper'] = ManagementMessageHelper ( event.receiver.remote_source.address ) + for connector in [ 'AB_connector' ] : + self.connector_check ( 'B', connector ) + # The C mgmt link has opened. Check its connections. -------------------------- + elif event.receiver == self.routers['C'] ['mgmt_receiver'] : + event.receiver.flow ( 1000 ) + self.routers['C'] ['mgmt_helper'] = ManagementMessageHelper ( event.receiver.remote_source.address ) + for connector in [ 'AC_connector', 'BC_connector' ] : + self.connector_check ( 'C', connector ) + # The D mgmt link has opened. Check its connections. -------------------------- + elif event.receiver == self.routers['D'] ['mgmt_receiver']: + event.receiver.flow ( 1000 ) + self.routers['D'] ['mgmt_helper'] = ManagementMessageHelper ( event.receiver.remote_source.address ) + for connector in [ 'AD_connector', 'BD_connector', 'CD_connector' ] : + self.connector_check ( 'D', connector ) + + + def send ( self ): + n_sent_this_time = 0 + if self.sender.credit <= 0: + self.receiver.flow ( 100 ) + return + # Send messages one at a time. + if self.sender.credit > 0 : + msg = Message ( body=self.n_sent ) + self.sender.send ( msg ) + n_sent_this_time += 1 + self.n_sent += 1 + self.debug_print ( "sent: %d" % self.n_sent ) + + + def on_message ( self, event ): + + if event.receiver == self.routers['B'] ['mgmt_receiver'] or \ + event.receiver == self.routers['C'] ['mgmt_receiver'] or \ + event.receiver == self.routers['D'] ['mgmt_receiver'] : + + #---------------------------------------------------------------- + # This is a management message. + #---------------------------------------------------------------- + if self.state == 'checking' : + connection_name = event.message.body['name'] + + if connection_name in self.connectors_map : + self.connectors_map [ connection_name ] = 1 + else : + self.state_transition ( "bad connection name: %s" % connection_name, 'bailing' ) + self.bail ( "bad connection name: %s" % connection_name ) + + n_connections = sum(self.connectors_map.values()) + if n_connections == 6 : + self.state_transition ( "all %d connections found" % n_connections, 'examine_trace' ) + elif self.state == 'kill_connector' : + if event.message.properties["statusDescription"] == 'No Content': + # We are in the process of killing a connector, and + # have received the response to the kill message. + self.state_transition ( 'got kill response', 'examine_trace' ) + # This sleep is here because one early bug that this test found + # (and which is now fixed) involved connections that had been + # deleted coming back sometimes. It was a race and only happened + # very occasionally -- but with a pause here, after getting + # confirmation that we have successfully deleted the connector, + # the bug would show up 60 to 75% of the time. I think that leaving + # this sleep here is the only way to ensure that that particular + # bug stays fixed. + time.sleep ( self.nap_time ) + else: + #---------------------------------------------------------------- + # This is a payload message. + #---------------------------------------------------------------- + self.n_received += 1 + if self.state == 'examine_trace' : + trace = event.message.annotations [ 'x-opt-qd.trace' ] + expected = self.expected_traces [ self.trace_count ] + if trace == expected : + if self.trace_count == len(self.expected_traces) - 1 : + self.state_transition ( 'final expected trace %s observed' % expected, 'bailing' ) + self.bail ( None ) + return + self.state_transition ( "expected trace %d observed successfully %s" % ( self.trace_count, expected ) , 'kill_connector' ) + self.kill_a_connector ( self.kill_list[self.trace_count] ) + self.trace_count += 1 + else : + self.state_transition ( "expected trace %s but got %s" % ( expected, trace ), 'bailing' ) + self.bail ( "expected trace %s but got %s" % ( expected, trace ) ) + + + def on_accepted ( self, event ): + self.n_accepted += 1 + + + def on_released ( self, event ) : + self.n_released += 1 + + + def connector_check ( self, router, connector ) : + self.debug_print ( "checking connector for router %s" % router ) + mgmt_helper = self.routers[router] ['mgmt_helper'] + mgmt_sender = self.routers[router] ['mgmt_sender'] + msg = mgmt_helper.make_connector_query ( connector ) + mgmt_sender.send ( msg ) + + + def kill_a_connector ( self, target ) : + router = target[0] + connector = target[1] + self.debug_print ( "killing connector %s on router %s" % (connector, router) ) + mgmt_helper = self.routers[router] ['mgmt_helper'] + mgmt_sender = self.routers[router] ['mgmt_sender'] + msg = mgmt_helper.make_connector_delete_command ( connector ) + mgmt_sender.send ( msg ) + + + + def run(self): + Container(self).run() + + + + +if __name__ == '__main__': + unittest.main(main_module()) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org