This is an automated email from the ASF dual-hosted git repository. tross pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/master by this push: new ef4fba8 DISPATCH-1463 - Added facility for detect stuck deliveries in the router. Added facility for the core thread to process background actions when there are no normal actions available. Added counters to router and link for currently-stuck deliveries. Added a core module to periodically check for long-unsettled (stuck) deliveries on links. Added a test set for stuck-delivery detection Exposed the new deliveries_stuck metric to the HTTP-accessible global [...] ef4fba8 is described below commit ef4fba8b25bfa54d54c76d291cdc2493c6183ad3 Author: Ted Ross <tr...@redhat.com> AuthorDate: Fri Oct 25 12:43:36 2019 -0400 DISPATCH-1463 - Added facility for detect stuck deliveries in the router. Added facility for the core thread to process background actions when there are no normal actions available. Added counters to router and link for currently-stuck deliveries. Added a core module to periodically check for long-unsettled (stuck) deliveries on links. Added a test set for stuck-delivery detection Exposed the new deliveries_stuck metric to the HTTP-accessible global metrics This closes #600 --- include/qpid/dispatch/router_core.h | 1 + python/qpid_dispatch/management/qdrouter.json | 10 + src/CMakeLists.txt | 1 + src/http-libwebsockets.c | 2 + src/router_core/agent_link.c | 12 +- src/router_core/agent_link.h | 2 +- src/router_core/agent_router.c | 18 +- src/router_core/agent_router.h | 2 +- src/router_core/delivery.c | 8 + src/router_core/delivery.h | 1 + .../stuck_delivery_detection/delivery_tracker.c | 166 +++++++++++++ src/router_core/router_core.c | 11 + src/router_core/router_core_private.h | 4 + src/router_core/router_core_thread.c | 34 ++- tests/CMakeLists.txt | 1 + tests/system_test.py | 4 + tests/system_tests_stuck_deliveries.py | 274 +++++++++++++++++++++ tools/qdstat.in | 16 +- 18 files changed, 549 insertions(+), 18 deletions(-) diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h index 1b2702b..5520888 100644 --- a/include/qpid/dispatch/router_core.h +++ b/include/qpid/dispatch/router_core.h @@ -854,6 +854,7 @@ typedef struct { size_t deliveries_egress_route_container; size_t deliveries_delayed_1sec; size_t deliveries_delayed_10sec; + size_t deliveries_stuck; size_t deliveries_redirected_to_fallback; } qdr_global_stats_t; ALLOC_DECLARE(qdr_global_stats_t); diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json index e297d58..7e54bc0 100644 --- a/python/qpid_dispatch/management/qdrouter.json +++ b/python/qpid_dispatch/management/qdrouter.json @@ -570,6 +570,11 @@ "graph": true, "description": "The total number of settled deliveries that were held in the router for more than 10 seconds." }, + "deliveriesStuck": { + "type": "integer", + "graph": true, + "description": "The current number of deliveries that are unsettled and have been held in the router for more than 10 seconds." + }, "deliveriesIngress": { "type": "integer", "description":"Number of deliveries that were sent to it by a sender that is directly attached to the router.", @@ -1524,6 +1529,11 @@ "graph": true, "description": "The total number of settled deliveries that were held in the router for more than 10 seconds." }, + "deliveriesStuck": { + "type": "integer", + "graph": true, + "description": "The current number of deliveries that are unsettled and have been held in the router for more than 10 seconds." + }, "settleRate": { "type": "integer", "graph": true, diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index b70dde2..d57df77 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -99,6 +99,7 @@ set(qpid_dispatch_SOURCES router_core/modules/edge_addr_tracking/edge_addr_tracking.c router_core/modules/address_lookup_server/address_lookup_server.c router_core/modules/address_lookup_client/lookup_client.c + router_core/modules/stuck_delivery_detection/delivery_tracker.c router_node.c router_pynode.c schema_enum.c diff --git a/src/http-libwebsockets.c b/src/http-libwebsockets.c index ffdbd28..dda8b9d 100644 --- a/src/http-libwebsockets.c +++ b/src/http-libwebsockets.c @@ -471,6 +471,7 @@ static int stats_get_deliveries_ingress_route_container(qdr_global_stats_t *stat static int stats_get_deliveries_egress_route_container(qdr_global_stats_t *stats) { return stats->deliveries_egress_route_container; } static int stats_get_deliveries_delayed_1sec(qdr_global_stats_t *stats) { return stats->deliveries_delayed_1sec; } static int stats_get_deliveries_delayed_10sec(qdr_global_stats_t *stats) { return stats->deliveries_delayed_10sec; } +static int stats_get_deliveries_stuck(qdr_global_stats_t *stats) { return stats->deliveries_stuck; } static int stats_get_deliveries_redirected_to_fallback(qdr_global_stats_t *stats) { return stats->deliveries_redirected_to_fallback; } static struct metric_definition metrics[] = { @@ -493,6 +494,7 @@ static struct metric_definition metrics[] = { {"deliveries_egress_route_container", "counter", stats_get_deliveries_egress_route_container}, {"deliveries_delayed_1sec", "counter", stats_get_deliveries_delayed_1sec}, {"deliveries_delayed_10sec", "counter", stats_get_deliveries_delayed_10sec}, + {"deliveries_stuck", "counter", stats_get_deliveries_stuck}, {"deliveries_redirected_to_fallback", "counter", stats_get_deliveries_redirected_to_fallback} }; static size_t metrics_length = sizeof(metrics)/sizeof(metrics[0]); diff --git a/src/router_core/agent_link.c b/src/router_core/agent_link.c index 92d3ab6..cb6e016 100644 --- a/src/router_core/agent_link.c +++ b/src/router_core/agent_link.c @@ -44,9 +44,10 @@ #define QDR_LINK_MODIFIED_COUNT 20 #define QDR_LINK_DELAYED_1SEC 21 #define QDR_LINK_DELAYED_10SEC 22 -#define QDR_LINK_INGRESS_HISTOGRAM 23 -#define QDR_LINK_PRIORITY 24 -#define QDR_LINK_SETTLE_RATE 25 +#define QDR_LINK_DELIVERIES_STUCK 23 +#define QDR_LINK_INGRESS_HISTOGRAM 24 +#define QDR_LINK_PRIORITY 25 +#define QDR_LINK_SETTLE_RATE 26 const char *qdr_link_columns[] = {"name", @@ -72,6 +73,7 @@ const char *qdr_link_columns[] = "modifiedCount", "deliveriesDelayed1Sec", "deliveriesDelayed10Sec", + "deliveriesStuck", "ingressHistogram", "priority", "settleRate", @@ -230,6 +232,10 @@ static void qdr_agent_write_column_CT(qdr_core_t *core, qd_composed_field_t *bod qd_compose_insert_ulong(body, link->deliveries_delayed_10sec); break; + case QDR_LINK_DELIVERIES_STUCK: + qd_compose_insert_ulong(body, link->deliveries_stuck); + break; + case QDR_LINK_INGRESS_HISTOGRAM: if (link->ingress_histogram) { qd_compose_start_list(body); diff --git a/src/router_core/agent_link.h b/src/router_core/agent_link.h index 80fa8a4..20b6c65 100644 --- a/src/router_core/agent_link.h +++ b/src/router_core/agent_link.h @@ -29,7 +29,7 @@ void qdra_link_update_CT(qdr_core_t *core, qdr_query_t *query, qd_parsed_field_t *in_body); -#define QDR_LINK_COLUMN_COUNT 26 +#define QDR_LINK_COLUMN_COUNT 27 const char *qdr_link_columns[QDR_LINK_COLUMN_COUNT + 1]; diff --git a/src/router_core/agent_router.c b/src/router_core/agent_router.c index 74f5b8d..7bdc934 100644 --- a/src/router_core/agent_router.c +++ b/src/router_core/agent_router.c @@ -46,12 +46,13 @@ #define QDR_ROUTER_MODIFIED_DELIVERIES 19 #define QDR_ROUTER_DELAYED_1SEC 20 #define QDR_ROUTER_DELAYED_10SEC 21 -#define QDR_ROUTER_DELIVERIES_INGRESS 22 -#define QDR_ROUTER_DELIVERIES_EGRESS 23 -#define QDR_ROUTER_DELIVERIES_TRANSIT 24 -#define QDR_ROUTER_DELIVERIES_INGRESS_ROUTE_CONTAINER 25 -#define QDR_ROUTER_DELIVERIES_EGRESS_ROUTE_CONTAINER 26 -#define QDR_ROUTER_DELIVERIES_REDIRECTED 27 +#define QDR_ROUTER_DELIVERIES_STUCK 22 +#define QDR_ROUTER_DELIVERIES_INGRESS 23 +#define QDR_ROUTER_DELIVERIES_EGRESS 24 +#define QDR_ROUTER_DELIVERIES_TRANSIT 25 +#define QDR_ROUTER_DELIVERIES_INGRESS_ROUTE_CONTAINER 26 +#define QDR_ROUTER_DELIVERIES_EGRESS_ROUTE_CONTAINER 27 +#define QDR_ROUTER_DELIVERIES_REDIRECTED 28 const char *qdr_router_columns[] = @@ -77,6 +78,7 @@ const char *qdr_router_columns[] = "modifiedDeliveries", "deliveriesDelayed1Sec", "deliveriesDelayed10Sec", + "deliveriesStuck", "deliveriesIngress", "deliveriesEgress", "deliveriesTransit", @@ -197,6 +199,10 @@ static void qdr_agent_write_column_CT(qd_composed_field_t *body, int col, qdr_co qd_compose_insert_ulong(body, core->deliveries_delayed_10sec); break; + case QDR_ROUTER_DELIVERIES_STUCK: + qd_compose_insert_ulong(body, core->deliveries_stuck); + break; + case QDR_ROUTER_DELIVERIES_INGRESS: qd_compose_insert_ulong(body, core->deliveries_ingress); break; diff --git a/src/router_core/agent_router.h b/src/router_core/agent_router.h index 860bf7a..7f0e271 100644 --- a/src/router_core/agent_router.h +++ b/src/router_core/agent_router.h @@ -21,7 +21,7 @@ #include "router_core_private.h" -#define QDR_ROUTER_COLUMN_COUNT 28 +#define QDR_ROUTER_COLUMN_COUNT 29 const char *qdr_router_columns[QDR_ROUTER_COLUMN_COUNT + 1]; diff --git a/src/router_core/delivery.c b/src/router_core/delivery.c index 3e21ed1..8d41d3f 100644 --- a/src/router_core/delivery.c +++ b/src/router_core/delivery.c @@ -379,6 +379,14 @@ void qdr_delivery_increment_counters_CT(qdr_core_t *core, qdr_delivery_t *delive core->deliveries_delayed_1sec++; } + // + // If this delivery was marked as stuck, decrement the currently-stuck counters in the link and router. + // + if (delivery->stuck) { + link->deliveries_stuck--; + core->deliveries_stuck--; + } + if (qd_bitmask_valid_bit_value(delivery->ingress_index) && link->ingress_histogram) link->ingress_histogram[delivery->ingress_index]++; diff --git a/src/router_core/delivery.h b/src/router_core/delivery.h index 16fcc43..fe3c858 100644 --- a/src/router_core/delivery.h +++ b/src/router_core/delivery.h @@ -63,6 +63,7 @@ struct qdr_delivery_t { qdr_delivery_ref_list_t peers; /// Use this list if there if the delivery has more than one peer. bool multicast; /// True if this delivery is targeted for a multicast address. bool via_edge; /// True if this delivery arrived via an edge-connection. + bool stuck; /// True if this delivery was counted as stuck. }; ALLOC_DECLARE(qdr_delivery_t); diff --git a/src/router_core/modules/stuck_delivery_detection/delivery_tracker.c b/src/router_core/modules/stuck_delivery_detection/delivery_tracker.c new file mode 100644 index 0000000..eec3905 --- /dev/null +++ b/src/router_core/modules/stuck_delivery_detection/delivery_tracker.c @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "qpid/dispatch/ctools.h" +#include "module.h" +#include "delivery.h" +#include <stdio.h> +#include <inttypes.h> + +#define PROD_TIMER_INTERVAL 30 +#define PROD_STUCK_AGE 10 + +#define TEST_TIMER_INTERVAL 5 +#define TEST_STUCK_AGE 3 + +static int timer_interval = PROD_TIMER_INTERVAL; +static int stuck_age = PROD_STUCK_AGE; + +static void action_handler_CT(qdr_core_t *core, qdr_action_t *action, bool discard); + +typedef struct tracker_t tracker_t; + +struct tracker_t { + qdr_core_t *core; + qdr_core_timer_t *timer; + qdr_link_t_sp next_link; +}; + + +static void check_delivery_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_t *dlv) +{ + if (!dlv->stuck && ((core->uptime_ticks - link->core_ticks) > stuck_age)) { + dlv->stuck = true; + link->deliveries_stuck++; + core->deliveries_stuck++; + if (link->deliveries_stuck == 1) + qd_log(core->log, QD_LOG_INFO, + "[C%"PRIu64"][L%"PRIu64"] " + "Stuck delivery: At least one delivery on this link has been undelivered/unsettled for more than %d seconds", + link->conn ? link->conn->identity : 0, link->identity, stuck_age); + } +} + + +static void process_link_CT(qdr_core_t *core, qdr_link_t *link) +{ + qdr_delivery_t *dlv = DEQ_HEAD(link->undelivered); + while (dlv) { + check_delivery_CT(core, link, dlv); + dlv = DEQ_NEXT(dlv); + } + + dlv = DEQ_HEAD(link->unsettled); + while (dlv) { + check_delivery_CT(core, link, dlv); + dlv = DEQ_NEXT(dlv); + } +} + + +static void timer_handler_CT(qdr_core_t *core, void *context) +{ + tracker_t *tracker = (tracker_t*) context; + qdr_link_t *first_link = DEQ_HEAD(core->open_links); + + qd_log(core->log, QD_LOG_DEBUG, "Stuck Delivery Detection: Starting detection cycle"); + + if (!!first_link) { + set_safe_ptr_qdr_link_t(first_link, &tracker->next_link); + qdr_action_t *action = qdr_action(action_handler_CT, "detect_stuck_deliveries"); + action->args.general.context_1 = tracker; + qdr_action_background_enqueue(core, action); + } else + qdr_core_timer_schedule_CT(core, tracker->timer, timer_interval); +} + + +static void action_handler_CT(qdr_core_t *core, qdr_action_t *action, bool discard) +{ + if (discard) + return; + + tracker_t *tracker = (tracker_t*) action->args.general.context_1; + qdr_link_t *link = safe_deref_qdr_link_t(tracker->next_link); + + if (!!link) { + process_link_CT(core, link); + qdr_link_t *next = DEQ_NEXT(link); + if (!!next) { + // + // There is another link on the list. Schedule another background action to process + // the next link. + // + set_safe_ptr_qdr_link_t(next, &tracker->next_link); + action = qdr_action(action_handler_CT, "detect_stuck_deliveries"); + action->args.general.context_1 = tracker; + qdr_action_background_enqueue(core, action); + } else + // + // We've come to the end of the list of open links. Set the timer to start a new sweep + // after the interval. + // + qdr_core_timer_schedule_CT(core, tracker->timer, timer_interval); + } else + // + // The link we were provided is not valid. It was probably closed since the last time we + // came through this path. Abort the sweep and set the timer for a new one after the interval. + // + qdr_core_timer_schedule_CT(core, tracker->timer, timer_interval); +} + + +static bool qdrc_delivery_tracker_enable_CT(qdr_core_t *core) +{ + if (core->qd->test_hooks) { + // + // Test hooks are enabled, override the timing constants with the test values + // + timer_interval = TEST_TIMER_INTERVAL; + stuck_age = TEST_STUCK_AGE; + } + + return true; +} + + +static void qdrc_delivery_tracker_init_CT(qdr_core_t *core, void **module_context) +{ + tracker_t *tracker = NEW(tracker_t); + ZERO(tracker); + tracker->core = core; + tracker->timer = qdr_core_timer_CT(core, timer_handler_CT, tracker); + qdr_core_timer_schedule_CT(core, tracker->timer, timer_interval); + *module_context = tracker; + + qd_log(core->log, QD_LOG_INFO, + "Stuck delivery detection: Scan interval: %d seconds, Delivery age threshold: %d seconds", + timer_interval, stuck_age); +} + + +static void qdrc_delivery_tracker_final_CT(void *module_context) +{ + tracker_t *tracker = (tracker_t*) module_context; + qdr_core_timer_free_CT(tracker->core, tracker->timer); + free(tracker); +} + + +QDR_CORE_MODULE_DECLARE("stuck_delivery_detection", qdrc_delivery_tracker_enable_CT, qdrc_delivery_tracker_init_CT, qdrc_delivery_tracker_final_CT) diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c index 67c145e..e5994a6 100644 --- a/src/router_core/router_core.c +++ b/src/router_core/router_core.c @@ -65,6 +65,7 @@ qdr_core_t *qdr_core(qd_dispatch_t *qd, qd_router_mode_t mode, const char *area, core->action_lock = sys_mutex(); core->running = true; DEQ_INIT(core->action_list); + DEQ_INIT(core->action_list_background); core->work_lock = sys_mutex(); DEQ_INIT(core->work_list); @@ -329,6 +330,15 @@ void qdr_action_enqueue(qdr_core_t *core, qdr_action_t *action) } +void qdr_action_background_enqueue(qdr_core_t *core, qdr_action_t *action) +{ + sys_mutex_lock(core->action_lock); + DEQ_INSERT_TAIL(core->action_list_background, action); + sys_cond_signal(core->action_cond); + sys_mutex_unlock(core->action_lock); +} + + qdr_address_t *qdr_address_CT(qdr_core_t *core, qd_address_treatment_t treatment, qdr_address_config_t *config) { if (treatment == QD_TREATMENT_UNAVAILABLE) @@ -848,6 +858,7 @@ static void qdr_global_stats_request_CT(qdr_core_t *core, qdr_action_t *action, stats->deliveries_egress_route_container = core->deliveries_egress_route_container; stats->deliveries_delayed_1sec = core->deliveries_delayed_1sec; stats->deliveries_delayed_10sec = core->deliveries_delayed_10sec; + stats->deliveries_stuck = core->deliveries_stuck; stats->deliveries_redirected_to_fallback = core->deliveries_redirected; } qdr_general_work_t *work = qdr_general_work(qdr_post_global_stats_response); diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h index e08c70a..fa2591b 100644 --- a/src/router_core/router_core_private.h +++ b/src/router_core/router_core_private.h @@ -458,6 +458,7 @@ struct qdr_link_t { uint64_t modified_deliveries; uint64_t deliveries_delayed_1sec; uint64_t deliveries_delayed_10sec; + uint64_t deliveries_stuck; uint64_t settled_deliveries[QDR_LINK_RATE_DEPTH]; uint64_t *ingress_histogram; uint8_t priority; @@ -756,6 +757,7 @@ struct qdr_core_t { sys_thread_t *thread; bool running; qdr_action_list_t action_list; + qdr_action_list_t action_list_background; /// Actions processed only when the action_list is empty sys_cond_t *action_cond; sys_mutex_t *action_lock; @@ -863,6 +865,7 @@ struct qdr_core_t { uint64_t deliveries_ingress_route_container; uint64_t deliveries_delayed_1sec; uint64_t deliveries_delayed_10sec; + uint64_t deliveries_stuck; uint64_t deliveries_redirected; qdr_edge_conn_addr_t edge_conn_addr; @@ -893,6 +896,7 @@ void qdr_agent_setup_CT(qdr_core_t *core); void qdr_forwarder_setup_CT(qdr_core_t *core); qdr_action_t *qdr_action(qdr_action_handler_t action_handler, const char *label); void qdr_action_enqueue(qdr_core_t *core, qdr_action_t *action); +void qdr_action_background_enqueue(qdr_core_t *core, qdr_action_t *action); void qdr_link_issue_credit_CT(qdr_core_t *core, qdr_link_t *link, int credit, bool drain); void qdr_drain_inbound_undelivered_CT(qdr_core_t *core, qdr_link_t *link, qdr_address_t *addr); void qdr_addr_start_inlinks_CT(qdr_core_t *core, qdr_address_t *addr); diff --git a/src/router_core/router_core_thread.c b/src/router_core/router_core_thread.c index 8a4682c..2ce0f09 100644 --- a/src/router_core/router_core_thread.c +++ b/src/router_core/router_core_thread.c @@ -123,6 +123,32 @@ void qdr_modules_finalize(qdr_core_t *core) } +/* + * router_core_process_background_action_LH + * + * Process up to one available background action. + * Return true iff an action was processed. + */ +static bool router_core_process_background_action_LH(qdr_core_t *core) +{ + qdr_action_t *action = DEQ_HEAD(core->action_list_background); + + if (!!action) { + DEQ_REMOVE_HEAD(core->action_list_background); + sys_mutex_unlock(core->action_lock); + if (action->label) + qd_log(core->log, QD_LOG_TRACE, "Core background action '%s'%s", action->label, core->running ? "" : " (discard)"); + action->action_handler(core, action, !core->running); + sys_mutex_lock(core->action_lock); + + free_qdr_action_t(action); + return true; + } + + return false; +} + + void *router_core_thread(void *arg) { qdr_core_t *core = (qdr_core_t*) arg; @@ -138,15 +164,17 @@ void *router_core_thread(void *arg) qd_log(core->log, QD_LOG_INFO, "Router Core thread running. %s/%s", core->router_area, core->router_id); while (core->running) { // - // Use the lock only to protect the condition variable and the action list + // Use the lock only to protect the condition variable and the action lists // sys_mutex_lock(core->action_lock); // // Block on the condition variable when there is no action to do // - while (core->running && DEQ_IS_EMPTY(core->action_list)) - sys_cond_wait(core->action_cond, core->action_lock); + while (core->running && DEQ_IS_EMPTY(core->action_list)) { + if (!router_core_process_background_action_LH(core)) + sys_cond_wait(core->action_cond, core->action_lock); + } // // Move the entire action list to a private list so we can process it without diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index f7c86a1..78d2091 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -137,6 +137,7 @@ foreach(py_test_module system_tests_multicast system_tests_fallback_dest system_tests_router_mesh + system_tests_stuck_deliveries ) add_test(${py_test_module} ${TEST_WRAP} ${PYTHON_TEST_COMMAND} -v ${py_test_module}) diff --git a/tests/system_test.py b/tests/system_test.py index 39471eb..327ccf2 100755 --- a/tests/system_test.py +++ b/tests/system_test.py @@ -1034,6 +1034,10 @@ class MgmtMsgProxy(object): ap = msg.properties return self._Response(ap['statusCode'], ap['statusDescription'], msg.body) + def query_router(self): + ap = {'operation': 'QUERY', 'type': 'org.apache.qpid.dispatch.router'} + return Message(properties=ap, reply_to=self.reply_addr) + def query_connections(self): ap = {'operation': 'QUERY', 'type': 'org.apache.qpid.dispatch.connection'} return Message(properties=ap, reply_to=self.reply_addr) diff --git a/tests/system_tests_stuck_deliveries.py b/tests/system_tests_stuck_deliveries.py new file mode 100644 index 0000000..c4f8b21 --- /dev/null +++ b/tests/system_tests_stuck_deliveries.py @@ -0,0 +1,274 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from __future__ import unicode_literals +from __future__ import division +from __future__ import absolute_import +from __future__ import print_function + +from time import sleep +from threading import Event +from threading import Timer + +from proton import Message, Timeout, symbol +from system_test import TestCase, Qdrouterd, main_module, TIMEOUT, MgmtMsgProxy +from system_test import AsyncTestReceiver +from system_test import AsyncTestSender +from system_test import QdManager +from system_test import unittest +from system_tests_link_routes import ConnLinkRouteService +from proton.handlers import MessagingHandler +from proton.reactor import Container, DynamicNodeProperties +from proton.utils import BlockingConnection +from qpid_dispatch.management.client import Node +from subprocess import PIPE, STDOUT +import re + + +class AddrTimer(object): + def __init__(self, parent): + self.parent = parent + + def on_timer_task(self, event): + self.parent.check_address() + + +class RouterTest(TestCase): + + inter_router_port = None + + @classmethod + def setUpClass(cls): + """Start a router""" + super(RouterTest, cls).setUpClass() + + def router(name, mode, connection, extra=None, args=[]): + config = [ + ('router', {'mode': mode, 'id': name}), + ('listener', {'port': cls.tester.get_port(), 'stripAnnotations': 'no'}), + connection + ] + + if extra: + config.append(extra) + config = Qdrouterd.Config(config) + cls.routers.append(cls.tester.qdrouterd(name, config, wait=True, cl_args=args)) + + cls.routers = [] + + inter_router_port = cls.tester.get_port() + edge_port_A = cls.tester.get_port() + edge_port_B = cls.tester.get_port() + + router('INT.A', 'interior', ('listener', {'role': 'inter-router', 'port': inter_router_port}), + ('listener', {'role': 'edge', 'port': edge_port_A}), ["-T"]) + router('INT.B', 'interior', ('connector', {'name': 'connectorToA', 'role': 'inter-router', 'port': inter_router_port}), + ('listener', {'role': 'edge', 'port': edge_port_B}), ["-T"]) + router('EA1', 'edge', ('connector', {'name': 'edge', 'role': 'edge', 'port': edge_port_A}), None, ["-T"]) + router('EA2', 'edge', ('connector', {'name': 'edge', 'role': 'edge', 'port': edge_port_A}), None, ["-T"]) + router('EB1', 'edge', ('connector', {'name': 'edge', 'role': 'edge', 'port': edge_port_B}), None, ["-T"]) + router('EB2', 'edge', ('connector', {'name': 'edge', 'role': 'edge', 'port': edge_port_B}), None, ["-T"]) + + cls.routers[0].wait_router_connected('INT.B') + cls.routers[1].wait_router_connected('INT.A') + + + def test_01_delayed_settlement_same_interior(self): + test = DelayedSettlementTest(self.routers[0].addresses[0], + self.routers[0].addresses[0], + self.routers[0].addresses[0], + 'dest.01', 10, [2], False) + test.run() + self.assertEqual(None, test.error) + + def test_02_delayed_settlement_different_edges_check_sender(self): + test = DelayedSettlementTest(self.routers[2].addresses[0], + self.routers[5].addresses[0], + self.routers[2].addresses[0], + 'dest.02', 10, [2,3,8], False) + test.run() + self.assertEqual(None, test.error) + + def test_03_delayed_settlement_different_edges_check_receiver(self): + test = DelayedSettlementTest(self.routers[2].addresses[0], + self.routers[5].addresses[0], + self.routers[5].addresses[0], + 'dest.03', 10, [2,4,9], False) + test.run() + self.assertEqual(None, test.error) + + def test_04_delayed_settlement_different_edges_check_interior(self): + test = DelayedSettlementTest(self.routers[2].addresses[0], + self.routers[5].addresses[0], + self.routers[0].addresses[0], + 'dest.04', 10, [0,2,3,8], False) + test.run() + self.assertEqual(None, test.error) + + def test_05_no_settlement_same_interior(self): + test = DelayedSettlementTest(self.routers[0].addresses[0], + self.routers[0].addresses[0], + self.routers[0].addresses[0], + 'dest.05', 10, [0,2,4,9], True) + test.run() + self.assertEqual(None, test.error) + + def test_06_no_settlement_different_edges_check_sender(self): + test = DelayedSettlementTest(self.routers[2].addresses[0], + self.routers[5].addresses[0], + self.routers[2].addresses[0], + 'dest.06', 10, [9], True) + test.run() + self.assertEqual(None, test.error) + + def test_07_no_settlement_different_edges_check_receiver(self): + test = DelayedSettlementTest(self.routers[2].addresses[0], + self.routers[5].addresses[0], + self.routers[5].addresses[0], + 'dest.07', 10, [0,9], True) + test.run() + self.assertEqual(None, test.error) + + def test_08_no_settlement_different_edges_check_interior(self): + test = DelayedSettlementTest(self.routers[2].addresses[0], + self.routers[5].addresses[0], + self.routers[0].addresses[0], + 'dest.08', 10, [1,2,3,4,5,6,7,8], True) + test.run() + self.assertEqual(None, test.error) + + +class Timeout(object): + def __init__(self, parent): + self.parent = parent + + def on_timer_task(self, event): + self.parent.timeout() + + +class PollTimeout(object): + def __init__(self, parent): + self.parent = parent + + def on_timer_task(self, event): + self.parent.poll_timeout() + + +class DelayedSettlementTest(MessagingHandler): + def __init__(self, sender_host, receiver_host, query_host, addr, dlv_count, stuck_list, close_link): + super(DelayedSettlementTest, self).__init__(auto_accept = False) + self.sender_host = sender_host + self.receiver_host = receiver_host + self.query_host = query_host + self.addr = addr + self.dlv_count = dlv_count + self.stuck_list = stuck_list + self.close_link = close_link + self.stuck_dlvs = [] + + self.sender_conn = None + self.receiver_conn = None + self.query_conn = None + self.error = None + self.n_tx = 0 + self.n_rx = 0 + self.expected_stuck = 0 + self.last_stuck = 0 + + def timeout(self): + self.error = "Timeout Expired - n_tx=%d, n_rx=%d, expected_stuck=%d last_stuck=%d" %\ + (self.n_tx, self.n_rx, self.expected_stuck, self.last_stuck) + self.sender_conn.close() + self.receiver_conn.close() + self.query_conn.close() + if self.poll_timer: + self.poll_timer.cancel() + + def fail(self, error): + self.error = error + self.sender_conn.close() + self.receiver_conn.close() + self.query_conn.close() + if self.poll_timer: + self.poll_timer.cancel() + self.timer.cancel() + + def on_start(self, event): + self.timer = event.reactor.schedule(30.0, Timeout(self)) + self.poll_timer = None + self.sender_conn = event.container.connect(self.sender_host) + self.receiver_conn = event.container.connect(self.receiver_host) + self.query_conn = event.container.connect(self.query_host) + self.sender = event.container.create_sender(self.sender_conn, self.addr) + self.receiver = event.container.create_receiver(self.receiver_conn, self.addr) + self.reply_receiver = event.container.create_receiver(self.query_conn, None, dynamic=True) + self.query_sender = event.container.create_sender(self.query_conn, "$management") + + def on_link_opened(self, event): + if event.receiver == self.reply_receiver: + self.reply_addr = event.receiver.remote_source.address + self.proxy = MgmtMsgProxy(self.reply_addr) + + def on_sendable(self, event): + if event.sender == self.sender: + while self.sender.credit > 0 and self.n_tx < self.dlv_count: + self.sender.send(Message("Message %d" % self.n_tx)) + self.n_tx += 1 + + def on_message(self, event): + if event.receiver == self.receiver: + if self.n_rx not in self.stuck_list: + self.accept(event.delivery) + else: + self.stuck_dlvs.append(event.delivery) + self.n_rx += 1 + if self.n_rx == self.dlv_count: + self.query_stats(len(self.stuck_list) * 2) + elif event.receiver == self.reply_receiver: + response = self.proxy.response(event.message) + self.accept(event.delivery) + self.last_stuck = response.results[0].deliveriesStuck + if response.results[0].deliveriesStuck == self.expected_stuck: + if self.close_link: + self.receiver.close() + else: + for dlv in self.stuck_dlvs: + self.accept(dlv) + self.stuck_dlvs = [] + if self.expected_stuck > 0: + self.query_stats(0) + else: + self.fail(None) + else: + self.poll_timer = event.reactor.schedule(0.5, PollTimeout(self)) + + def query_stats(self, expected_stuck): + self.expected_stuck = expected_stuck + msg = self.proxy.query_router() + self.query_sender.send(msg) + + def poll_timeout(self): + self.query_stats(self.expected_stuck) + + def run(self): + Container(self).run() + + +if __name__== '__main__': + unittest.main(main_module()) diff --git a/tools/qdstat.in b/tools/qdstat.in index c7399df..59f0191 100755 --- a/tools/qdstat.in +++ b/tools/qdstat.in @@ -310,9 +310,10 @@ class BusManager(Node): rows.append(('Released Count', router.releasedDeliveries)) rows.append(('Modified Count', router.modifiedDeliveries)) try: - rows.append(('Deliveries Delayed > 1sec', router.deliveriesDelayed1Sec)) + rows.append(('Deliveries Delayed > 1sec', router.deliveriesDelayed1Sec)) rows.append(('Deliveries Delayed > 10sec', router.deliveriesDelayed10Sec)) - rows.append(('Deliveries to Fallback', router.deliveriesRedirectedToFallback)) + rows.append(('Deliveries Stuck > 10sec', router.deliveriesStuck)) + rows.append(('Deliveries to Fallback', router.deliveriesRedirectedToFallback)) except: pass rows.append(('Ingress Count', router.deliveriesIngress)) @@ -345,14 +346,15 @@ class BusManager(Node): cols = ('linkType', 'linkDir', 'connectionId', 'identity', 'peer', 'owningAddr', 'capacity', 'undeliveredCount', 'unsettledCount', 'deliveryCount', 'presettledCount', 'droppedPresettledCount', 'acceptedCount', 'rejectedCount', 'releasedCount', - 'modifiedCount', 'deliveriesDelayed1Sec', 'deliveriesDelayed10Sec', 'adminStatus', 'operStatus', - 'linkName', 'priority', 'settleRate') + 'modifiedCount', 'deliveriesDelayed1Sec', 'deliveriesDelayed10Sec', 'deliveriesStuck', + 'adminStatus', 'operStatus', 'linkName', 'priority', 'settleRate') objects = self.query('org.apache.qpid.dispatch.router.link', cols, limit=self.opts.limit) has_dropped_presettled_count = False has_priority = False has_delayed = False + has_stuck = False if show_date_id: self.display_datetime_router_id() @@ -366,6 +368,8 @@ class BusManager(Node): has_priority = True if hasattr(first_row, 'deliveriesDelayed1Sec'): has_delayed = True + if hasattr(first_row, 'deliveriesStuck'): + has_stuck = True if has_priority: heads.append(Header("pri")) @@ -384,6 +388,8 @@ class BusManager(Node): if has_delayed: heads.append(Header("delay")) heads.append(Header("rate")) + if has_stuck: + heads.append(Header("stuck")) if self.opts.verbose: heads.append(Header("admin")) heads.append(Header("oper")) @@ -415,6 +421,8 @@ class BusManager(Node): if has_delayed: row.append(link.deliveriesDelayed1Sec + link.deliveriesDelayed10Sec) row.append(link.settleRate) + if has_stuck: + row.append(link.deliveriesStuck) if self.opts.verbose: row.append(link.adminStatus) row.append(link.operStatus) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org