This is an automated email from the ASF dual-hosted git repository.

gsim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git

commit ea32ab17c4d977edcc9f6cf7f1f52565cb7b440a
Author: Gordon Sim <g...@redhat.com>
AuthorDate: Thu Sep 19 17:14:23 2019 +0100

    DISPATCH-1428: allow a route-container connection to be looked up by 
connector name, even if the container id is the same as a connection from a 
different connector
---
 src/router_core/route_control.c       |  94 +++++++++++++++---------
 src/router_core/router_core_private.h |   1 +
 tests/system_tests_link_routes.py     | 130 ++++++++++++++++++++++++++++++++++
 3 files changed, 191 insertions(+), 34 deletions(-)

diff --git a/src/router_core/route_control.c b/src/router_core/route_control.c
index 3249148..9ec0023 100644
--- a/src/router_core/route_control.c
+++ b/src/router_core/route_control.c
@@ -549,22 +549,8 @@ void qdr_route_del_auto_link_CT(qdr_core_t *core, 
qdr_auto_link_t *al)
     qdr_core_delete_auto_link(core, al);
 }
 
-
-void qdr_route_connection_opened_CT(qdr_core_t       *core,
-                                    qdr_connection_t *conn,
-                                    qdr_field_t      *container_field,
-                                    qdr_field_t      *connection_field)
+static void activate_route_connection(qdr_core_t *core, qdr_connection_t 
*conn, qdr_conn_identifier_t *cid)
 {
-    if (conn->role != QDR_ROLE_ROUTE_CONTAINER)
-        return;
-
-    qdr_conn_identifier_t *cid = qdr_route_declare_id_CT(core,
-            container_field?container_field->iterator:0, 
connection_field?connection_field->iterator:0);
-
-    qdr_add_connection_ref(&cid->connection_refs, conn);
-
-    conn->conn_id        = cid;
-
     //
     // Activate all link-routes associated with this remote container.
     //
@@ -584,24 +570,8 @@ void qdr_route_connection_opened_CT(qdr_core_t       *core,
     }
 }
 
