This is an automated email from the ASF dual-hosted git repository.

tross pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/master by this push:
     new 4b3a420  DISPATCH-1283 - Added protection of links from being freed 
while they are in IO processing
4b3a420 is described below

commit 4b3a42067b81bea6e5ba9a3f530ae1f6c577cc5d
Author: Ted Ross <tr...@redhat.com>
AuthorDate: Fri Mar 22 16:30:26 2019 -0400

    DISPATCH-1283 - Added protection of links from being freed while they are 
in IO processing
---
 src/router_core/connections.c         | 225 +++++++++++++++++++++-------------
 src/router_core/router_core_private.h |   2 +
 2 files changed, 142 insertions(+), 85 deletions(-)

diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 24741fe..f3ba50d 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -34,7 +34,9 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t 
*core, qdr_action_t *act
 static void qdr_link_inbound_second_attach_CT(qdr_core_t *core, qdr_action_t 
*action, bool discard);
 static void qdr_link_inbound_detach_CT(qdr_core_t *core, qdr_action_t *action, 
bool discard);
 static void qdr_link_detach_sent_CT(qdr_core_t *core, qdr_action_t *action, 
bool discard);
+static void qdr_link_processing_complete_CT(qdr_core_t *core, qdr_action_t 
*action, bool discard);
 static void qdr_link_detach_sent(qdr_link_t *link);
+static void qdr_link_processing_complete(qdr_core_t *core, qdr_link_t *link);
 
 ALLOC_DEFINE(qdr_connection_t);
 ALLOC_DEFINE(qdr_connection_work_t);
@@ -244,6 +246,7 @@ int qdr_connection_process(qdr_connection_t *conn)
         ref = DEQ_HEAD(links_with_work[priority]);
         while (ref) {
             move_link_ref(ref->link, QDR_LINK_LIST_CLASS_WORK, 
QDR_LINK_LIST_CLASS_LOCAL);
+            ref->link->processing = true;
             ref = DEQ_NEXT(ref);
         }
     }
