This is an automated email from the ASF dual-hosted git repository.

tross pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/master by this push:
     new ef4fba8  DISPATCH-1463 - Added facility for detect stuck deliveries in 
the router.     Added facility for the core thread to process background 
actions when there are no normal actions available.     Added counters to 
router and link for currently-stuck deliveries.     Added a core module to 
periodically check for long-unsettled (stuck) deliveries on links.     Added a 
test set for stuck-delivery detection     Exposed the new deliveries_stuck 
metric to the HTTP-accessible global  [...]
ef4fba8 is described below

commit ef4fba8b25bfa54d54c76d291cdc2493c6183ad3
Author: Ted Ross <tr...@redhat.com>
AuthorDate: Fri Oct 25 12:43:36 2019 -0400

    DISPATCH-1463 - Added facility for detect stuck deliveries in the router.
        Added facility for the core thread to process background actions when 
there are no normal actions available.
        Added counters to router and link for currently-stuck deliveries.
        Added a core module to periodically check for long-unsettled (stuck) 
deliveries on links.
        Added a test set for stuck-delivery detection
        Exposed the new deliveries_stuck metric to the HTTP-accessible global 
metrics
    This closes #600
---
 include/qpid/dispatch/router_core.h                |   1 +
 python/qpid_dispatch/management/qdrouter.json      |  10 +
 src/CMakeLists.txt                                 |   1 +
 src/http-libwebsockets.c                           |   2 +
 src/router_core/agent_link.c                       |  12 +-
 src/router_core/agent_link.h                       |   2 +-
 src/router_core/agent_router.c                     |  18 +-
 src/router_core/agent_router.h                     |   2 +-
 src/router_core/delivery.c                         |   8 +
 src/router_core/delivery.h                         |   1 +
 .../stuck_delivery_detection/delivery_tracker.c    | 166 +++++++++++++
 src/router_core/router_core.c                      |  11 +
 src/router_core/router_core_private.h              |   4 +
 src/router_core/router_core_thread.c               |  34 ++-
 tests/CMakeLists.txt                               |   1 +
 tests/system_test.py                               |   4 +
 tests/system_tests_stuck_deliveries.py             | 274 +++++++++++++++++++++
 tools/qdstat.in                                    |  16 +-
 18 files changed, 549 insertions(+), 18 deletions(-)

diff --git a/include/qpid/dispatch/router_core.h 
b/include/qpid/dispatch/router_core.h
index 1b2702b..5520888 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -854,6 +854,7 @@ typedef struct {
     size_t deliveries_egress_route_container;
     size_t deliveries_delayed_1sec;
     size_t deliveries_delayed_10sec;
+    size_t deliveries_stuck;
     size_t deliveries_redirected_to_fallback;
 }  qdr_global_stats_t;
 ALLOC_DECLARE(qdr_global_stats_t);
diff --git a/python/qpid_dispatch/management/qdrouter.json 
b/python/qpid_dispatch/management/qdrouter.json
index e297d58..7e54bc0 100644
--- a/python/qpid_dispatch/management/qdrouter.json
+++ b/python/qpid_dispatch/management/qdrouter.json
@@ -570,6 +570,11 @@
                     "graph": true,
                     "description": "The total number of settled deliveries 
that were held in the router for more than 10 seconds."
                 },
+                "deliveriesStuck": {
+                    "type": "integer",
+                    "graph": true,
+                    "description": "The current number of deliveries that are 
unsettled and have been held in the router for more than 10 seconds."
+                },
                 "deliveriesIngress": {
                     "type": "integer",
                     "description":"Number of deliveries that were sent to it 
by a sender that is directly attached to the router.",
@@ -1524,6 +1529,11 @@
                     "graph": true,
                     "description": "The total number of settled deliveries 
that were held in the router for more than 10 seconds."
                 },
