This is an automated email from the ASF dual-hosted git repository. tross pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/master by this push: new b8613b8 DISPATCH-1224 - Completed endpoint-initiated waypoints. b8613b8 is described below commit b8613b8a64353ee021e218c8c487a9871517966c Author: Ted Ross <tr...@redhat.com> AuthorDate: Fri Dec 14 17:38:08 2018 -0500 DISPATCH-1224 - Completed endpoint-initiated waypoints. DISPATCH-1224 - Added a waypoint and multi-phase test. DISPATCH-1224 - Replaced use of sprintf with explicit characters. --- include/qpid/dispatch/amqp.h | 9 + include/qpid/dispatch/router_core.h | 11 + python/qpid_dispatch/management/qdrouter.json | 7 + .../qpid_dispatch_internal/policy/policy_local.py | 6 +- src/amqp.c | 19 +- src/policy.c | 41 ++ src/policy.h | 1 + src/router_core/connections.c | 2 +- .../modules/address_lookup_client/lookup_client.c | 7 +- src/router_core/modules/edge_router/addr_proxy.c | 53 ++- src/router_core/route_control.c | 2 + src/router_core/router_core.c | 4 +- src/router_core/router_core_private.h | 1 + src/router_core/terminus.c | 20 + tests/CMakeLists.txt | 1 + tests/system_tests_multi_phase.py | 411 +++++++++++++++++++++ 16 files changed, 574 insertions(+), 21 deletions(-) diff --git a/include/qpid/dispatch/amqp.h b/include/qpid/dispatch/amqp.h index 35f7c67..9f6be37 100644 --- a/include/qpid/dispatch/amqp.h +++ b/include/qpid/dispatch/amqp.h @@ -127,7 +127,16 @@ extern const char * const QD_CAPABILITY_ANONYMOUS_RELAY; extern const char * const QD_CAPABILITY_ROUTER_CONTROL; extern const char * const QD_CAPABILITY_ROUTER_DATA; extern const char * const QD_CAPABILITY_EDGE_DOWNLINK; +extern const char * const QD_CAPABILITY_WAYPOINT_DEFAULT; extern const char * const QD_CAPABILITY_WAYPOINT1; +extern const char * const QD_CAPABILITY_WAYPOINT2; +extern const char * const QD_CAPABILITY_WAYPOINT3; +extern const char * const QD_CAPABILITY_WAYPOINT4; +extern const char * const QD_CAPABILITY_WAYPOINT5; +extern const char * const QD_CAPABILITY_WAYPOINT6; +extern const char * const QD_CAPABILITY_WAYPOINT7; +extern const char * const QD_CAPABILITY_WAYPOINT8; +extern const char * const QD_CAPABILITY_WAYPOINT9; /// @} /** @name Dynamic Node Properties */ diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h index a03db27..136ad6a 100644 --- a/include/qpid/dispatch/router_core.h +++ b/include/qpid/dispatch/router_core.h @@ -316,6 +316,17 @@ void qdr_terminus_add_capability(qdr_terminus_t *term, const char *capability); bool qdr_terminus_has_capability(qdr_terminus_t *term, const char *capability); /** + * qdr_terminus_waypoint_capability + * + * If the terminus has a waypoint capability, return the ordinal of the + * waypoint. If not, return zero. + * + * @param term A qdr_terminus pointer returned by qdr_terminus() + * @return 1..9 if the terminus has waypoint capability, 0 otherwise + */ +int qdr_terminus_waypoint_capability(qdr_terminus_t *term); + +/** * qdr_terminus_is_anonymous * * Indicate whether this terminus represents an anonymous endpoint. diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json index 5408c9b..180a0e0 100644 --- a/python/qpid_dispatch/management/qdrouter.json +++ b/python/qpid_dispatch/management/qdrouter.json @@ -1868,6 +1868,13 @@ "required": false, "create": true }, + "allowWaypointLinks": { + "type": "boolean", + "description": "Whether this connection is allowed to claim 'waypoint.N' capability for attached links. This allows endpoints to act as waypoints without needing auto-links.", + "default": true, + "required": false, + "create": true + }, "sources": { "type": "string", "description": "A list of source addresses from which users in this group may receive messages. To specify multiple addresses, separate the addresses with either a comma or a space. If you do not specify any addresses, users in this group are not allowed to receive messages from any addresses. You can use the substitution token '${user}' to specify an address that contains a user's authenticated user name. You can use an asterisk ('*') wildcard to match one or more ch [...] diff --git a/python/qpid_dispatch_internal/policy/policy_local.py b/python/qpid_dispatch_internal/policy/policy_local.py index 9a5a1d6..2f3cfb4 100644 --- a/python/qpid_dispatch_internal/policy/policy_local.py +++ b/python/qpid_dispatch_internal/policy/policy_local.py @@ -70,6 +70,7 @@ class PolicyKeys(object): KW_ALLOW_DYNAMIC_SRC = "allowDynamicSource" KW_ALLOW_ANONYMOUS_SENDER = "allowAnonymousSender" KW_ALLOW_USERID_PROXY = "allowUserIdProxy" + KW_ALLOW_WAYPOINT_LINKS = "allowWaypointLinks" KW_SOURCES = "sources" KW_TARGETS = "targets" KW_SOURCE_PATTERN = "sourcePattern" @@ -143,6 +144,7 @@ class PolicyCompiler(object): PolicyKeys.KW_ALLOW_DYNAMIC_SRC, PolicyKeys.KW_ALLOW_ANONYMOUS_SENDER, PolicyKeys.KW_ALLOW_USERID_PROXY, + PolicyKeys.KW_ALLOW_WAYPOINT_LINKS, PolicyKeys.KW_SOURCES, PolicyKeys.KW_TARGETS, PolicyKeys.KW_SOURCE_PATTERN, @@ -243,6 +245,7 @@ class PolicyCompiler(object): policy_out[PolicyKeys.KW_ALLOW_DYNAMIC_SRC] = False policy_out[PolicyKeys.KW_ALLOW_ANONYMOUS_SENDER] = False policy_out[PolicyKeys.KW_ALLOW_USERID_PROXY] = False + policy_out[PolicyKeys.KW_ALLOW_WAYPOINT_LINKS] = True policy_out[PolicyKeys.KW_SOURCES] = '' policy_out[PolicyKeys.KW_TARGETS] = '' policy_out[PolicyKeys.KW_SOURCE_PATTERN] = '' @@ -278,7 +281,8 @@ class PolicyCompiler(object): policy_out[key] = val_out elif key in [PolicyKeys.KW_ALLOW_ANONYMOUS_SENDER, PolicyKeys.KW_ALLOW_DYNAMIC_SRC, - PolicyKeys.KW_ALLOW_USERID_PROXY + PolicyKeys.KW_ALLOW_USERID_PROXY, + PolicyKeys.KW_ALLOW_WAYPOINT_LINKS ]: if isinstance(val, (PY_STRING_TYPE, PY_TEXT_TYPE)) and val.lower() in ['true', 'false']: val = True if val == 'true' else False diff --git a/src/amqp.c b/src/amqp.c index c1a0f11..9164d60 100644 --- a/src/amqp.c +++ b/src/amqp.c @@ -32,11 +32,20 @@ const int QD_MA_MAX_KEY_LEN = 16; const int QD_MA_N_KEYS = 4; // max number of router annotations to send/receive const int QD_MA_FILTER_LEN = 5; // N tailing inbound entries to search for stripping -const char * const QD_CAPABILITY_ROUTER_CONTROL = "qd.router"; -const char * const QD_CAPABILITY_ROUTER_DATA = "qd.router-data"; -const char * const QD_CAPABILITY_EDGE_DOWNLINK = "qd.router-edge-downlink"; -const char * const QD_CAPABILITY_WAYPOINT1 = "qd.waypoint.1"; -const char * const QD_CAPABILITY_ANONYMOUS_RELAY = "ANONYMOUS-RELAY"; +const char * const QD_CAPABILITY_ROUTER_CONTROL = "qd.router"; +const char * const QD_CAPABILITY_ROUTER_DATA = "qd.router-data"; +const char * const QD_CAPABILITY_EDGE_DOWNLINK = "qd.router-edge-downlink"; +const char * const QD_CAPABILITY_WAYPOINT_DEFAULT = "qd.waypoint"; +const char * const QD_CAPABILITY_WAYPOINT1 = "qd.waypoint.1"; +const char * const QD_CAPABILITY_WAYPOINT2 = "qd.waypoint.2"; +const char * const QD_CAPABILITY_WAYPOINT3 = "qd.waypoint.3"; +const char * const QD_CAPABILITY_WAYPOINT4 = "qd.waypoint.4"; +const char * const QD_CAPABILITY_WAYPOINT5 = "qd.waypoint.5"; +const char * const QD_CAPABILITY_WAYPOINT6 = "qd.waypoint.6"; +const char * const QD_CAPABILITY_WAYPOINT7 = "qd.waypoint.7"; +const char * const QD_CAPABILITY_WAYPOINT8 = "qd.waypoint.8"; +const char * const QD_CAPABILITY_WAYPOINT9 = "qd.waypoint.9"; +const char * const QD_CAPABILITY_ANONYMOUS_RELAY = "ANONYMOUS-RELAY"; const char * const QD_DYNAMIC_NODE_PROPERTY_ADDRESS = "x-opt-qd.address"; diff --git a/src/policy.c b/src/policy.c index 636292e..910a93b 100644 --- a/src/policy.c +++ b/src/policy.c @@ -437,6 +437,7 @@ bool qd_policy_open_lookup_user( settings->allowDynamicSource = qd_entity_opt_bool((qd_entity_t*)upolicy, "allowDynamicSource", false); } settings->allowUserIdProxy = qd_entity_opt_bool((qd_entity_t*)upolicy, "allowUserIdProxy", false); + settings->allowWaypointLinks = qd_entity_opt_bool((qd_entity_t*)upolicy, "allowWaypointLinks", true); if (settings->sources == 0) { //don't override if configured by authz plugin settings->sources = qd_entity_get_string((qd_entity_t*)upolicy, "sources"); } @@ -873,6 +874,24 @@ bool _qd_policy_approve_link_name_tree(const char *username, const char *allowed } +static bool qd_policy_terminus_is_waypoint(pn_terminus_t *term) +{ + pn_data_t *cap = pn_terminus_capabilities(term); + if (cap) { + pn_data_rewind(cap); + pn_data_next(cap); + if (cap && pn_data_type(cap) == PN_SYMBOL) { + pn_bytes_t sym = pn_data_get_symbol(cap); + size_t len = strlen(QD_CAPABILITY_WAYPOINT_DEFAULT); + if (sym.size >= len && strncmp(sym.start, QD_CAPABILITY_WAYPOINT_DEFAULT, len) == 0) + return true; + } + } + + return false; +} + + bool qd_policy_approve_amqp_sender_link(pn_link_t *pn_link, qd_connection_t *qd_conn) { const char *hostip = qd_connection_remote_ip(qd_conn); @@ -897,6 +916,17 @@ bool qd_policy_approve_amqp_sender_link(pn_link_t *pn_link, qd_connection_t *qd_ bool lookup; if (target && *target) { // a target is specified + if (!qd_conn->policy_settings->allowWaypointLinks) { + bool waypoint = qd_policy_terminus_is_waypoint(pn_link_remote_target(pn_link)); + if (waypoint) { + qd_log(qd_server_dispatch(qd_conn->server)->policy->log_source, QD_LOG_INFO, + "[%"PRIu64"]: DENY AMQP Attach sender link '%s' for user '%s', rhost '%s', vhost '%s'. Waypoint capability not permitted", + qd_conn->connection_id, target, qd_conn->user_id, hostip, vhost); + _qd_policy_deny_amqp_sender_link(pn_link, qd_conn, QD_AMQP_COND_UNAUTHORIZED_ACCESS); + return false; + } + } + lookup = qd_policy_approve_link_name(qd_conn->user_id, qd_conn->policy_settings, target, false); qd_log(qd_server_dispatch(qd_conn->server)->policy->log_source, (lookup ? QD_LOG_TRACE : QD_LOG_INFO), @@ -959,6 +989,17 @@ bool qd_policy_approve_amqp_receiver_link(pn_link_t *pn_link, qd_connection_t *q const char * source = pn_terminus_get_address(pn_link_remote_source(pn_link)); if (source && *source) { // a source is specified + if (!qd_conn->policy_settings->allowWaypointLinks) { + bool waypoint = qd_policy_terminus_is_waypoint(pn_link_remote_source(pn_link)); + if (waypoint) { + qd_log(qd_server_dispatch(qd_conn->server)->policy->log_source, QD_LOG_INFO, + "[%"PRIu64"]: DENY AMQP Attach receiver link '%s' for user '%s', rhost '%s', vhost '%s'. Waypoint capability not permitted", + qd_conn->connection_id, source, qd_conn->user_id, hostip, vhost); + _qd_policy_deny_amqp_sender_link(pn_link, qd_conn, QD_AMQP_COND_UNAUTHORIZED_ACCESS); + return false; + } + } + bool lookup = qd_policy_approve_link_name(qd_conn->user_id, qd_conn->policy_settings, source, true); qd_log(qd_server_dispatch(qd_conn->server)->policy->log_source, (lookup ? QD_LOG_TRACE : QD_LOG_INFO), diff --git a/src/policy.h b/src/policy.h index a1d415a..5f7af74 100644 --- a/src/policy.h +++ b/src/policy.h @@ -51,6 +51,7 @@ struct qd_policy__settings_s { bool allowDynamicSource; bool allowAnonymousSender; bool allowUserIdProxy; + bool allowWaypointLinks; char *sources; char *targets; char *sourcePattern; diff --git a/src/router_core/connections.c b/src/router_core/connections.c index a810a1e..d6ced98 100644 --- a/src/router_core/connections.c +++ b/src/router_core/connections.c @@ -387,7 +387,7 @@ qd_direction_t qdr_link_direction(const qdr_link_t *link) int qdr_link_phase(const qdr_link_t *link) { - return link && link->auto_link ? link->auto_link->phase : 0; + return link ? link->phase : 0; } diff --git a/src/router_core/modules/address_lookup_client/lookup_client.c b/src/router_core/modules/address_lookup_client/lookup_client.c index b457304..039387b 100644 --- a/src/router_core/modules/address_lookup_client/lookup_client.c +++ b/src/router_core/modules/address_lookup_client/lookup_client.c @@ -252,9 +252,10 @@ static qdr_address_t *qdr_lookup_terminus_address_CT(qdr_core_t *core, // // If the terminus has a waypoint capability, override the configured phases and use the waypoint phases. // - if (qdr_terminus_has_capability(terminus, QD_CAPABILITY_WAYPOINT1)) { - in_phase = 1; - out_phase = 0; + int waypoint_ordinal = qdr_terminus_waypoint_capability(terminus); + if (waypoint_ordinal > 0) { + in_phase = waypoint_ordinal; + out_phase = waypoint_ordinal - 1; } qd_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH); diff --git a/src/router_core/modules/edge_router/addr_proxy.c b/src/router_core/modules/edge_router/addr_proxy.c index a6c409e..c73dfc0 100644 --- a/src/router_core/modules/edge_router/addr_proxy.c +++ b/src/router_core/modules/edge_router/addr_proxy.c @@ -89,6 +89,41 @@ static qdr_terminus_t *qdr_terminus_normal(const char *addr) } +static void set_waypoint_capability(qdr_terminus_t *term, char phase_char, qd_direction_t dir, int in_phase, int out_phase) +{ + int phase = (int) (phase_char - '0'); + + // + // For links that are outgoing on the in_phase or incoming on the out_phase, don't set the + // waypoint capability. These links will behave like normal client links. + // + if ((dir == QD_OUTGOING && phase == in_phase) || + (dir == QD_INCOMING && phase == out_phase)) + return; + + // + // If the phase is outside the range of in_phase..out_phase, don't do anything. This is a + // misconfiguration. + // + if (phase < in_phase || phase > out_phase) + return; + + // + // In all remaining cases, the new links are acting as waypoints. + // + int ordinal = phase + (dir == QD_OUTGOING ? 0 : 1); + char cap[16]; + char suffix[3]; + + strcpy(cap, QD_CAPABILITY_WAYPOINT_DEFAULT); + suffix[0] = '.'; + suffix[1] = '0' + ordinal; + suffix[2] = '\0'; + strcat(cap, suffix); + qdr_terminus_add_capability(term, cap); +} + + static void add_inlink(qcm_edge_addr_proxy_t *ap, const char *key, qdr_address_t *addr) { if (addr->edge_inlink == 0) { @@ -96,13 +131,12 @@ static void add_inlink(qcm_edge_addr_proxy_t *ap, const char *key, qdr_address_t if (addr->config && addr->config->out_phase > 0) { // - // If this address is configured as multi-phase, check to see if it is - // an inlink on phase-0. If so, tell the Interior peer that this is - // for a waypoint. + // If this address is configured as multi-phase, we may need to + // add waypoint capabilities to the terminus. // const char *key = (char*) qd_hash_key_by_handle(addr->hash_handle); - if (key[0] == QD_ITER_HASH_PREFIX_MOBILE && key[1] == '0') - qdr_terminus_add_capability(term, QD_CAPABILITY_WAYPOINT1); + if (key[0] == QD_ITER_HASH_PREFIX_MOBILE) + set_waypoint_capability(term, key[1], QD_INCOMING, addr->config->in_phase, addr->config->out_phase); } qdr_link_t *link = qdr_create_link_CT(ap->core, ap->edge_conn, QD_LINK_ENDPOINT, QD_INCOMING, @@ -136,13 +170,12 @@ static void add_outlink(qcm_edge_addr_proxy_t *ap, const char *key, qdr_address_ if (addr->config && addr->config->out_phase > 0) { // - // If this address is configured as multi-phase, check to see if it is - // an outlink on phase-1. If so, tell the Interior peer that this is - // for a waypoint. + // If this address is configured as multi-phase, we may need to + // add waypoint capabilities to the terminus. // const char *key = (char*) qd_hash_key_by_handle(addr->hash_handle); - if (key[0] == QD_ITER_HASH_PREFIX_MOBILE && key[1] == '1') - qdr_terminus_add_capability(term, QD_CAPABILITY_WAYPOINT1); + if (key[0] == QD_ITER_HASH_PREFIX_MOBILE) + set_waypoint_capability(term, key[1], QD_OUTGOING, addr->config->in_phase, addr->config->out_phase); } qdr_link_t *link = qdr_create_link_CT(ap->core, ap->edge_conn, QD_LINK_ENDPOINT, QD_OUTGOING, diff --git a/src/router_core/route_control.c b/src/router_core/route_control.c index b843436..30cfb44 100644 --- a/src/router_core/route_control.c +++ b/src/router_core/route_control.c @@ -255,6 +255,7 @@ static void qdr_auto_link_activate_CT(qdr_core_t *core, qdr_auto_link_t *al, qdr qdr_terminus_set_address(term, &key[2]); // truncate the "Mp" annotation (where p = phase) al->link = qdr_create_link_CT(core, conn, QD_LINK_ENDPOINT, al->dir, source, target); al->link->auto_link = al; + al->link->phase = al->phase; al->state = QDR_AUTO_LINK_STATE_ATTACHING; } else { @@ -288,6 +289,7 @@ static void qdr_auto_link_deactivate_CT(qdr_core_t *core, qdr_auto_link_t *al, q if (al->link) { qdr_link_outbound_detach_CT(core, al->link, 0, QDR_CONDITION_NONE, true); al->link->auto_link = 0; + al->link->phase = 0; al->link = 0; } diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c index 3f21290..28649ad 100644 --- a/src/router_core/router_core.c +++ b/src/router_core/router_core.c @@ -453,12 +453,14 @@ void qdr_core_remove_address(qdr_core_t *core, qdr_address_t *addr) void qdr_core_bind_address_link_CT(qdr_core_t *core, qdr_address_t *addr, qdr_link_t *link) { + const char *key = (const char*) qd_hash_key_by_handle(addr->hash_handle); link->owning_addr = addr; + if (key && (*key == QD_ITER_HASH_PREFIX_MOBILE)) + link->phase = (int) (key[1] - '0'); if (link->link_direction == QD_OUTGOING) { qdr_add_link_ref(&addr->rlinks, link, QDR_LINK_LIST_CLASS_ADDRESS); if (DEQ_SIZE(addr->rlinks) == 1) { - const char *key = (const char*) qd_hash_key_by_handle(addr->hash_handle); if (key && (*key == QD_ITER_HASH_PREFIX_EDGE_SUMMARY || *key == QD_ITER_HASH_PREFIX_MOBILE)) qdr_post_mobile_added_CT(core, key, addr->treatment); qdr_addr_start_inlinks_CT(core, addr); diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h index 289614a..a2ebc8e 100644 --- a/src/router_core/router_core_private.h +++ b/src/router_core/router_core_private.h @@ -422,6 +422,7 @@ struct qdr_link_t { int attach_count; ///< 1 or 2 depending on the state of the lifecycle int detach_count; ///< 0, 1, or 2 depending on the state of the lifecycle qdr_address_t *owning_addr; ///< [ref] Address record that owns this link + int phase; qdr_link_t *connected_link; ///< [ref] If this is a link-route, reference the connected link qdrc_endpoint_t *core_endpoint; ///< [ref] Set if this link terminates on an in-core endpoint qdr_link_ref_t *ref[QDR_LINK_LIST_CLASSES]; ///< Pointers to containing reference objects diff --git a/src/router_core/terminus.c b/src/router_core/terminus.c index 1486148..58ec9e5 100644 --- a/src/router_core/terminus.c +++ b/src/router_core/terminus.c @@ -128,6 +128,26 @@ bool qdr_terminus_has_capability(qdr_terminus_t *term, const char *capability) } +int qdr_terminus_waypoint_capability(qdr_terminus_t *term) +{ + pn_data_t *cap = term->capabilities; + pn_data_rewind(cap); + pn_data_next(cap); + if (cap && pn_data_type(cap) == PN_SYMBOL) { + pn_bytes_t sym = pn_data_get_symbol(cap); + size_t len = strlen(QD_CAPABILITY_WAYPOINT_DEFAULT); + if (sym.size >= len && strncmp(sym.start, QD_CAPABILITY_WAYPOINT_DEFAULT, len) == 0) { + if (sym.size == len) + return 1; // This is the default ordinal + if (sym.size == len + 2 && sym.start[len + 1] > '0' && sym.start[len + 1] <= '9') + return (int) (sym.start[len + 1] - '0'); + } + } + + return 0; +} + + bool qdr_terminus_is_anonymous(qdr_terminus_t *term) { return term == 0 || (term->address == 0 && !term->dynamic); diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 9c68325..6eb0547 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -126,6 +126,7 @@ foreach(py_test_module system_tests_priority system_tests_core_client system_tests_address_lookup + system_tests_multi_phase ) add_test(${py_test_module} ${TEST_WRAP} -x unit2 -v ${py_test_module}) diff --git a/tests/system_tests_multi_phase.py b/tests/system_tests_multi_phase.py new file mode 100644 index 0000000..27043d0 --- /dev/null +++ b/tests/system_tests_multi_phase.py @@ -0,0 +1,411 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from __future__ import unicode_literals +from __future__ import division +from __future__ import absolute_import +from __future__ import print_function + +from time import sleep +from threading import Event +from threading import Timer + +import unittest2 as unittest +from proton import Message, Timeout, symbol +from system_test import TestCase, Qdrouterd, main_module, TIMEOUT, MgmtMsgProxy +from system_test import AsyncTestReceiver +from system_test import AsyncTestSender +from system_test import QdManager +from system_tests_link_routes import ConnLinkRouteService +from proton.handlers import MessagingHandler +from proton.reactor import Container, DynamicNodeProperties +from proton.utils import BlockingConnection +from qpid_dispatch.management.client import Node +from subprocess import PIPE, STDOUT +import re + + +class AddrTimer(object): + def __init__(self, parent): + self.parent = parent + + def on_timer_task(self, event): + self.parent.check_address() + + +class RouterTest(TestCase): + + inter_router_port = None + + @classmethod + def setUpClass(cls): + """Start a router""" + super(RouterTest, cls).setUpClass() + + def router(name, mode, connection, extra=None): + config = [ + ('router', {'mode': mode, 'id': name}), + ('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'no'}), + ('address', {'prefix': 'queue', 'waypoint': 'yes'}), + ('address', {'prefix': 'multi', 'ingressPhase': '0', 'egressPhase': '9'}), + connection + ] + + if extra: + config.append(extra) + config = Qdrouterd.Config(config) + cls.routers.append(cls.tester.qdrouterd(name, config, wait=True)) + + cls.routers = [] + + inter_router_port = cls.tester.get_port() + edge_port_A = cls.tester.get_port() + edge_port_B = cls.tester.get_port() + + router('INT.A', 'interior', ('listener', {'role': 'inter-router', 'port': inter_router_port}), + ('listener', {'role': 'edge', 'port': edge_port_A})) + router('INT.B', 'interior', ('connector', {'name': 'connectorToA', 'role': 'inter-router', 'port': inter_router_port}), + ('listener', {'role': 'edge', 'port': edge_port_B})) + router('EA1', 'edge', ('connector', {'name': 'edge', 'role': 'edge', 'port': edge_port_A})) + router('EA2', 'edge', ('connector', {'name': 'edge', 'role': 'edge', 'port': edge_port_A})) + router('EB1', 'edge', ('connector', {'name': 'edge', 'role': 'edge', 'port': edge_port_B})) + router('EB2', 'edge', ('connector', {'name': 'edge', 'role': 'edge', 'port': edge_port_B})) + + cls.routers[0].wait_router_connected('INT.B') + cls.routers[1].wait_router_connected('INT.A') + + + def test_01_waypoint_same_interior(self): + test = WaypointTest(self.routers[0].addresses[0], + self.routers[0].addresses[0], + self.routers[0].addresses[0], + 'queue.01') + test.run() + self.assertEqual(None, test.error) + + def test_02_waypoint_same_edge(self): + test = WaypointTest(self.routers[2].addresses[0], + self.routers[2].addresses[0], + self.routers[2].addresses[0], + 'queue.02') + test.run() + self.assertEqual(None, test.error) + + def test_03_waypoint_edge_interior(self): + test = WaypointTest(self.routers[2].addresses[0], + self.routers[2].addresses[0], + self.routers[0].addresses[0], + 'queue.03') + test.run() + self.assertEqual(None, test.error) + + def test_04_waypoint_interior_edge(self): + test = WaypointTest(self.routers[0].addresses[0], + self.routers[0].addresses[0], + self.routers[2].addresses[0], + 'queue.04') + test.run() + self.assertEqual(None, test.error) + + def test_05_waypoint_interior_interior(self): + test = WaypointTest(self.routers[0].addresses[0], + self.routers[0].addresses[0], + self.routers[1].addresses[0], + 'queue.05') + test.run() + self.assertEqual(None, test.error) + + def test_06_waypoint_edge_edge(self): + test = WaypointTest(self.routers[2].addresses[0], + self.routers[5].addresses[0], + self.routers[0].addresses[0], + 'queue.06') + test.run() + self.assertEqual(None, test.error) + + def test_07_waypoint_edge_endpoints_int_1(self): + test = WaypointTest(self.routers[0].addresses[0], + self.routers[1].addresses[0], + self.routers[2].addresses[0], + 'queue.07') + test.run() + self.assertEqual(None, test.error) + + def test_08_waypoint_edge_endpoints_int_2(self): + test = WaypointTest(self.routers[0].addresses[0], + self.routers[1].addresses[0], + self.routers[5].addresses[0], + 'queue.08') + test.run() + self.assertEqual(None, test.error) + + def test_09_waypoint_int_endpoints_edge_1(self): + test = WaypointTest(self.routers[2].addresses[0], + self.routers[5].addresses[0], + self.routers[0].addresses[0], + 'queue.09') + test.run() + self.assertEqual(None, test.error) + + def test_10_waypoint_int_endpoints_edge_2(self): + test = WaypointTest(self.routers[2].addresses[0], + self.routers[5].addresses[0], + self.routers[1].addresses[0], + 'queue.10') + test.run() + self.assertEqual(None, test.error) + + def test_11_waypoint_int_endpoints_int_1(self): + test = WaypointTest(self.routers[0].addresses[0], + self.routers[1].addresses[0], + self.routers[0].addresses[0], + 'queue.11') + test.run() + self.assertEqual(None, test.error) + + def test_12_waypoint_int_endpoints_int_2(self): + test = WaypointTest(self.routers[0].addresses[0], + self.routers[1].addresses[0], + self.routers[1].addresses[0], + 'queue.12') + test.run() + self.assertEqual(None, test.error) + + def test_13_waypoint_edge_endpoints_edge_1(self): + test = WaypointTest(self.routers[2].addresses[0], + self.routers[5].addresses[0], + self.routers[3].addresses[0], + 'queue.13') + test.run() + self.assertEqual(None, test.error) + + def test_14_waypoint_edge_endpoints_edge_2(self): + test = WaypointTest(self.routers[2].addresses[0], + self.routers[5].addresses[0], + self.routers[4].addresses[0], + 'queue.14') + test.run() + self.assertEqual(None, test.error) + + def test_15_multiphase_1(self): + test = MultiPhaseTest(self.routers[2].addresses[0], + self.routers[5].addresses[0], + [ + self.routers[0].addresses[0], + self.routers[1].addresses[0], + self.routers[2].addresses[0], + self.routers[3].addresses[0], + self.routers[4].addresses[0], + self.routers[5].addresses[0], + self.routers[0].addresses[0], + self.routers[1].addresses[0], + self.routers[2].addresses[0] + ], + 'multi.15') + test.run() + self.assertEqual(None, test.error) + + def test_16_multiphase_2(self): + test = MultiPhaseTest(self.routers[2].addresses[0], + self.routers[5].addresses[0], + [ + self.routers[5].addresses[0], + self.routers[3].addresses[0], + self.routers[1].addresses[0], + self.routers[4].addresses[0], + self.routers[2].addresses[0], + self.routers[0].addresses[0], + self.routers[5].addresses[0], + self.routers[3].addresses[0], + self.routers[1].addresses[0] + ], + 'multi.16') + test.run() + self.assertEqual(None, test.error) + + def test_17_multiphase_3(self): + test = MultiPhaseTest(self.routers[1].addresses[0], + self.routers[0].addresses[0], + [ + self.routers[0].addresses[0], + self.routers[1].addresses[0], + self.routers[2].addresses[0], + self.routers[3].addresses[0], + self.routers[4].addresses[0], + self.routers[5].addresses[0], + self.routers[0].addresses[0], + self.routers[1].addresses[0], + self.routers[2].addresses[0] + ], + 'multi.17') + test.run() + self.assertEqual(None, test.error) + + + +class Timeout(object): + def __init__(self, parent): + self.parent = parent + + def on_timer_task(self, event): + self.parent.timeout() + + +class WaypointTest(MessagingHandler): + def __init__(self, sender_host, receiver_host, waypoint_host, addr): + super(WaypointTest, self).__init__() + self.sender_host = sender_host + self.receiver_host = receiver_host + self.waypoint_host = waypoint_host + self.addr = addr + self.count = 300 + + self.sender_conn = None + self.receiver_conn = None + self.waypoint_conn = None + self.error = None + self.n_tx = 0 + self.n_rx = 0 + self.n_thru = 0 + + def timeout(self): + self.error = "Timeout Expired - n_tx=%d, n_rx=%d, n_thru=%d" % (self.n_tx, self.n_rx, self.n_thru) + self.sender_conn.close() + self.receiver_conn.close() + self.waypoint_conn.close() + + def fail(self, error): + self.error = error + self.sender_conn.close() + self.receiver_conn.close() + self.waypoint_conn.close() + self.timer.cancel() + + def on_start(self, event): + self.timer = event.reactor.schedule(10.0, Timeout(self)) + self.sender_conn = event.container.connect(self.sender_host) + self.receiver_conn = event.container.connect(self.receiver_host) + self.waypoint_conn = event.container.connect(self.waypoint_host) + self.sender = event.container.create_sender(self.sender_conn, self.addr) + self.receiver = event.container.create_receiver(self.receiver_conn, self.addr) + self.wp_sender = event.container.create_sender(self.waypoint_conn, self.addr) + self.wp_receiver = event.container.create_receiver(self.waypoint_conn, self.addr) + self.wp_sender.target.capabilities.put_object(symbol("qd.waypoint")) + self.wp_receiver.source.capabilities.put_object(symbol("qd.waypoint")) + + def on_sendable(self, event): + if event.sender == self.sender: + while self.sender.credit > 0 and self.n_tx < self.count: + self.sender.send(Message("Message %d" % self.n_tx)) + self.n_tx += 1 + + def on_message(self, event): + if event.receiver == self.receiver: + self.n_rx += 1 + if self.n_rx == self.count and self.n_thru == self.count: + self.fail(None) + elif event.receiver == self.wp_receiver: + self.n_thru += 1 + self.wp_sender.send(Message(event.message.body)) + + def run(self): + Container(self).run() + + +class MultiPhaseTest(MessagingHandler): + def __init__(self, sender_host, receiver_host, waypoint_hosts, addr): + super(MultiPhaseTest, self).__init__() + self.sender_host = sender_host + self.receiver_host = receiver_host + self.waypoint_hosts = waypoint_hosts + self.addr = addr + self.count = 300 + + self.sender_conn = None + self.receiver_conn = None + self.waypoint_conns = [] + self.wp_senders = [] + self.wp_receivers = [] + self.error = None + self.n_tx = 0 + self.n_rx = 0 + self.n_thru = [0,0,0,0,0,0,0,0,0] + + def timeout(self): + self.error = "Timeout Expired - n_tx=%d, n_rx=%d, n_thru=%r" % (self.n_tx, self.n_rx, self.n_thru) + self.sender_conn.close() + self.receiver_conn.close() + for c in self.waypoint_conns: + c.close() + + def fail(self, error): + self.error = error + self.sender_conn.close() + self.receiver_conn.close() + for c in self.waypoint_conns: + c.close() + self.timer.cancel() + + def on_start(self, event): + self.timer = event.reactor.schedule(10.0, Timeout(self)) + self.sender_conn = event.container.connect(self.sender_host) + self.receiver_conn = event.container.connect(self.receiver_host) + self.sender = event.container.create_sender(self.sender_conn, self.addr) + self.receiver = event.container.create_receiver(self.receiver_conn, self.addr) + for host in self.waypoint_hosts: + self.waypoint_conns.append(event.container.connect(host)) + + ordinal = 1 + for conn in self.waypoint_conns: + sender = event.container.create_sender(conn, self.addr) + receiver = event.container.create_receiver(conn, self.addr) + + sender.target.capabilities.put_object(symbol("qd.waypoint.%d" % ordinal)) + receiver.source.capabilities.put_object(symbol("qd.waypoint.%d" % ordinal)) + + self.wp_senders.append(sender) + self.wp_receivers.append(receiver) + ordinal += 1 + + def on_sendable(self, event): + if event.sender == self.sender: + while self.sender.credit > 0 and self.n_tx < self.count: + self.sender.send(Message("Message %d" % self.n_tx)) + self.n_tx += 1 + + def on_message(self, event): + if event.receiver == self.receiver: + self.n_rx += 1 + if self.n_rx == self.count: + self.fail(None) + else: + idx = 0 + for receiver in self.wp_receivers: + if event.receiver == receiver: + self.n_thru[idx] += 1 + self.wp_senders[idx].send(Message(event.message.body)) + return + idx += 1 + + def run(self): + Container(self).run() + + +if __name__== '__main__': + unittest.main(main_module()) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org