This is an automated email from the ASF dual-hosted git repository.

tross pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/master by this push:
     new 096f99c  DISPATCH-1927 - Lock link when removing the initial delivery 
and handle the related link_work object.
096f99c is described below

commit 096f99c7d46dd925f66815d78daac04c304fc781
Author: Ted Ross <tr...@apache.org>
AuthorDate: Tue Feb 2 11:46:29 2021 -0500

    DISPATCH-1927 - Lock link when removing the initial delivery and handle the 
related link_work object.
    
    DISPATCH-1927 - Expand the scope of the lock in qdr_delivery_continue
---
 src/adaptors/http2/http2_adaptor.c |  1 +
 src/router_core/connections.c      |  7 +++++--
 src/router_core/delivery.c         | 16 +++++++++-------
 3 files changed, 15 insertions(+), 9 deletions(-)

diff --git a/src/adaptors/http2/http2_adaptor.c 
b/src/adaptors/http2/http2_adaptor.c
index fe2966c..a08328c 100644
--- a/src/adaptors/http2/http2_adaptor.c
+++ b/src/adaptors/http2/http2_adaptor.c
@@ -1854,6 +1854,7 @@ static uint64_t qdr_http_deliver(void *context, 
qdr_link_t *link, qdr_delivery_t
                                                      0,
                                                      
&(stream_data->incoming_id));
         qdr_link_set_context(stream_data->in_link, stream_data);
+        return QD_DELIVERY_MOVED_TO_NEW_LINK;
     }
     else if (stream_data) {
         if (conn->ingress) {
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 6bba0d5..c4c02ea 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -1608,13 +1608,14 @@ static void qdr_attach_link_downlink_CT(qdr_core_t 
*core, qdr_connection_t *conn
 
 static void qdr_link_process_initial_delivery_CT(qdr_core_t *core, qdr_link_t 
*link, qdr_delivery_t *dlv)
 {
-    qdr_link_t *old_link  = safe_deref_qdr_link_t(dlv->link_sp);
-    int         ref_delta = -1; // Account for the action-list protection
+    int ref_delta = -1; // Account for the action-list protection
 
     //
     // Remove the delivery from its current link if needed
     //
+    qdr_link_t *old_link  = safe_deref_qdr_link_t(dlv->link_sp);
     if (!!old_link) {
+        sys_mutex_lock(old_link->conn->work_lock);
         switch (dlv->where) {
         case QDR_DELIVERY_NOWHERE:
             break;
@@ -1622,6 +1623,7 @@ static void 
qdr_link_process_initial_delivery_CT(qdr_core_t *core, qdr_link_t *l
         case QDR_DELIVERY_IN_UNDELIVERED:
             DEQ_REMOVE(old_link->undelivered, dlv);
             dlv->where = QDR_DELIVERY_NOWHERE;
+            dlv->link_work = 0;
             ref_delta--;
             break;
 
@@ -1637,6 +1639,7 @@ static void 
qdr_link_process_initial_delivery_CT(qdr_core_t *core, qdr_link_t *l
             ref_delta--;
             break;
         }
+        sys_mutex_unlock(old_link->conn->work_lock);
     }
 
     //
diff --git a/src/router_core/delivery.c b/src/router_core/delivery.c
index 725178d..9421581 100644
--- a/src/router_core/delivery.c
+++ b/src/router_core/delivery.c
@@ -1056,10 +1056,12 @@ void qdr_delivery_continue_peers_CT(qdr_core_t *core, 
qdr_delivery_t *in_dlv, bo
         if (! peer->presettled && in_dlv->presettled) {
             peer->presettled       = in_dlv->presettled;
         }
-        qdr_link_work_t *work      = peer->link_work;
-        qdr_link_t      *peer_link = qdr_delivery_link(peer);
 
+        qdr_link_t *peer_link = qdr_delivery_link(peer);
         if (!!peer_link) {
+            sys_mutex_lock(peer_link->conn->work_lock);
+            qdr_link_work_t *work     = peer->link_work;
+            bool             activate = false;
 
             if (peer_link->streaming && !more) {
                 if (!peer_link->in_streaming_pool) {
@@ -1078,19 +1080,19 @@ void qdr_delivery_continue_peers_CT(qdr_core_t *core, 
qdr_delivery_t *in_dlv, bo
             // after the streaming message has been sent.
             //
             if (!!work) {
-                sys_mutex_lock(peer_link->conn->work_lock);
                 if (work->processing || work == 
DEQ_HEAD(peer_link->work_list)) {
                     
qdr_add_link_ref(&peer_link->conn->links_with_work[peer_link->priority], 
peer_link, QDR_LINK_LIST_CLASS_WORK);
-                    sys_mutex_unlock(peer_link->conn->work_lock);
 
                     //
                     // Activate the outgoing connection for later processing.
                     //
-                    qdr_connection_activate_CT(core, peer_link->conn);
+                    activate = true;
                 }
-                else
-                    sys_mutex_unlock(peer_link->conn->work_lock);
             }
+            sys_mutex_unlock(peer_link->conn->work_lock);
+
+            if (activate)
+                qdr_connection_activate_CT(core, peer_link->conn);
         }
 
         peer = qdr_delivery_next_peer_CT(in_dlv);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to