-
-void qdr_route_connection_closed_CT(qdr_core_t *core, qdr_connection_t *conn)
+static void deactivate_route_connection(qdr_core_t *core, qdr_connection_t 
*conn, qdr_conn_identifier_t *cid)
 {
-    //
-    // release any connection-based link routes.  These can exist on
-    // QDR_ROLE_NORMAL connections.
-    //
-    while (DEQ_HEAD(conn->conn_link_routes)) {
-        qdr_link_route_t *lr = DEQ_HEAD(conn->conn_link_routes);
-        // removes the link route from conn->link_routes
-        qdr_route_del_conn_route_CT(core, lr);
-    }
-
-    if (conn->role != QDR_ROLE_ROUTE_CONTAINER)
-        return;
-
-    qdr_conn_identifier_t *cid = conn->conn_id;
-    if (cid) {
         //
         // Deactivate all link-routes associated with this remote container.
         //
@@ -625,9 +595,65 @@ void qdr_route_connection_closed_CT(qdr_core_t *core, 
qdr_connection_t *conn)
         //
         qdr_del_connection_ref(&cid->connection_refs, conn);
 
-        conn->conn_id        = 0;
-
         qdr_route_check_id_for_deletion_CT(core, cid);
+}
+
+void qdr_route_connection_opened_CT(qdr_core_t       *core,
+                                    qdr_connection_t *conn,
+                                    qdr_field_t      *container_field,
+                                    qdr_field_t      *connection_field)
+{
+    if (conn->role != QDR_ROLE_ROUTE_CONTAINER)
+        return;
+
+    if (connection_field) {
+        qdr_conn_identifier_t *cid = qdr_route_declare_id_CT(core, 0, 
connection_field->iterator);
+        qdr_add_connection_ref(&cid->connection_refs, conn);
+        conn->conn_id = cid;
+        activate_route_connection(core, conn, conn->conn_id);
+        if (container_field) {
+            cid = qdr_route_declare_id_CT(core, container_field->iterator, 0);
+            if (cid != conn->conn_id) {
+                //the connection and container may be indexed to different 
objects if
+                //there are multiple distinctly named connectors which connect 
to the
+                //same amqp container
+                qdr_add_connection_ref(&cid->connection_refs, conn);
+                conn->alt_conn_id = cid;
+                activate_route_connection(core, conn, conn->alt_conn_id);
+            }
+        }
+    } else {
+        qdr_conn_identifier_t *cid = qdr_route_declare_id_CT(core, 
container_field->iterator, 0);
+        qdr_add_connection_ref(&cid->connection_refs, conn);
+        conn->conn_id = cid;
+        activate_route_connection(core, conn, conn->conn_id);
+    }
+}
+
+void qdr_route_connection_closed_CT(qdr_core_t *core, qdr_connection_t *conn)
+{
+    //
+    // release any connection-based link routes.  These can exist on
+    // QDR_ROLE_NORMAL connections.
+    //
+    while (DEQ_HEAD(conn->conn_link_routes)) {
+        qdr_link_route_t *lr = DEQ_HEAD(conn->conn_link_routes);
+        // removes the link route from conn->link_routes
+        qdr_route_del_conn_route_CT(core, lr);
+    }
+
+    if (conn->role != QDR_ROLE_ROUTE_CONTAINER)
+        return;
+
+    qdr_conn_identifier_t *cid = conn->conn_id;
+    if (cid) {
+        deactivate_route_connection(core, conn, cid);
+        conn->conn_id = 0;
+    }
+    cid = conn->alt_conn_id;
+    if (cid) {
+        deactivate_route_connection(core, conn, cid);
+        conn->alt_conn_id = 0;
     }
 }
 
diff --git a/src/router_core/router_core_private.h 
b/src/router_core/router_core_private.h
index ef137d6..e08c70a 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -635,6 +635,7 @@ struct qdr_connection_t {
     qdr_connection_role_t       role;
     int                         inter_router_cost;
     qdr_conn_identifier_t      *conn_id;
+    qdr_conn_identifier_t      *alt_conn_id;
     bool                        strip_annotations_in;
     bool                        strip_annotations_out;
     bool                        policy_allow_dynamic_link_routes;
diff --git a/tests/system_tests_link_routes.py 
b/tests/system_tests_link_routes.py
index 33658b4..bd204ab 100644
--- a/tests/system_tests_link_routes.py
+++ b/tests/system_tests_link_routes.py
@@ -2052,6 +2052,136 @@ class InvalidTagTest(MessagingHandler):
     def run(self):
         Container(self).run()
 
+class Dispatch1428(TestCase):
+    """
+    Sets up 2 routers (one of which are acting as brokers (QDR.A)).
+
+        QDR.A acting broker #1
+             +---------+         +---------+
+             |         | <------ |         |
+             |  QDR.A  |         |  QDR.B  |
+             |         | ------> |         |
+             +---------+         +---------+
+
+    """
+    @classmethod
+    def get_router(cls, index):
+        return cls.routers[index]
+
+    @classmethod
+    def setUpClass(cls):
+        """Start two routers"""
+        super(Dispatch1428, cls).setUpClass()
+
+        def router(name, connection):
+
+            config = [
+                ('router', {'mode': 'interior', 'id': 'QDR.%s'%name}),
+            ] + connection
+
+            config = Qdrouterd.Config(config)
+            cls.routers.append(cls.tester.qdrouterd(name, config, wait=False))
+
+        cls.routers = []
+        a_listener_port = cls.tester.get_port()
+        b_listener_port = cls.tester.get_port()
+
+        router('A',
+               [
+                   ('listener', {'role': 'normal', 'host': '0.0.0.0', 'port': 
a_listener_port, 'saslMechanisms': 'ANONYMOUS'}),
+               ])
+        router('B',
+               [
+                   ('listener', {'role': 'normal', 'host': '0.0.0.0', 'port': 
b_listener_port, 'saslMechanisms': 'ANONYMOUS'}),
+                   ('connector', {'name': 'one', 'role': 'route-container', 
'host': '0.0.0.0', 'port': a_listener_port, 'saslMechanisms': 'ANONYMOUS'}),
+                   ('connector', {'name': 'two', 'role': 'route-container', 
'host': '0.0.0.0', 'port': a_listener_port, 'saslMechanisms': 'ANONYMOUS'})
+               ]
+               )
+        sleep(2)
+
+
+    def run_qdmanage(self, cmd, input=None, expect=Process.EXIT_OK, 
address=None):
+        p = self.popen(
+            ['qdmanage'] + cmd.split(' ') + ['--bus', address or 
self.address(), '--indent=-1', '--timeout', str(TIMEOUT)],
+            stdin=PIPE, stdout=PIPE, stderr=STDOUT, expect=expect,
+            universal_newlines=True)
+        out = p.communicate(input)[0]
+        try:
+            p.teardown()
+        except Exception as e:
+            raise Exception("%s\n%s" % (e, out))
+        return out
+
+    def test_both_link_routes_active(self):
+        cmds = [
+            'CREATE --type=linkRoute name=foo prefix=foo direction=in 
connection=one',
+            'CREATE --type=linkRoute name=bar prefix=bar direction=in 
connection=two',
+            'CREATE --type=linkRoute name=baz prefix=baz direction=in 
containerId=QDR.A'
+        ]
+        for c in cmds:
+            self.run_qdmanage(cmd=c, address=self.routers[1].addresses[0])
+
+        first = SendReceive("%s/foo" % self.routers[1].addresses[0], "%s/foo" 
% self.routers[0].addresses[0])
+        first.run()
+        self.assertEqual(None, first.error)
+        second = SendReceive("%s/bar" % self.routers[1].addresses[0], "%s/bar" 
% self.routers[0].addresses[0])
+        second.run()
+        self.assertEqual(None, second.error)
+        third = SendReceive("%s/baz" % self.routers[1].addresses[0], "%s/baz" 
% self.routers[0].addresses[0])
+        third.run()
+        self.assertEqual(None, third.error)
+
+class Timeout(object):
+    def __init__(self, parent):
+        self.parent = parent
+
+    def on_timer_task(self, event):
+        self.parent.timeout()
+
+
+class SendReceive(MessagingHandler):
+    def __init__(self, send_url, recv_url, message=None):
+        super(SendReceive, self).__init__()
+        self.send_url = send_url
+        self.recv_url = recv_url
+        self.message = message or Message(body="SendReceiveTest")
+        self.sent = False
+        self.error = None
+
+    def close(self):
+        self.sender.close()
+        self.receiver.close()
+        self.sender.connection.close()
+        self.receiver.connection.close()
+
+    def timeout(self):
+        self.error = "Timeout Expired - Check for cores"
+        self.close()
+
+    def stop(self):
+        self.close()
+        self.timer.cancel()
+
+    def on_start(self, event):
+        self.timer      = event.reactor.schedule(TIMEOUT, Timeout(self))
+        event.container.container_id = "SendReceiveTestClient"
+        self.sender = event.container.create_sender(self.send_url)
+        self.receiver = event.container.create_receiver(self.recv_url)
+
+    def on_sendable(self, event):
+        if not self.sent:
+            event.sender.send(self.message)
+            self.sent = True
+
+    def on_message(self, event):
+        if self.message.body != event.message.body:
+            self.error = "Incorrect message. Got %s, expected %s" % 
(event.message.body, self.message.body)
+
+    def on_accepted(self, event):
+        self.stop()
+
+    def run(self):
+        Container(self).run()
 
 if __name__ == '__main__':
     unittest.main(main_module())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to