+                "deliveriesStuck": {
+                    "type": "integer",
+                    "graph": true,
+                    "description": "The current number of deliveries that are 
unsettled and have been held in the router for more than 10 seconds."
+                },
                 "settleRate": {
                     "type": "integer",
                     "graph": true,
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index b70dde2..d57df77 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -99,6 +99,7 @@ set(qpid_dispatch_SOURCES
   router_core/modules/edge_addr_tracking/edge_addr_tracking.c
   router_core/modules/address_lookup_server/address_lookup_server.c
   router_core/modules/address_lookup_client/lookup_client.c
+  router_core/modules/stuck_delivery_detection/delivery_tracker.c
   router_node.c
   router_pynode.c
   schema_enum.c
diff --git a/src/http-libwebsockets.c b/src/http-libwebsockets.c
index ffdbd28..dda8b9d 100644
--- a/src/http-libwebsockets.c
+++ b/src/http-libwebsockets.c
@@ -471,6 +471,7 @@ static int 
stats_get_deliveries_ingress_route_container(qdr_global_stats_t *stat
 static int stats_get_deliveries_egress_route_container(qdr_global_stats_t 
*stats) { return stats->deliveries_egress_route_container; }
 static int stats_get_deliveries_delayed_1sec(qdr_global_stats_t *stats) { 
return stats->deliveries_delayed_1sec; }
 static int stats_get_deliveries_delayed_10sec(qdr_global_stats_t *stats) { 
return stats->deliveries_delayed_10sec; }
+static int stats_get_deliveries_stuck(qdr_global_stats_t *stats) { return 
stats->deliveries_stuck; }
 static int stats_get_deliveries_redirected_to_fallback(qdr_global_stats_t 
*stats) { return stats->deliveries_redirected_to_fallback; }
 
 static struct metric_definition metrics[] = {
@@ -493,6 +494,7 @@ static struct metric_definition metrics[] = {
     {"deliveries_egress_route_container", "counter", 
stats_get_deliveries_egress_route_container},
     {"deliveries_delayed_1sec", "counter", stats_get_deliveries_delayed_1sec},
     {"deliveries_delayed_10sec", "counter", 
stats_get_deliveries_delayed_10sec},
+    {"deliveries_stuck", "counter", stats_get_deliveries_stuck},
     {"deliveries_redirected_to_fallback", "counter", 
stats_get_deliveries_redirected_to_fallback}
 };
 static size_t metrics_length = sizeof(metrics)/sizeof(metrics[0]);
diff --git a/src/router_core/agent_link.c b/src/router_core/agent_link.c
index 92d3ab6..cb6e016 100644
--- a/src/router_core/agent_link.c
+++ b/src/router_core/agent_link.c
@@ -44,9 +44,10 @@
 #define QDR_LINK_MODIFIED_COUNT           20
 #define QDR_LINK_DELAYED_1SEC             21
 #define QDR_LINK_DELAYED_10SEC            22
-#define QDR_LINK_INGRESS_HISTOGRAM        23
-#define QDR_LINK_PRIORITY                 24
-#define QDR_LINK_SETTLE_RATE              25
+#define QDR_LINK_DELIVERIES_STUCK         23
+#define QDR_LINK_INGRESS_HISTOGRAM        24
+#define QDR_LINK_PRIORITY                 25
+#define QDR_LINK_SETTLE_RATE              26
 
 const char *qdr_link_columns[] =
     {"name",
@@ -72,6 +73,7 @@ const char *qdr_link_columns[] =
      "modifiedCount",
      "deliveriesDelayed1Sec",
      "deliveriesDelayed10Sec",
+     "deliveriesStuck",
      "ingressHistogram",
      "priority",
      "settleRate",
@@ -230,6 +232,10 @@ static void qdr_agent_write_column_CT(qdr_core_t *core, 
qd_composed_field_t *bod
         qd_compose_insert_ulong(body, link->deliveries_delayed_10sec);
         break;
 
+    case QDR_LINK_DELIVERIES_STUCK:
+        qd_compose_insert_ulong(body, link->deliveries_stuck);
+        break;
+
     case QDR_LINK_INGRESS_HISTOGRAM:
         if (link->ingress_histogram) {
             qd_compose_start_list(body);
diff --git a/src/router_core/agent_link.h b/src/router_core/agent_link.h
index 80fa8a4..20b6c65 100644
--- a/src/router_core/agent_link.h
+++ b/src/router_core/agent_link.h
@@ -29,7 +29,7 @@ void qdra_link_update_CT(qdr_core_t          *core,
                          qdr_query_t         *query,
                          qd_parsed_field_t   *in_body);
 
-#define QDR_LINK_COLUMN_COUNT  26
+#define QDR_LINK_COLUMN_COUNT  27
 
 const char *qdr_link_columns[QDR_LINK_COLUMN_COUNT + 1];
 
diff --git a/src/router_core/agent_router.c b/src/router_core/agent_router.c
index 74f5b8d..7bdc934 100644
--- a/src/router_core/agent_router.c
+++ b/src/router_core/agent_router.c
@@ -46,12 +46,13 @@
 #define QDR_ROUTER_MODIFIED_DELIVERIES                 19
 #define QDR_ROUTER_DELAYED_1SEC                        20
 #define QDR_ROUTER_DELAYED_10SEC                       21
-#define QDR_ROUTER_DELIVERIES_INGRESS                  22
-#define QDR_ROUTER_DELIVERIES_EGRESS                   23
-#define QDR_ROUTER_DELIVERIES_TRANSIT                  24
-#define QDR_ROUTER_DELIVERIES_INGRESS_ROUTE_CONTAINER  25
-#define QDR_ROUTER_DELIVERIES_EGRESS_ROUTE_CONTAINER   26
-#define QDR_ROUTER_DELIVERIES_REDIRECTED               27
+#define QDR_ROUTER_DELIVERIES_STUCK                    22
+#define QDR_ROUTER_DELIVERIES_INGRESS                  23
+#define QDR_ROUTER_DELIVERIES_EGRESS                   24
+#define QDR_ROUTER_DELIVERIES_TRANSIT                  25
+#define QDR_ROUTER_DELIVERIES_INGRESS_ROUTE_CONTAINER  26
+#define QDR_ROUTER_DELIVERIES_EGRESS_ROUTE_CONTAINER   27
+#define QDR_ROUTER_DELIVERIES_REDIRECTED               28
 
 
 const char *qdr_router_columns[] =
@@ -77,6 +78,7 @@ const char *qdr_router_columns[] =
      "modifiedDeliveries",
      "deliveriesDelayed1Sec",
      "deliveriesDelayed10Sec",
+     "deliveriesStuck",
      "deliveriesIngress",
      "deliveriesEgress",
      "deliveriesTransit",
@@ -197,6 +199,10 @@ static void qdr_agent_write_column_CT(qd_composed_field_t 
*body, int col, qdr_co
         qd_compose_insert_ulong(body, core->deliveries_delayed_10sec);
         break;
 
+    case QDR_ROUTER_DELIVERIES_STUCK:
+        qd_compose_insert_ulong(body, core->deliveries_stuck);
+        break;
+
     case QDR_ROUTER_DELIVERIES_INGRESS:
         qd_compose_insert_ulong(body, core->deliveries_ingress);
         break;
diff --git a/src/router_core/agent_router.h b/src/router_core/agent_router.h
index 860bf7a..7f0e271 100644
--- a/src/router_core/agent_router.h
+++ b/src/router_core/agent_router.h
@@ -21,7 +21,7 @@
 
 #include "router_core_private.h"
 
-#define QDR_ROUTER_COLUMN_COUNT  28
+#define QDR_ROUTER_COLUMN_COUNT  29
 
 const char *qdr_router_columns[QDR_ROUTER_COLUMN_COUNT + 1];
 
diff --git a/src/router_core/delivery.c b/src/router_core/delivery.c
index 3e21ed1..8d41d3f 100644
--- a/src/router_core/delivery.c
+++ b/src/router_core/delivery.c
@@ -379,6 +379,14 @@ void qdr_delivery_increment_counters_CT(qdr_core_t *core, 
qdr_delivery_t *delive
                 core->deliveries_delayed_1sec++;
         }
 
+        //
+        // If this delivery was marked as stuck, decrement the currently-stuck 
counters in the link and router.
+        //
+        if (delivery->stuck) {
+            link->deliveries_stuck--;
+            core->deliveries_stuck--;
+        }
+
         if (qd_bitmask_valid_bit_value(delivery->ingress_index) && 
link->ingress_histogram)
             link->ingress_histogram[delivery->ingress_index]++;
 
diff --git a/src/router_core/delivery.h b/src/router_core/delivery.h
index 16fcc43..fe3c858 100644
--- a/src/router_core/delivery.h
+++ b/src/router_core/delivery.h
@@ -63,6 +63,7 @@ struct qdr_delivery_t {
     qdr_delivery_ref_list_t peers;             /// Use this list if there if 
the delivery has more than one peer.
     bool                    multicast;         /// True if this delivery is 
targeted for a multicast address.
     bool                    via_edge;          /// True if this delivery 
arrived via an edge-connection.
+    bool                    stuck;             /// True if this delivery was 
counted as stuck.
 };
 
 ALLOC_DECLARE(qdr_delivery_t);
diff --git 
a/src/router_core/modules/stuck_delivery_detection/delivery_tracker.c 
b/src/router_core/modules/stuck_delivery_detection/delivery_tracker.c
new file mode 100644
index 0000000..eec3905
--- /dev/null
+++ b/src/router_core/modules/stuck_delivery_detection/delivery_tracker.c
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qpid/dispatch/ctools.h"
+#include "module.h"
+#include "delivery.h"
+#include <stdio.h>
+#include <inttypes.h>
+
+#define PROD_TIMER_INTERVAL 30
+#define PROD_STUCK_AGE      10
+
+#define TEST_TIMER_INTERVAL 5
+#define TEST_STUCK_AGE      3
+
+static int timer_interval = PROD_TIMER_INTERVAL;
+static int stuck_age      = PROD_STUCK_AGE;
+
+static void action_handler_CT(qdr_core_t *core, qdr_action_t *action, bool 
discard);
+
+typedef struct tracker_t tracker_t;
+
+struct tracker_t {
+    qdr_core_t       *core;
+    qdr_core_timer_t *timer;
+    qdr_link_t_sp     next_link;
+};
+
+
+static void check_delivery_CT(qdr_core_t *core, qdr_link_t *link, 
qdr_delivery_t *dlv)
+{
+    if (!dlv->stuck && ((core->uptime_ticks - link->core_ticks) > stuck_age)) {
+        dlv->stuck = true;
+        link->deliveries_stuck++;
+        core->deliveries_stuck++;
+        if (link->deliveries_stuck == 1)
+            qd_log(core->log, QD_LOG_INFO,
+                   "[C%"PRIu64"][L%"PRIu64"] "
+                   "Stuck delivery: At least one delivery on this link has 
been undelivered/unsettled for more than %d seconds",
+                   link->conn ? link->conn->identity : 0, link->identity, 
stuck_age);
+    }
+}
+
+
+static void process_link_CT(qdr_core_t *core, qdr_link_t *link)
+{
+    qdr_delivery_t *dlv = DEQ_HEAD(link->undelivered);
+    while (dlv) {
+        check_delivery_CT(core, link, dlv);
+        dlv = DEQ_NEXT(dlv);
+    }
+
+    dlv = DEQ_HEAD(link->unsettled);
+    while (dlv) {
+        check_delivery_CT(core, link, dlv);
+        dlv = DEQ_NEXT(dlv);
+    }
+}
+
+
+static void timer_handler_CT(qdr_core_t *core, void *context)
+{
+    tracker_t  *tracker    = (tracker_t*) context;
+    qdr_link_t *first_link = DEQ_HEAD(core->open_links);
+
+    qd_log(core->log, QD_LOG_DEBUG, "Stuck Delivery Detection: Starting 
detection cycle");
+
+    if (!!first_link) {
+        set_safe_ptr_qdr_link_t(first_link, &tracker->next_link);
+        qdr_action_t *action = qdr_action(action_handler_CT, 
"detect_stuck_deliveries");
+        action->args.general.context_1 = tracker;
+        qdr_action_background_enqueue(core, action);
+    } else
+        qdr_core_timer_schedule_CT(core, tracker->timer, timer_interval);
+}
+
+
+static void action_handler_CT(qdr_core_t *core, qdr_action_t *action, bool 
discard)
+{
+    if (discard)
+        return;
+
+    tracker_t  *tracker = (tracker_t*) action->args.general.context_1;
+    qdr_link_t *link    = safe_deref_qdr_link_t(tracker->next_link);
+
+    if (!!link) {
+        process_link_CT(core, link);
+        qdr_link_t *next = DEQ_NEXT(link);
+        if (!!next) {
+            //
+            // There is another link on the list.  Schedule another background 
action to process
+            // the next link.
+            //
+            set_safe_ptr_qdr_link_t(next, &tracker->next_link);
+            action = qdr_action(action_handler_CT, "detect_stuck_deliveries");
+            action->args.general.context_1 = tracker;
+            qdr_action_background_enqueue(core, action);
+        } else
+            //
+            // We've come to the end of the list of open links.  Set the timer 
to start a new sweep
+            // after the interval.
+            //
+            qdr_core_timer_schedule_CT(core, tracker->timer, timer_interval);
+    } else
+        //
+        // The link we were provided is not valid.  It was probably closed 
since the last time we
+        // came through this path.  Abort the sweep and set the timer for a 
new one after the interval.
+        //
+        qdr_core_timer_schedule_CT(core, tracker->timer, timer_interval);
+}
+
+
+static bool qdrc_delivery_tracker_enable_CT(qdr_core_t *core)
+{
+    if (core->qd->test_hooks) {
+        //
+        // Test hooks are enabled, override the timing constants with the test 
values
+        //
+        timer_interval = TEST_TIMER_INTERVAL;
+        stuck_age      = TEST_STUCK_AGE;
+    }
+
+    return true;
+}
+
+
+static void qdrc_delivery_tracker_init_CT(qdr_core_t *core, void 
**module_context)
+{
+    tracker_t *tracker = NEW(tracker_t);
+    ZERO(tracker);
+    tracker->core  = core;
+    tracker->timer = qdr_core_timer_CT(core, timer_handler_CT, tracker);
+    qdr_core_timer_schedule_CT(core, tracker->timer, timer_interval);
+    *module_context = tracker;
+
+    qd_log(core->log, QD_LOG_INFO,
+           "Stuck delivery detection: Scan interval: %d seconds, Delivery age 
threshold: %d seconds",
+           timer_interval, stuck_age);
+}
+
+
+static void qdrc_delivery_tracker_final_CT(void *module_context)
+{
+    tracker_t *tracker = (tracker_t*) module_context;
+    qdr_core_timer_free_CT(tracker->core, tracker->timer);
+    free(tracker);
+}
+
+
+QDR_CORE_MODULE_DECLARE("stuck_delivery_detection", 
qdrc_delivery_tracker_enable_CT, qdrc_delivery_tracker_init_CT, 
qdrc_delivery_tracker_final_CT)
diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c
index 67c145e..e5994a6 100644
--- a/src/router_core/router_core.c
+++ b/src/router_core/router_core.c
@@ -65,6 +65,7 @@ qdr_core_t *qdr_core(qd_dispatch_t *qd, qd_router_mode_t 
mode, const char *area,
     core->action_lock = sys_mutex();
     core->running     = true;
     DEQ_INIT(core->action_list);
+    DEQ_INIT(core->action_list_background);
 
     core->work_lock = sys_mutex();
     DEQ_INIT(core->work_list);
@@ -329,6 +330,15 @@ void qdr_action_enqueue(qdr_core_t *core, qdr_action_t 
*action)
 }
 
 
+void qdr_action_background_enqueue(qdr_core_t *core, qdr_action_t *action)
+{
+    sys_mutex_lock(core->action_lock);
+    DEQ_INSERT_TAIL(core->action_list_background, action);
+    sys_cond_signal(core->action_cond);
+    sys_mutex_unlock(core->action_lock);
+}
+
+
 qdr_address_t *qdr_address_CT(qdr_core_t *core, qd_address_treatment_t 
treatment, qdr_address_config_t *config)
 {
     if (treatment == QD_TREATMENT_UNAVAILABLE)
@@ -848,6 +858,7 @@ static void qdr_global_stats_request_CT(qdr_core_t *core, 
qdr_action_t *action,
         stats->deliveries_egress_route_container = 
core->deliveries_egress_route_container;
         stats->deliveries_delayed_1sec = core->deliveries_delayed_1sec;
         stats->deliveries_delayed_10sec = core->deliveries_delayed_10sec;
+        stats->deliveries_stuck = core->deliveries_stuck;
         stats->deliveries_redirected_to_fallback = core->deliveries_redirected;
     }
     qdr_general_work_t *work = 
qdr_general_work(qdr_post_global_stats_response);
diff --git a/src/router_core/router_core_private.h 
b/src/router_core/router_core_private.h
index e08c70a..fa2591b 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -458,6 +458,7 @@ struct qdr_link_t {
     uint64_t  modified_deliveries;
     uint64_t  deliveries_delayed_1sec;
     uint64_t  deliveries_delayed_10sec;
+    uint64_t  deliveries_stuck;
     uint64_t  settled_deliveries[QDR_LINK_RATE_DEPTH];
     uint64_t *ingress_histogram;
     uint8_t   priority;
@@ -756,6 +757,7 @@ struct qdr_core_t {
     sys_thread_t      *thread;
     bool               running;
     qdr_action_list_t  action_list;
+    qdr_action_list_t  action_list_background;  /// Actions processed only 
when the action_list is empty
     sys_cond_t        *action_cond;
     sys_mutex_t       *action_lock;
 
@@ -863,6 +865,7 @@ struct qdr_core_t {
     uint64_t deliveries_ingress_route_container;
     uint64_t deliveries_delayed_1sec;
     uint64_t deliveries_delayed_10sec;
+    uint64_t deliveries_stuck;
     uint64_t deliveries_redirected;
 
     qdr_edge_conn_addr_t          edge_conn_addr;
@@ -893,6 +896,7 @@ void  qdr_agent_setup_CT(qdr_core_t *core);
 void  qdr_forwarder_setup_CT(qdr_core_t *core);
 qdr_action_t *qdr_action(qdr_action_handler_t action_handler, const char 
*label);
 void qdr_action_enqueue(qdr_core_t *core, qdr_action_t *action);
+void qdr_action_background_enqueue(qdr_core_t *core, qdr_action_t *action);
 void qdr_link_issue_credit_CT(qdr_core_t *core, qdr_link_t *link, int credit, 
bool drain);
 void qdr_drain_inbound_undelivered_CT(qdr_core_t *core, qdr_link_t *link, 
qdr_address_t *addr);
 void qdr_addr_start_inlinks_CT(qdr_core_t *core, qdr_address_t *addr);
diff --git a/src/router_core/router_core_thread.c 
b/src/router_core/router_core_thread.c
index 8a4682c..2ce0f09 100644
--- a/src/router_core/router_core_thread.c
+++ b/src/router_core/router_core_thread.c
@@ -123,6 +123,32 @@ void qdr_modules_finalize(qdr_core_t *core)
 }
 
 
+/*
+ * router_core_process_background_action_LH
+ *
+ * Process up to one available background action.
+ * Return true iff an action was processed.
+ */
+static bool router_core_process_background_action_LH(qdr_core_t *core)
+{
+    qdr_action_t *action = DEQ_HEAD(core->action_list_background);
+
+    if (!!action) {
+        DEQ_REMOVE_HEAD(core->action_list_background);
+        sys_mutex_unlock(core->action_lock);
+        if (action->label)
+            qd_log(core->log, QD_LOG_TRACE, "Core background action '%s'%s", 
action->label, core->running ? "" : " (discard)");
+        action->action_handler(core, action, !core->running);
+        sys_mutex_lock(core->action_lock);
+
+        free_qdr_action_t(action);
+        return true;
+    }
+
+    return false;
+}
+
+
 void *router_core_thread(void *arg)
 {
     qdr_core_t        *core = (qdr_core_t*) arg;
@@ -138,15 +164,17 @@ void *router_core_thread(void *arg)
     qd_log(core->log, QD_LOG_INFO, "Router Core thread running. %s/%s", 
core->router_area, core->router_id);
     while (core->running) {
         //
-        // Use the lock only to protect the condition variable and the action 
list
+        // Use the lock only to protect the condition variable and the action 
lists
         //
         sys_mutex_lock(core->action_lock);
 
         //
         // Block on the condition variable when there is no action to do
         //
-        while (core->running && DEQ_IS_EMPTY(core->action_list))
-            sys_cond_wait(core->action_cond, core->action_lock);
+        while (core->running && DEQ_IS_EMPTY(core->action_list)) {
+            if (!router_core_process_background_action_LH(core))
+                sys_cond_wait(core->action_cond, core->action_lock);
+        }
 
         //
         // Move the entire action list to a private list so we can process it 
without
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index f7c86a1..78d2091 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -137,6 +137,7 @@ foreach(py_test_module
     system_tests_multicast
     system_tests_fallback_dest
     system_tests_router_mesh
+    system_tests_stuck_deliveries
     )
 
   add_test(${py_test_module} ${TEST_WRAP} ${PYTHON_TEST_COMMAND} -v 
${py_test_module})
diff --git a/tests/system_test.py b/tests/system_test.py
index 39471eb..327ccf2 100755
--- a/tests/system_test.py
+++ b/tests/system_test.py
@@ -1034,6 +1034,10 @@ class MgmtMsgProxy(object):
         ap = msg.properties
         return self._Response(ap['statusCode'], ap['statusDescription'], 
msg.body)
 
+    def query_router(self):
+        ap = {'operation': 'QUERY', 'type': 'org.apache.qpid.dispatch.router'}
+        return Message(properties=ap, reply_to=self.reply_addr)
+
     def query_connections(self):
         ap = {'operation': 'QUERY', 'type': 
'org.apache.qpid.dispatch.connection'}
         return Message(properties=ap, reply_to=self.reply_addr)
diff --git a/tests/system_tests_stuck_deliveries.py 
b/tests/system_tests_stuck_deliveries.py
new file mode 100644
index 0000000..c4f8b21
--- /dev/null
+++ b/tests/system_tests_stuck_deliveries.py
@@ -0,0 +1,274 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from __future__ import unicode_literals
+from __future__ import division
+from __future__ import absolute_import
+from __future__ import print_function
+
+from time import sleep
+from threading import Event
+from threading import Timer
+
+from proton import Message, Timeout, symbol
+from system_test import TestCase, Qdrouterd, main_module, TIMEOUT, MgmtMsgProxy
+from system_test import AsyncTestReceiver
+from system_test import AsyncTestSender
+from system_test import QdManager
+from system_test import unittest
+from system_tests_link_routes import ConnLinkRouteService
+from proton.handlers import MessagingHandler
+from proton.reactor import Container, DynamicNodeProperties
+from proton.utils import BlockingConnection
+from qpid_dispatch.management.client import Node
+from subprocess import PIPE, STDOUT
+import re
+
+
+class AddrTimer(object):
+    def __init__(self, parent):
+        self.parent = parent
+
+    def on_timer_task(self, event):
+            self.parent.check_address()
+
+
+class RouterTest(TestCase):
+
+    inter_router_port = None
+
+    @classmethod
+    def setUpClass(cls):
+        """Start a router"""
+        super(RouterTest, cls).setUpClass()
+
+        def router(name, mode, connection, extra=None, args=[]):
+            config = [
+                ('router', {'mode': mode, 'id': name}),
+                ('listener', {'port': cls.tester.get_port(), 
'stripAnnotations': 'no'}),
+                connection
+            ]
+
+            if extra:
+                config.append(extra)
+            config = Qdrouterd.Config(config)
+            cls.routers.append(cls.tester.qdrouterd(name, config, wait=True, 
cl_args=args))
+
+        cls.routers = []
+
+        inter_router_port = cls.tester.get_port()
+        edge_port_A       = cls.tester.get_port()
+        edge_port_B       = cls.tester.get_port()
+
+        router('INT.A', 'interior', ('listener', {'role': 'inter-router', 
'port': inter_router_port}),
+               ('listener', {'role': 'edge', 'port': edge_port_A}), ["-T"])
+        router('INT.B', 'interior', ('connector', {'name': 'connectorToA', 
'role': 'inter-router', 'port': inter_router_port}),
+               ('listener', {'role': 'edge', 'port': edge_port_B}), ["-T"])
+        router('EA1',   'edge',     ('connector', {'name': 'edge', 'role': 
'edge', 'port': edge_port_A}), None, ["-T"])
+        router('EA2',   'edge',     ('connector', {'name': 'edge', 'role': 
'edge', 'port': edge_port_A}), None, ["-T"])
+        router('EB1',   'edge',     ('connector', {'name': 'edge', 'role': 
'edge', 'port': edge_port_B}), None, ["-T"])
+        router('EB2',   'edge',     ('connector', {'name': 'edge', 'role': 
'edge', 'port': edge_port_B}), None, ["-T"])
+
+        cls.routers[0].wait_router_connected('INT.B')
+        cls.routers[1].wait_router_connected('INT.A')
+
+
+    def test_01_delayed_settlement_same_interior(self):
+        test = DelayedSettlementTest(self.routers[0].addresses[0],
+                                     self.routers[0].addresses[0],
+                                     self.routers[0].addresses[0],
+                                     'dest.01', 10, [2], False)
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_02_delayed_settlement_different_edges_check_sender(self):
+        test = DelayedSettlementTest(self.routers[2].addresses[0],
+                                     self.routers[5].addresses[0],
+                                     self.routers[2].addresses[0],
+                                     'dest.02', 10, [2,3,8], False)
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_03_delayed_settlement_different_edges_check_receiver(self):
+        test = DelayedSettlementTest(self.routers[2].addresses[0],
+                                     self.routers[5].addresses[0],
+                                     self.routers[5].addresses[0],
+                                     'dest.03', 10, [2,4,9], False)
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_04_delayed_settlement_different_edges_check_interior(self):
+        test = DelayedSettlementTest(self.routers[2].addresses[0],
+                                     self.routers[5].addresses[0],
+                                     self.routers[0].addresses[0],
+                                     'dest.04', 10, [0,2,3,8], False)
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_05_no_settlement_same_interior(self):
+        test = DelayedSettlementTest(self.routers[0].addresses[0],
+                                     self.routers[0].addresses[0],
+                                     self.routers[0].addresses[0],
+                                     'dest.05', 10, [0,2,4,9], True)
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_06_no_settlement_different_edges_check_sender(self):
+        test = DelayedSettlementTest(self.routers[2].addresses[0],
+                                     self.routers[5].addresses[0],
+                                     self.routers[2].addresses[0],
+                                     'dest.06', 10, [9], True)
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_07_no_settlement_different_edges_check_receiver(self):
+        test = DelayedSettlementTest(self.routers[2].addresses[0],
+                                     self.routers[5].addresses[0],
+                                     self.routers[5].addresses[0],
+                                     'dest.07', 10, [0,9], True)
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_08_no_settlement_different_edges_check_interior(self):
+        test = DelayedSettlementTest(self.routers[2].addresses[0],
+                                     self.routers[5].addresses[0],
+                                     self.routers[0].addresses[0],
+                                     'dest.08', 10, [1,2,3,4,5,6,7,8], True)
+        test.run()
+        self.assertEqual(None, test.error)
+
+
+class Timeout(object):
+    def __init__(self, parent):
+        self.parent = parent
+
+    def on_timer_task(self, event):
+        self.parent.timeout()
+
+
+class PollTimeout(object):
+    def __init__(self, parent):
+        self.parent = parent
+
+    def on_timer_task(self, event):
+        self.parent.poll_timeout()
+
+
+class DelayedSettlementTest(MessagingHandler):
+    def __init__(self, sender_host, receiver_host, query_host, addr, 
dlv_count, stuck_list, close_link):
+        super(DelayedSettlementTest, self).__init__(auto_accept = False)
+        self.sender_host   = sender_host
+        self.receiver_host = receiver_host
+        self.query_host    = query_host
+        self.addr          = addr
+        self.dlv_count     = dlv_count
+        self.stuck_list    = stuck_list
+        self.close_link    = close_link
+        self.stuck_dlvs    = []
+
+        self.sender_conn    = None
+        self.receiver_conn  = None
+        self.query_conn     = None
+        self.error          = None
+        self.n_tx           = 0
+        self.n_rx           = 0
+        self.expected_stuck = 0
+        self.last_stuck     = 0
+
+    def timeout(self):
+        self.error = "Timeout Expired - n_tx=%d, n_rx=%d, expected_stuck=%d 
last_stuck=%d" %\
+            (self.n_tx, self.n_rx, self.expected_stuck, self.last_stuck)
+        self.sender_conn.close()
+        self.receiver_conn.close()
+        self.query_conn.close()
+        if self.poll_timer:
+            self.poll_timer.cancel()
+
+    def fail(self, error):
+        self.error = error
+        self.sender_conn.close()
+        self.receiver_conn.close()
+        self.query_conn.close()
+        if self.poll_timer:
+            self.poll_timer.cancel()
+        self.timer.cancel()
+
+    def on_start(self, event):
+        self.timer          = event.reactor.schedule(30.0, Timeout(self))
+        self.poll_timer     = None
+        self.sender_conn    = event.container.connect(self.sender_host)
+        self.receiver_conn  = event.container.connect(self.receiver_host)
+        self.query_conn     = event.container.connect(self.query_host)
+        self.sender         = event.container.create_sender(self.sender_conn, 
self.addr)
+        self.receiver       = 
event.container.create_receiver(self.receiver_conn, self.addr)
+        self.reply_receiver = event.container.create_receiver(self.query_conn, 
None, dynamic=True)
+        self.query_sender   = event.container.create_sender(self.query_conn, 
"$management")
+
+    def on_link_opened(self, event):
+        if event.receiver == self.reply_receiver:
+            self.reply_addr = event.receiver.remote_source.address
+            self.proxy      = MgmtMsgProxy(self.reply_addr)
+
+    def on_sendable(self, event):
+        if event.sender == self.sender:
+            while self.sender.credit > 0 and self.n_tx < self.dlv_count:
+                self.sender.send(Message("Message %d" % self.n_tx))
+                self.n_tx += 1
+
+    def on_message(self, event):
+        if event.receiver == self.receiver:
+            if self.n_rx not in self.stuck_list:
+                self.accept(event.delivery)
+            else:
+                self.stuck_dlvs.append(event.delivery)
+            self.n_rx += 1
+            if self.n_rx == self.dlv_count:
+                self.query_stats(len(self.stuck_list) * 2)
+        elif event.receiver == self.reply_receiver:
+            response = self.proxy.response(event.message)
+            self.accept(event.delivery)
+            self.last_stuck = response.results[0].deliveriesStuck
+            if response.results[0].deliveriesStuck == self.expected_stuck:
+                if self.close_link:
+                    self.receiver.close()
+                else:
+                    for dlv in self.stuck_dlvs:
+                        self.accept(dlv)
+                    self.stuck_dlvs = []
+                if self.expected_stuck > 0:
+                    self.query_stats(0)
+                else:
+                    self.fail(None)
+            else:
+                self.poll_timer = event.reactor.schedule(0.5, 
PollTimeout(self))
+
+    def query_stats(self, expected_stuck):
+        self.expected_stuck = expected_stuck
+        msg = self.proxy.query_router()
+        self.query_sender.send(msg)
+
+    def poll_timeout(self):
+        self.query_stats(self.expected_stuck)
+
+    def run(self):
+        Container(self).run()
+
+
+if __name__== '__main__':
+    unittest.main(main_module())
diff --git a/tools/qdstat.in b/tools/qdstat.in
index c7399df..59f0191 100755
--- a/tools/qdstat.in
+++ b/tools/qdstat.in
@@ -310,9 +310,10 @@ class BusManager(Node):
             rows.append(('Released Count', router.releasedDeliveries))
             rows.append(('Modified Count', router.modifiedDeliveries))
             try:
-                rows.append(('Deliveries Delayed > 1sec', 
router.deliveriesDelayed1Sec))
+                rows.append(('Deliveries Delayed > 1sec',  
router.deliveriesDelayed1Sec))
                 rows.append(('Deliveries Delayed > 10sec', 
router.deliveriesDelayed10Sec))
-                rows.append(('Deliveries to Fallback', 
router.deliveriesRedirectedToFallback))
+                rows.append(('Deliveries Stuck > 10sec',   
router.deliveriesStuck))
+                rows.append(('Deliveries to Fallback',     
router.deliveriesRedirectedToFallback))
             except:
                 pass
             rows.append(('Ingress Count', router.deliveriesIngress))
@@ -345,14 +346,15 @@ class BusManager(Node):
         cols = ('linkType', 'linkDir', 'connectionId', 'identity', 'peer', 
'owningAddr',
                 'capacity', 'undeliveredCount', 'unsettledCount', 
'deliveryCount',
                 'presettledCount', 'droppedPresettledCount', 'acceptedCount', 
'rejectedCount', 'releasedCount',
-                'modifiedCount', 'deliveriesDelayed1Sec', 
'deliveriesDelayed10Sec', 'adminStatus', 'operStatus',
-                'linkName', 'priority', 'settleRate')
+                'modifiedCount', 'deliveriesDelayed1Sec', 
'deliveriesDelayed10Sec', 'deliveriesStuck',
+                'adminStatus', 'operStatus', 'linkName', 'priority', 
'settleRate')
 
         objects = self.query('org.apache.qpid.dispatch.router.link', cols, 
limit=self.opts.limit)
 
         has_dropped_presettled_count = False
         has_priority = False
         has_delayed  = False
+        has_stuck    = False
 
         if show_date_id:
             self.display_datetime_router_id()
@@ -366,6 +368,8 @@ class BusManager(Node):
                     has_priority = True
                 if hasattr(first_row, 'deliveriesDelayed1Sec'):
                     has_delayed = True
+                if hasattr(first_row, 'deliveriesStuck'):
+                    has_stuck = True
 
         if has_priority:
             heads.append(Header("pri"))
@@ -384,6 +388,8 @@ class BusManager(Node):
         if has_delayed:
             heads.append(Header("delay"))
             heads.append(Header("rate"))
+        if has_stuck:
+            heads.append(Header("stuck"))
         if self.opts.verbose:
             heads.append(Header("admin"))
             heads.append(Header("oper"))
@@ -415,6 +421,8 @@ class BusManager(Node):
             if has_delayed:
                 row.append(link.deliveriesDelayed1Sec + 
link.deliveriesDelayed10Sec)
                 row.append(link.settleRate)
+            if has_stuck:
+                row.append(link.deliveriesStuck)
             if self.opts.verbose:
                 row.append(link.adminStatus)
                 row.append(link.operStatus)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to