Repository: qpid-dispatch Updated Branches: refs/heads/master fcbbb5989 -> 532ebae51
DISPATCH-980: add fields to specify prefix to be inserted into- or removed from- remote address on link routes Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/532ebae5 Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/532ebae5 Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/532ebae5 Branch: refs/heads/master Commit: 532ebae51a455fba340e9625d71ea62cc64be8f2 Parents: fcbbb59 Author: Gordon Sim <g...@redhat.com> Authored: Fri Apr 27 01:10:34 2018 +0100 Committer: Gordon Sim <g...@redhat.com> Committed: Wed Jun 27 20:41:01 2018 +0100 ---------------------------------------------------------------------- include/qpid/dispatch/router_core.h | 20 ++ python/qpid_dispatch/management/qdrouter.json | 12 + src/router_config.c | 16 ++ src/router_core/agent_config_link_route.c | 22 +- src/router_core/agent_config_link_route.h | 2 +- src/router_core/connections.c | 12 + src/router_core/forwarder.c | 18 ++ src/router_core/route_control.c | 22 ++ src/router_core/route_control.h | 2 + src/router_core/router_core.c | 4 + src/router_core/router_core_private.h | 10 + src/router_core/terminus.c | 28 ++ tests/CMakeLists.txt | 1 + ...tem_tests_link_routes_add_external_prefix.py | 272 +++++++++++++++++++ tools/qdstat | 13 +- 15 files changed, 451 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/532ebae5/include/qpid/dispatch/router_core.h ---------------------------------------------------------------------- diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h index 8f144b0..17d3002 100644 --- a/include/qpid/dispatch/router_core.h +++ b/include/qpid/dispatch/router_core.h @@ -363,6 +363,26 @@ void qdr_terminus_set_address_iterator(qdr_terminus_t *term, qd_iterator_t *addr qd_iterator_t *qdr_terminus_get_address(qdr_terminus_t *term); /** + * qdr_terminus_insert_address_prefix + * + * Insert the given prefix into the terminus address + * + * @param term A qdr_terminus pointer returned by qdr_terminus() + * @param prefix null-terminated string + */ +void qdr_terminus_insert_address_prefix(qdr_terminus_t *term, const char *prefix); + +/** + * qdr_terminus_strip_address_prefix + * + * Remove the given prefix from the terminus address + * + * @param term A qdr_terminus pointer returned by qdr_terminus() + * @param prefix null-terminated string + */ +void qdr_terminus_strip_address_prefix(qdr_terminus_t *term, const char *prefix); + +/** * qdr_terminus_dnp_address * * Return the address field in the dynamic-node-properties if it is there. http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/532ebae5/python/qpid_dispatch/management/qdrouter.json ---------------------------------------------------------------------- diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json index 42f501f..fa29d94 100644 --- a/python/qpid_dispatch/management/qdrouter.json +++ b/python/qpid_dispatch/management/qdrouter.json @@ -1129,6 +1129,18 @@ "create": true, "required": false }, + "addExternalPrefix": { + "type": "string", + "description": "add the specified prefix to the address of the remote terminus on the route container link", + "create": true, + "required": false + }, + "delExternalPrefix": { + "type": "string", + "description": "remove the specified prefix to the address of the remote terminus on the route container link", + "create": true, + "required": false + }, "containerId": { "type": "string", "description": "ContainerID for the target container. Only one of containerId or connection should be specified for a linkRoute. Specifying both will result in the linkRoute not being created.", http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/532ebae5/src/router_config.c ---------------------------------------------------------------------- diff --git a/src/router_config.c b/src/router_config.c index 94758df..9ae9f9f 100644 --- a/src/router_config.c +++ b/src/router_config.c @@ -139,6 +139,8 @@ qd_error_t qd_router_configure_link_route(qd_router_t *router, qd_entity_t *enti char *name = 0; char *prefix = 0; char *pattern = 0; + char *add_prefix= 0; + char *del_prefix= 0; char *container = 0; char *c_name = 0; char *distrib = 0; @@ -153,6 +155,8 @@ qd_error_t qd_router_configure_link_route(qd_router_t *router, qd_entity_t *enti prefix = qd_entity_opt_string(entity, "prefix", 0); pattern = qd_entity_opt_string(entity, "pattern", 0); + add_prefix= qd_entity_opt_string(entity, "addExternalPrefix", 0); + del_prefix= qd_entity_opt_string(entity, "delExternalPrefix", 0); if (prefix && pattern) { qd_log(router->log_source, QD_LOG_WARNING, @@ -187,6 +191,16 @@ qd_error_t qd_router_configure_link_route(qd_router_t *router, qd_entity_t *enti qd_compose_insert_string(body, pattern); } + if (add_prefix) { + qd_compose_insert_string(body, "addExternalPrefix"); + qd_compose_insert_string(body, add_prefix); + } + + if (del_prefix) { + qd_compose_insert_string(body, "delExternalPrefix"); + qd_compose_insert_string(body, del_prefix); + } + if (container) { qd_compose_insert_string(body, "containerId"); qd_compose_insert_string(body, container); @@ -215,6 +229,8 @@ qd_error_t qd_router_configure_link_route(qd_router_t *router, qd_entity_t *enti free(name); free(prefix); + free(add_prefix); + free(del_prefix); free(container); free(c_name); free(distrib); http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/532ebae5/src/router_core/agent_config_link_route.c ---------------------------------------------------------------------- diff --git a/src/router_core/agent_config_link_route.c b/src/router_core/agent_config_link_route.c index 981d142..e94ad23 100644 --- a/src/router_core/agent_config_link_route.c +++ b/src/router_core/agent_config_link_route.c @@ -34,6 +34,8 @@ #define QDR_CONFIG_LINK_ROUTE_DIR 8 #define QDR_CONFIG_LINK_ROUTE_OPER_STATUS 9 #define QDR_CONFIG_LINK_ROUTE_PATTERN 10 +#define QDR_CONFIG_LINK_ROUTE_ADD_EXTERNAL_PREFIX 11 +#define QDR_CONFIG_LINK_ROUTE_DEL_EXTERNAL_PREFIX 12 const char *qdr_config_link_route_columns[] = {"name", @@ -47,6 +49,8 @@ const char *qdr_config_link_route_columns[] = "dir", "operStatus", "pattern", + "addExternalPrefix", + "delExternalPrefix", 0}; const char *CONFIG_LINKROUTE_TYPE = "org.apache.qpid.dispatch.router.config.linkRoute"; @@ -98,6 +102,20 @@ static void qdr_config_link_route_insert_column_CT(qdr_link_route_t *lr, int col qd_compose_insert_null(body); break; + case QDR_CONFIG_LINK_ROUTE_ADD_EXTERNAL_PREFIX: + if (lr->add_prefix) + qd_compose_insert_string(body, lr->add_prefix); + else + qd_compose_insert_null(body); + break; + + case QDR_CONFIG_LINK_ROUTE_DEL_EXTERNAL_PREFIX: + if (lr->del_prefix) + qd_compose_insert_string(body, lr->del_prefix); + else + qd_compose_insert_null(body); + break; + case QDR_CONFIG_LINK_ROUTE_DISTRIBUTION: switch (lr->treatment) { case QD_TREATMENT_LINK_BALANCED: text = "linkBalanced"; break; @@ -392,6 +410,8 @@ void qdra_config_link_route_create_CT(qdr_core_t *core, // qd_parsed_field_t *prefix_field = qd_parse_value_by_key(in_body, qdr_config_link_route_columns[QDR_CONFIG_LINK_ROUTE_PREFIX]); qd_parsed_field_t *pattern_field = qd_parse_value_by_key(in_body, qdr_config_link_route_columns[QDR_CONFIG_LINK_ROUTE_PATTERN]); + qd_parsed_field_t *add_prefix_field = qd_parse_value_by_key(in_body, qdr_config_link_route_columns[QDR_CONFIG_LINK_ROUTE_ADD_EXTERNAL_PREFIX]); + qd_parsed_field_t *del_prefix_field = qd_parse_value_by_key(in_body, qdr_config_link_route_columns[QDR_CONFIG_LINK_ROUTE_DEL_EXTERNAL_PREFIX]); qd_parsed_field_t *distrib_field = qd_parse_value_by_key(in_body, qdr_config_link_route_columns[QDR_CONFIG_LINK_ROUTE_DISTRIBUTION]); qd_parsed_field_t *connection_field = qd_parse_value_by_key(in_body, qdr_config_link_route_columns[QDR_CONFIG_LINK_ROUTE_CONNECTION]); qd_parsed_field_t *container_field = qd_parse_value_by_key(in_body, qdr_config_link_route_columns[QDR_CONFIG_LINK_ROUTE_CONTAINER_ID]); @@ -456,7 +476,7 @@ void qdra_config_link_route_create_CT(qdr_core_t *core, // The request is good. Create the entity. // - lr = qdr_route_add_link_route_CT(core, name, prefix_field, pattern_field, container_field, connection_field, trt, dir); + lr = qdr_route_add_link_route_CT(core, name, prefix_field, pattern_field, add_prefix_field, del_prefix_field, container_field, connection_field, trt, dir); // // Compose the result map for the response. http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/532ebae5/src/router_core/agent_config_link_route.h ---------------------------------------------------------------------- diff --git a/src/router_core/agent_config_link_route.h b/src/router_core/agent_config_link_route.h index 57f6f0d..465fd22 100644 --- a/src/router_core/agent_config_link_route.h +++ b/src/router_core/agent_config_link_route.h @@ -33,7 +33,7 @@ void qdra_config_link_route_get_CT(qdr_core_t *core, qdr_query_t *query, const char *qdr_config_link_route_columns[]); -#define QDR_CONFIG_LINK_ROUTE_COLUMN_COUNT 11 +#define QDR_CONFIG_LINK_ROUTE_COLUMN_COUNT 13 const char *qdr_config_link_route_columns[QDR_CONFIG_LINK_ROUTE_COLUMN_COUNT + 1]; http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/532ebae5/src/router_core/connections.c ---------------------------------------------------------------------- diff --git a/src/router_core/connections.c b/src/router_core/connections.c index 5fdc3bf..8f7c196 100644 --- a/src/router_core/connections.c +++ b/src/router_core/connections.c @@ -844,6 +844,8 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_li free(link->disambiguated_name); free(link->terminus_addr); free(link->ingress_histogram); + free(link->insert_prefix); + free(link->strip_prefix); link->name = 0; } @@ -875,6 +877,8 @@ qdr_link_t *qdr_create_link_CT(qdr_core_t *core, qdr_generate_link_name("qdlink", link->name, QDR_DISCRIMINATOR_SIZE + 8); link->admin_enabled = true; link->oper_status = QDR_LINK_OPER_DOWN; + link->insert_prefix = 0; + link->strip_prefix = 0; link->strip_annotations_in = conn->strip_annotations_in; link->strip_annotations_out = conn->strip_annotations_out; @@ -1602,6 +1606,14 @@ static void qdr_link_inbound_second_attach_CT(qdr_core_t *core, qdr_action_t *ac // Handle attach-routed links // if (link->connected_link) { + qdr_terminus_t *remote_terminus = link->link_direction == QD_OUTGOING ? target : source; + if (link->strip_prefix) { + qdr_terminus_strip_address_prefix(remote_terminus, link->strip_prefix); + } + if (link->insert_prefix) { + qdr_terminus_insert_address_prefix(remote_terminus, link->insert_prefix); + } + qdr_link_outbound_second_attach_CT(core, link->connected_link, source, target); return; } http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/532ebae5/src/router_core/forwarder.c ---------------------------------------------------------------------- diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c index 0557d01..7003788 100644 --- a/src/router_core/forwarder.c +++ b/src/router_core/forwarder.c @@ -703,12 +703,23 @@ bool qdr_forward_link_balanced_CT(qdr_core_t *core, { qdr_connection_ref_t *conn_ref = DEQ_HEAD(addr->conns); qdr_connection_t *conn = 0; + char *strip = 0; + char *insert = 0; // // Check for locally connected containers that can handle this link attach. // if (conn_ref) { conn = conn_ref->conn; + qdr_terminus_t *remote_terminus = in_link->link_direction == QD_OUTGOING ? source : target; + if (addr->del_prefix) { + insert = strdup(addr->del_prefix); + qdr_terminus_strip_address_prefix(remote_terminus, addr->del_prefix); + } + if (addr->add_prefix) { + strip = strdup(addr->add_prefix); + qdr_terminus_insert_address_prefix(remote_terminus, addr->add_prefix); + } // // If there are more than one local connections available for handling this link, @@ -767,6 +778,13 @@ bool qdr_forward_link_balanced_CT(qdr_core_t *core, out_link->admin_enabled = true; out_link->terminus_addr = 0; + if (strip) { + out_link->strip_prefix = strip; + } + if (insert) { + out_link->insert_prefix = insert; + } + out_link->oper_status = QDR_LINK_OPER_DOWN; out_link->name = strdup(in_link->disambiguated_name ? in_link->disambiguated_name : in_link->name); http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/532ebae5/src/router_core/route_control.c ---------------------------------------------------------------------- diff --git a/src/router_core/route_control.c b/src/router_core/route_control.c index 61f426d..53c0b53 100644 --- a/src/router_core/route_control.c +++ b/src/router_core/route_control.c @@ -270,6 +270,8 @@ qdr_link_route_t *qdr_route_add_link_route_CT(qdr_core_t *core, qd_iterator_t *name, qd_parsed_field_t *prefix_field, qd_parsed_field_t *pattern_field, + qd_parsed_field_t *add_prefix_field, + qd_parsed_field_t *del_prefix_field, qd_parsed_field_t *container_field, qd_parsed_field_t *connection_field, qd_address_treatment_t treatment, @@ -308,6 +310,18 @@ qdr_link_route_t *qdr_route_add_link_route_CT(qdr_core_t *core, lr->is_prefix = is_prefix; lr->pattern = pattern; + if (!!add_prefix_field) { + qd_iterator_t *ap_iter = qd_parse_raw(add_prefix_field); + int ap_len = qd_iterator_length(ap_iter); + lr->add_prefix = malloc(ap_len + 1); + qd_iterator_strncpy(ap_iter, lr->add_prefix, ap_len + 1); + } + if (!!del_prefix_field) { + qd_iterator_t *ap_iter = qd_parse_raw(del_prefix_field); + int ap_len = qd_iterator_length(ap_iter); + lr->del_prefix = malloc(ap_len + 1); + qd_iterator_strncpy(ap_iter, lr->del_prefix, ap_len + 1); + } // // Add the address to the routing hash table and map it as a pattern in the // wildcard pattern parse tree @@ -318,6 +332,14 @@ qdr_link_route_t *qdr_route_add_link_route_CT(qdr_core_t *core, qd_hash_retrieve(core->addr_hash, a_iter, (void*) &lr->addr); if (!lr->addr) { lr->addr = qdr_address_CT(core, treatment); + if (lr->add_prefix) { + lr->addr->add_prefix = (char*) malloc(strlen(lr->add_prefix) + 1); + strcpy(lr->addr->add_prefix, lr->add_prefix); + } + if (lr->del_prefix) { + lr->addr->del_prefix = (char*) malloc(strlen(lr->del_prefix) + 1); + strcpy(lr->addr->del_prefix, lr->del_prefix); + } //treatment will not be undefined for link route so above will not return null DEQ_INSERT_TAIL(core->addrs, lr->addr); qd_hash_insert(core->addr_hash, a_iter, lr->addr, &lr->addr->hash_handle); http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/532ebae5/src/router_core/route_control.h ---------------------------------------------------------------------- diff --git a/src/router_core/route_control.h b/src/router_core/route_control.h index 3c715bc..291766c 100644 --- a/src/router_core/route_control.h +++ b/src/router_core/route_control.h @@ -25,6 +25,8 @@ qdr_link_route_t *qdr_route_add_link_route_CT(qdr_core_t *core, qd_iterator_t *name, qd_parsed_field_t *prefix_field, qd_parsed_field_t *pattern_field, + qd_parsed_field_t *add_prefix_field, + qd_parsed_field_t *del_prefix_field, qd_parsed_field_t *container_field, qd_parsed_field_t *connection_field, qd_address_treatment_t treatment, http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/532ebae5/src/router_core/router_core.c ---------------------------------------------------------------------- diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c index 9d7f7a5..7b198ef 100644 --- a/src/router_core/router_core.c +++ b/src/router_core/router_core.c @@ -295,6 +295,8 @@ qdr_address_t *qdr_address_CT(qdr_core_t *core, qd_address_treatment_t treatment addr->treatment = treatment; addr->forwarder = qdr_forwarder_CT(core, treatment); addr->rnodes = qd_bitmask(0); + addr->add_prefix = 0; + addr->del_prefix = 0; return addr; } @@ -334,6 +336,8 @@ bool qdr_is_addr_treatment_multicast(qdr_address_t *addr) void qdr_core_delete_link_route(qdr_core_t *core, qdr_link_route_t *lr) { DEQ_REMOVE(core->link_routes, lr); + free(lr->add_prefix); + free(lr->del_prefix); free(lr->name); free(lr->pattern); free_qdr_link_route_t(lr); http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/532ebae5/src/router_core/router_core_private.h ---------------------------------------------------------------------- diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h index b6775c7..3f80ba4 100644 --- a/src/router_core/router_core_private.h +++ b/src/router_core/router_core_private.h @@ -406,6 +406,8 @@ struct qdr_link_t { bool strip_annotations_out; bool drain_mode; bool stalled_outbound; ///< Indicates that this link is stalled on outbound buffer backpressure + char *strip_prefix; + char *insert_prefix; uint64_t total_deliveries; uint64_t presettled_deliveries; @@ -476,6 +478,12 @@ struct qdr_address_t { // qdr_exchange_t *exchange; // weak ref + // + // State for "link balanced" treatment + // + char *add_prefix; + char *del_prefix; + /**@name Statistics */ ///@{ uint64_t deliveries_ingress; @@ -590,6 +598,8 @@ struct qdr_link_route_t { bool active; bool is_prefix; char *pattern; + char *add_prefix; + char *del_prefix; }; ALLOC_DECLARE(qdr_link_route_t); http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/532ebae5/src/router_core/terminus.c ---------------------------------------------------------------------- diff --git a/src/router_core/terminus.c b/src/router_core/terminus.c index 4c0e0a3..30623d1 100644 --- a/src/router_core/terminus.c +++ b/src/router_core/terminus.c @@ -187,6 +187,34 @@ qd_iterator_t *qdr_terminus_get_address(qdr_terminus_t *term) return term->address->iterator; } +void qdr_terminus_insert_address_prefix(qdr_terminus_t *term, const char *prefix) +{ + qd_iterator_t *orig = qdr_terminus_get_address(term); + char *rewrite_addr = 0; + + size_t prefix_len = strlen(prefix); + size_t orig_len = qd_iterator_length(orig); + rewrite_addr = malloc(prefix_len + orig_len + 1); + strcpy(rewrite_addr, prefix); + qd_iterator_strncpy(orig, rewrite_addr+prefix_len, orig_len + 1); + + qdr_terminus_set_address(term, rewrite_addr); + free(rewrite_addr); +} + +void qdr_terminus_strip_address_prefix(qdr_terminus_t *term, const char *prefix) +{ + qd_iterator_t *orig = qdr_terminus_get_address(term); + size_t prefix_len = strlen(prefix); + size_t orig_len = qd_iterator_length(orig); + if (orig_len > prefix_len && qd_iterator_prefix(orig, prefix)) { + char *rewrite_addr = malloc(orig_len + 1); + qd_iterator_strncpy(orig, rewrite_addr, orig_len + 1); + qdr_terminus_set_address(term, rewrite_addr + prefix_len); + free(rewrite_addr); + } +} + qd_iterator_t *qdr_terminus_dnp_address(qdr_terminus_t *term) { http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/532ebae5/tests/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 315693e..d043694 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -78,6 +78,7 @@ endif() foreach(py_test_module # system_tests_broker system_tests_link_routes + system_tests_link_routes_add_external_prefix system_tests_autolinks system_tests_drain system_tests_management http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/532ebae5/tests/system_tests_link_routes_add_external_prefix.py ---------------------------------------------------------------------- diff --git a/tests/system_tests_link_routes_add_external_prefix.py b/tests/system_tests_link_routes_add_external_prefix.py new file mode 100644 index 0000000..0bb6b39 --- /dev/null +++ b/tests/system_tests_link_routes_add_external_prefix.py @@ -0,0 +1,272 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import unittest2 as unittest +from time import sleep, time +from subprocess import PIPE, STDOUT + +from system_test import TestCase, Qdrouterd, main_module, TIMEOUT, Process + +from proton import Message +from proton.handlers import MessagingHandler +from proton.reactor import Container +import json + +def parse_record(fields, line): + return [line[f[0]:f[1]].strip() for f in fields] + +def parse_fields(header, items): + pos = [header.find(name) for name in header.split()] + [len(header)] + fields = zip(pos, pos[1:]) + return [parse_record(fields, item) for item in items] + +class LinkRouteTest(TestCase): + """ + Tests the addExternalPrefix property of the linkRoute entity on the dispatch router. + + Sets up 3 routers (one of which are acting as brokers (QDR.A)). The other two routers have linkRoutes + configured such that matching traffic will be directed to/from the 'fake' broker. + + QDR.A acting broker #1 + +---------+ +---------+ +---------+ + | | <------ | | <----- | | + | QDR.A | | QDR.B | | QDR.C | + | | ------> | | ------> | | + +---------+ +---------+ +---------+ + + """ + @classmethod + def get_router(cls, index): + return cls.routers[index] + + @classmethod + def setUpClass(cls): + """Start three routers""" + super(LinkRouteTest, cls).setUpClass() + + def router(name, connection): + + config = [ + ('router', {'mode': 'interior', 'id': 'QDR.%s'%name}), + ] + connection + + config = Qdrouterd.Config(config) + cls.routers.append(cls.tester.qdrouterd(name, config, wait=False)) + + cls.routers = [] + a_listener_port = cls.tester.get_port() + b_listener_port = cls.tester.get_port() + c_listener_port = cls.tester.get_port() + + router('A', + [ + ('listener', {'role': 'normal', 'host': '0.0.0.0', 'port': a_listener_port, 'saslMechanisms': 'ANONYMOUS'}), + ]) + router('B', + [ + ('listener', {'role': 'normal', 'host': '0.0.0.0', 'port': b_listener_port, 'saslMechanisms': 'ANONYMOUS'}), + ('connector', {'name': 'broker', 'role': 'route-container', 'host': '0.0.0.0', 'port': a_listener_port, 'saslMechanisms': 'ANONYMOUS'}), + ('connector', {'name': 'routerC', 'role': 'inter-router', 'host': '0.0.0.0', 'port': c_listener_port}), + + ('linkRoute', {'prefix': 'foo', 'containerId': 'QDR.A', 'direction': 'in', 'addExternalPrefix':'bar.'}), + ('linkRoute', {'prefix': 'foo', 'containerId': 'QDR.A', 'direction': 'out', 'addExternalPrefix':'bar.'}), + + ('linkRoute', {'prefix': 'qdr-a', 'containerId': 'QDR.A', 'direction': 'in', 'delExternalPrefix':'qdr-a.'}), + ('linkRoute', {'prefix': 'qdr-a', 'containerId': 'QDR.A', 'direction': 'out', 'delExternalPrefix':'qdr-a.'}) + + ] + ) + router('C', + [ + ('listener', {'host': '0.0.0.0', 'role': 'normal', 'port': cls.tester.get_port(), 'saslMechanisms': 'ANONYMOUS'}), + ('listener', {'host': '0.0.0.0', 'role': 'inter-router', 'port': c_listener_port, 'saslMechanisms': 'ANONYMOUS'}), + + ('linkRoute', {'prefix': 'foo', 'direction': 'in', 'addExternalPrefix':'bar.'}), + ('linkRoute', {'prefix': 'foo', 'direction': 'out', 'addExternalPrefix':'bar.'}), + + ('linkRoute', {'prefix': 'qdr-a', 'direction': 'in', 'delExternalPrefix':'qdr-a.'}), + ('linkRoute', {'prefix': 'qdr-a', 'direction': 'out', 'delExternalPrefix':'qdr-a.'}) + ] + ) + + # Wait for the routers to locate each other, and for route propagation + # to settle + cls.routers[1].wait_router_connected('QDR.C') + cls.routers[2].wait_router_connected('QDR.B') + cls.routers[2].wait_address("foo", remotes=1, delay=0.5) + + # This is not a classic router network in the sense that QDR.A is acting as a broker. We allow a little + # bit more time for the routers to stabilize. + sleep(2) + + def run_qdstat_linkRoute(self, address, args=None): + cmd = ['qdstat', '--bus', str(address), '--timeout', str(TIMEOUT) ] + ['--linkroute'] + if args: + cmd = cmd + args + p = self.popen( + cmd, + name='qdstat-'+self.id(), stdout=PIPE, expect=None, + universal_newlines=True) + + out = p.communicate()[0] + assert p.returncode == 0, "qdstat exit status %s, output:\n%s" % (p.returncode, out) + return out + + def run_qdmanage(self, cmd, input=None, expect=Process.EXIT_OK, address=None): + p = self.popen( + ['qdmanage'] + cmd.split(' ') + ['--bus', address or self.address(), '--indent=-1', '--timeout', str(TIMEOUT)], + stdin=PIPE, stdout=PIPE, stderr=STDOUT, expect=expect, + universal_newlines=True) + out = p.communicate(input)[0] + try: + p.teardown() + except Exception as e: + raise Exception("%s\n%s" % (e, out)) + return out + + def test_qdstat_link_routes_on_B(self): + output = self.run_qdstat_linkRoute(self.routers[1].addresses[0]) + lines = output.split("\n") + self.assertEqual(len(lines), 8) # 4 links, 3 lines of header and an empty line at the end + header = lines[1] + columns = header.split() + self.assertEqual(len(columns), 6) + self.assertEqual(columns[4], "add-ext-prefix") + self.assertEqual(columns[5], "del-ext-prefix") + linkroutes = parse_fields(header, lines[3:7]) + self.assertEqual(linkroutes[0][0], "foo") + self.assertEqual(linkroutes[0][1], "in") + self.assertEqual(linkroutes[0][4], "bar.") + self.assertEqual(linkroutes[1][0], "foo") + self.assertEqual(linkroutes[1][1], "out") + self.assertEqual(linkroutes[1][4], "bar.") + self.assertEqual(linkroutes[2][0], "qdr-a") + self.assertEqual(linkroutes[2][1], "in") + self.assertEqual(linkroutes[2][5], "qdr-a.") + self.assertEqual(linkroutes[3][0], "qdr-a") + self.assertEqual(linkroutes[3][1], "out") + self.assertEqual(linkroutes[3][5], "qdr-a.") + + def test_qdstat_link_routes_on_C(self): + output = self.run_qdmanage('QUERY --type=org.apache.qpid.dispatch.router.config.linkRoute', address=self.routers[2].addresses[0]) + objects = json.loads(output) + self.assertEqual(len(objects), 4) + index = {} + for o in objects: + index["%s-%s" % (o["prefix"], o["direction"])] = o + self.assertEqual(index["foo-in"]["addExternalPrefix"], "bar.") + self.assertEqual(index["foo-out"]["addExternalPrefix"], "bar.") + self.assertEqual(index["qdr-a-in"]["delExternalPrefix"], "qdr-a.") + self.assertEqual(index["qdr-a-out"]["delExternalPrefix"], "qdr-a.") + + def test_route_sender_add_prefix_on_B(self): + test = SendReceive("%s/foo" % self.routers[1].addresses[0], "%s/bar.foo" % self.routers[0].addresses[0]) + test.run() + self.assertEqual(None, test.error) + + def test_route_receiver_add_prefix_on_B(self): + test = SendReceive("%s/bar.foo" % self.routers[0].addresses[0], "%s/foo" % self.routers[1].addresses[0]) + test.run() + self.assertEqual(None, test.error) + + def test_route_sender_add_prefix_on_C(self): + test = SendReceive("%s/foo" % self.routers[2].addresses[0], "%s/bar.foo" % self.routers[0].addresses[0]) + test.run() + self.assertEqual(None, test.error) + + def test_route_receiver_add_prefix_on_C(self): + test = SendReceive("%s/bar.foo" % self.routers[0].addresses[0], "%s/foo" % self.routers[2].addresses[0]) + test.run() + self.assertEqual(None, test.error) + + def test_route_sender_del_prefix_on_B(self): + test = SendReceive("%s/qdr-a.baz" % self.routers[1].addresses[0], "%s/baz" % self.routers[0].addresses[0]) + test.run() + self.assertEqual(None, test.error) + + def test_route_receiver_del_prefix_on_B(self): + test = SendReceive("%s/baz" % self.routers[0].addresses[0], "%s/qdr-a.baz" % self.routers[1].addresses[0]) + test.run() + self.assertEqual(None, test.error) + + def test_route_sender_del_prefix_on_C(self): + test = SendReceive("%s/qdr-a.baz" % self.routers[2].addresses[0], "%s/baz" % self.routers[0].addresses[0]) + test.run() + self.assertEqual(None, test.error) + + def test_route_receiver_del_prefix_on_C(self): + test = SendReceive("%s/baz" % self.routers[0].addresses[0], "%s/qdr-a.baz" % self.routers[2].addresses[0]) + test.run() + self.assertEqual(None, test.error) + +class Timeout(object): + def __init__(self, parent): + self.parent = parent + + def on_timer_task(self, event): + self.parent.timeout() + + +class SendReceive(MessagingHandler): + def __init__(self, send_url, recv_url, message=None): + super(SendReceive, self).__init__() + self.send_url = send_url + self.recv_url = recv_url + self.message = message or Message(body="SendReceiveTest") + self.sent = False + self.error = None + + def close(self): + self.sender.close() + self.receiver.close() + self.sender.connection.close() + self.receiver.connection.close() + + def timeout(self): + self.error = "Timeout Expired - Check for cores" + self.close() + + def stop(self): + self.close() + self.timer.cancel() + + def on_start(self, event): + self.timer = event.reactor.schedule(TIMEOUT, Timeout(self)) + event.container.container_id = "SendReceiveTestClient" + self.sender = event.container.create_sender(self.send_url) + self.receiver = event.container.create_receiver(self.recv_url) + + def on_sendable(self, event): + if not self.sent: + event.sender.send(self.message) + self.sent = True + + def on_message(self, event): + if self.message.body != event.message.body: + self.error = "Incorrect message. Got %s, expected %s" % (event.message.body, self.message.body) + + def on_accepted(self, event): + self.stop() + + def run(self): + Container(self).run() + + +if __name__ == '__main__': + unittest.main(main_module()) http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/532ebae5/tools/qdstat ---------------------------------------------------------------------- diff --git a/tools/qdstat b/tools/qdstat index 1e3a3a3..21ef985 100755 --- a/tools/qdstat +++ b/tools/qdstat @@ -476,18 +476,29 @@ class BusManager(Node): heads.append(Header("distrib")) heads.append(Header("status")) rows = [] - cols = ('prefix', 'direction', 'distribution', 'operStatus', 'pattern') + cols = ('prefix', 'direction', 'distribution', 'operStatus', 'pattern', 'addExternalPrefix', 'delExternalPrefix') link_routes = self.query('org.apache.qpid.dispatch.router.config.linkRoute', cols, limit=self.opts.limit) + have_add_del_prefix = False for link_route in link_routes: row = [] row.append(link_route.prefix if link_route.prefix else link_route.pattern) row.append(link_route.direction) row.append(link_route.distribution) row.append(link_route.operStatus) + try: + if link_route.addExternalPrefix or link_route.delExternalPrefix: + row.append(link_route.addExternalPrefix) + row.append(link_route.delExternalPrefix) + have_add_del_prefix = True + except KeyError: + pass # added post 1.1.0 rows.append(row) title = "Link Routes" + if have_add_del_prefix: + heads.append(Header("add-ext-prefix")) + heads.append(Header("del-ext-prefix")) sorter = Sorter(heads, rows, 'address', 0, True) dispRows = sorter.getSorted() disp.formattedTable(title, heads, dispRows) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org