This is an automated email from the ASF dual-hosted git repository.

kgiusti pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/main by this push:
     new d68ceff  DISPATCH-2032: do not delete link_work while deliveries 
reference it
d68ceff is described below

commit d68ceff4a767ddec84fa044ecc8bb3d4a609c5d5
Author: Kenneth Giusti <kgiu...@apache.org>
AuthorDate: Tue Apr 20 15:20:45 2021 -0400

    DISPATCH-2032: do not delete link_work while deliveries reference it
    
    This closes #1144
---
 src/alloc_pool.c                      |  1 -
 src/router_core/connections.c         | 21 ++++++++++++------
 src/router_core/delivery.c            |  1 +
 src/router_core/forwarder.c           | 10 ++++-----
 src/router_core/router_core.c         | 42 +++++++++++++++++++++++++++++++++--
 src/router_core/router_core_private.h |  6 ++++-
 src/router_core/transfer.c            | 22 ++++++++----------
 7 files changed, 73 insertions(+), 30 deletions(-)

diff --git a/src/alloc_pool.c b/src/alloc_pool.c
index 6cdd5f8..e4f85e4 100644
--- a/src/alloc_pool.c
+++ b/src/alloc_pool.c
@@ -107,7 +107,6 @@ static const char *leaking_types[] = {
     "qd_iterator_t",
     "qdr_action_t",
     "qdr_field_t",
-    "qdr_link_work_t",
     "qd_buffer_t",
     "qd_bitmask_t",
 
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 5ee6453..127ba0a 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -384,6 +384,7 @@ int qdr_connection_process(qdr_connection_t *conn)
             sys_mutex_lock(conn->work_lock);
             link_work = DEQ_HEAD(link->work_list);
             if (link_work) {
+                // link_work ref transfered to local link_work
                 DEQ_REMOVE_HEAD(link->work_list);
                 link_work->processing = true;
             }
@@ -438,14 +439,15 @@ int qdr_connection_process(qdr_connection_t *conn)
 
                 sys_mutex_lock(conn->work_lock);
                 if (link_work->work_type == QDR_LINK_WORK_DELIVERY && 
link_work->value > 0 && !link->detach_received) {
+                    // link_work ref transfered from link_work to work_list
                     DEQ_INSERT_HEAD(link->work_list, link_work);
                     link_work->processing = false;
                     link_work = 0; // Halt work processing
                 } else {
-                    qdr_error_free(link_work->error);
-                    free_qdr_link_work_t(link_work);
+                    qdr_link_work_release(link_work);
                     link_work = DEQ_HEAD(link->work_list);
                     if (link_work) {
+                        // link_work ref transfered to local link_work
                         DEQ_REMOVE_HEAD(link->work_list);
                         link_work->processing = true;
                     }
@@ -727,6 +729,8 @@ void qdr_link_enqueue_work_CT(qdr_core_t      *core,
     qdr_connection_t *conn = link->conn;
 
     sys_mutex_lock(conn->work_lock);
+    // expect: caller transfers refcount:
+    assert(sys_atomic_get(&work->ref_count) > 0);
     DEQ_INSERT_TAIL(link->work_list, work);
     qdr_add_link_ref(&conn->links_with_work[link->priority], link, 
QDR_LINK_LIST_CLASS_WORK);
     sys_mutex_unlock(conn->work_lock);
@@ -768,6 +772,7 @@ void qdr_link_cleanup_deliveries_CT(qdr_core_t *core, 
qdr_connection_t *conn, qd
         d->where = QDR_DELIVERY_NOWHERE;
         if (on_shutdown)
             d->tracking_addr = 0;
+        qdr_link_work_release(d->link_work);
         d->link_work = 0;
         d = DEQ_NEXT(d);
     }
@@ -777,6 +782,7 @@ void qdr_link_cleanup_deliveries_CT(qdr_core_t *core, 
qdr_connection_t *conn, qd
     while (d) {
         assert(d->where == QDR_DELIVERY_IN_UNSETTLED);
         d->where = QDR_DELIVERY_NOWHERE;
+        qdr_link_work_release(d->link_work);
         d->link_work = 0;
         if (on_shutdown)
             d->tracking_addr = 0;
@@ -788,6 +794,7 @@ void qdr_link_cleanup_deliveries_CT(qdr_core_t *core, 
qdr_connection_t *conn, qd
     while (d) {
         assert(d->where == QDR_DELIVERY_IN_SETTLED);
         d->where = QDR_DELIVERY_NOWHERE;
+        qdr_link_work_release(d->link_work);
         d->link_work = 0;
         if (on_shutdown)
             d->tracking_addr = 0;
@@ -1019,8 +1026,7 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, 
qdr_connection_t *conn, qdr_li
     qdr_link_work_t *link_work = DEQ_HEAD(work_list);
     while (link_work) {
         DEQ_REMOVE_HEAD(work_list);
-        qdr_error_free(link_work->error);
-        free_qdr_link_work_t(link_work);
+        qdr_link_work_release(link_work);
         link_work = DEQ_HEAD(work_list);
     }
 
@@ -1186,9 +1192,9 @@ void qdr_link_outbound_detach_CT(qdr_core_t *core, 
qdr_link_t *link, qdr_error_t
     //
     // tell the I/O thread to do the detach
     //
-    qdr_link_work_t *work = new_qdr_link_work_t();
-    ZERO(work);
-    work->work_type  = ++link->detach_count == 1 ? QDR_LINK_WORK_FIRST_DETACH 
: QDR_LINK_WORK_SECOND_DETACH;
+
+    link->detach_count += 1;
+    qdr_link_work_t *work = qdr_link_work(link->detach_count == 1 ? 
QDR_LINK_WORK_FIRST_DETACH : QDR_LINK_WORK_SECOND_DETACH);
     work->close_link = close;
 
     if (error)
@@ -1641,6 +1647,7 @@ static void 
qdr_link_process_initial_delivery_CT(qdr_core_t *core, qdr_link_t *l
         case QDR_DELIVERY_IN_UNDELIVERED:
             DEQ_REMOVE(old_link->undelivered, dlv);
             dlv->where = QDR_DELIVERY_NOWHERE;
+            qdr_link_work_release(dlv->link_work);
             dlv->link_work = 0;
             // expect: caller holds reference to dlv (in action)
             assert(sys_atomic_get(&dlv->ref_count) > 1);
diff --git a/src/router_core/delivery.c b/src/router_core/delivery.c
index 0ddd1d5..8de73e2 100644
--- a/src/router_core/delivery.c
+++ b/src/router_core/delivery.c
@@ -481,6 +481,7 @@ static void qdr_delete_delivery_internal_CT(qdr_core_t 
*core, qdr_delivery_t *de
         ref = DEQ_HEAD(delivery->peers);
     }
 
+    qdr_link_work_release(delivery->link_work);
     qd_bitmask_free(delivery->link_exclusion);
     qd_delivery_state_free(delivery->local_state);
     qd_delivery_state_free(delivery->remote_state);
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index 88e0a31..71acb5b 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -232,8 +232,8 @@ static void qdr_forward_drop_presettled_CT_LH(qdr_core_t 
*core, qdr_link_t *link
             assert(dlv->link_work);
             if (dlv->link_work && (--dlv->link_work->value == 0)) {
                 DEQ_REMOVE(link->work_list, dlv->link_work);
-                qdr_error_free(dlv->link_work->error);
-                free_qdr_link_work_t(dlv->link_work);
+                qdr_link_work_release(dlv->link_work); // for work_list
+                qdr_link_work_release(dlv->link_work); // for dlv ref
                 dlv->link_work = 0;
             }
             dlv->disposition = PN_RELEASED;
@@ -275,15 +275,13 @@ void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t 
*out_link, qdr_delivery
     if (work && work->work_type == QDR_LINK_WORK_DELIVERY) {
         work->value++;
     } else {
-        work = new_qdr_link_work_t();
-        ZERO(work);
-        work->work_type = QDR_LINK_WORK_DELIVERY;
+        work            = qdr_link_work(QDR_LINK_WORK_DELIVERY);
         work->value     = 1;
         DEQ_INSERT_TAIL(out_link->work_list, work);
     }
     qdr_add_link_ref(&out_link->conn->links_with_work[out_link->priority], 
out_link, QDR_LINK_LIST_CLASS_WORK);
 
-    out_dlv->link_work = work;
+    out_dlv->link_work = qdr_link_work_getref(work);
     sys_mutex_unlock(out_link->conn->work_lock);
 
     //
diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c
index b49a1aa..343de9b 100644
--- a/src/router_core/router_core.c
+++ b/src/router_core/router_core.c
@@ -26,6 +26,8 @@
 #include <stdio.h>
 #include <strings.h>
 
+ALLOC_DECLARE(qdr_link_work_t);
+
 ALLOC_DEFINE(qdr_address_t);
 ALLOC_DEFINE(qdr_address_config_t);
 ALLOC_DEFINE(qdr_node_t);
@@ -248,8 +250,7 @@ void qdr_core_free(qdr_core_t *core)
         qdr_link_work_t *link_work = DEQ_HEAD(link->work_list);
         while (link_work) {
             DEQ_REMOVE_HEAD(link->work_list);
-            qdr_error_free(link_work->error);
-            free_qdr_link_work_t(link_work);
+            qdr_link_work_release(link_work);
             link_work = DEQ_HEAD(link->work_list);
         }
         sys_mutex_unlock(link->conn->work_lock);
@@ -1076,3 +1077,40 @@ void qdr_protocol_adaptor_free(qdr_core_t *core, 
qdr_protocol_adaptor_t *adaptor
     DEQ_REMOVE(core->protocol_adaptors, adaptor);
     free(adaptor);
 }
+
+
+qdr_link_work_t *qdr_link_work(qdr_link_work_type_t type)
+{
+    qdr_link_work_t *work = new_qdr_link_work_t();
+    if (work) {
+        ZERO(work);
+        work->work_type = type;
+        sys_atomic_init(&work->ref_count, 1);
+    }
+    return work;
+}
+
+
+qdr_link_work_t *qdr_link_work_getref(qdr_link_work_t *work)
+{
+    if (work) {
+        uint32_t old = sys_atomic_inc(&work->ref_count);
+        (void)old;  // mask unused var compiler warning
+        assert(old != 0);
+    }
+    return work;
+}
+
+void qdr_link_work_release(qdr_link_work_t *work)
+{
+    if (work) {
+        uint32_t old = sys_atomic_dec(&work->ref_count);
+        assert(old != 0);
+        if (old == 1) {
+            qdr_error_free(work->error);
+            free_qdr_link_work_t(work);
+        }
+    }
+}
+
+
diff --git a/src/router_core/router_core_private.h 
b/src/router_core/router_core_private.h
index 117580e..4852134 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -316,6 +316,7 @@ typedef enum {
 typedef struct qdr_link_work_t {
     DEQ_LINKS(struct qdr_link_work_t);
     qdr_link_work_type_t          work_type;
+    sys_atomic_t                  ref_count;
     qdr_error_t                  *error;
     int                           value;
     qdr_link_work_drain_action_t  drain_action;
@@ -323,9 +324,12 @@ typedef struct qdr_link_work_t {
     bool                          processing;
 } qdr_link_work_t;
 
-ALLOC_DECLARE(qdr_link_work_t);
 DEQ_DECLARE(qdr_link_work_t, qdr_link_work_list_t);
 
+qdr_link_work_t *qdr_link_work(qdr_link_work_type_t type);
+qdr_link_work_t *qdr_link_work_getref(qdr_link_work_t *work);
+void qdr_link_work_release(qdr_link_work_t *work);
+
 
 #define QDR_AGENT_MAX_COLUMNS 64
 #define QDR_AGENT_COLUMN_NULL (QDR_AGENT_MAX_COLUMNS + 1)
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index f52c772..e7ed17d 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -207,6 +207,7 @@ int qdr_link_process_deliveries(qdr_core_t *core, 
qdr_link_t *link, int credit)
 
                         assert(dlv == DEQ_HEAD(link->undelivered));
                         DEQ_REMOVE_HEAD(link->undelivered);
+                        qdr_link_work_release(dlv->link_work);
                         dlv->link_work = 0;
 
                         if (settled || qdr_delivery_oversize(dlv) || 
qdr_delivery_is_aborted(dlv)) {
@@ -229,6 +230,7 @@ int qdr_link_process_deliveries(qdr_core_t *core, 
qdr_link_t *link, int credit)
                         //
                         if (dlv == DEQ_HEAD(link->undelivered)) {
                             DEQ_REMOVE_HEAD(link->undelivered);
+                            qdr_link_work_release(dlv->link_work);
                             dlv->link_work = 0;
                             dlv->where = QDR_DELIVERY_NOWHERE;
                             qd_nullify_safe_ptr(&dlv->link_sp);
@@ -301,8 +303,8 @@ void qdr_link_complete_sent_message(qdr_core_t *core, 
qdr_link_t *link)
 
             if (dlv->link_work->value == 0) {
                 DEQ_REMOVE_HEAD(link->work_list);
-                qdr_error_free(dlv->link_work->error);
-                free_qdr_link_work_t(dlv->link_work);
+                qdr_link_work_release(dlv->link_work);  // for work_list ref
+                qdr_link_work_release(dlv->link_work);  // for dlv ref
                 dlv->link_work = 0;
             }
         }
@@ -439,10 +441,8 @@ static void qdr_link_flow_CT(qdr_core_t *core, 
qdr_action_t *action, bool discar
         if (clink->link_direction == QD_INCOMING)
             qdr_link_issue_credit_CT(core, link->connected_link, credit, 
drain);
         else {
-            work = new_qdr_link_work_t();
-            ZERO(work);
-            work->work_type = QDR_LINK_WORK_FLOW;
-            work->value     = credit;
+            work        = qdr_link_work(QDR_LINK_WORK_FLOW);
+            work->value = credit;
             if (drain)
                 work->drain_action = QDR_LINK_WORK_DRAIN_ACTION_DRAINED;
             qdr_link_enqueue_work_CT(core, clink, work);
@@ -460,9 +460,7 @@ static void qdr_link_flow_CT(qdr_core_t *core, qdr_action_t 
*action, bool discar
         //
         if (link->link_direction == QD_OUTGOING && (credit > 0 || 
drain_was_set)) {
             if (drain_was_set) {
-                work = new_qdr_link_work_t();
-                ZERO(work);
-                work->work_type    = QDR_LINK_WORK_FLOW;
+                work               = qdr_link_work(QDR_LINK_WORK_FLOW);
                 work->drain_action = QDR_LINK_WORK_DRAIN_ACTION_DRAINED;
             }
 
@@ -944,10 +942,8 @@ void qdr_link_issue_credit_CT(qdr_core_t *core, qdr_link_t 
*link, int credit, bo
         sys_mutex_unlock(conn->work_lock);
 
         // need a new work flow item
-        work = new_qdr_link_work_t();
-        ZERO(work);
-        work->work_type = QDR_LINK_WORK_FLOW;
-        work->value     = credit;
+        work        = qdr_link_work(QDR_LINK_WORK_FLOW);
+        work->value = credit;
         if (drain_changed)
             work->drain_action = drain_action;
         qdr_link_enqueue_work_CT(core, link, work);

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to