This is an automated email from the ASF dual-hosted git repository. tross pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/master by this push: new 4b3a420 DISPATCH-1283 - Added protection of links from being freed while they are in IO processing 4b3a420 is described below commit 4b3a42067b81bea6e5ba9a3f530ae1f6c577cc5d Author: Ted Ross <tr...@redhat.com> AuthorDate: Fri Mar 22 16:30:26 2019 -0400 DISPATCH-1283 - Added protection of links from being freed while they are in IO processing --- src/router_core/connections.c | 225 +++++++++++++++++++++------------- src/router_core/router_core_private.h | 2 + 2 files changed, 142 insertions(+), 85 deletions(-) diff --git a/src/router_core/connections.c b/src/router_core/connections.c index 24741fe..f3ba50d 100644 --- a/src/router_core/connections.c +++ b/src/router_core/connections.c @@ -34,7 +34,9 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act static void qdr_link_inbound_second_attach_CT(qdr_core_t *core, qdr_action_t *action, bool discard); static void qdr_link_inbound_detach_CT(qdr_core_t *core, qdr_action_t *action, bool discard); static void qdr_link_detach_sent_CT(qdr_core_t *core, qdr_action_t *action, bool discard); +static void qdr_link_processing_complete_CT(qdr_core_t *core, qdr_action_t *action, bool discard); static void qdr_link_detach_sent(qdr_link_t *link); +static void qdr_link_processing_complete(qdr_core_t *core, qdr_link_t *link); ALLOC_DEFINE(qdr_connection_t); ALLOC_DEFINE(qdr_connection_work_t); @@ -244,6 +246,7 @@ int qdr_connection_process(qdr_connection_t *conn) ref = DEQ_HEAD(links_with_work[priority]); while (ref) { move_link_ref(ref->link, QDR_LINK_LIST_CLASS_WORK, QDR_LINK_LIST_CLASS_LOCAL); + ref->link->processing = true; ref = DEQ_NEXT(ref); } } @@ -270,102 +273,113 @@ int qdr_connection_process(qdr_connection_t *conn) // Process the links_with_work array from highest to lowest priority. for (int priority = QDR_MAX_PRIORITY; priority >= 0; -- priority) { - do { + ref = DEQ_HEAD(links_with_work[priority]); + while (ref) { qdr_link_work_t *link_work; detach_sent = false; + link = ref->link; - ref = DEQ_HEAD(links_with_work[priority]); - if (ref) { - link = ref->link; - qdr_del_link_ref(links_with_work + priority, ref->link, QDR_LINK_LIST_CLASS_LOCAL); + // + // The work lock must be used to protect accesses to the link's work_list and + // link_work->processing. + // + sys_mutex_lock(conn->work_lock); + link_work = DEQ_HEAD(link->work_list); + if (link_work) { + DEQ_REMOVE_HEAD(link->work_list); + link_work->processing = true; + } + sys_mutex_unlock(conn->work_lock); - // - // The work lock must be used to protect accesses to the link's work_list and - // link_work->processing. - // - sys_mutex_lock(conn->work_lock); - link_work = DEQ_HEAD(link->work_list); - if (link_work) { - DEQ_REMOVE_HEAD(link->work_list); - link_work->processing = true; + // + // Handle disposition/settlement updates + // + qdr_delivery_ref_list_t updated_deliveries; + sys_mutex_lock(conn->work_lock); + DEQ_MOVE(link->updated_deliveries, updated_deliveries); + sys_mutex_unlock(conn->work_lock); + + qdr_delivery_ref_t *dref = DEQ_HEAD(updated_deliveries); + while (dref) { + core->delivery_update_handler(core->user_context, dref->dlv, dref->dlv->disposition, dref->dlv->settled); + qdr_delivery_decref(core, dref->dlv, "qdr_connection_process - remove from updated list"); + qdr_del_delivery_ref(&updated_deliveries, dref); + dref = DEQ_HEAD(updated_deliveries); + event_count++; + } + + while (link_work) { + switch (link_work->work_type) { + case QDR_LINK_WORK_DELIVERY : + { + int count = core->push_handler(core->user_context, link, link_work->value); + assert(count <= link_work->value); + link_work->value -= count; + break; + } + + case QDR_LINK_WORK_FLOW : + if (link_work->value > 0) + core->flow_handler(core->user_context, link, link_work->value); + if (link_work->drain_action == QDR_LINK_WORK_DRAIN_ACTION_SET) + core->drain_handler(core->user_context, link, true); + else if (link_work->drain_action == QDR_LINK_WORK_DRAIN_ACTION_CLEAR) + core->drain_handler(core->user_context, link, false); + else if (link_work->drain_action == QDR_LINK_WORK_DRAIN_ACTION_DRAINED) + core->drained_handler(core->user_context, link); + break; + + case QDR_LINK_WORK_FIRST_DETACH : + case QDR_LINK_WORK_SECOND_DETACH : + core->detach_handler(core->user_context, link, link_work->error, + link_work->work_type == QDR_LINK_WORK_FIRST_DETACH, + link_work->close_link); + detach_sent = true; + break; } - sys_mutex_unlock(conn->work_lock); - } else - link = 0; - if (link) { - // - // Handle disposition/settlement updates - // - qdr_delivery_ref_list_t updated_deliveries; sys_mutex_lock(conn->work_lock); - DEQ_MOVE(link->updated_deliveries, updated_deliveries); + if (link_work->work_type == QDR_LINK_WORK_DELIVERY && link_work->value > 0 && !link->detach_received) { + DEQ_INSERT_HEAD(link->work_list, link_work); + link_work->processing = false; + link_work = 0; // Halt work processing + } else { + qdr_error_free(link_work->error); + free_qdr_link_work_t(link_work); + link_work = DEQ_HEAD(link->work_list); + if (link_work) { + DEQ_REMOVE_HEAD(link->work_list); + link_work->processing = true; + } + } sys_mutex_unlock(conn->work_lock); + event_count++; + } - qdr_delivery_ref_t *dref = DEQ_HEAD(updated_deliveries); - while (dref) { - core->delivery_update_handler(core->user_context, dref->dlv, dref->dlv->disposition, dref->dlv->settled); - qdr_delivery_decref(core, dref->dlv, "qdr_connection_process - remove from updated list"); - qdr_del_delivery_ref(&updated_deliveries, dref); - dref = DEQ_HEAD(updated_deliveries); - event_count++; - } + if (detach_sent) { + // let the core thread know so it can clean up + qdr_link_detach_sent(link); + } - while (link_work) { - switch (link_work->work_type) { - case QDR_LINK_WORK_DELIVERY : - { - int count = core->push_handler(core->user_context, link, link_work->value); - assert(count <= link_work->value); - link_work->value -= count; - break; - } - - case QDR_LINK_WORK_FLOW : - if (link_work->value > 0) - core->flow_handler(core->user_context, link, link_work->value); - if (link_work->drain_action == QDR_LINK_WORK_DRAIN_ACTION_SET) - core->drain_handler(core->user_context, link, true); - else if (link_work->drain_action == QDR_LINK_WORK_DRAIN_ACTION_CLEAR) - core->drain_handler(core->user_context, link, false); - else if (link_work->drain_action == QDR_LINK_WORK_DRAIN_ACTION_DRAINED) - core->drained_handler(core->user_context, link); - break; + ref = DEQ_NEXT(ref); + } + } - case QDR_LINK_WORK_FIRST_DETACH : - case QDR_LINK_WORK_SECOND_DETACH : - core->detach_handler(core->user_context, link, link_work->error, - link_work->work_type == QDR_LINK_WORK_FIRST_DETACH, - link_work->close_link); - detach_sent = true; - break; - } + sys_mutex_lock(conn->work_lock); + for (int priority = QDR_MAX_PRIORITY; priority >= 0; -- priority) { + ref = DEQ_HEAD(links_with_work[priority]); + while (ref) { + qdr_link_t *link = ref->link; - sys_mutex_lock(conn->work_lock); - if (link_work->work_type == QDR_LINK_WORK_DELIVERY && link_work->value > 0 && !link->detach_received) { - DEQ_INSERT_HEAD(link->work_list, link_work); - link_work->processing = false; - link_work = 0; // Halt work processing - } else { - qdr_error_free(link_work->error); - free_qdr_link_work_t(link_work); - link_work = DEQ_HEAD(link->work_list); - if (link_work) { - DEQ_REMOVE_HEAD(link->work_list); - link_work->processing = true; - } - } - sys_mutex_unlock(conn->work_lock); - event_count++; - } + link->processing = false; + if (link->ready_to_free) + qdr_link_processing_complete(core, link); - if (detach_sent) { - // let the core thread know so it can clean up - qdr_link_detach_sent(link); - } - } - } while (detach_sent || link); + qdr_del_link_ref(links_with_work + priority, ref->link, QDR_LINK_LIST_CLASS_LOCAL); + ref = DEQ_HEAD(links_with_work[priority]); + } } + sys_mutex_unlock(conn->work_lock); return event_count; } @@ -549,6 +563,16 @@ static void qdr_link_detach_sent(qdr_link_t *link) } +static void qdr_link_processing_complete(qdr_core_t *core, qdr_link_t *link) +{ + qdr_action_t *action = qdr_action(qdr_link_processing_complete_CT, "link_processing_complete"); + + action->args.connection.link = link; + qdr_action_enqueue(core, action); +} + + + void qdr_connection_handlers(qdr_core_t *core, void *context, qdr_connection_activate_t activate, @@ -1759,7 +1783,18 @@ static void qdr_link_inbound_detach_CT(qdr_core_t *core, qdr_action_t *action, b } } else if (link->detach_send_done) { // detach count indicates detach has been scheduled // I/O thread is finished sending detach, ok to free link now - qdr_link_cleanup_CT(core, conn, link, "Link detached"); + + bool do_cleanup = false; + + sys_mutex_lock(conn->work_lock); + if (link->processing) + link->ready_to_free = true; + else + do_cleanup = true; + sys_mutex_unlock(conn->work_lock); + + if (do_cleanup) + qdr_link_cleanup_CT(core, link->conn, link, "Link detached"); } // @@ -1786,9 +1821,29 @@ static void qdr_link_detach_sent_CT(qdr_core_t *core, qdr_action_t *action, bool if (link) { link->detach_send_done = true; if (link->conn && link->detach_received) { - // link is fully detached - qdr_link_cleanup_CT(core, link->conn, link, "Link detached"); + bool do_cleanup = false; + + sys_mutex_lock(link->conn->work_lock); + if (link->processing) + link->ready_to_free = true; + else + do_cleanup = true; + sys_mutex_unlock(link->conn->work_lock); + + if (do_cleanup) + qdr_link_cleanup_CT(core, link->conn, link, "Link detached"); } } } + +static void qdr_link_processing_complete_CT(qdr_core_t *core, qdr_action_t *action, bool discard) +{ + if (!discard) { + qdr_link_t *link = action->args.connection.link; + + if (link) + qdr_link_cleanup_CT(core, link->conn, link, "Link cleanup deferred after IO processing"); + } +} + diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h index abe695f..1e4f69c 100644 --- a/src/router_core/router_core_private.h +++ b/src/router_core/router_core_private.h @@ -472,6 +472,8 @@ struct qdr_link_t { bool detach_received; ///< True on core receipt of inbound attach bool detach_send_done; ///< True once the detach has been sent by the I/O thread bool edge; ///< True if this link is in an edge-connection + bool processing; ///< True if an IO thread is currently handling this link + bool ready_to_free; ///< True if the core thread wanted to clean up the link but it was processing char *strip_prefix; char *insert_prefix; bool terminus_survives_disconnect; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org