Repository: qpid-dispatch Updated Branches: refs/heads/master 94be97fa5 -> 0138754e5
DISPATCH-1145 - Implemented responsibility (3). Deliveries with Mobile addresses now flow from interior to edge. Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/0138754e Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/0138754e Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/0138754e Branch: refs/heads/master Commit: 0138754e5811e761a68024f5c00af13e8d153fd5 Parents: 94be97f Author: Ted Ross <tr...@redhat.com> Authored: Tue Oct 16 11:23:18 2018 -0400 Committer: Ted Ross <tr...@redhat.com> Committed: Tue Oct 16 11:23:18 2018 -0400 ---------------------------------------------------------------------- .../modules/edge_router/addr_proxy.c | 22 ++++++++++++++ src/router_core/router_core.c | 19 +++++++------ src/router_core/router_core_private.h | 1 + tests/system_tests_edge_router.py | 30 ++++++++++++++------ 4 files changed, 56 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/0138754e/src/router_core/modules/edge_router/addr_proxy.c ---------------------------------------------------------------------- diff --git a/src/router_core/modules/edge_router/addr_proxy.c b/src/router_core/modules/edge_router/addr_proxy.c index d5dc237..34d5697 100644 --- a/src/router_core/modules/edge_router/addr_proxy.c +++ b/src/router_core/modules/edge_router/addr_proxy.c @@ -41,6 +41,7 @@ struct qcm_edge_addr_proxy_t { qdrc_event_subscription_t *event_sub; bool uplink_established; qdr_address_t *uplink_addr; + qdr_connection_t *uplink_conn; }; @@ -54,6 +55,15 @@ static qdr_terminus_t *qdr_terminus_edge_downlink(const char *addr) } +static qdr_terminus_t *qdr_terminus_normal(const char *addr) +{ + qdr_terminus_t *term = qdr_terminus(0); + if (addr) + qdr_terminus_set_address(term, addr); + return term; +} + + static void on_conn_event(void *context, qdrc_event_t event, qdr_connection_t *conn) { qcm_edge_addr_proxy_t *ap = (qcm_edge_addr_proxy_t*) context; @@ -64,6 +74,7 @@ static void on_conn_event(void *context, qdrc_event_t event, qdr_connection_t *c // Flag the uplink as being established. // ap->uplink_established = true; + ap->uplink_conn = conn; // // Attach an anonymous sending link to the interior router. @@ -96,6 +107,7 @@ static void on_conn_event(void *context, qdrc_event_t event, qdr_connection_t *c case QDRC_EVENT_CONN_EDGE_LOST : ap->uplink_established = false; + ap->uplink_conn = 0; break; default: @@ -108,6 +120,7 @@ static void on_conn_event(void *context, qdrc_event_t event, qdr_connection_t *c static void on_addr_event(void *context, qdrc_event_t event, qdr_address_t *addr) { qcm_edge_addr_proxy_t *ap = (qcm_edge_addr_proxy_t*) context; + qdr_link_t *link; // // If we don't have an established uplink, there is no further work to be done. @@ -124,9 +137,18 @@ static void on_addr_event(void *context, qdrc_event_t event, qdr_address_t *addr switch (event) { case QDRC_EVENT_ADDR_BECAME_LOCAL_DEST : + link = qdr_create_link_CT(ap->core, ap->uplink_conn, QD_LINK_ENDPOINT, QD_INCOMING, + qdr_terminus_normal(key + 2), qdr_terminus_normal(0)); + qdr_core_bind_address_link_CT(ap->core, addr, link); + addr->edge_inlink = link; + break; case QDRC_EVENT_ADDR_NO_LONGER_LOCAL_DEST : + link = addr->edge_inlink; + qdr_core_unbind_address_link_CT(ap->core, addr, link); + qdr_link_outbound_detach_CT(ap->core, link, 0, QDR_CONDITION_NONE, true); + break; default: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/0138754e/src/router_core/router_core.c ---------------------------------------------------------------------- diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c index f7ddb5b..b984532 100644 --- a/src/router_core/router_core.c +++ b/src/router_core/router_core.c @@ -429,16 +429,19 @@ void qdr_core_bind_address_link_CT(qdr_core_t *core, qdr_address_t *addr, qdr_li void qdr_core_unbind_address_link_CT(qdr_core_t *core, qdr_address_t *addr, qdr_link_t *link) { - qdr_del_link_ref(&addr->rlinks, link, QDR_LINK_LIST_CLASS_ADDRESS); link->owning_addr = 0; - if (DEQ_SIZE(addr->rlinks) == 0) { - const char *key = (const char*) qd_hash_key_by_handle(addr->hash_handle); - if (key && (*key == QD_ITER_HASH_PREFIX_MOBILE || *key == QD_ITER_HASH_PREFIX_EDGE_SUMMARY)) - qdr_post_mobile_removed_CT(core, key); - qdrc_event_addr_raise(core, QDRC_EVENT_ADDR_NO_LONGER_LOCAL_DEST, addr); - } else if (DEQ_SIZE(addr->rlinks) == 1 && qd_bitmask_cardinality(addr->rnodes) == 0) - qdrc_event_addr_raise(core, QDRC_EVENT_ADDR_ONE_LOCAL_DEST, addr); + if (link->link_direction == QD_OUTGOING) { + qdr_del_link_ref(&addr->rlinks, link, QDR_LINK_LIST_CLASS_ADDRESS); + if (DEQ_SIZE(addr->rlinks) == 0) { + const char *key = (const char*) qd_hash_key_by_handle(addr->hash_handle); + if (key && (*key == QD_ITER_HASH_PREFIX_MOBILE || *key == QD_ITER_HASH_PREFIX_EDGE_SUMMARY)) + qdr_post_mobile_removed_CT(core, key); + qdrc_event_addr_raise(core, QDRC_EVENT_ADDR_NO_LONGER_LOCAL_DEST, addr); + } else if (DEQ_SIZE(addr->rlinks) == 1 && qd_bitmask_cardinality(addr->rnodes) == 0) + qdrc_event_addr_raise(core, QDRC_EVENT_ADDR_ONE_LOCAL_DEST, addr); + } else + qdr_del_link_ref(&addr->inlinks, link, QDR_LINK_LIST_CLASS_ADDRESS); } http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/0138754e/src/router_core/router_core_private.h ---------------------------------------------------------------------- diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h index c027dc4..45f4437 100644 --- a/src/router_core/router_core_private.h +++ b/src/router_core/router_core_private.h @@ -477,6 +477,7 @@ struct qdr_address_t { qd_hash_handle_t *hash_handle; ///< Linkage back to the hash table entry qdrc_endpoint_desc_t *core_endpoint; ///< [ref] Set if this address is bound to an in-core endpoint void *core_endpoint_context; + qdr_link_t *edge_inlink; ///< [ref] In-link from connected Interior router (on edge router) qd_address_treatment_t treatment; qdr_forwarder_t *forwarder; int ref_count; ///< Number of link-routes + auto-links referencing this address http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/0138754e/tests/system_tests_edge_router.py ---------------------------------------------------------------------- diff --git a/tests/system_tests_edge_router.py b/tests/system_tests_edge_router.py index 803ba6d..b489e2b 100644 --- a/tests/system_tests_edge_router.py +++ b/tests/system_tests_edge_router.py @@ -38,10 +38,10 @@ class RouterTest(TestCase): ('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'no'}), ('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'no', 'multiTenant': 'yes'}), ('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'no', 'role': 'route-container'}), - ('linkRoute', {'prefix': '0.0.0.0/link', 'dir': 'in', 'containerId': 'LRC'}), - ('linkRoute', {'prefix': '0.0.0.0/link', 'dir': 'out', 'containerId': 'LRC'}), - ('autoLink', {'addr': '0.0.0.0/queue.waypoint', 'containerId': 'ALC', 'dir': 'in'}), - ('autoLink', {'addr': '0.0.0.0/queue.waypoint', 'containerId': 'ALC', 'dir': 'out'}), + ('linkRoute', {'prefix': '0.0.0.0/link', 'direction': 'in', 'containerId': 'LRC'}), + ('linkRoute', {'prefix': '0.0.0.0/link', 'direction': 'out', 'containerId': 'LRC'}), + ('autoLink', {'addr': '0.0.0.0/queue.waypoint', 'containerId': 'ALC', 'direction': 'in'}), + ('autoLink', {'addr': '0.0.0.0/queue.waypoint', 'containerId': 'ALC', 'direction': 'out'}), ('address', {'prefix': 'closest', 'distribution': 'closest'}), ('address', {'prefix': 'spread', 'distribution': 'balanced'}), ('address', {'prefix': 'multicast', 'distribution': 'multicast'}), @@ -139,7 +139,6 @@ class RouterTest(TestCase): self.assertEqual(None, test.error) def test_11_mobile_address_interior_to_edge(self): - self.skipTest("Temporarily disabled") test = MobileAddressTest(self.routers[2].addresses[0], self.routers[0].addresses[0], "test_11") @@ -335,13 +334,17 @@ class MobileAddressTest(MessagingHandler): self.receiver_conn = None self.sender_conn = None self.receiver = None - self.count = 10 + self.count = 300 + self.rel_count = 50 self.n_rcvd = 0 self.n_sent = 0 + self.n_settled = 0 + self.n_released = 0 self.error = None def timeout(self): - self.error = "Timeout Expired - n_sent=%d n_rcvd=%d addr=%s" % (self.n_sent, self.n_rcvd, self.address) + self.error = "Timeout Expired - n_sent=%d n_rcvd=%d n_settled=%d n_released=%d addr=%s" % \ + (self.n_sent, self.n_rcvd, self.n_settled, self.n_released, self.address) self.receiver_conn.close() self.sender_conn.close() @@ -359,7 +362,18 @@ class MobileAddressTest(MessagingHandler): def on_message(self, event): self.n_rcvd += 1 - if self.n_rcvd == self.count: + + def on_settled(self, event): + self.n_settled += 1 + if self.n_settled == self.count: + self.receiver.close() + for i in range(self.rel_count): + self.sender.send(Message(body="Message %d" % self.n_sent)) + self.n_sent += 1 + + def on_released(self, event): + self.n_released += 1 + if self.n_released == self.rel_count: self.receiver_conn.close() self.sender_conn.close() self.timer.cancel() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org