@@ -270,102 +273,113 @@ int qdr_connection_process(qdr_connection_t *conn)
 
     // Process the links_with_work array from highest to lowest priority.
     for (int priority = QDR_MAX_PRIORITY; priority >= 0; -- priority) {
-        do {
+        ref = DEQ_HEAD(links_with_work[priority]);
+        while (ref) {
             qdr_link_work_t *link_work;
             detach_sent = false;
+            link = ref->link;
 
-            ref = DEQ_HEAD(links_with_work[priority]);
-            if (ref) {
-                link = ref->link;
-                qdr_del_link_ref(links_with_work + priority, ref->link, 
QDR_LINK_LIST_CLASS_LOCAL);
+            //
+            // The work lock must be used to protect accesses to the link's 
work_list and
+            // link_work->processing.
+            //
+            sys_mutex_lock(conn->work_lock);
+            link_work = DEQ_HEAD(link->work_list);
+            if (link_work) {
+                DEQ_REMOVE_HEAD(link->work_list);
+                link_work->processing = true;
+            }
+            sys_mutex_unlock(conn->work_lock);
 
-                //
-                // The work lock must be used to protect accesses to the 
link's work_list and
-                // link_work->processing.
-                //
-                sys_mutex_lock(conn->work_lock);
-                link_work = DEQ_HEAD(link->work_list);
-                if (link_work) {
-                    DEQ_REMOVE_HEAD(link->work_list);
-                    link_work->processing = true;
+            //
+            // Handle disposition/settlement updates
+            //
+            qdr_delivery_ref_list_t updated_deliveries;
+            sys_mutex_lock(conn->work_lock);
+            DEQ_MOVE(link->updated_deliveries, updated_deliveries);
+            sys_mutex_unlock(conn->work_lock);
+
+            qdr_delivery_ref_t *dref = DEQ_HEAD(updated_deliveries);
+            while (dref) {
+                core->delivery_update_handler(core->user_context, dref->dlv, 
dref->dlv->disposition, dref->dlv->settled);
+                qdr_delivery_decref(core, dref->dlv, "qdr_connection_process - 
remove from updated list");
+                qdr_del_delivery_ref(&updated_deliveries, dref);
+                dref = DEQ_HEAD(updated_deliveries);
+                event_count++;
+            }
+
+            while (link_work) {
+                switch (link_work->work_type) {
+                case QDR_LINK_WORK_DELIVERY :
+                    {
+                        int count = core->push_handler(core->user_context, 
link, link_work->value);
+                        assert(count <= link_work->value);
+                        link_work->value -= count;
+                        break;
+                    }
+
+                case QDR_LINK_WORK_FLOW :
+                    if (link_work->value > 0)
+                        core->flow_handler(core->user_context, link, 
link_work->value);
+                    if      (link_work->drain_action == 
QDR_LINK_WORK_DRAIN_ACTION_SET)
+                        core->drain_handler(core->user_context, link, true);
+                    else if (link_work->drain_action == 
QDR_LINK_WORK_DRAIN_ACTION_CLEAR)
+                        core->drain_handler(core->user_context, link, false);
+                    else if (link_work->drain_action == 
QDR_LINK_WORK_DRAIN_ACTION_DRAINED)
+                        core->drained_handler(core->user_context, link);
+                    break;
+
+                case QDR_LINK_WORK_FIRST_DETACH :
+                case QDR_LINK_WORK_SECOND_DETACH :
+                    core->detach_handler(core->user_context, link, 
link_work->error,
+                                         link_work->work_type == 
QDR_LINK_WORK_FIRST_DETACH,
+                                         link_work->close_link);
+                    detach_sent = true;
+                    break;
                 }
-                sys_mutex_unlock(conn->work_lock);
-            } else
-                link = 0;
 
-            if (link) {
-                //
-                // Handle disposition/settlement updates
-                //
-                qdr_delivery_ref_list_t updated_deliveries;
                 sys_mutex_lock(conn->work_lock);
-                DEQ_MOVE(link->updated_deliveries, updated_deliveries);
+                if (link_work->work_type == QDR_LINK_WORK_DELIVERY && 
link_work->value > 0 && !link->detach_received) {
+                    DEQ_INSERT_HEAD(link->work_list, link_work);
+                    link_work->processing = false;
+                    link_work = 0; // Halt work processing
+                } else {
+                    qdr_error_free(link_work->error);
+                    free_qdr_link_work_t(link_work);
+                    link_work = DEQ_HEAD(link->work_list);
+                    if (link_work) {
+                        DEQ_REMOVE_HEAD(link->work_list);
+                        link_work->processing = true;
+                    }
+                }
                 sys_mutex_unlock(conn->work_lock);
+                event_count++;
+            }
 
-                qdr_delivery_ref_t *dref = DEQ_HEAD(updated_deliveries);
-                while (dref) {
-                    core->delivery_update_handler(core->user_context, 
dref->dlv, dref->dlv->disposition, dref->dlv->settled);
-                    qdr_delivery_decref(core, dref->dlv, 
"qdr_connection_process - remove from updated list");
-                    qdr_del_delivery_ref(&updated_deliveries, dref);
-                    dref = DEQ_HEAD(updated_deliveries);
-                    event_count++;
-                }
+            if (detach_sent) {
+                // let the core thread know so it can clean up
+                qdr_link_detach_sent(link);
+            }
 
-                while (link_work) {
-                    switch (link_work->work_type) {
-                    case QDR_LINK_WORK_DELIVERY :
-                        {
-                            int count = core->push_handler(core->user_context, 
link, link_work->value);
-                            assert(count <= link_work->value);
-                            link_work->value -= count;
-                            break;
-                        }
-
-                    case QDR_LINK_WORK_FLOW :
-                        if (link_work->value > 0)
-                            core->flow_handler(core->user_context, link, 
link_work->value);
-                        if      (link_work->drain_action == 
QDR_LINK_WORK_DRAIN_ACTION_SET)
-                            core->drain_handler(core->user_context, link, 
true);
-                        else if (link_work->drain_action == 
QDR_LINK_WORK_DRAIN_ACTION_CLEAR)
-                            core->drain_handler(core->user_context, link, 
false);
-                        else if (link_work->drain_action == 
QDR_LINK_WORK_DRAIN_ACTION_DRAINED)
-                            core->drained_handler(core->user_context, link);
-                        break;
+            ref = DEQ_NEXT(ref);
+        }
+    }
 
-                    case QDR_LINK_WORK_FIRST_DETACH :
-                    case QDR_LINK_WORK_SECOND_DETACH :
-                        core->detach_handler(core->user_context, link, 
link_work->error,
-                                             link_work->work_type == 
QDR_LINK_WORK_FIRST_DETACH,
-                                             link_work->close_link);
-                        detach_sent = true;
-                        break;
-                    }
+    sys_mutex_lock(conn->work_lock);
+    for (int priority = QDR_MAX_PRIORITY; priority >= 0; -- priority) {
+        ref = DEQ_HEAD(links_with_work[priority]);
+        while (ref) {
+            qdr_link_t *link = ref->link;
 
-                    sys_mutex_lock(conn->work_lock);
-                    if (link_work->work_type == QDR_LINK_WORK_DELIVERY && 
link_work->value > 0 && !link->detach_received) {
-                        DEQ_INSERT_HEAD(link->work_list, link_work);
-                        link_work->processing = false;
-                        link_work = 0; // Halt work processing
-                    } else {
-                        qdr_error_free(link_work->error);
-                        free_qdr_link_work_t(link_work);
-                        link_work = DEQ_HEAD(link->work_list);
-                        if (link_work) {
-                            DEQ_REMOVE_HEAD(link->work_list);
-                            link_work->processing = true;
-                        }
-                    }
-                    sys_mutex_unlock(conn->work_lock);
-                    event_count++;
-                }
+            link->processing = false;
+            if (link->ready_to_free)
+                qdr_link_processing_complete(core, link);
 
-                if (detach_sent) {
-                    // let the core thread know so it can clean up
-                    qdr_link_detach_sent(link);
-                }
-            }
-        } while (detach_sent || link);
+            qdr_del_link_ref(links_with_work + priority, ref->link, 
QDR_LINK_LIST_CLASS_LOCAL);
+            ref = DEQ_HEAD(links_with_work[priority]);
+        }
     }
+    sys_mutex_unlock(conn->work_lock);
 
     return event_count;
 }
@@ -549,6 +563,16 @@ static void qdr_link_detach_sent(qdr_link_t *link)
 }
 
 
