This is an automated email from the ASF dual-hosted git repository. tross pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/master by this push: new 9c2e0b4 DISPATCH-1337 - Added fallback-link feature This closes #507 9c2e0b4 is described below commit 9c2e0b4f4564a5cbcd7bf97af62f0078c7e1f04e Author: Ted Ross <tr...@redhat.com> AuthorDate: Wed Apr 24 15:14:20 2019 -0400 DISPATCH-1337 - Added fallback-link feature This closes #507 --- console/stand-alone/package-lock.json | 3 +- docs/books/user-guide/configuration-security.adoc | 3 + include/qpid/dispatch/amqp.h | 1 + include/qpid/dispatch/iterator.h | 2 + include/qpid/dispatch/router_core.h | 2 + python/qpid_dispatch/management/qdrouter.json | 34 +- .../qpid_dispatch_internal/policy/policy_local.py | 4 + src/amqp.c | 1 + src/http-libwebsockets.c | 4 +- src/policy.c | 41 ++ src/policy.h | 1 + src/router_config.c | 17 +- src/router_core/agent_address.c | 6 + src/router_core/agent_address.h | 2 +- src/router_core/agent_config_address.c | 27 +- src/router_core/agent_config_address.h | 2 +- src/router_core/agent_config_auto_link.c | 13 +- src/router_core/agent_config_auto_link.h | 2 +- src/router_core/agent_router.c | 6 + src/router_core/agent_router.h | 2 +- src/router_core/connections.c | 8 +- src/router_core/forwarder.c | 14 +- .../modules/address_lookup_client/lookup_client.c | 75 +- src/router_core/modules/edge_router/addr_proxy.c | 58 +- src/router_core/route_control.c | 14 +- src/router_core/route_control.h | 7 +- src/router_core/router_core.c | 84 ++- src/router_core/router_core_private.h | 38 +- src/router_core/transfer.c | 58 +- tests/CMakeLists.txt | 1 + tests/policy-3/test-sender-receiver-limits.json | 46 ++ tests/system_tests_fallback_dest.py | 781 +++++++++++++++++++++ tests/system_tests_http.py | 1 + tests/system_tests_policy.py | 43 +- tools/qdstat.in | 8 +- 35 files changed, 1305 insertions(+), 104 deletions(-) diff --git a/console/stand-alone/package-lock.json b/console/stand-alone/package-lock.json index f372c16..8cd46be 100644 --- a/console/stand-alone/package-lock.json +++ b/console/stand-alone/package-lock.json @@ -3385,7 +3385,8 @@ "version": "1.1.0", "resolved": "https://registry.npmjs.org/console-control-strings/-/console-control-strings-1.1.0.tgz", "integrity": "sha1-PXz0Rk22RG6mRL9LOVB/mFEAjo4=", - "dev": true + "dev": true, + "optional": true }, "constants-browserify": { "version": "1.0.0", diff --git a/docs/books/user-guide/configuration-security.adoc b/docs/books/user-guide/configuration-security.adoc index e2a2de2..a3a936c 100644 --- a/docs/books/user-guide/configuration-security.adoc +++ b/docs/books/user-guide/configuration-security.adoc @@ -630,6 +630,9 @@ If true, connections from users in this group are permitted to attach links usin `allowDynamicLinkRoutes`:: If true, connections from users in this group may dynamically create connection-scoped link route destinations. This allows endpoints to act as link route destinations (i.e. brokers) without the need for configuring link-routes. If false, creation of dynamic link route destintations is forbidden. +`allowFallbackLinks`:: +If true, connections from users in this group are permitted to attach links using fallback-link capabilities. This allows endpoints to act as fallback destinations (and sources) for addresses that have fallback enabled. If false, use of fallback-link capabilities is forbidden. + `sources` | `sourcePattern`:: A list of AMQP source addresses from which users in this group may receive messages. + diff --git a/include/qpid/dispatch/amqp.h b/include/qpid/dispatch/amqp.h index 8a17c4c..80447be 100644 --- a/include/qpid/dispatch/amqp.h +++ b/include/qpid/dispatch/amqp.h @@ -137,6 +137,7 @@ extern const char * const QD_CAPABILITY_WAYPOINT6; extern const char * const QD_CAPABILITY_WAYPOINT7; extern const char * const QD_CAPABILITY_WAYPOINT8; extern const char * const QD_CAPABILITY_WAYPOINT9; +extern const char * const QD_CAPABILITY_FALLBACK; /// @} /** @name Dynamic Node Properties */ diff --git a/include/qpid/dispatch/iterator.h b/include/qpid/dispatch/iterator.h index 3bc86e1..80fd383 100644 --- a/include/qpid/dispatch/iterator.h +++ b/include/qpid/dispatch/iterator.h @@ -59,6 +59,8 @@ typedef struct qd_iterator_t qd_iterator_t; #define QD_ITER_HASH_PREFIX_GLOBAL_PLACEHOLDER 'G' #define QD_ITER_HASH_PREFIX_EDGE_SUMMARY 'H' +#define QD_ITER_HASH_PHASE_FALLBACK 'F' + /** * qd_iterator_view_t diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h index 035781f..1b2702b 100644 --- a/include/qpid/dispatch/router_core.h +++ b/include/qpid/dispatch/router_core.h @@ -174,6 +174,7 @@ typedef void (*qdr_connection_bind_context_t) (qdr_connection_t *context, void* * @param strip_annotations_in True if configured to remove annotations on inbound messages. * @param strip_annotations_out True if configured to remove annotations on outbound messages. * @param policy_allow_dynamic_link_routes True if this connection is allowed by policy to create link route destinations. + * @param policy_allow_admin_status_update True if this connection is allowed to modify admin_status on other connections. * @param link_capacity The capacity, in deliveries, for links in this connection. * @param vhost If non-null, this is the vhost of the connection to be used for multi-tenancy. * @return Pointer to a connection object that can be used to refer to this connection over its lifetime. @@ -853,6 +854,7 @@ typedef struct { size_t deliveries_egress_route_container; size_t deliveries_delayed_1sec; size_t deliveries_delayed_10sec; + size_t deliveries_redirected_to_fallback; } qdr_global_stats_t; ALLOC_DECLARE(qdr_global_stats_t); typedef void (*qdr_global_stats_handler_t) (void *context); diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json index 0df3a06..2b494fc 100644 --- a/python/qpid_dispatch/management/qdrouter.json +++ b/python/qpid_dispatch/management/qdrouter.json @@ -588,7 +588,12 @@ "type": "integer", "description":"Number of deliveries that were sent to route container connections.", "graph": true - } + }, + "deliveriesRedirectedToFallback": { + "type": "integer", + "description": "Number of deliveries that were sent to the fallback destination due to the primary destination being unreachable.", + "graph": true + } } }, "sslProfile": { @@ -1167,6 +1172,13 @@ "description": "All messages sent to this address which lack an intrinsic priority will be assigned this priority.", "create": true, "required": false + }, + "enableFallback": { + "type": "boolean", + "description": "If false, undeliverable messages are released. If true, undeliverable messages shall be re-delivered to a fallback destination. The fallback destination uses the same address, but is attached using an autoLink with 'fallback' enabled or a link with the qd.fallback capability.", + "create": true, + "required": false, + "default": false } } }, @@ -1278,6 +1290,14 @@ "create": true, "required": false }, + "fallback": + { + "type": "boolean", + "description": "If true, this auto-link is attached to a fallback destination for an address.", + "create": true, + "required": false, + "default": false + }, "linkRef": { "type": "string", "description": "Reference to the org.apache.qpid.dispatch.router.link if the link exists", @@ -1566,6 +1586,11 @@ "description": "Number of deliveries that were sent to a route-container address.", "graph": true }, + "deliveriesRedirectedToFallback": { + "type": "integer", + "description": "Number of deliveries that were sent to the fallback destination due to the primary destination being unreachable.", + "graph": true + }, "key": { "description": "Internal unique (to this router) key to identify the address", "type": "string" @@ -1955,6 +1980,13 @@ "required": false, "create": true }, + "allowFallbackLinks": { + "type": "boolean", + "description": "Whether this connection is allowed to claim 'qd.fallback' capability for attached links. This allows endpoints to act as fallback destinations for addresses that have fallback capability enabled.", + "default": true, + "required": false, + "create": true + }, "sources": { "type": "string", "description": "A list of source addresses from which users in this group may receive messages. To specify multiple addresses, separate the addresses with either a comma or a space. If you do not specify any addresses, users in this group are not allowed to receive messages from any addresses. You can use the substitution token '${user}' to specify an address that contains a user's authenticated user name. You can use an asterisk ('*') wildcard to match one or more ch [...] diff --git a/python/qpid_dispatch_internal/policy/policy_local.py b/python/qpid_dispatch_internal/policy/policy_local.py index a7ab8b9..ecf717c 100644 --- a/python/qpid_dispatch_internal/policy/policy_local.py +++ b/python/qpid_dispatch_internal/policy/policy_local.py @@ -71,6 +71,7 @@ class PolicyKeys(object): KW_ALLOW_ANONYMOUS_SENDER = "allowAnonymousSender" KW_ALLOW_USERID_PROXY = "allowUserIdProxy" KW_ALLOW_WAYPOINT_LINKS = "allowWaypointLinks" + KW_ALLOW_FALLBACK_LINKS = "allowFallbackLinks" KW_ALLOW_DYNAMIC_LINK_ROUTES = "allowDynamicLinkRoutes" KW_ALLOW_ADMIN_STATUS_UPDATE = "allowAdminStatusUpdate" KW_SOURCES = "sources" @@ -147,6 +148,7 @@ class PolicyCompiler(object): PolicyKeys.KW_ALLOW_ANONYMOUS_SENDER, PolicyKeys.KW_ALLOW_USERID_PROXY, PolicyKeys.KW_ALLOW_WAYPOINT_LINKS, + PolicyKeys.KW_ALLOW_FALLBACK_LINKS, PolicyKeys.KW_ALLOW_DYNAMIC_LINK_ROUTES, PolicyKeys.KW_ALLOW_ADMIN_STATUS_UPDATE, PolicyKeys.KW_SOURCES, @@ -251,6 +253,7 @@ class PolicyCompiler(object): policy_out[PolicyKeys.KW_ALLOW_ANONYMOUS_SENDER] = False policy_out[PolicyKeys.KW_ALLOW_USERID_PROXY] = False policy_out[PolicyKeys.KW_ALLOW_WAYPOINT_LINKS] = True + policy_out[PolicyKeys.KW_ALLOW_FALLBACK_LINKS] = True policy_out[PolicyKeys.KW_ALLOW_DYNAMIC_LINK_ROUTES] = True policy_out[PolicyKeys.KW_ALLOW_ADMIN_STATUS_UPDATE] = True policy_out[PolicyKeys.KW_SOURCES] = '' @@ -290,6 +293,7 @@ class PolicyCompiler(object): PolicyKeys.KW_ALLOW_DYNAMIC_SRC, PolicyKeys.KW_ALLOW_USERID_PROXY, PolicyKeys.KW_ALLOW_WAYPOINT_LINKS, + PolicyKeys.KW_ALLOW_FALLBACK_LINKS, PolicyKeys.KW_ALLOW_DYNAMIC_LINK_ROUTES, PolicyKeys.KW_ALLOW_ADMIN_STATUS_UPDATE ]: diff --git a/src/amqp.c b/src/amqp.c index d0b19c2..de89637 100644 --- a/src/amqp.c +++ b/src/amqp.c @@ -45,6 +45,7 @@ const char * const QD_CAPABILITY_WAYPOINT6 = "qd.waypoint.6"; const char * const QD_CAPABILITY_WAYPOINT7 = "qd.waypoint.7"; const char * const QD_CAPABILITY_WAYPOINT8 = "qd.waypoint.8"; const char * const QD_CAPABILITY_WAYPOINT9 = "qd.waypoint.9"; +const char * const QD_CAPABILITY_FALLBACK = "qd.fallback"; const char * const QD_CAPABILITY_ANONYMOUS_RELAY = "ANONYMOUS-RELAY"; const char * const QD_DYNAMIC_NODE_PROPERTY_ADDRESS = "x-opt-qd.address"; diff --git a/src/http-libwebsockets.c b/src/http-libwebsockets.c index a74e9ca..98f232a 100644 --- a/src/http-libwebsockets.c +++ b/src/http-libwebsockets.c @@ -471,6 +471,7 @@ static int stats_get_deliveries_ingress_route_container(qdr_global_stats_t *stat static int stats_get_deliveries_egress_route_container(qdr_global_stats_t *stats) { return stats->deliveries_egress_route_container; } static int stats_get_deliveries_delayed_1sec(qdr_global_stats_t *stats) { return stats->deliveries_delayed_1sec; } static int stats_get_deliveries_delayed_10sec(qdr_global_stats_t *stats) { return stats->deliveries_delayed_10sec; } +static int stats_get_deliveries_redirected_to_fallback(qdr_global_stats_t *stats) { return stats->deliveries_redirected_to_fallback; } static struct metric_definition metrics[] = { {"connections", "gauge", stats_get_connections}, @@ -491,7 +492,8 @@ static struct metric_definition metrics[] = { {"deliveries_ingress_route_container", "counter", stats_get_deliveries_ingress_route_container}, {"deliveries_egress_route_container", "counter", stats_get_deliveries_egress_route_container}, {"deliveries_delayed_1sec", "counter", stats_get_deliveries_delayed_1sec}, - {"deliveries_delayed_10sec", "counter", stats_get_deliveries_delayed_10sec} + {"deliveries_delayed_10sec", "counter", stats_get_deliveries_delayed_10sec}, + {"deliveries_redirected_to_fallback", "counter", stats_get_deliveries_redirected_to_fallback} }; static size_t metrics_length = sizeof(metrics)/sizeof(metrics[0]); diff --git a/src/policy.c b/src/policy.c index 12afd74..dec214d 100644 --- a/src/policy.c +++ b/src/policy.c @@ -466,6 +466,7 @@ bool qd_policy_open_fetch_settings( } settings->allowUserIdProxy = qd_entity_opt_bool((qd_entity_t*)upolicy, "allowUserIdProxy", false); settings->allowWaypointLinks = qd_entity_opt_bool((qd_entity_t*)upolicy, "allowWaypointLinks", true); + settings->allowFallbackLinks = qd_entity_opt_bool((qd_entity_t*)upolicy, "allowFallbackLinks", true); settings->allowDynamicLinkRoutes = qd_entity_opt_bool((qd_entity_t*)upolicy, "allowDynamicLinkRoutes", true); // @@ -928,6 +929,24 @@ static bool qd_policy_terminus_is_waypoint(pn_terminus_t *term) return false; } + +static bool qd_policy_terminus_is_fallback(pn_terminus_t *term) +{ + pn_data_t *cap = pn_terminus_capabilities(term); + if (cap) { + pn_data_rewind(cap); + pn_data_next(cap); + if (cap && pn_data_type(cap) == PN_SYMBOL) { + pn_bytes_t sym = pn_data_get_symbol(cap); + if (strcmp(sym.start, QD_CAPABILITY_FALLBACK) == 0) + return true; + } + } + + return false; +} + + bool qd_policy_approve_message_target(qd_iterator_t *address, qd_connection_t *qd_conn) { #define ON_STACK_SIZE 2048 @@ -1001,6 +1020,17 @@ bool qd_policy_approve_amqp_sender_link(pn_link_t *pn_link, qd_connection_t *qd_ } } + if (!qd_conn->policy_settings->allowFallbackLinks) { + bool fallback = qd_policy_terminus_is_fallback(pn_link_remote_target(pn_link)); + if (fallback) { + qd_log(qd_server_dispatch(qd_conn->server)->policy->log_source, QD_LOG_INFO, + "[%"PRIu64"]: DENY AMQP Attach sender link '%s' for user '%s', rhost '%s', vhost '%s'. Fallback capability not permitted", + qd_conn->connection_id, target, qd_conn->user_id, hostip, vhost); + _qd_policy_deny_amqp_sender_link(pn_link, qd_conn, QD_AMQP_COND_UNAUTHORIZED_ACCESS); + return false; + } + } + lookup = qd_policy_approve_link_name(qd_conn->user_id, qd_conn->policy_settings, target, false); qd_log(qd_server_dispatch(qd_conn->server)->policy->log_source, (lookup ? QD_LOG_TRACE : QD_LOG_INFO), @@ -1074,6 +1104,17 @@ bool qd_policy_approve_amqp_receiver_link(pn_link_t *pn_link, qd_connection_t *q } } + if (!qd_conn->policy_settings->allowFallbackLinks) { + bool fallback = qd_policy_terminus_is_fallback(pn_link_remote_source(pn_link)); + if (fallback) { + qd_log(qd_server_dispatch(qd_conn->server)->policy->log_source, QD_LOG_INFO, + "[%"PRIu64"]: DENY AMQP Attach receiver link '%s' for user '%s', rhost '%s', vhost '%s'. Fallback capability not permitted", + qd_conn->connection_id, source, qd_conn->user_id, hostip, vhost); + _qd_policy_deny_amqp_sender_link(pn_link, qd_conn, QD_AMQP_COND_UNAUTHORIZED_ACCESS); + return false; + } + } + bool lookup = qd_policy_approve_link_name(qd_conn->user_id, qd_conn->policy_settings, source, true); qd_log(qd_server_dispatch(qd_conn->server)->policy->log_source, (lookup ? QD_LOG_TRACE : QD_LOG_INFO), diff --git a/src/policy.h b/src/policy.h index 26803d3..5d523fa 100644 --- a/src/policy.h +++ b/src/policy.h @@ -52,6 +52,7 @@ struct qd_policy__settings_s { bool allowAnonymousSender; bool allowUserIdProxy; bool allowWaypointLinks; + bool allowFallbackLinks; bool allowDynamicLinkRoutes; bool allowAdminStatusUpdate; bool outgoingConnection; diff --git a/src/router_config.c b/src/router_config.c index adb46ba..2471b14 100644 --- a/src/router_config.c +++ b/src/router_config.c @@ -75,10 +75,11 @@ qd_error_t qd_router_configure_address(qd_router_t *router, qd_entity_t *entity) break; } - bool waypoint = qd_entity_opt_bool(entity, "waypoint", false); - long in_phase = qd_entity_opt_long(entity, "ingressPhase", -1); - long out_phase = qd_entity_opt_long(entity, "egressPhase", -1); - long priority = qd_entity_opt_long(entity, "priority", -1); + bool waypoint = qd_entity_opt_bool(entity, "waypoint", false); + long in_phase = qd_entity_opt_long(entity, "ingressPhase", -1); + long out_phase = qd_entity_opt_long(entity, "egressPhase", -1); + long priority = qd_entity_opt_long(entity, "priority", -1); + bool fallback = qd_entity_opt_bool(entity, "enableFallback", false); // // Formulate this configuration create it through the core management API. @@ -112,6 +113,8 @@ qd_error_t qd_router_configure_address(qd_router_t *router, qd_entity_t *entity) qd_compose_insert_string(body, "priority"); qd_compose_insert_long(body, priority); + qd_compose_insert_string(body, "fallback"); + qd_compose_insert_bool(body, fallback); if (in_phase >= 0) { qd_compose_insert_string(body, "ingressPhase"); @@ -262,7 +265,8 @@ qd_error_t qd_router_configure_auto_link(qd_router_t *router, qd_entity_t *entit container = qd_entity_opt_string(entity, "containerId", 0); QD_ERROR_BREAK(); c_name = qd_entity_opt_string(entity, "connection", 0); QD_ERROR_BREAK(); ext_addr = qd_entity_opt_string(entity, "externalAddress", 0); QD_ERROR_BREAK(); - long phase = qd_entity_opt_long(entity, "phase", -1); QD_ERROR_BREAK(); + long phase = qd_entity_opt_long(entity, "phase", -1); QD_ERROR_BREAK(); + bool fallback = qd_entity_opt_bool(entity, "fallback", false); QD_ERROR_BREAK(); // // Formulate this configuration as a route and create it through the core management API. @@ -305,6 +309,9 @@ qd_error_t qd_router_configure_auto_link(qd_router_t *router, qd_entity_t *entit qd_compose_insert_string(body, ext_addr); } + qd_compose_insert_string(body, "fallback"); + qd_compose_insert_bool(body, fallback); + qd_compose_end_map(body); qdi_router_configure_body(router->router_core, body, QD_ROUTER_CONFIG_AUTO_LINK, name); diff --git a/src/router_core/agent_address.c b/src/router_core/agent_address.c index 8c01588..1662f0b 100644 --- a/src/router_core/agent_address.c +++ b/src/router_core/agent_address.c @@ -40,6 +40,7 @@ #define QDR_ADDRESS_TRANSIT_OUTSTANDING 17 #define QDR_ADDRESS_TRACKED_DELIVERIES 18 #define QDR_ADDRESS_PRIORITY 19 +#define QDR_ADDRESS_DELIVERIES_REDIRECTED 20 const char *qdr_address_columns[] = {"name", @@ -62,6 +63,7 @@ const char *qdr_address_columns[] = "transitOutstanding", "trackedDeliveries", "priority", + "deliveriesRedirectedToFallback", 0}; @@ -173,6 +175,10 @@ static void qdr_insert_address_columns_CT(qdr_core_t *core, qd_compose_insert_int(body, addr->priority); break; + case QDR_ADDRESS_DELIVERIES_REDIRECTED: + qd_compose_insert_ulong(body, addr->deliveries_redirected); + break; + default: qd_compose_insert_null(body); break; diff --git a/src/router_core/agent_address.h b/src/router_core/agent_address.h index 58c0d91..55a4cb5 100644 --- a/src/router_core/agent_address.h +++ b/src/router_core/agent_address.h @@ -31,7 +31,7 @@ void qdra_address_get_CT(qdr_core_t *core, const char *qdr_address_columns[]); -#define QDR_ADDRESS_COLUMN_COUNT 20 +#define QDR_ADDRESS_COLUMN_COUNT 21 const char *qdr_address_columns[QDR_ADDRESS_COLUMN_COUNT + 1]; diff --git a/src/router_core/agent_config_address.c b/src/router_core/agent_config_address.c index a3e034e..393e191 100644 --- a/src/router_core/agent_config_address.c +++ b/src/router_core/agent_config_address.c @@ -32,6 +32,7 @@ #define QDR_CONFIG_ADDRESS_OUT_PHASE 7 #define QDR_CONFIG_ADDRESS_PATTERN 8 #define QDR_CONFIG_ADDRESS_PRIORITY 9 +#define QDR_CONFIG_ADDRESS_FALLBACK 10 const char *qdr_config_address_columns[] = {"name", @@ -44,6 +45,7 @@ const char *qdr_config_address_columns[] = "egressPhase", "pattern", "priority", + "fallback", 0}; const char *CONFIG_ADDRESS_TYPE = "org.apache.qpid.dispatch.router.config.address"; @@ -125,6 +127,10 @@ static void qdr_config_address_insert_column_CT(qdr_address_config_t *addr, int case QDR_CONFIG_ADDRESS_PRIORITY: qd_compose_insert_int(body, addr->priority); break; + + case QDR_CONFIG_ADDRESS_FALLBACK: + qd_compose_insert_bool(body, addr->fallback); + break; } } @@ -347,6 +353,13 @@ void qdra_config_address_create_CT(qdr_core_t *core, qd_parsed_field_t *in_phase_field = qd_parse_value_by_key(in_body, qdr_config_address_columns[QDR_CONFIG_ADDRESS_IN_PHASE]); qd_parsed_field_t *out_phase_field = qd_parse_value_by_key(in_body, qdr_config_address_columns[QDR_CONFIG_ADDRESS_OUT_PHASE]); qd_parsed_field_t *priority_field = qd_parse_value_by_key(in_body, qdr_config_address_columns[QDR_CONFIG_ADDRESS_PRIORITY]); + qd_parsed_field_t *fallback_field = qd_parse_value_by_key(in_body, qdr_config_address_columns[QDR_CONFIG_ADDRESS_FALLBACK]); + + bool waypoint = waypoint_field ? qd_parse_as_bool(waypoint_field) : false; + long in_phase = in_phase_field ? qd_parse_as_long(in_phase_field) : -1; + long out_phase = out_phase_field ? qd_parse_as_long(out_phase_field) : -1; + long priority = priority_field ? qd_parse_as_long(priority_field) : -1; + bool fallback = fallback_field ? qd_parse_as_bool(fallback_field) : false; // // Either a prefix or a pattern field is mandatory. Prefix and pattern @@ -358,6 +371,11 @@ void qdra_config_address_create_CT(qdr_core_t *core, } else if (prefix_field && pattern_field) { msg = "Cannot specify both a 'prefix' and a 'pattern' attribute"; } + + if (fallback && (waypoint || in_phase > 0 || out_phase > 0)) { + msg = "Fallback cannot be specified with waypoint or non-zero ingress and egress phases"; + } + if (msg) { query->status = QD_AMQP_BAD_REQUEST; query->status.description = msg; @@ -389,12 +407,6 @@ void qdra_config_address_create_CT(qdr_core_t *core, break; } - - bool waypoint = waypoint_field ? qd_parse_as_bool(waypoint_field) : false; - long in_phase = in_phase_field ? qd_parse_as_long(in_phase_field) : -1; - long out_phase = out_phase_field ? qd_parse_as_long(out_phase_field) : -1; - long priority = priority_field ? qd_parse_as_long(priority_field) : -1; - // // Handle the address-phasing logic. If the phases are provided, use them. Otherwise // use the waypoint flag to set the most common defaults. @@ -429,7 +441,7 @@ void qdra_config_address_create_CT(qdr_core_t *core, // addr = new_qdr_address_config_t(); - DEQ_ITEM_INIT(addr); + ZERO(addr); addr->ref_count = 1; // Represents the reference from the addr_config list addr->name = name ? (char*) qd_iterator_copy(name) : 0; addr->identity = qdr_identifier(core); @@ -439,6 +451,7 @@ void qdra_config_address_create_CT(qdr_core_t *core, addr->is_prefix = !!prefix_field; addr->pattern = pattern; addr->priority = priority; + addr->fallback = fallback; pattern = 0; qd_iterator_reset_view(iter, ITER_VIEW_ALL); diff --git a/src/router_core/agent_config_address.h b/src/router_core/agent_config_address.h index 38c3365..06d56f9 100644 --- a/src/router_core/agent_config_address.h +++ b/src/router_core/agent_config_address.h @@ -35,7 +35,7 @@ void qdra_config_address_get_CT(qdr_core_t *core, char *qdra_config_address_validate_pattern_CT(qd_parsed_field_t *pattern_field, bool is_prefix, const char **error); -#define QDR_CONFIG_ADDRESS_COLUMN_COUNT 10 +#define QDR_CONFIG_ADDRESS_COLUMN_COUNT 11 const char *qdr_config_address_columns[QDR_CONFIG_ADDRESS_COLUMN_COUNT + 1]; diff --git a/src/router_core/agent_config_auto_link.c b/src/router_core/agent_config_auto_link.c index b381adf..d231e44 100644 --- a/src/router_core/agent_config_auto_link.c +++ b/src/router_core/agent_config_auto_link.c @@ -38,6 +38,7 @@ #define QDR_CONFIG_AUTO_LINK_LINK_REF 12 #define QDR_CONFIG_AUTO_LINK_OPER_STATUS 13 #define QDR_CONFIG_AUTO_LINK_LAST_ERROR 14 +#define QDR_CONFIG_AUTO_LINK_FALLBACK 15 const char *qdr_config_auto_link_columns[] = {"name", @@ -55,6 +56,7 @@ const char *qdr_config_auto_link_columns[] = "linkRef", "operStatus", "lastError", + "fallback", 0}; const char *CONFIG_AUTOLINK_TYPE = "org.apache.qpid.dispatch.router.config.autoLink"; @@ -161,6 +163,10 @@ static void qdr_config_auto_link_insert_column_CT(qdr_auto_link_t *al, int col, else qd_compose_insert_null(body); break; + + case QDR_CONFIG_AUTO_LINK_FALLBACK: + qd_compose_insert_bool(body, al->fallback); + break; } } @@ -391,7 +397,7 @@ void qdra_config_auto_link_create_CT(qdr_core_t *core, qd_parsed_field_t *phase_field = qd_parse_value_by_key(in_body, qdr_config_auto_link_columns[QDR_CONFIG_AUTO_LINK_PHASE]); qd_parsed_field_t *connection_field = qd_parse_value_by_key(in_body, qdr_config_auto_link_columns[QDR_CONFIG_AUTO_LINK_CONNECTION]); qd_parsed_field_t *container_field = qd_parse_value_by_key(in_body, qdr_config_auto_link_columns[QDR_CONFIG_AUTO_LINK_CONTAINER_ID]); - + qd_parsed_field_t *fallback_field = qd_parse_value_by_key(in_body, qdr_config_auto_link_columns[QDR_CONFIG_AUTO_LINK_FALLBACK]); qd_parsed_field_t *external_addr = qd_parse_value_by_key(in_body, qdr_config_auto_link_columns[QDR_CONFIG_AUTO_LINK_EXT_ADDRESS]); if (!external_addr) { @@ -408,6 +414,7 @@ void qdra_config_auto_link_create_CT(qdr_core_t *core, break; } + bool fallback = !!fallback_field ? qd_parse_as_bool(fallback_field) : false; // // Addr and direction fields are mandatory. Fail if they're not both here. @@ -432,7 +439,7 @@ void qdra_config_auto_link_create_CT(qdr_core_t *core, // Use the specified phase if present. Otherwise default based on the direction: // Phase 0 for outgoing links and phase 1 for incoming links. // - long phase = phase_field ? qd_parse_as_long(phase_field) : (dir == QD_OUTGOING ? 0 : 1); + long phase = phase_field ? qd_parse_as_long(phase_field) : ((dir == QD_OUTGOING || !!fallback) ? 0 : 1); // // Validate the phase @@ -447,7 +454,7 @@ void qdra_config_auto_link_create_CT(qdr_core_t *core, // // The request is good. Create the entity. // - al = qdr_route_add_auto_link_CT(core, name, addr_field, dir, phase, container_field, connection_field, external_addr); + al = qdr_route_add_auto_link_CT(core, name, addr_field, dir, phase, container_field, connection_field, external_addr, fallback); // // Compose the result map for the response. diff --git a/src/router_core/agent_config_auto_link.h b/src/router_core/agent_config_auto_link.h index 6ff9438..56327cf 100644 --- a/src/router_core/agent_config_auto_link.h +++ b/src/router_core/agent_config_auto_link.h @@ -32,7 +32,7 @@ void qdra_config_auto_link_get_CT(qdr_core_t *core, qd_iterator_t *identity, qdr_query_t *query, const char *qdr_config_auto_link_columns[]); -#define QDR_CONFIG_AUTO_LINK_COLUMN_COUNT 15 +#define QDR_CONFIG_AUTO_LINK_COLUMN_COUNT 16 const char *qdr_config_auto_link_columns[QDR_CONFIG_AUTO_LINK_COLUMN_COUNT + 1]; diff --git a/src/router_core/agent_router.c b/src/router_core/agent_router.c index d7ea523..2c4f89a 100644 --- a/src/router_core/agent_router.c +++ b/src/router_core/agent_router.c @@ -50,6 +50,7 @@ #define QDR_ROUTER_DELIVERIES_TRANSIT 23 #define QDR_ROUTER_DELIVERIES_INGRESS_ROUTE_CONTAINER 24 #define QDR_ROUTER_DELIVERIES_EGRESS_ROUTE_CONTAINER 25 +#define QDR_ROUTER_DELIVERIES_REDIRECTED 26 const char *qdr_router_columns[] = @@ -79,6 +80,7 @@ const char *qdr_router_columns[] = "deliveriesTransit", "deliveriesIngressRouteContainer", "deliveriesEgressRouteContainer", + "deliveriesRedirectedToFallback", 0}; @@ -206,6 +208,10 @@ static void qdr_agent_write_column_CT(qd_composed_field_t *body, int col, qdr_co qd_compose_insert_ulong(body, core->deliveries_egress_route_container); break; + case QDR_ROUTER_DELIVERIES_REDIRECTED: + qd_compose_insert_ulong(body, core->deliveries_redirected); + break; + default: qd_compose_insert_null(body); break; diff --git a/src/router_core/agent_router.h b/src/router_core/agent_router.h index 6a6e35f..dc9c7ce 100644 --- a/src/router_core/agent_router.h +++ b/src/router_core/agent_router.h @@ -21,7 +21,7 @@ #include "router_core_private.h" -#define QDR_ROUTER_COLUMN_COUNT 26 +#define QDR_ROUTER_COLUMN_COUNT 27 const char *qdr_router_columns[QDR_ROUTER_COLUMN_COUNT + 1]; diff --git a/src/router_core/connections.c b/src/router_core/connections.c index 086453c..01f4175 100644 --- a/src/router_core/connections.c +++ b/src/router_core/connections.c @@ -1171,7 +1171,8 @@ void qdr_check_addr_CT(qdr_core_t *core, qdr_address_t *addr) && addr->ref_count == 0 && !addr->block_deletion && addr->tracked_deliveries == 0 - && addr->core_endpoint == 0) { + && addr->core_endpoint == 0 + && addr->fallback_for == 0) { qdr_core_remove_address(core, addr); } } @@ -1616,7 +1617,10 @@ static void qdr_link_inbound_second_attach_CT(qdr_core_t *core, qdr_action_t *ac // Issue credit if this is an anonymous link or if its address has at least one reachable destination. // qdr_address_t *addr = link->owning_addr; - if (!addr || (DEQ_SIZE(addr->subscriptions) || DEQ_SIZE(addr->rlinks) || qd_bitmask_cardinality(addr->rnodes))) + if (!addr || (DEQ_SIZE(addr->subscriptions) || DEQ_SIZE(addr->rlinks) || qd_bitmask_cardinality(addr->rnodes) + || (!!addr->fallback && (DEQ_SIZE(addr->fallback->subscriptions) + || DEQ_SIZE(addr->fallback->rlinks) + || qd_bitmask_cardinality(addr->fallback->rnodes))))) qdr_link_issue_credit_CT(core, link, link->capacity, false); break; diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c index 8e7a8bb..cfb9fe4 100644 --- a/src/router_core/forwarder.c +++ b/src/router_core/forwarder.c @@ -380,7 +380,7 @@ int qdr_forward_multicast_CT(qdr_core_t *core, DEQ_INSERT_TAIL(deliver_info_list, deliver_info); fanout++; - if (out_link->link_type != QD_LINK_CONTROL && out_link->link_type != QD_LINK_ROUTER) { + if (out_link->link_type != QD_LINK_CONTROL && out_link->link_type != QD_LINK_ROUTER && !out_link->fallback) { addr->deliveries_egress++; core->deliveries_egress++; } @@ -585,8 +585,10 @@ int qdr_forward_closest_CT(qdr_core_t *core, DEQ_INSERT_TAIL(addr->rlinks, link_ref); } - addr->deliveries_egress++; - core->deliveries_egress++; + if (!out_link->fallback) { + addr->deliveries_egress++; + core->deliveries_egress++; + } if (qdr_connection_route_container(out_link->conn)) { core->deliveries_egress_route_container++; @@ -800,8 +802,10 @@ int qdr_forward_balanced_CT(qdr_core_t *core, core->deliveries_transit++; } else { - addr->deliveries_egress++; - core->deliveries_egress++; + if (!chosen_link->fallback) { + addr->deliveries_egress++; + core->deliveries_egress++; + } if (qdr_connection_route_container(chosen_link->conn)) { core->deliveries_egress_route_container++; diff --git a/src/router_core/modules/address_lookup_client/lookup_client.c b/src/router_core/modules/address_lookup_client/lookup_client.c index c71673f..214624b 100644 --- a/src/router_core/modules/address_lookup_client/lookup_client.c +++ b/src/router_core/modules/address_lookup_client/lookup_client.c @@ -122,6 +122,7 @@ static void qdr_generate_mobile_addr(qdr_core_t *core, char *buffer, size_t leng * @param [out] link_route True iff the lookup indicates that an attach should be routed * @param [out] unavailable True iff this address is blocked as unavailable * @param [out] core_endpoint True iff this address is bound to a core-internal endpoint + * @param [out] fallback True iff this terminus has fallback capability * @return Pointer to an address record or 0 if none is found */ static qdr_address_t *qdr_lookup_terminus_address_CT(qdr_core_t *core, @@ -132,7 +133,8 @@ static qdr_address_t *qdr_lookup_terminus_address_CT(qdr_core_t *core, bool accept_dynamic, bool *link_route, bool *unavailable, - bool *core_endpoint) + bool *core_endpoint, + bool *fallback) { qdr_address_t *addr = 0; @@ -142,6 +144,7 @@ static qdr_address_t *qdr_lookup_terminus_address_CT(qdr_core_t *core, *link_route = false; *unavailable = false; *core_endpoint = false; + *fallback = false; if (qdr_terminus_is_dynamic(terminus)) { // @@ -235,10 +238,10 @@ static qdr_address_t *qdr_lookup_terminus_address_CT(qdr_core_t *core, // // There was no match for a link-route destination, look for a message-route address. // - int in_phase = 0; - int out_phase = 0; - int addr_phase; - int priority = -1; + int in_phase = 0; + int out_phase = 0; + char addr_phase; + int priority = -1; qd_address_treatment_t treat = core->qd->default_treatment; qdr_address_config_t *addr_config = qdr_config_for_address_CT(core, conn, iter); @@ -258,30 +261,51 @@ static qdr_address_t *qdr_lookup_terminus_address_CT(qdr_core_t *core, out_phase = waypoint_ordinal - 1; } + // + // Determine if this endpoint is acting as a fallback destination for the address. + // + *fallback = qdr_terminus_has_capability(terminus, QD_CAPABILITY_FALLBACK); + bool edge_link = conn->role == QDR_ROLE_EDGE_CONNECTION; + qd_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH); qd_iterator_annotate_prefix(iter, '\0'); // Cancel previous override - addr_phase = dir == QD_INCOMING ? in_phase : out_phase; - qd_iterator_annotate_phase(iter, (char) addr_phase + '0'); + addr_phase = dir == QD_INCOMING ? + (*fallback && edge_link ? QD_ITER_HASH_PHASE_FALLBACK : in_phase + '0') : + (*fallback ? QD_ITER_HASH_PHASE_FALLBACK : out_phase + '0'); + qd_iterator_annotate_phase(iter, addr_phase); qd_hash_retrieve(core->addr_hash, iter, (void**) &addr); if (addr && addr->treatment == QD_TREATMENT_UNAVAILABLE) *unavailable = true; - if (!addr && create_if_not_found) { + // + // If the address is a router-class address, change treatment to closest. + // + qd_iterator_reset(iter); + if (qd_iterator_octet(iter) == (unsigned char) QD_ITER_HASH_PREFIX_ROUTER) { + treat = QD_TREATMENT_ANYCAST_CLOSEST; + // - // If the address is a router-class address, change treatment to closest. + // It is not valid for an outgoing link to have a router-class address. // - qd_iterator_reset(iter); - if (qd_iterator_octet(iter) == (unsigned char) QD_ITER_HASH_PREFIX_ROUTER) { - treat = QD_TREATMENT_ANYCAST_CLOSEST; - } + if (dir == QD_OUTGOING) + return 0; + } + if (!addr && create_if_not_found) { addr = qdr_address_CT(core, treat, addr_config); if (addr) { - addr->config = addr_config; + qd_iterator_reset(iter); qd_hash_insert(core->addr_hash, iter, addr, &addr->hash_handle); DEQ_INSERT_TAIL(core->addrs, addr); + + // + // If this address is configured with a fallback, set up the + // fallback address linkage. + // + if (!!addr_config && addr_config->fallback && !addr->fallback) + qdr_setup_fallback_address_CT(core, addr); } if (!addr && treat == QD_TREATMENT_UNAVAILABLE) @@ -309,8 +333,11 @@ static void qdr_link_react_to_first_attach_CT(qdr_core_t *core, qdr_terminus_t *target, bool link_route, bool unavailable, - bool core_endpoint) + bool core_endpoint, + bool fallback) { + link->fallback = fallback; + if (core_endpoint) { qdrc_endpoint_do_bound_attach_CT(core, addr, link, source, target); } @@ -386,7 +413,11 @@ static void qdr_link_react_to_first_attach_CT(qdr_core_t *core, || DEQ_SIZE(addr->rlinks) || qd_bitmask_cardinality(addr->rnodes) || qdr_is_addr_treatment_multicast(addr) - || !!addr->exchange)) { + || !!addr->exchange + || (!!addr->fallback + && (DEQ_SIZE(addr->fallback->subscriptions) + || DEQ_SIZE(addr->fallback->rlinks) + || qd_bitmask_cardinality(addr->fallback->rnodes))))) { qdr_link_issue_credit_CT(core, link, link->capacity, false); } @@ -405,6 +436,7 @@ static void qcm_addr_lookup_local_search(qcm_lookup_client_t *client, qcm_addr_l bool link_route; bool unavailable; bool core_endpoint; + bool fallback; qdr_connection_t *conn = safe_deref_qdr_connection_t(request->conn_sp); qdr_link_t *link = safe_deref_qdr_link_t(request->link_sp); @@ -420,7 +452,8 @@ static void qcm_addr_lookup_local_search(qcm_lookup_client_t *client, qcm_addr_l true, &link_route, &unavailable, - &core_endpoint); + &core_endpoint, + &fallback); qdr_link_react_to_first_attach_CT(client->core, conn, addr, @@ -430,7 +463,8 @@ static void qcm_addr_lookup_local_search(qcm_lookup_client_t *client, qcm_addr_l request->target, link_route, unavailable, - core_endpoint); + core_endpoint, + fallback); } @@ -507,6 +541,7 @@ static void qcm_addr_lookup_CT(void *context, bool link_route; bool unavailable; bool core_endpoint; + bool fallback; qdr_terminus_t *term = dir == QD_INCOMING ? target : source; if (client->core->router_mode == QD_ROUTER_MODE_EDGE @@ -535,9 +570,9 @@ static void qcm_addr_lookup_CT(void *context, // If this lookup doesn't meet the criteria for asynchronous action, perform the built-in, synchronous address lookup // qdr_address_t *addr = qdr_lookup_terminus_address_CT(client->core, dir, conn, term, true, true, - &link_route, &unavailable, &core_endpoint); + &link_route, &unavailable, &core_endpoint, &fallback); qdr_link_react_to_first_attach_CT(client->core, conn, addr, link, dir, source, target, - link_route, unavailable, core_endpoint); + link_route, unavailable, core_endpoint, fallback); } diff --git a/src/router_core/modules/edge_router/addr_proxy.c b/src/router_core/modules/edge_router/addr_proxy.c index 8a7a0ee..31f9fdc 100644 --- a/src/router_core/modules/edge_router/addr_proxy.c +++ b/src/router_core/modules/edge_router/addr_proxy.c @@ -89,9 +89,24 @@ static qdr_terminus_t *qdr_terminus_normal(const char *addr) } +static void set_fallback_capability(qdr_terminus_t *term) +{ + qdr_terminus_add_capability(term, QD_CAPABILITY_FALLBACK); +} + + static void set_waypoint_capability(qdr_terminus_t *term, char phase_char, qd_direction_t dir, int in_phase, int out_phase) { - int phase = (int) (phase_char - '0'); + int phase = (int) (phase_char - '0'); + bool fallback = phase_char == QD_ITER_HASH_PHASE_FALLBACK; + char cap[16]; + char suffix[3]; + + if (fallback) { + strncpy(cap, QD_CAPABILITY_FALLBACK, 15); + qdr_terminus_add_capability(term, cap); + return; + } // // For links that are outgoing on the in_phase or incoming on the out_phase, don't set the @@ -112,11 +127,8 @@ static void set_waypoint_capability(qdr_terminus_t *term, char phase_char, qd_di // In all remaining cases, the new links are acting as waypoints. // int ordinal = phase + (dir == QD_OUTGOING ? 0 : 1); - char cap[16]; - char suffix[3]; - strncpy(cap, QD_CAPABILITY_WAYPOINT_DEFAULT, 12); - //strcpy(cap, QD_CAPABILITY_WAYPOINT_DEFAULT); + strncpy(cap, QD_CAPABILITY_WAYPOINT_DEFAULT, 15); suffix[0] = '.'; suffix[1] = '0' + ordinal; suffix[2] = '\0'; @@ -129,13 +141,16 @@ static void add_inlink(qcm_edge_addr_proxy_t *ap, const char *key, qdr_address_t { if (addr->edge_inlink == 0) { qdr_terminus_t *term = qdr_terminus_normal(key + 2); + const char *key = (char*) qd_hash_key_by_handle(addr->hash_handle); + + if (key[1] == QD_ITER_HASH_PHASE_FALLBACK) { + set_fallback_capability(term); - if (addr->config && addr->config->out_phase > 0) { + } else if (addr->config && addr->config->out_phase > 0) { // // If this address is configured as multi-phase, we may need to // add waypoint capabilities to the terminus. // - const char *key = (char*) qd_hash_key_by_handle(addr->hash_handle); if (key[0] == QD_ITER_HASH_PREFIX_MOBILE) set_waypoint_capability(term, key[1], QD_INCOMING, addr->config->in_phase, addr->config->out_phase); } @@ -168,8 +183,12 @@ static void add_outlink(qcm_edge_addr_proxy_t *ap, const char *key, qdr_address_ // for the address (see on_transfer below). // qdr_terminus_t *term = qdr_terminus_normal(key + 2); + const char *key = (char*) qd_hash_key_by_handle(addr->hash_handle); - if (addr->config && addr->config->out_phase > 0) { + if (key[1] == QD_ITER_HASH_PHASE_FALLBACK) { + set_fallback_capability(term); + + } else if (addr->config && addr->config->out_phase > 0) { // // If this address is configured as multi-phase, we may need to // add waypoint capabilities to the terminus. @@ -259,6 +278,9 @@ static void on_conn_event(void *context, qdrc_event_t event, qdr_connection_t *c // if (DEQ_SIZE(addr->rlinks) > 0) { if (DEQ_SIZE(addr->rlinks) == 1) { + // + // If there's only one link and it's on the edge connection, ignore the address. + // qdr_link_ref_t *ref = DEQ_HEAD(addr->rlinks); if (ref->link->conn != ap->edge_conn) add_inlink(ap, key, addr); @@ -270,13 +292,27 @@ static void on_conn_event(void *context, qdrc_event_t event, qdr_connection_t *c // If the address has more than zero attached sources, create an outgoing link // to the interior to signal the presence of local producers. // + bool add = false; if (DEQ_SIZE(addr->inlinks) > 0) { if (DEQ_SIZE(addr->inlinks) == 1) { + // + // If there's only one link and it's on the edge connection, ignore the address. + // qdr_link_ref_t *ref = DEQ_HEAD(addr->inlinks); if (ref->link->conn != ap->edge_conn) - add_outlink(ap, key, addr); + add = true; } else + add = true; + + if (add) { add_outlink(ap, key, addr); + + // + // If the address has a fallback address, add an outlink for that as well + // + if (!!addr->fallback) + add_outlink(ap, key, addr->fallback); + } } } addr = DEQ_NEXT(addr); @@ -345,7 +381,7 @@ static void on_addr_event(void *context, qdrc_event_t event, qdr_address_t *addr case QDRC_EVENT_ADDR_BECAME_SOURCE : link_ref = DEQ_HEAD(addr->inlinks); - if (link_ref->link->conn != ap->edge_conn) + if (!link_ref || link_ref->link->conn != ap->edge_conn) add_outlink(ap, key, addr); break; @@ -359,7 +395,7 @@ static void on_addr_event(void *context, qdrc_event_t event, qdr_address_t *addr case QDRC_EVENT_ADDR_ONE_SOURCE : link_ref = DEQ_HEAD(addr->inlinks); - if (link_ref->link->conn == ap->edge_conn) + if (!link_ref || link_ref->link->conn == ap->edge_conn) del_outlink(ap, addr); break; diff --git a/src/router_core/route_control.c b/src/router_core/route_control.c index 4810b98..3249148 100644 --- a/src/router_core/route_control.c +++ b/src/router_core/route_control.c @@ -256,6 +256,7 @@ static void qdr_auto_link_activate_CT(qdr_core_t *core, qdr_auto_link_t *al, qdr al->link = qdr_create_link_CT(core, conn, QD_LINK_ENDPOINT, al->dir, source, target); al->link->auto_link = al; al->link->phase = al->phase; + al->link->fallback = al->fallback; al->state = QDR_AUTO_LINK_STATE_ATTACHING; } else { @@ -456,7 +457,8 @@ qdr_auto_link_t *qdr_route_add_auto_link_CT(qdr_core_t *core, int phase, qd_parsed_field_t *container_field, qd_parsed_field_t *connection_field, - qd_parsed_field_t *external_addr) + qd_parsed_field_t *external_addr, + bool fallback) { qdr_auto_link_t *al = new_qdr_auto_link_t(); @@ -470,13 +472,15 @@ qdr_auto_link_t *qdr_route_add_auto_link_CT(qdr_core_t *core, al->phase = phase; al->state = QDR_AUTO_LINK_STATE_INACTIVE; al->external_addr = external_addr ? (char*) qd_iterator_copy(qd_parse_raw(external_addr)) : 0; + al->fallback = fallback; // // Find or create an address for the auto_link destination // + char phase_char = dir == QD_OUTGOING ? (fallback ? QD_ITER_HASH_PHASE_FALLBACK : phase + '0') : phase + '0'; qd_iterator_t *iter = qd_parse_raw(addr_field); qd_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH); - qd_iterator_annotate_phase(iter, (char) phase + '0'); + qd_iterator_annotate_phase(iter, phase_char); qd_hash_retrieve(core->addr_hash, iter, (void*) &al->addr); if (!al->addr) { @@ -490,6 +494,12 @@ qdr_auto_link_t *qdr_route_add_auto_link_CT(qdr_core_t *core, al->addr = qdr_address_CT(core, treatment, addr_config); DEQ_INSERT_TAIL(core->addrs, al->addr); qd_hash_insert(core->addr_hash, iter, al->addr, &al->addr->hash_handle); + + // + // If we just created an address that needs a fallback, set up the fallback now. + // + if (!!addr_config && addr_config->fallback && dir == QD_INCOMING) + qdr_setup_fallback_address_CT(core, al->addr); } al->addr->ref_count++; diff --git a/src/router_core/route_control.h b/src/router_core/route_control.h index 8a1039b..51c9900 100644 --- a/src/router_core/route_control.h +++ b/src/router_core/route_control.h @@ -39,9 +39,10 @@ qdr_auto_link_t *qdr_route_add_auto_link_CT(qdr_core_t *core, qd_parsed_field_t *addr_field, qd_direction_t dir, int phase, - qd_parsed_field_t *container_field, - qd_parsed_field_t *connection_field, - qd_parsed_field_t *external_addr); + qd_parsed_field_t *container_field, + qd_parsed_field_t *connection_field, + qd_parsed_field_t *external_addr, + bool fallback); void qdr_route_del_auto_link_CT(qdr_core_t *core, qdr_auto_link_t *auto_link); diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c index 16d7242..500ebe8 100644 --- a/src/router_core/router_core.c +++ b/src/router_core/router_core.c @@ -333,12 +333,13 @@ qdr_address_t *qdr_address_CT(qdr_core_t *core, qd_address_treatment_t treatment { if (treatment == QD_TREATMENT_UNAVAILABLE) return 0; + qdr_address_t *addr = new_qdr_address_t(); ZERO(addr); - addr->config = config; - addr->treatment = treatment; - addr->forwarder = qdr_forwarder_CT(core, treatment); - addr->rnodes = qd_bitmask(0); + addr->config = config; + addr->treatment = treatment; + addr->forwarder = qdr_forwarder_CT(core, treatment); + addr->rnodes = qd_bitmask(0); addr->add_prefix = 0; addr->del_prefix = 0; addr->priority = -1; @@ -518,6 +519,11 @@ void qdr_core_remove_address(qdr_core_t *core, qdr_address_t *addr) cr = DEQ_HEAD(addr->conns); } + if (!!addr->fallback) { + addr->fallback->fallback_for = 0; + qdr_check_addr_CT(core, addr->fallback); + } + free(addr->add_prefix); free(addr->del_prefix); free_qdr_address_t(addr); @@ -542,10 +548,15 @@ void qdr_core_bind_address_link_CT(qdr_core_t *core, qdr_address_t *addr, qdr_li qdrc_event_addr_raise(core, QDRC_EVENT_ADDR_TWO_DEST, addr); } else { // link->link_direction == QD_INCOMING qdr_add_link_ref(&addr->inlinks, link, QDR_LINK_LIST_CLASS_ADDRESS); - if (DEQ_SIZE(addr->inlinks) == 1) + if (DEQ_SIZE(addr->inlinks) == 1) { qdrc_event_addr_raise(core, QDRC_EVENT_ADDR_BECAME_SOURCE, addr); - else if (DEQ_SIZE(addr->inlinks) == 2) + if (!!addr->fallback && !link->fallback) + qdrc_event_addr_raise(core, QDRC_EVENT_ADDR_BECAME_SOURCE, addr->fallback); + } else if (DEQ_SIZE(addr->inlinks) == 2) { qdrc_event_addr_raise(core, QDRC_EVENT_ADDR_TWO_SOURCE, addr); + if (!!addr->fallback && !link->fallback) + qdrc_event_addr_raise(core, QDRC_EVENT_ADDR_TWO_SOURCE, addr->fallback); + } } } @@ -566,10 +577,15 @@ void qdr_core_unbind_address_link_CT(qdr_core_t *core, qdr_address_t *addr, qdr_ } else { bool removed = qdr_del_link_ref(&addr->inlinks, link, QDR_LINK_LIST_CLASS_ADDRESS); if (removed) { - if (DEQ_SIZE(addr->inlinks) == 0) + if (DEQ_SIZE(addr->inlinks) == 0) { qdrc_event_addr_raise(core, QDRC_EVENT_ADDR_NO_LONGER_SOURCE, addr); - else if (DEQ_SIZE(addr->inlinks) == 1) + if (!!addr->fallback && !link->fallback) + qdrc_event_addr_raise(core, QDRC_EVENT_ADDR_NO_LONGER_SOURCE, addr->fallback); + } else if (DEQ_SIZE(addr->inlinks) == 1) { qdrc_event_addr_raise(core, QDRC_EVENT_ADDR_ONE_SOURCE, addr); + if (!!addr->fallback && !link->fallback) + qdrc_event_addr_raise(core, QDRC_EVENT_ADDR_ONE_SOURCE, addr->fallback); + } } } } @@ -597,6 +613,57 @@ void qdr_core_unbind_address_conn_CT(qdr_core_t *core, qdr_address_t *addr, qdr_ } +/** + * Search for, and possibly create, the fallback address based on the + * fallback flag in the address's configuration. This will be used in + * the forwarding paths to handle undeliverable messages with fallback destinations. + */ +void qdr_setup_fallback_address_CT(qdr_core_t *core, qdr_address_t *addr) +{ +#define QDR_SETUP_FALLBACK_BUFFER_SIZE 256 + char buffer[QDR_SETUP_FALLBACK_BUFFER_SIZE]; + char *alt_text = buffer; + bool buffer_on_heap = false; + + char *address_text = (char*) qd_hash_key_by_handle(addr->hash_handle); + size_t alt_length = strlen(address_text) + 1; + + // + // If this is a fallback address for a primary address that hasn't been seen + // yet, simply exit without doing anything. + // + if (address_text[1] == QD_ITER_HASH_PHASE_FALLBACK) + return; + + if (alt_length > QDR_SETUP_FALLBACK_BUFFER_SIZE) { + alt_text = (char*) malloc(alt_length); + buffer_on_heap = true; + } + + strcpy(alt_text, address_text); + alt_text[1] = QD_ITER_HASH_PHASE_FALLBACK; + + qd_iterator_t *alt_iter = qd_iterator_string(alt_text, ITER_VIEW_ALL); + qdr_address_t *alt_addr = 0; + + qd_hash_retrieve(core->addr_hash, alt_iter, (void**) &alt_addr); + if (!alt_addr) { + alt_addr = qdr_address_CT(core, QD_TREATMENT_ANYCAST_BALANCED, 0); + qd_hash_insert(core->addr_hash, alt_iter, alt_addr, &alt_addr->hash_handle); + DEQ_INSERT_TAIL(core->addrs, alt_addr); + } + + assert(alt_addr != addr); + assert(alt_addr->fallback_for == 0); + addr->fallback = alt_addr; + alt_addr->fallback_for = addr; + + qd_iterator_free(alt_iter); + if (buffer_on_heap) + free(alt_text); +} + + void qdr_core_remove_address_config(qdr_core_t *core, qdr_address_config_t *addr) { qd_iterator_t *pattern = qd_iterator_string(addr->pattern, ITER_VIEW_ALL); @@ -778,6 +845,7 @@ static void qdr_global_stats_request_CT(qdr_core_t *core, qdr_action_t *action, stats->deliveries_egress_route_container = core->deliveries_egress_route_container; stats->deliveries_delayed_1sec = core->deliveries_delayed_1sec; stats->deliveries_delayed_1sec = core->deliveries_delayed_10sec; + stats->deliveries_redirected_to_fallback = core->deliveries_redirected; } qdr_general_work_t *work = qdr_general_work(qdr_post_global_stats_response); work->stats_handler = action->args.stats_request.handler; diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h index 9cdb25f..ef137d6 100644 --- a/src/router_core/router_core_private.h +++ b/src/router_core/router_core_private.h @@ -444,6 +444,7 @@ struct qdr_link_t { bool edge; ///< True if this link is in an edge-connection bool processing; ///< True if an IO thread is currently handling this link bool ready_to_free; ///< True if the core thread wanted to clean up the link but it was processing + bool fallback; ///< True if this link is attached to a fallback destination for an address char *strip_prefix; char *insert_prefix; bool terminus_survives_disconnect; @@ -511,6 +512,12 @@ struct qdr_address_t { uint64_t cost_epoch; // + // State for tracking fallback destinations for undeliverable deliveries + // + qdr_address_t *fallback; ///< Pointer to this address's fallback destination + qdr_address_t *fallback_for; ///< Pointer to the address that this is a fallback for + + // // State for "closest" treatment // qd_bitmask_t *closest_remotes; @@ -541,6 +548,7 @@ struct qdr_address_t { uint64_t deliveries_from_container; uint64_t deliveries_egress_route_container; uint64_t deliveries_ingress_route_container; + uint64_t deliveries_redirected; ///@} @@ -557,6 +565,7 @@ void qdr_core_bind_address_link_CT(qdr_core_t *core, qdr_address_t *addr, qdr_li void qdr_core_unbind_address_link_CT(qdr_core_t *core, qdr_address_t *addr, qdr_link_t *link); void qdr_core_bind_address_conn_CT(qdr_core_t *core, qdr_address_t *addr, qdr_connection_t *conn); void qdr_core_unbind_address_conn_CT(qdr_core_t *core, qdr_address_t *addr, qdr_connection_t *conn); +void qdr_setup_fallback_address_CT(qdr_core_t *core, qdr_address_t *addr); struct qdr_address_config_t { DEQ_LINKS(qdr_address_config_t); @@ -564,6 +573,7 @@ struct qdr_address_config_t { uint64_t identity; uint32_t ref_count; char *pattern; + bool fallback; bool is_prefix; qd_address_treatment_t treatment; int in_phase; @@ -717,6 +727,7 @@ struct qdr_auto_link_t { qdr_auto_link_state_t state; qdr_core_timer_t *retry_timer; // If the auto link attach fails or gets disconnected, this timer retries the attach. char *last_error; + bool fallback; // True iff this auto-link attaches to a fallback destination for an address. }; DEQ_DECLARE(qdr_auto_link_t, qdr_auto_link_list_t); @@ -838,19 +849,20 @@ struct qdr_core_t { qdr_delivery_cleanup_list_t delivery_cleanup_list; ///< List of delivery cleanup items to be processed in an IO thread // Overall delivery counters - uint64_t presettled_deliveries; - uint64_t dropped_presettled_deliveries; - uint64_t accepted_deliveries; - uint64_t rejected_deliveries; - uint64_t released_deliveries; - uint64_t modified_deliveries; - uint64_t deliveries_ingress; - uint64_t deliveries_egress; - uint64_t deliveries_transit; - uint64_t deliveries_egress_route_container; - uint64_t deliveries_ingress_route_container; - uint64_t deliveries_delayed_1sec; - uint64_t deliveries_delayed_10sec; + uint64_t presettled_deliveries; + uint64_t dropped_presettled_deliveries; + uint64_t accepted_deliveries; + uint64_t rejected_deliveries; + uint64_t released_deliveries; + uint64_t modified_deliveries; + uint64_t deliveries_ingress; + uint64_t deliveries_egress; + uint64_t deliveries_transit; + uint64_t deliveries_egress_route_container; + uint64_t deliveries_ingress_route_container; + uint64_t deliveries_delayed_1sec; + uint64_t deliveries_delayed_10sec; + uint64_t deliveries_redirected; qdr_edge_conn_addr_t edge_conn_addr; void *edge_context; diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c index 1563510..f23f0b9 100644 --- a/src/router_core/transfer.c +++ b/src/router_core/transfer.c @@ -386,6 +386,9 @@ static void qdr_link_flow_CT(qdr_core_t *core, qdr_action_t *action, bool discar */ static long qdr_addr_path_count_CT(qdr_address_t *addr) { + if (!addr) + return 0; + long rc = ((long) DEQ_SIZE(addr->subscriptions) + (long) DEQ_SIZE(addr->rlinks) + (long) qd_bitmask_cardinality(addr->rnodes)); @@ -400,20 +403,23 @@ static void qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery { qdr_link_t *dlv_link = qdr_delivery_link(dlv); + assert(dlv_link == link); + if (!dlv_link) return; - if (dlv_link->link_type == QD_LINK_ENDPOINT) + if (dlv_link->link_type == QD_LINK_ENDPOINT && !dlv_link->fallback) core->deliveries_ingress++; - if (addr && addr == link->owning_addr && qdr_addr_path_count_CT(addr) == 0) { + if (addr + && addr == link->owning_addr + && qdr_addr_path_count_CT(addr) == 0 + && (link->fallback || qdr_addr_path_count_CT(addr->fallback) == 0)) { // // We are trying to forward a delivery on an address that has no outbound paths // AND the incoming link is targeted (not anonymous). // - // We shall release the delivery (it is currently undeliverable). If the distribution is - // multicast or it's on an edge connection, we will replenish the credit. Otherwise, we - // will allow the credit to drain. + // We shall release the delivery (it is currently undeliverable). // if (dlv->settled) { // Increment the presettled_dropped_deliveries on the in_link @@ -437,6 +443,10 @@ static void qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery qdr_link_issue_credit_CT(core, link, 0, true); } + // + // If the distribution is multicast or it's on an edge connection, we will replenish the credit. + // Otherwise, we will allow the credit to drain. + // if (link->edge || qdr_is_addr_treatment_multicast(link->owning_addr)) qdr_link_issue_credit_CT(core, link, 1, false); else @@ -453,7 +463,8 @@ static void qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery if (addr) { fanout = qdr_forward_message_CT(core, addr, dlv->msg, dlv, false, link->link_type == QD_LINK_CONTROL); if (link->link_type != QD_LINK_CONTROL && link->link_type != QD_LINK_ROUTER) { - addr->deliveries_ingress++; + if (!link->fallback) + addr->deliveries_ingress++; if (qdr_connection_route_container(link->conn)) { addr->deliveries_ingress_route_container++; @@ -463,6 +474,7 @@ static void qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery } link->total_deliveries++; } + // // There is no address that we can send this delivery to, which means the addr was not found in our hastable. This // can be because there were no receivers or because the address was not defined in the config file. @@ -472,6 +484,7 @@ static void qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery dlv->disposition = PN_REJECTED; dlv->error = qdr_error(QD_AMQP_COND_NOT_FOUND, "Deliveries cannot be sent to an unavailable address"); qdr_delivery_push_CT(core, dlv); + // // We will not detach this link because this could be anonymous sender. We don't know // which address the sender will be sending to next @@ -487,9 +500,26 @@ static void qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery if (fanout == 0 && !dlv->multicast && link->owning_addr == 0 && dlv->to_addr != 0) { if (core->edge_conn_addr && link->conn->role != QDR_ROLE_EDGE_CONNECTION) { qdr_address_t *sender_address = core->edge_conn_addr(core->edge_context); - if (sender_address && sender_address != addr) { + if (sender_address && sender_address != addr) fanout += qdr_forward_message_CT(core, sender_address, dlv->msg, dlv, false, link->link_type == QD_LINK_CONTROL); - } + } + } + + // + // If the fanout is still zero, check to see if there is a fallback address and + // route via the fallback if present. Don't do fallback forwarding if this link is + // itself associated with a fallback destination. + // + if (fanout == 0 && !!addr && !!addr->fallback && !link->fallback) { + const char *key = (const char*) qd_hash_key_by_handle(addr->fallback->hash_handle); + qd_composed_field_t *to_field = qd_compose_subfield(0); + qd_compose_insert_string(to_field, key + 2); + qd_message_set_to_override_annotation(dlv->msg, to_field); + qd_message_set_phase_annotation(dlv->msg, key[1] - '0'); + fanout = qdr_forward_message_CT(core, addr->fallback, dlv->msg, dlv, false, link->link_type == QD_LINK_CONTROL); + if (fanout > 0) { + addr->deliveries_redirected++; + core->deliveries_redirected++; } } @@ -692,6 +722,7 @@ static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool dis free_qdr_link_ref_t(temp_rlink); } } else { + assert(false); // // Take the action reference and use it for undelivered. Don't decref/incref. // @@ -801,13 +832,7 @@ void qdr_drain_inbound_undelivered_CT(qdr_core_t *core, qdr_link_t *link, qdr_ad */ void qdr_addr_start_inlinks_CT(qdr_core_t *core, qdr_address_t *addr) { - // - // If there aren't any inlinks, there's no point in proceeding. - // - if (DEQ_SIZE(addr->inlinks) == 0) - return; - - if (qdr_addr_path_count_CT(addr) == 1) { + if (qdr_addr_path_count_CT(addr) == 1 || (!!addr->fallback && qdr_addr_path_count_CT(addr->fallback) == 1)) { qdr_link_ref_t *ref = DEQ_HEAD(addr->inlinks); while (ref) { qdr_link_t *link = ref->link; @@ -825,5 +850,8 @@ void qdr_addr_start_inlinks_CT(qdr_core_t *core, qdr_address_t *addr) ref = DEQ_NEXT(ref); } + + if (!!addr->fallback_for) + qdr_addr_start_inlinks_CT(core, addr->fallback_for); } } diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 9fa941e..2fcff52 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -126,6 +126,7 @@ foreach(py_test_module system_tests_address_lookup system_tests_multi_phase system_tests_multicast + system_tests_fallback_dest ) add_test(${py_test_module} ${TEST_WRAP} unit2 -v ${py_test_module}) diff --git a/tests/policy-3/test-sender-receiver-limits.json b/tests/policy-3/test-sender-receiver-limits.json index 0668800..3244e9a 100644 --- a/tests/policy-3/test-sender-receiver-limits.json +++ b/tests/policy-3/test-sender-receiver-limits.json @@ -92,5 +92,51 @@ } } } + ], + ["vhost", { + "hostname": "capabilities1.host.com", + "maxConnections": 50, + "maxConnectionsPerUser": 2, + "maxConnectionsPerHost": 4, + "allowUnknownUser": true, + "groups": { + "$default" : { + "remoteHosts": "*", + "maxFrameSize": 222222, + "maxMessageSize": 222222, + "maxSessionWindow": 222222, + "maxSessions": 2, + "maxSenders": 3, + "maxReceivers": 5, + "allowWaypointLinks": true, + "allowFallbackLinks": false, + "sources": "*", + "targets": "*" + } + } + } + ], + ["vhost", { + "hostname": "capabilities2.host.com", + "maxConnections": 50, + "maxConnectionsPerUser": 2, + "maxConnectionsPerHost": 4, + "allowUnknownUser": true, + "groups": { + "$default" : { + "remoteHosts": "*", + "maxFrameSize": 222222, + "maxMessageSize": 222222, + "maxSessionWindow": 222222, + "maxSessions": 2, + "maxSenders": 3, + "maxReceivers": 5, + "allowWaypointLinks": false, + "allowFallbackLinks": true, + "sources": "*", + "targets": "*" + } + } + } ] ] diff --git a/tests/system_tests_fallback_dest.py b/tests/system_tests_fallback_dest.py new file mode 100644 index 0000000..2915aac --- /dev/null +++ b/tests/system_tests_fallback_dest.py @@ -0,0 +1,781 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from __future__ import unicode_literals +from __future__ import division +from __future__ import absolute_import +from __future__ import print_function + +from time import sleep +from threading import Event +from threading import Timer + +import unittest2 as unittest +from proton import Message, Timeout, symbol +from system_test import TestCase, Qdrouterd, main_module, TIMEOUT, MgmtMsgProxy +from system_test import AsyncTestReceiver +from system_test import AsyncTestSender +from system_test import QdManager +from system_tests_link_routes import ConnLinkRouteService +from proton.handlers import MessagingHandler +from proton.reactor import Container, DynamicNodeProperties +from proton.utils import BlockingConnection +from qpid_dispatch.management.client import Node +from subprocess import PIPE, STDOUT +import re + + +class AddrTimer(object): + def __init__(self, parent): + self.parent = parent + + def on_timer_task(self, event): + self.parent.check_address() + + +class RouterTest(TestCase): + + inter_router_port = None + + @classmethod + def setUpClass(cls): + """Start a router""" + super(RouterTest, cls).setUpClass() + + def router(name, mode, connection, extra=None): + config = [ + ('router', {'mode': mode, 'id': name}), + ('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'no'}), + ('listener', {'port': cls.tester.get_port(), 'role': 'route-container', 'name': 'WP'}), + ('address', {'prefix': 'dest', 'enableFallback': 'yes'}), + ('autoLink', {'connection': 'WP', 'address': 'dest.al', 'dir': 'out', 'fallback': 'yes'}), + ('autoLink', {'connection': 'WP', 'address': 'dest.al', 'dir': 'in', 'fallback': 'yes'}), + connection + ] + + if extra: + config.append(extra) + config = Qdrouterd.Config(config) + cls.routers.append(cls.tester.qdrouterd(name, config, wait=True)) + + cls.routers = [] + + inter_router_port = cls.tester.get_port() + edge_port_A = cls.tester.get_port() + edge_port_B = cls.tester.get_port() + + router('INT.A', 'interior', ('listener', {'role': 'inter-router', 'port': inter_router_port}), + ('listener', {'role': 'edge', 'port': edge_port_A})) + router('INT.B', 'interior', ('connector', {'name': 'connectorToA', 'role': 'inter-router', 'port': inter_router_port}), + ('listener', {'role': 'edge', 'port': edge_port_B})) + router('EA1', 'edge', ('connector', {'name': 'edge', 'role': 'edge', 'port': edge_port_A})) + router('EA2', 'edge', ('connector', {'name': 'edge', 'role': 'edge', 'port': edge_port_A})) + router('EB1', 'edge', ('connector', {'name': 'edge', 'role': 'edge', 'port': edge_port_B})) + router('EB2', 'edge', ('connector', {'name': 'edge', 'role': 'edge', 'port': edge_port_B})) + + cls.routers[0].wait_router_connected('INT.B') + cls.routers[1].wait_router_connected('INT.A') + + + def test_01_sender_first_primary_same_interior(self): + test = SenderFirstTest(self.routers[0].addresses[0], + self.routers[0].addresses[0], + 'dest.01', False) + test.run() + self.assertEqual(None, test.error) + + def test_02_sender_first_fallback_same_interior(self): + test = SenderFirstTest(self.routers[0].addresses[0], + self.routers[0].addresses[0], + 'dest.02', True) + test.run() + self.assertEqual(None, test.error) + + def test_03_sender_first_primary_same_edge(self): + test = SenderFirstTest(self.routers[2].addresses[0], + self.routers[2].addresses[0], + 'dest.03', False) + test.run() + self.assertEqual(None, test.error) + + def test_04_sender_first_fallback_same_edge(self): + test = SenderFirstTest(self.routers[2].addresses[0], + self.routers[2].addresses[0], + 'dest.04', True) + test.run() + self.assertEqual(None, test.error) + + def test_05_sender_first_primary_interior_interior(self): + test = SenderFirstTest(self.routers[0].addresses[0], + self.routers[1].addresses[0], + 'dest.05', False) + test.run() + self.assertEqual(None, test.error) + + def test_06_sender_first_fallback_interior_interior(self): + test = SenderFirstTest(self.routers[0].addresses[0], + self.routers[1].addresses[0], + 'dest.06', True) + test.run() + self.assertEqual(None, test.error) + + def test_07_sender_first_primary_edge_interior(self): + test = SenderFirstTest(self.routers[2].addresses[0], + self.routers[1].addresses[0], + 'dest.07', False) + test.run() + self.assertEqual(None, test.error) + + def test_08_sender_first_fallback_edge_interior(self): + test = SenderFirstTest(self.routers[2].addresses[0], + self.routers[1].addresses[0], + 'dest.08', True) + test.run() + self.assertEqual(None, test.error) + + def test_09_sender_first_primary_interior_edge(self): + test = SenderFirstTest(self.routers[1].addresses[0], + self.routers[2].addresses[0], + 'dest.09', False) + test.run() + self.assertEqual(None, test.error) + + def test_10_sender_first_fallback_interior_edge(self): + test = SenderFirstTest(self.routers[1].addresses[0], + self.routers[2].addresses[0], + 'dest.10', True) + test.run() + self.assertEqual(None, test.error) + + def test_11_sender_first_primary_edge_edge(self): + test = SenderFirstTest(self.routers[2].addresses[0], + self.routers[4].addresses[0], + 'dest.11', False) + test.run() + self.assertEqual(None, test.error) + + def test_12_sender_first_fallback_edge_edge(self): + test = SenderFirstTest(self.routers[2].addresses[0], + self.routers[4].addresses[0], + 'dest.12', True) + test.run() + self.assertEqual(None, test.error) + + def test_13_receiver_first_primary_same_interior(self): + test = ReceiverFirstTest(self.routers[0].addresses[0], + self.routers[0].addresses[0], + 'dest.13', False) + test.run() + self.assertEqual(None, test.error) + + def test_14_receiver_first_fallback_same_interior(self): + test = ReceiverFirstTest(self.routers[0].addresses[0], + self.routers[0].addresses[0], + 'dest.14', True) + test.run() + self.assertEqual(None, test.error) + + def test_15_receiver_first_primary_same_edge(self): + test = ReceiverFirstTest(self.routers[2].addresses[0], + self.routers[2].addresses[0], + 'dest.15', False) + test.run() + self.assertEqual(None, test.error) + + def test_16_receiver_first_fallback_same_edge(self): + test = ReceiverFirstTest(self.routers[2].addresses[0], + self.routers[2].addresses[0], + 'dest.16', True) + test.run() + self.assertEqual(None, test.error) + + def test_17_receiver_first_primary_interior_interior(self): + test = ReceiverFirstTest(self.routers[0].addresses[0], + self.routers[1].addresses[0], + 'dest.17', False) + test.run() + self.assertEqual(None, test.error) + + def test_18_receiver_first_fallback_interior_interior(self): + test = ReceiverFirstTest(self.routers[0].addresses[0], + self.routers[1].addresses[0], + 'dest.18', True) + test.run() + self.assertEqual(None, test.error) + + def test_19_receiver_first_primary_edge_interior(self): + test = ReceiverFirstTest(self.routers[2].addresses[0], + self.routers[1].addresses[0], + 'dest.19', False) + test.run() + self.assertEqual(None, test.error) + + def test_20_receiver_first_fallback_edge_interior(self): + test = ReceiverFirstTest(self.routers[2].addresses[0], + self.routers[1].addresses[0], + 'dest.20', True) + test.run() + self.assertEqual(None, test.error) + + def test_21_receiver_first_primary_interior_edge(self): + test = ReceiverFirstTest(self.routers[1].addresses[0], + self.routers[2].addresses[0], + 'dest.21', False) + test.run() + self.assertEqual(None, test.error) + + def test_22_receiver_first_fallback_interior_edge(self): + test = ReceiverFirstTest(self.routers[1].addresses[0], + self.routers[2].addresses[0], + 'dest.22', True) + test.run() + self.assertEqual(None, test.error) + + def test_23_receiver_first_primary_edge_edge(self): + test = ReceiverFirstTest(self.routers[2].addresses[0], + self.routers[4].addresses[0], + 'dest.23', False) + test.run() + self.assertEqual(None, test.error) + + def test_24_receiver_first_fallback_edge_edge(self): + test = ReceiverFirstTest(self.routers[2].addresses[0], + self.routers[4].addresses[0], + 'dest.24', True) + test.run() + self.assertEqual(None, test.error) + + def test_25_switchover_same_edge(self): + test = SwitchoverTest(self.routers[2].addresses[0], + self.routers[2].addresses[0], + self.routers[2].addresses[0], + 'dest.25') + test.run() + self.assertEqual(None, test.error) + + def test_26_switchover_same_interior(self): + test = SwitchoverTest(self.routers[0].addresses[0], + self.routers[0].addresses[0], + self.routers[0].addresses[0], + 'dest.26') + test.run() + self.assertEqual(None, test.error) + + def test_27_switchover_local_edge_alt_remote_interior(self): + test = SwitchoverTest(self.routers[2].addresses[0], + self.routers[0].addresses[0], + self.routers[2].addresses[0], + 'dest.27') + test.run() + self.assertEqual(None, test.error) + + def test_28_switchover_local_edge_alt_remote_edge(self): + test = SwitchoverTest(self.routers[2].addresses[0], + self.routers[4].addresses[0], + self.routers[2].addresses[0], + 'dest.28') + test.run() + self.assertEqual(None, test.error) + + def test_29_switchover_local_edge_pri_remote_interior(self): + test = SwitchoverTest(self.routers[2].addresses[0], + self.routers[2].addresses[0], + self.routers[0].addresses[0], + 'dest.29') + test.run() + self.assertEqual(None, test.error) + + def test_30_switchover_local_interior_pri_remote_edge(self): + test = SwitchoverTest(self.routers[2].addresses[0], + self.routers[2].addresses[0], + self.routers[4].addresses[0], + 'dest.30') + test.run() + self.assertEqual(None, test.error) + + def test_31_switchover_local_interior_alt_remote_interior(self): + test = SwitchoverTest(self.routers[1].addresses[0], + self.routers[0].addresses[0], + self.routers[1].addresses[0], + 'dest.31') + test.run() + self.assertEqual(None, test.error) + + def test_32_switchover_local_interior_alt_remote_edge(self): + test = SwitchoverTest(self.routers[1].addresses[0], + self.routers[3].addresses[0], + self.routers[1].addresses[0], + 'dest.32') + test.run() + self.assertEqual(None, test.error) + + def test_33_switchover_local_interior_pri_remote_interior(self): + test = SwitchoverTest(self.routers[1].addresses[0], + self.routers[1].addresses[0], + self.routers[0].addresses[0], + 'dest.33') + test.run() + self.assertEqual(None, test.error) + + def test_34_switchover_local_interior_pri_remote_edge(self): + test = SwitchoverTest(self.routers[1].addresses[0], + self.routers[1].addresses[0], + self.routers[4].addresses[0], + 'dest.34') + test.run() + self.assertEqual(None, test.error) + + def test_35_switchover_mix_1(self): + test = SwitchoverTest(self.routers[0].addresses[0], + self.routers[1].addresses[0], + self.routers[2].addresses[0], + 'dest.35') + test.run() + self.assertEqual(None, test.error) + + def test_36_switchover_mix_2(self): + test = SwitchoverTest(self.routers[2].addresses[0], + self.routers[1].addresses[0], + self.routers[0].addresses[0], + 'dest.36') + test.run() + self.assertEqual(None, test.error) + + def test_37_switchover_mix_3(self): + test = SwitchoverTest(self.routers[2].addresses[0], + self.routers[1].addresses[0], + self.routers[4].addresses[0], + 'dest.37') + test.run() + self.assertEqual(None, test.error) + + def test_38_switchover_mix_4(self): + test = SwitchoverTest(self.routers[2].addresses[0], + self.routers[3].addresses[0], + self.routers[4].addresses[0], + 'dest.38') + test.run() + self.assertEqual(None, test.error) + + def test_39_auto_link_sender_first_fallback_same_interior(self): + test = SenderFirstAutoLinkTest(self.routers[0].addresses[0], + self.routers[0].addresses[1]) + test.run() + self.assertEqual(None, test.error) + + def test_40_auto_link_sender_first_fallback_same_edge(self): + test = SenderFirstAutoLinkTest(self.routers[2].addresses[0], + self.routers[2].addresses[1]) + test.run() + self.assertEqual(None, test.error) + + def test_41_auto_link_sender_first_fallback_interior_interior(self): + test = SenderFirstAutoLinkTest(self.routers[0].addresses[0], + self.routers[1].addresses[1]) + test.run() + self.assertEqual(None, test.error) + + def test_42_auto_link_sender_first_fallback_edge_interior(self): + test = SenderFirstAutoLinkTest(self.routers[2].addresses[0], + self.routers[0].addresses[1]) + test.run() + self.assertEqual(None, test.error) + + def test_43_auto_link_sender_first_fallback_interior_edge(self): + test = SenderFirstAutoLinkTest(self.routers[1].addresses[0], + self.routers[2].addresses[1]) + test.run() + self.assertEqual(None, test.error) + + def test_44_auto_link_sender_first_fallback_edge_edge(self): + test = SenderFirstAutoLinkTest(self.routers[2].addresses[0], + self.routers[4].addresses[1]) + test.run() + self.assertEqual(None, test.error) + + def test_45_auto_link_receiver_first_fallback_same_interior(self): + test = ReceiverFirstAutoLinkTest(self.routers[0].addresses[0], + self.routers[0].addresses[1]) + test.run() + self.assertEqual(None, test.error) + + def test_46_auto_link_receiver_first_fallback_same_edge(self): + test = ReceiverFirstAutoLinkTest(self.routers[2].addresses[0], + self.routers[2].addresses[1]) + test.run() + self.assertEqual(None, test.error) + + def test_47_auto_link_receiver_first_fallback_interior_interior(self): + test = ReceiverFirstAutoLinkTest(self.routers[0].addresses[0], + self.routers[1].addresses[1]) + test.run() + self.assertEqual(None, test.error) + + def test_48_auto_link_receiver_first_fallback_edge_interior(self): + test = ReceiverFirstAutoLinkTest(self.routers[2].addresses[0], + self.routers[1].addresses[1]) + test.run() + self.assertEqual(None, test.error) + + def test_49_auto_link_receiver_first_fallback_interior_edge(self): + test = ReceiverFirstAutoLinkTest(self.routers[1].addresses[0], + self.routers[2].addresses[1]) + test.run() + self.assertEqual(None, test.error) + + def test_50_auto_link_receiver_first_fallback_edge_edge(self): + test = ReceiverFirstAutoLinkTest(self.routers[2].addresses[0], + self.routers[4].addresses[1]) + test.run() + self.assertEqual(None, test.error) + + +class Timeout(object): + def __init__(self, parent): + self.parent = parent + + def on_timer_task(self, event): + self.parent.timeout() + + +class SenderFirstTest(MessagingHandler): + def __init__(self, sender_host, receiver_host, addr, rx_fallback): + super(SenderFirstTest, self).__init__() + self.sender_host = sender_host + self.receiver_host = receiver_host + self.addr = addr + self.rx_fallback = rx_fallback + self.count = 300 + + self.sender_conn = None + self.receiver_conn = None + self.error = None + self.n_tx = 0 + self.n_rx = 0 + self.n_rel = 0 + + def timeout(self): + self.error = "Timeout Expired - n_tx=%d, n_rx=%d, n_rel=%d" % (self.n_tx, self.n_rx, self.n_rel) + self.sender_conn.close() + self.receiver_conn.close() + + def fail(self, error): + self.error = error + self.sender_conn.close() + self.receiver_conn.close() + self.timer.cancel() + + def on_start(self, event): + self.timer = event.reactor.schedule(10.0, Timeout(self)) + self.sender_conn = event.container.connect(self.sender_host) + self.receiver_conn = event.container.connect(self.receiver_host) + self.sender = event.container.create_sender(self.sender_conn, self.addr) + + def on_link_opened(self, event): + if event.sender == self.sender: + self.receiver = event.container.create_receiver(self.receiver_conn, self.addr) + if self.rx_fallback: + self.receiver.source.capabilities.put_symbol("qd.fallback") + + def on_sendable(self, event): + if event.sender == self.sender: + while self.sender.credit > 0 and self.n_tx < self.count: + self.sender.send(Message("Message %d" % self.n_tx)) + self.n_tx += 1 + + def on_message(self, event): + if event.receiver == self.receiver: + self.n_rx += 1 + if self.n_rx == self.count: + self.fail(None) + + def on_released(self, event): + self.n_rel += 1 + + def run(self): + Container(self).run() + + +class ReceiverFirstTest(MessagingHandler): + def __init__(self, sender_host, receiver_host, addr, rx_fallback): + super(ReceiverFirstTest, self).__init__() + self.sender_host = sender_host + self.receiver_host = receiver_host + self.addr = addr + self.rx_fallback = rx_fallback + self.count = 300 + + self.sender_conn = None + self.receiver_conn = None + self.error = None + self.n_tx = 0 + self.n_rx = 0 + self.n_rel = 0 + + def timeout(self): + self.error = "Timeout Expired - n_tx=%d, n_rx=%d, n_rel=%d" % (self.n_tx, self.n_rx, self.n_rel) + self.sender_conn.close() + self.receiver_conn.close() + + def fail(self, error): + self.error = error + self.sender_conn.close() + self.receiver_conn.close() + self.timer.cancel() + + def on_start(self, event): + self.timer = event.reactor.schedule(10.0, Timeout(self)) + self.sender_conn = event.container.connect(self.sender_host) + self.receiver_conn = event.container.connect(self.receiver_host) + self.receiver = event.container.create_receiver(self.receiver_conn, self.addr) + if self.rx_fallback: + self.receiver.source.capabilities.put_symbol("qd.fallback") + + def on_link_opened(self, event): + if event.receiver == self.receiver: + self.sender = event.container.create_sender(self.sender_conn, self.addr) + + def on_sendable(self, event): + if event.sender == self.sender: + while self.sender.credit > 0 and self.n_tx < self.count: + self.sender.send(Message("Message %d" % self.n_tx)) + self.n_tx += 1 + + def on_message(self, event): + if event.receiver == self.receiver: + self.n_rx += 1 + if self.n_rx == self.count: + self.fail(None) + + def on_released(self, event): + self.n_rel += 1 + + def run(self): + Container(self).run() + + +class SwitchoverTest(MessagingHandler): + def __init__(self, sender_host, primary_host, fallback_host, addr): + super(SwitchoverTest, self).__init__() + self.sender_host = sender_host + self.primary_host = primary_host + self.fallback_host = fallback_host + self.addr = addr + self.count = 300 + + self.sender_conn = None + self.primary_conn = None + self.fallback_conn = None + self.error = None + self.n_tx = 0 + self.n_rx = 0 + self.n_rel = 0 + self.phase = 0 + + def timeout(self): + self.error = "Timeout Expired - n_tx=%d, n_rx=%d, n_rel=%d, phase=%d" % (self.n_tx, self.n_rx, self.n_rel, self.phase) + self.sender_conn.close() + self.primary_conn.close() + self.fallback_conn.close() + + def fail(self, error): + self.error = error + self.sender_conn.close() + self.primary_conn.close() + self.fallback_conn.close() + self.timer.cancel() + + def on_start(self, event): + self.timer = event.reactor.schedule(10.0, Timeout(self)) + self.sender_conn = event.container.connect(self.sender_host) + self.primary_conn = event.container.connect(self.primary_host) + self.fallback_conn = event.container.connect(self.fallback_host) + self.primary_receiver = event.container.create_receiver(self.primary_conn, self.addr) + self.fallback_receiver = event.container.create_receiver(self.primary_conn, self.addr, name=self.addr) + self.fallback_receiver.source.capabilities.put_object(symbol("qd.fallback")) + + def on_link_opened(self, event): + if event.receiver == self.primary_receiver: + self.sender = event.container.create_sender(self.sender_conn, self.addr) + + def on_link_closed(self, event): + if event.receiver == self.primary_receiver: + self.n_rx = 0 + self.n_tx = 0 + self.send() + + def send(self): + while self.sender.credit > 0 and self.n_tx < self.count: + self.sender.send(Message("Message %d" % self.n_tx)) + self.n_tx += 1 + + def on_sendable(self, event): + if event.sender == self.sender: + self.send() + + def on_message(self, event): + self.n_rx += 1 + if self.n_rx == self.count: + if self.phase == 0: + self.phase = 1 + self.primary_receiver.close() + else: + self.fail(None) + + def on_released(self, event): + self.n_rel += 1 + self.n_tx -= 1 + + def run(self): + Container(self).run() + + +class SenderFirstAutoLinkTest(MessagingHandler): + def __init__(self, sender_host, receiver_host): + super(SenderFirstAutoLinkTest, self).__init__() + self.sender_host = sender_host + self.receiver_host = receiver_host + self.addr = "dest.al" + self.count = 300 + + self.sender_conn = None + self.receiver_conn = None + self.error = None + self.n_tx = 0 + self.n_rx = 0 + self.n_rel = 0 + + def timeout(self): + self.error = "Timeout Expired - n_tx=%d, n_rx=%d, n_rel=%d" % (self.n_tx, self.n_rx, self.n_rel) + self.sender_conn.close() + self.receiver_conn.close() + + def fail(self, error): + self.error = error + self.sender_conn.close() + self.receiver_conn.close() + self.timer.cancel() + + def on_start(self, event): + self.timer = event.reactor.schedule(10.0, Timeout(self)) + self.sender_conn = event.container.connect(self.sender_host) + self.sender = event.container.create_sender(self.sender_conn, self.addr) + + def on_link_opening(self, event): + if event.sender: + self.alt_sender = event.sender + event.sender.source.address = self.addr + event.sender.open() + + elif event.receiver: + self.alt_receiver = event.receiver + event.receiver.target.address = self.addr + event.receiver.open() + + def on_link_opened(self, event): + if event.sender == self.sender: + self.receiver_conn = event.container.connect(self.receiver_host) + + def on_sendable(self, event): + if event.sender == self.sender: + while self.sender.credit > 0 and self.n_tx < self.count: + self.sender.send(Message("Message %d" % self.n_tx)) + self.n_tx += 1 + + def on_message(self, event): + self.n_rx += 1 + if self.n_rx == self.count: + self.fail(None) + + def on_released(self, event): + self.n_rel += 1 + self.n_tx -= 1 + + def run(self): + Container(self).run() + + +class ReceiverFirstAutoLinkTest(MessagingHandler): + def __init__(self, sender_host, receiver_host): + super(ReceiverFirstAutoLinkTest, self).__init__() + self.sender_host = sender_host + self.receiver_host = receiver_host + self.addr = "dest.al" + self.count = 300 + + self.sender_conn = None + self.receiver_conn = None + self.alt_receiver = None + self.error = None + self.n_tx = 0 + self.n_rx = 0 + self.n_rel = 0 + + def timeout(self): + self.error = "Timeout Expired - n_tx=%d, n_rx=%d, n_rel=%d" % (self.n_tx, self.n_rx, self.n_rel) + self.sender_conn.close() + self.receiver_conn.close() + + def fail(self, error): + self.error = error + self.sender_conn.close() + self.receiver_conn.close() + self.timer.cancel() + + def on_start(self, event): + self.timer = event.reactor.schedule(10.0, Timeout(self)) + self.receiver_conn = event.container.connect(self.receiver_host) + + def on_link_opening(self, event): + if event.sender: + self.alt_sender = event.sender + event.sender.source.address = self.addr + event.sender.open() + + elif event.receiver: + self.alt_receiver = event.receiver + event.receiver.target.address = self.addr + event.receiver.open() + + def on_link_opened(self, event): + if event.receiver == self.alt_receiver: + self.sender_conn = event.container.connect(self.sender_host) + self.sender = event.container.create_sender(self.sender_conn, self.addr) + + def on_sendable(self, event): + if event.sender == self.sender: + while self.sender.credit > 0 and self.n_tx < self.count: + self.sender.send(Message("Message %d" % self.n_tx)) + self.n_tx += 1 + + def on_message(self, event): + self.n_rx += 1 + if self.n_rx == self.count: + self.fail(None) + + def on_released(self, event): + self.n_rel += 1 + self.n_tx -= 1 + + def run(self): + Container(self).run() + + +if __name__== '__main__': + unittest.main(main_module()) diff --git a/tests/system_tests_http.py b/tests/system_tests_http.py index 7b445a9..561aed2 100644 --- a/tests/system_tests_http.py +++ b/tests/system_tests_http.py @@ -172,6 +172,7 @@ class RouterTestHttp(TestCase): assert('deliveries_ingress' in data) assert('deliveries_delayed_1sec' in data) assert('deliveries_delayed_10sec' in data) + assert('deliveries_redirected_to_fallback' in data) # Sequential calls on multiple ports for port in r.ports: test(port) diff --git a/tests/system_tests_policy.py b/tests/system_tests_policy.py index 422c8f4..1f08f05 100644 --- a/tests/system_tests_policy.py +++ b/tests/system_tests_policy.py @@ -28,9 +28,9 @@ import time from system_test import TestCase, Qdrouterd, main_module, Process, TIMEOUT, DIR from subprocess import PIPE, STDOUT -from proton import ConnectionException, Timeout, Url +from proton import ConnectionException, Timeout, Url, symbol from proton.handlers import MessagingHandler -from proton.reactor import Container +from proton.reactor import Container, ReceiverOption from proton.utils import BlockingConnection, LinkDetached, SyncRequestResponse from qpid_dispatch_internal.policy.policy_util import is_ipv6_enabled from qpid_dispatch_internal.compat import dict_iteritems @@ -367,6 +367,45 @@ class PolicyVhostOverride(TestCase): bs1.close() +class Capabilities(ReceiverOption): + def __init__(self, value): + self.value = value + + def apply(self, receiver): + receiver.source.capabilities.put_object(symbol(self.value)) + + +class PolicyTerminusCapabilities(TestCase): + """ + Verify that specifying a policy folder from the router conf file + effects loading the policies in that folder. + This test relies on qdmanage utility. + """ + @classmethod + def setUpClass(cls): + """Start the router""" + super(PolicyTerminusCapabilities, cls).setUpClass() + policy_config_path = os.path.join(DIR, 'policy-3') + config = Qdrouterd.Config([ + ('router', {'mode': 'standalone', 'id': 'QDR.Policy'}), + ('policy', {'maxConnections': 2, 'policyDir': policy_config_path, 'enableVhostPolicy': 'true'}), + ('listener', {'port': cls.tester.get_port(), 'policyVhost': 'capabilities1.host.com'}), + ('listener', {'port': cls.tester.get_port(), 'policyVhost': 'capabilities2.host.com'}) + ]) + + cls.router = cls.tester.qdrouterd('PolicyTerminusCapabilities', config, wait=True) + + def test_forbid_waypoint(self): + br1 = BlockingConnection(self.router.addresses[1]) + self.assertRaises(LinkDetached, br1.create_receiver, address="ok1", options=Capabilities('qd.waypoint_1')) + br1.close() + + def test_forbid_fallback(self): + br1 = BlockingConnection(self.router.addresses[0]) + self.assertRaises(LinkDetached, br1.create_receiver, address="ok2", options=Capabilities('qd.fallback')) + br1.close() + + class InterrouterLinksAllowed(TestCase): inter_router_port = None diff --git a/tools/qdstat.in b/tools/qdstat.in index e0ec094..73a1dea 100755 --- a/tools/qdstat.in +++ b/tools/qdstat.in @@ -326,6 +326,7 @@ class BusManager(Node): try: rows.append(('Deliveries Delayed > 1sec', router.deliveriesDelayed1Sec)) rows.append(('Deliveries Delayed > 10sec', router.deliveriesDelayed10Sec)) + rows.append(('Deliveries to Fallback', router.deliveriesRedirectedToFallback)) except: pass rows.append(('Ingress Count', router.deliveriesIngress)) @@ -492,7 +493,7 @@ class BusManager(Node): rows = [] cols = ('distribution', 'inProcess', 'subscriberCount', 'remoteCount', - 'containerCount', 'deliveriesIngress', 'deliveriesEgress', + 'containerCount', 'deliveriesIngress', 'deliveriesEgress', 'deliveriesRedirectedToFallback', 'deliveriesTransit', 'deliveriesToContainer', 'deliveriesFromContainer', 'name', 'priority') objects = self.query('org.apache.qpid.dispatch.router.address', cols, limit=self.opts.limit) @@ -514,6 +515,7 @@ class BusManager(Node): heads.append(Header("in", Header.COMMAS)) heads.append(Header("out", Header.COMMAS)) heads.append(Header("thru", Header.COMMAS)) + heads.append(Header("fallback", Header.COMMAS)) if self.opts.verbose: heads.append(Header("to-proc", Header.COMMAS)) heads.append(Header("from-proc", Header.COMMAS)) @@ -533,6 +535,10 @@ class BusManager(Node): row.append(addr.deliveriesIngress) row.append(addr.deliveriesEgress) row.append(addr.deliveriesTransit) + try: + row.append(addr.deliveriesRedirectedToFallback) + except: + row.append("-") if self.opts.verbose: row.append(addr.deliveriesToContainer) row.append(addr.deliveriesFromContainer) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org