This is an automated email from the ASF dual-hosted git repository. gsim pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
commit ea32ab17c4d977edcc9f6cf7f1f52565cb7b440a Author: Gordon Sim <g...@redhat.com> AuthorDate: Thu Sep 19 17:14:23 2019 +0100 DISPATCH-1428: allow a route-container connection to be looked up by connector name, even if the container id is the same as a connection from a different connector --- src/router_core/route_control.c | 94 +++++++++++++++--------- src/router_core/router_core_private.h | 1 + tests/system_tests_link_routes.py | 130 ++++++++++++++++++++++++++++++++++ 3 files changed, 191 insertions(+), 34 deletions(-) diff --git a/src/router_core/route_control.c b/src/router_core/route_control.c index 3249148..9ec0023 100644 --- a/src/router_core/route_control.c +++ b/src/router_core/route_control.c @@ -549,22 +549,8 @@ void qdr_route_del_auto_link_CT(qdr_core_t *core, qdr_auto_link_t *al) qdr_core_delete_auto_link(core, al); } - -void qdr_route_connection_opened_CT(qdr_core_t *core, - qdr_connection_t *conn, - qdr_field_t *container_field, - qdr_field_t *connection_field) +static void activate_route_connection(qdr_core_t *core, qdr_connection_t *conn, qdr_conn_identifier_t *cid) { - if (conn->role != QDR_ROLE_ROUTE_CONTAINER) - return; - - qdr_conn_identifier_t *cid = qdr_route_declare_id_CT(core, - container_field?container_field->iterator:0, connection_field?connection_field->iterator:0); - - qdr_add_connection_ref(&cid->connection_refs, conn); - - conn->conn_id = cid; - // // Activate all link-routes associated with this remote container. // @@ -584,24 +570,8 @@ void qdr_route_connection_opened_CT(qdr_core_t *core, } } - -void qdr_route_connection_closed_CT(qdr_core_t *core, qdr_connection_t *conn) +static void deactivate_route_connection(qdr_core_t *core, qdr_connection_t *conn, qdr_conn_identifier_t *cid) { - // - // release any connection-based link routes. These can exist on - // QDR_ROLE_NORMAL connections. - // - while (DEQ_HEAD(conn->conn_link_routes)) { - qdr_link_route_t *lr = DEQ_HEAD(conn->conn_link_routes); - // removes the link route from conn->link_routes - qdr_route_del_conn_route_CT(core, lr); - } - - if (conn->role != QDR_ROLE_ROUTE_CONTAINER) - return; - - qdr_conn_identifier_t *cid = conn->conn_id; - if (cid) { // // Deactivate all link-routes associated with this remote container. // @@ -625,9 +595,65 @@ void qdr_route_connection_closed_CT(qdr_core_t *core, qdr_connection_t *conn) // qdr_del_connection_ref(&cid->connection_refs, conn); - conn->conn_id = 0; - qdr_route_check_id_for_deletion_CT(core, cid); +} + +void qdr_route_connection_opened_CT(qdr_core_t *core, + qdr_connection_t *conn, + qdr_field_t *container_field, + qdr_field_t *connection_field) +{ + if (conn->role != QDR_ROLE_ROUTE_CONTAINER) + return; + + if (connection_field) { + qdr_conn_identifier_t *cid = qdr_route_declare_id_CT(core, 0, connection_field->iterator); + qdr_add_connection_ref(&cid->connection_refs, conn); + conn->conn_id = cid; + activate_route_connection(core, conn, conn->conn_id); + if (container_field) { + cid = qdr_route_declare_id_CT(core, container_field->iterator, 0); + if (cid != conn->conn_id) { + //the connection and container may be indexed to different objects if + //there are multiple distinctly named connectors which connect to the + //same amqp container + qdr_add_connection_ref(&cid->connection_refs, conn); + conn->alt_conn_id = cid; + activate_route_connection(core, conn, conn->alt_conn_id); + } + } + } else { + qdr_conn_identifier_t *cid = qdr_route_declare_id_CT(core, container_field->iterator, 0); + qdr_add_connection_ref(&cid->connection_refs, conn); + conn->conn_id = cid; + activate_route_connection(core, conn, conn->conn_id); + } +} + +void qdr_route_connection_closed_CT(qdr_core_t *core, qdr_connection_t *conn) +{ + // + // release any connection-based link routes. These can exist on + // QDR_ROLE_NORMAL connections. + // + while (DEQ_HEAD(conn->conn_link_routes)) { + qdr_link_route_t *lr = DEQ_HEAD(conn->conn_link_routes); + // removes the link route from conn->link_routes + qdr_route_del_conn_route_CT(core, lr); + } + + if (conn->role != QDR_ROLE_ROUTE_CONTAINER) + return; + + qdr_conn_identifier_t *cid = conn->conn_id; + if (cid) { + deactivate_route_connection(core, conn, cid); + conn->conn_id = 0; + } + cid = conn->alt_conn_id; + if (cid) { + deactivate_route_connection(core, conn, cid); + conn->alt_conn_id = 0; } } diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h index ef137d6..e08c70a 100644 --- a/src/router_core/router_core_private.h +++ b/src/router_core/router_core_private.h @@ -635,6 +635,7 @@ struct qdr_connection_t { qdr_connection_role_t role; int inter_router_cost; qdr_conn_identifier_t *conn_id; + qdr_conn_identifier_t *alt_conn_id; bool strip_annotations_in; bool strip_annotations_out; bool policy_allow_dynamic_link_routes; diff --git a/tests/system_tests_link_routes.py b/tests/system_tests_link_routes.py index 33658b4..bd204ab 100644 --- a/tests/system_tests_link_routes.py +++ b/tests/system_tests_link_routes.py @@ -2052,6 +2052,136 @@ class InvalidTagTest(MessagingHandler): def run(self): Container(self).run() +class Dispatch1428(TestCase): + """ + Sets up 2 routers (one of which are acting as brokers (QDR.A)). + + QDR.A acting broker #1 + +---------+ +---------+ + | | <------ | | + | QDR.A | | QDR.B | + | | ------> | | + +---------+ +---------+ + + """ + @classmethod + def get_router(cls, index): + return cls.routers[index] + + @classmethod + def setUpClass(cls): + """Start two routers""" + super(Dispatch1428, cls).setUpClass() + + def router(name, connection): + + config = [ + ('router', {'mode': 'interior', 'id': 'QDR.%s'%name}), + ] + connection + + config = Qdrouterd.Config(config) + cls.routers.append(cls.tester.qdrouterd(name, config, wait=False)) + + cls.routers = [] + a_listener_port = cls.tester.get_port() + b_listener_port = cls.tester.get_port() + + router('A', + [ + ('listener', {'role': 'normal', 'host': '0.0.0.0', 'port': a_listener_port, 'saslMechanisms': 'ANONYMOUS'}), + ]) + router('B', + [ + ('listener', {'role': 'normal', 'host': '0.0.0.0', 'port': b_listener_port, 'saslMechanisms': 'ANONYMOUS'}), + ('connector', {'name': 'one', 'role': 'route-container', 'host': '0.0.0.0', 'port': a_listener_port, 'saslMechanisms': 'ANONYMOUS'}), + ('connector', {'name': 'two', 'role': 'route-container', 'host': '0.0.0.0', 'port': a_listener_port, 'saslMechanisms': 'ANONYMOUS'}) + ] + ) + sleep(2) + + + def run_qdmanage(self, cmd, input=None, expect=Process.EXIT_OK, address=None): + p = self.popen( + ['qdmanage'] + cmd.split(' ') + ['--bus', address or self.address(), '--indent=-1', '--timeout', str(TIMEOUT)], + stdin=PIPE, stdout=PIPE, stderr=STDOUT, expect=expect, + universal_newlines=True) + out = p.communicate(input)[0] + try: + p.teardown() + except Exception as e: + raise Exception("%s\n%s" % (e, out)) + return out + + def test_both_link_routes_active(self): + cmds = [ + 'CREATE --type=linkRoute name=foo prefix=foo direction=in connection=one', + 'CREATE --type=linkRoute name=bar prefix=bar direction=in connection=two', + 'CREATE --type=linkRoute name=baz prefix=baz direction=in containerId=QDR.A' + ] + for c in cmds: + self.run_qdmanage(cmd=c, address=self.routers[1].addresses[0]) + + first = SendReceive("%s/foo" % self.routers[1].addresses[0], "%s/foo" % self.routers[0].addresses[0]) + first.run() + self.assertEqual(None, first.error) + second = SendReceive("%s/bar" % self.routers[1].addresses[0], "%s/bar" % self.routers[0].addresses[0]) + second.run() + self.assertEqual(None, second.error) + third = SendReceive("%s/baz" % self.routers[1].addresses[0], "%s/baz" % self.routers[0].addresses[0]) + third.run() + self.assertEqual(None, third.error) + +class Timeout(object): + def __init__(self, parent): + self.parent = parent + + def on_timer_task(self, event): + self.parent.timeout() + + +class SendReceive(MessagingHandler): + def __init__(self, send_url, recv_url, message=None): + super(SendReceive, self).__init__() + self.send_url = send_url + self.recv_url = recv_url + self.message = message or Message(body="SendReceiveTest") + self.sent = False + self.error = None + + def close(self): + self.sender.close() + self.receiver.close() + self.sender.connection.close() + self.receiver.connection.close() + + def timeout(self): + self.error = "Timeout Expired - Check for cores" + self.close() + + def stop(self): + self.close() + self.timer.cancel() + + def on_start(self, event): + self.timer = event.reactor.schedule(TIMEOUT, Timeout(self)) + event.container.container_id = "SendReceiveTestClient" + self.sender = event.container.create_sender(self.send_url) + self.receiver = event.container.create_receiver(self.recv_url) + + def on_sendable(self, event): + if not self.sent: + event.sender.send(self.message) + self.sent = True + + def on_message(self, event): + if self.message.body != event.message.body: + self.error = "Incorrect message. Got %s, expected %s" % (event.message.body, self.message.body) + + def on_accepted(self, event): + self.stop() + + def run(self): + Container(self).run() if __name__ == '__main__': unittest.main(main_module()) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org