This is an automated email from the ASF dual-hosted git repository. kgiusti pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/master by this push: new b594672 DISPATCH-1154: update link route proxy tests, add missing delivery reference b594672 is described below commit b594672e41856e626d3b15e849cf39bced836b3a Author: Kenneth Giusti <kgiu...@apache.org> AuthorDate: Wed Dec 12 11:50:40 2018 -0500 DISPATCH-1154: update link route proxy tests, add missing delivery reference --- src/router_core/core_client_api.c | 9 ++ tests/system_test.py | 25 +++- tests/system_tests_edge_router.py | 234 ++++++++++++++++++++++++-------------- tests/test_broker.py | 32 ++---- 4 files changed, 188 insertions(+), 112 deletions(-) diff --git a/src/router_core/core_client_api.c b/src/router_core/core_client_api.c index 9ef1c4e..7483869 100644 --- a/src/router_core/core_client_api.c +++ b/src/router_core/core_client_api.c @@ -286,6 +286,7 @@ static void _flush_send_queue_CT(qdrc_client_t *client) req->delivery = qdrc_endpoint_delivery_CT(client->core, client->sender, msg); + qdr_delivery_incref(req->delivery, "core client send request"); qdrc_endpoint_send_CT(client->core, client->sender, req->delivery, @@ -347,6 +348,10 @@ static void _free_request_CT(qdrc_client_t *client, qd_compose_free(req->app_properties); } + if (req->delivery) { + qdr_delivery_decref_CT(client->core, req->delivery, "core client send request"); + } + // notify user that the request has completed if (req->done_cb) { req->done_cb(client->core, @@ -478,6 +483,10 @@ static void _sender_update_CT(void *context, DEQ_REMOVE_N(UNSETTLED, client->unsettled_list, req); req->on_unsettled_list = false; + // delivery no longer needed + qdr_delivery_decref_CT(client->core, req->delivery, "core client send request"); + req->delivery = 0; + if (!req->on_reply_list || disposition != PN_ACCEPTED) { // no reply is coming, release the request _free_request_CT(client, req, NULL); diff --git a/tests/system_test.py b/tests/system_test.py index 354eac6..bec6b7f 100755 --- a/tests/system_test.py +++ b/tests/system_test.py @@ -43,6 +43,7 @@ except ImportError: from threading import Thread from threading import Event import json +import uuid import proton from proton import Message, Timeout @@ -720,11 +721,12 @@ class AsyncTestReceiver(MessagingHandler): messages. Messages can be retrieved from this thread via the queue member. :param wait: block the constructor until the link has been fully established. + :param recover_link: restart on remote link detach """ Empty = Queue.Empty def __init__(self, address, source, conn_args=None, container_id=None, - wait=True): + wait=True, recover_link=False): super(AsyncTestReceiver, self).__init__() self.address = address self.source = source @@ -732,9 +734,11 @@ class AsyncTestReceiver(MessagingHandler): self.queue = Queue.Queue() self._conn = None self._container = Container(self) - if container_id is not None: - self._container.container_id = container_id + cid = container_id or "ATR-%s:%s" % (source, uuid.uuid4()) + self._container.container_id = cid self._ready = Event() + self._recover_link = recover_link + self._recover_count = 0 self._stop_thread = False self._thread = Thread(target=self._main) self._thread.daemon = True @@ -770,6 +774,17 @@ class AsyncTestReceiver(MessagingHandler): def on_link_opened(self, event): self._ready.set() + def on_link_closing(self, event): + event.link.close() + if self._recover_link and not self._stop_thread: + # lesson learned: the generated link name will be the same as the + # old link (which is bad) so we specify a new one + self._recover_count += 1 + kwargs = {'source': self.source, + 'name': "%s:%s" % (event.link.name, self._recover_count)} + rcv = event.container.create_receiver(event.connection, + **kwargs) + def on_message(self, event): self.queue.put(event.message) @@ -787,8 +802,8 @@ class AsyncTestSender(MessagingHandler): self._unaccepted = count self._body = body or "test" self._container = Container(self) - if container_id is not None: - self._container.container_id = container_id + cid = container_id or "ATS-%s:%s" % (target, uuid.uuid4()) + self._container.container_id = cid self._thread = Thread(target=self._main) self._thread.daemon = True self._thread.start() diff --git a/tests/system_tests_edge_router.py b/tests/system_tests_edge_router.py index 3e2ead5..f9a55d9 100644 --- a/tests/system_tests_edge_router.py +++ b/tests/system_tests_edge_router.py @@ -34,7 +34,6 @@ from system_test import AsyncTestSender from system_test import QdManager from system_tests_link_routes import ConnLinkRouteService from test_broker import FakeService -from test_broker import FakeBrokerStop from proton.handlers import MessagingHandler from proton.reactor import Container, DynamicNodeProperties from proton.utils import BlockingConnection @@ -723,16 +722,19 @@ class LinkRouteProxyTest(TestCase): cls.CONNECTOR_TYPE = 'org.apache.qpid.dispatch.connector' def _get_address(self, router, address): + """Lookup address in route table""" a_type = 'org.apache.qpid.dispatch.router.address' addrs = router.management.query(a_type).get_dicts() return list(filter(lambda a: a['name'].find(address) != -1, addrs)) def _wait_address_gone(self, router, address): + """Block until address is removed from the route table""" while self._get_address(router, address): sleep(0.1) def _test_traffic(self, sender, receiver, address, count=5): + """Generate message traffic between two normal clients""" tr = AsyncTestReceiver(receiver, address) ts = AsyncTestSender(sender, address, count) ts.wait() # wait until all sent @@ -740,6 +742,91 @@ class LinkRouteProxyTest(TestCase): tr.queue.get(timeout=TIMEOUT) tr.stop() + def _test_attach_weirdness(self, service): + """Exercise a service that simulates link attach failures""" + + # create a consumer, do not wait for link to open, reattach + # on received detach + rx = AsyncTestReceiver(self.EB1.listener, 'CfgLinkRoute1/foo', + wait=False, recover_link=True) + service.link_dropped.wait(timeout=TIMEOUT) + service.join() # wait for thread exit + del service + + # now attach a working service to the same address, + # make sure it all works + fs = FakeService(self.EA1.route_container) + self.INT_B.wait_address("CfgLinkRoute1") + + if True: # + rx.stop() + fs.join() + else: + tx = AsyncTestSender(self.EA1.listener, 'CfgLinkRoute1/foo', + + body="HEY HO LET'S GO!") + tx.wait() + + msg = rx.queue.get(timeout=TIMEOUT) + self.assertTrue(msg.body == "HEY HO LET'S GO!") + rx.stop() + fs.join() + self.assertEqual(1, fs.in_count) + self.assertEqual(1, fs.out_count) + + def test_01_immedate_detach_reattach(self): + """ + Have a service for a link routed address abruptly detach + in response to an incoming link attach + + The attaching client from EB1 will get an attach response then an + immediate detach. The client will immediately re-establish the link. + """ + class AttachDropper(FakeService): + def __init__(self, *args, **kwargs): + super(AttachDropper, self).__init__(*args, **kwargs) + self.link_dropped = Event() + + def on_link_remote_open(self, event): + # drop it + event.link.close() + event.connection.close() + self.link_dropped.set() + + ad = AttachDropper(self.EA1.route_container) + self.INT_B.wait_address("CfgLinkRoute1") + self._test_attach_weirdness(ad) + + def test_02_thrashing_link_routes(self): + """ + Rapidly add and delete link routes at the edge + """ + + # activate the pre-configured link routes + ea1_mgmt = self.EA1.management + fs = FakeService(self.EA1.route_container) + self.INT_B.wait_address("CfgLinkRoute1") + + for i in range(10): + lr1 = ea1_mgmt.create(type=self.CFG_LINK_ROUTE_TYPE, + name="TestLRout%d" % i, + attributes={'pattern': 'Test/*/%d/#' % i, + 'containerId': 'FakeBroker', + 'direction': 'out'}) + lr2 = ea1_mgmt.create(type=self.CFG_LINK_ROUTE_TYPE, + name="TestLRin%d" % i, + attributes={'pattern': 'Test/*/%d/#' % i, + 'containerId': 'FakeBroker', + 'direction': 'in'}) + # verify that they are correctly propagated (once) + if i == 9: + self.INT_B.wait_address("Test/*/9/#") + lr1.delete() + lr2.delete() + + fs.join() + self._wait_address_gone(self.INT_B, "CfgLinkRoute1") + def _validate_topology(self, router, expected_links, address): """ query existing links and verify they are set up as expected @@ -765,7 +852,59 @@ class LinkRouteProxyTest(TestCase): test_links) self.assertTrue(len(matches) == 1) - def test_link_topology(self): + def test_03_interior_conn_lost(self): + """ + What happens when the interior connection bounces? + """ + config = Qdrouterd.Config([('router', {'mode': 'edge', + 'id': 'Edge1'}), + ('listener', {'role': 'normal', + 'port': self.tester.get_port()}), + ('listener', {'name': 'rc', + 'role': 'route-container', + 'port': self.tester.get_port()}), + ('linkRoute', {'pattern': 'Edge1/*', + 'containerId': 'FakeBroker', + 'direction': 'in'}), + ('linkRoute', {'pattern': 'Edge1/*', + 'containerId': 'FakeBroker', + 'direction': 'out'})]) + er = self.tester.qdrouterd('Edge1', config, wait=True) + + # activate the link routes before the connection exists + fs = FakeService(er.addresses[1]) + er.wait_address("Edge1/*") + + # create the connection to interior + er_mgmt = er.management + ctor = er_mgmt.create(type=self.CONNECTOR_TYPE, + name='toA', + attributes={'role': 'edge', + 'port': self.INTA_edge_port}) + self.INT_B.wait_address("Edge1/*") + + # delete it, and verify the routes are removed + ctor.delete() + self._wait_address_gone(self.INT_B, "Edge1/*") + + # now recreate and verify routes re-appear + ctor = er_mgmt.create(type=self.CONNECTOR_TYPE, + name='toA', + attributes={'role': 'edge', + 'port': self.INTA_edge_port}) + self.INT_B.wait_address("Edge1/*") + self._test_traffic(self.INT_B.listener, + self.INT_B.listener, + "Edge1/One", + count=5) + fs.join() + self.assertEqual(5, fs.in_count) + self.assertEqual(5, fs.out_count) + + er.teardown() + self._wait_address_gone(self.INT_B, "Edge1/*") + + def test_50_link_topology(self): """ Verify that the link topology that results from activating a link route and sending traffic is correct @@ -825,7 +964,7 @@ class LinkRouteProxyTest(TestCase): self.assertEqual(1, fs.in_count) self.assertEqual(1, fs.out_count) - def test_link_route_proxy_configured(self): + def test_51_link_route_proxy_configured(self): """ Activate the configured link routes via a FakeService, verify proxies created by passing traffic from/to and interior router @@ -834,10 +973,12 @@ class LinkRouteProxyTest(TestCase): fs = FakeService(self.EA1.route_container) self.INT_B.wait_address("CfgLinkRoute1") + self._test_traffic(self.INT_B.listener, self.INT_B.listener, "CfgLinkRoute1/hi", count=5) + fs.join() self.assertEqual(5, fs.in_count) self.assertEqual(5, fs.out_count) @@ -850,16 +991,19 @@ class LinkRouteProxyTest(TestCase): fs = FakeService(self.EB1.route_container) self.INT_A.wait_address("*.cfg.pattern.#") + self._test_traffic(self.INT_A.listener, self.INT_A.listener, "MATCH.cfg.pattern", count=5) + fs.join() self.assertEqual(5, fs.in_count) self.assertEqual(5, fs.out_count) self._wait_address_gone(self.INT_A, "*.cfg.pattern.#") - def test_conn_link_route_proxy(self): + + def test_52_conn_link_route_proxy(self): """ Test connection scoped link routes by connecting a fake service to the Edge via the route-container connection. Have the fake service @@ -896,88 +1040,6 @@ class LinkRouteProxyTest(TestCase): self._wait_address_gone(self.INT_A, "Conn/*/One") - def test_interior_conn_lost(self): - """ - What happens when the interior connection bounces? - """ - config = Qdrouterd.Config([('router', {'mode': 'edge', - 'id': 'Edge1'}), - ('listener', {'role': 'normal', - 'port': self.tester.get_port()}), - ('listener', {'name': 'rc', - 'role': 'route-container', - 'port': self.tester.get_port()}), - ('linkRoute', {'pattern': 'Edge1/*', - 'containerId': 'FakeBroker', - 'direction': 'in'}), - ('linkRoute', {'pattern': 'Edge1/*', - 'containerId': 'FakeBroker', - 'direction': 'out'})]) - er = self.tester.qdrouterd('Edge1', config, wait=True) - - # activate the link routes before the connection exists - fs = FakeService(er.addresses[1]) - er.wait_address("Edge1/*") - - # create the connection to interior - er_mgmt = er.management - ctor = er_mgmt.create(type=self.CONNECTOR_TYPE, - name='toA', - attributes={'role': 'edge', - 'port': self.INTA_edge_port}) - self.INT_B.wait_address("Edge1/*") - - # delete it, and verify the routes are removed - ctor.delete() - self._wait_address_gone(self.INT_B, "Edge1/*") - - # now recreate and verify routes re-appear - ctor = er_mgmt.create(type=self.CONNECTOR_TYPE, - name='toA', - attributes={'role': 'edge', - 'port': self.INTA_edge_port}) - self.INT_B.wait_address("Edge1/*") - self._test_traffic(self.INT_B.listener, - self.INT_B.listener, - "Edge1/One", - count=5) - fs.join() - self.assertEqual(5, fs.in_count) - self.assertEqual(5, fs.out_count) - - er.teardown() - self._wait_address_gone(self.INT_B, "Edge1/*") - - def test_thrashing_link_routes(self): - """ - Rapidly add and delete link routes at the edge - """ - - # activate the pre-configured link routes - ea1_mgmt = self.EA1.management - fs = FakeService(self.EA1.route_container) - self.INT_B.wait_address("CfgLinkRoute1") - - for i in range(10): - lr1 = ea1_mgmt.create(type=self.CFG_LINK_ROUTE_TYPE, - name="TestLRout%d" % i, - attributes={'pattern': 'Test/*/%d/#' % i, - 'containerId': 'FakeBroker', - 'direction': 'out'}) - lr2 = ea1_mgmt.create(type=self.CFG_LINK_ROUTE_TYPE, - name="TestLRin%d" % i, - attributes={'pattern': 'Test/*/%d/#' % i, - 'containerId': 'FakeBroker', - 'direction': 'in'}) - # verify that they are correctly propagated (once) - if i == 9: - self.INT_B.wait_address("Test/*/9/#") - lr1.delete() - lr2.delete() - - fs.join() - self._wait_address_gone(self.INT_B, "CfgLinkRoute1") - class Timeout(object): def __init__(self, parent): diff --git a/tests/test_broker.py b/tests/test_broker.py index a05df02..df60626 100644 --- a/tests/test_broker.py +++ b/tests/test_broker.py @@ -39,11 +39,6 @@ from proton.reactor import AtMostOnce from system_test import TIMEOUT -class FakeBrokerStop(Exception): - """stop the broker from a handler callback""" - pass - - class FakeBroker(MessagingHandler): """ A fake broker-like service that listens for client connections @@ -110,22 +105,17 @@ class FakeBroker(MessagingHandler): self._container.timeout = 1.0 self._container.start() - try: - while self._container.process(): - if self._stop_thread: - break - - if self.acceptor: - self.acceptor.close() - self.acceptor = None - for c in self._connections: - c.close() - self._connections = [] - self._container.process() - except FakeBrokerStop: - # this abruptly kills the broker useful to test how dispatch deals - # with hung/stopped containers - pass + while self._container.process(): + if self._stop_thread: + break + + if self.acceptor: + self.acceptor.close() + self.acceptor = None + for c in self._connections: + c.close() + self._connections = [] + self._container.process() def join(self): self._stop_thread = True --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org