+static void qdr_link_processing_complete(qdr_core_t *core, qdr_link_t *link)
+{
+    qdr_action_t *action = qdr_action(qdr_link_processing_complete_CT, 
"link_processing_complete");
+
+    action->args.connection.link = link;
+    qdr_action_enqueue(core, action);
+}
+
+
+
 void qdr_connection_handlers(qdr_core_t                *core,
                              void                      *context,
                              qdr_connection_activate_t  activate,
@@ -1759,7 +1783,18 @@ static void qdr_link_inbound_detach_CT(qdr_core_t *core, 
qdr_action_t *action, b
         }
     } else if (link->detach_send_done) {  // detach count indicates detach has 
been scheduled
         // I/O thread is finished sending detach, ok to free link now
-        qdr_link_cleanup_CT(core, conn, link, "Link detached");
+
+        bool do_cleanup = false;
+        
+        sys_mutex_lock(conn->work_lock);
+        if (link->processing)
+            link->ready_to_free = true;
+        else
+            do_cleanup = true;
+        sys_mutex_unlock(conn->work_lock);
+
+        if (do_cleanup)
+            qdr_link_cleanup_CT(core, link->conn, link, "Link detached");
     }
 
     //
@@ -1786,9 +1821,29 @@ static void qdr_link_detach_sent_CT(qdr_core_t *core, 
qdr_action_t *action, bool
     if (link) {
         link->detach_send_done = true;
         if (link->conn && link->detach_received) {
-            // link is fully detached
-            qdr_link_cleanup_CT(core, link->conn, link, "Link detached");
+            bool do_cleanup = false;
+
+            sys_mutex_lock(link->conn->work_lock);
+            if (link->processing)
+                link->ready_to_free = true;
+            else
+                do_cleanup = true;
+            sys_mutex_unlock(link->conn->work_lock);
+
+            if (do_cleanup)
+                qdr_link_cleanup_CT(core, link->conn, link, "Link detached");
         }
     }
 }
 
+
+static void qdr_link_processing_complete_CT(qdr_core_t *core, qdr_action_t 
*action, bool discard)
+{
+    if (!discard) {
+        qdr_link_t *link = action->args.connection.link;
+
+        if (link)
+            qdr_link_cleanup_CT(core, link->conn, link, "Link cleanup deferred 
after IO processing");
+    }
+}
+
diff --git a/src/router_core/router_core_private.h 
b/src/router_core/router_core_private.h
index abe695f..1e4f69c 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -472,6 +472,8 @@ struct qdr_link_t {
     bool                     detach_received;   ///< True on core receipt of 
inbound attach
     bool                     detach_send_done;  ///< True once the detach has 
been sent by the I/O thread
     bool                     edge;              ///< True if this link is in 
an edge-connection
+    bool                     processing;        ///< True if an IO thread is 
currently handling this link
+    bool                     ready_to_free;     ///< True if the core thread 
wanted to clean up the link but it was processing
     char                    *strip_prefix;
     char                    *insert_prefix;
     bool                     terminus_survives_disconnect;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to