This is an automated email from the ASF dual-hosted git repository. kgiusti pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/main by this push: new d68ceff DISPATCH-2032: do not delete link_work while deliveries reference it d68ceff is described below commit d68ceff4a767ddec84fa044ecc8bb3d4a609c5d5 Author: Kenneth Giusti <kgiu...@apache.org> AuthorDate: Tue Apr 20 15:20:45 2021 -0400 DISPATCH-2032: do not delete link_work while deliveries reference it This closes #1144 --- src/alloc_pool.c | 1 - src/router_core/connections.c | 21 ++++++++++++------ src/router_core/delivery.c | 1 + src/router_core/forwarder.c | 10 ++++----- src/router_core/router_core.c | 42 +++++++++++++++++++++++++++++++++-- src/router_core/router_core_private.h | 6 ++++- src/router_core/transfer.c | 22 ++++++++---------- 7 files changed, 73 insertions(+), 30 deletions(-) diff --git a/src/alloc_pool.c b/src/alloc_pool.c index 6cdd5f8..e4f85e4 100644 --- a/src/alloc_pool.c +++ b/src/alloc_pool.c @@ -107,7 +107,6 @@ static const char *leaking_types[] = { "qd_iterator_t", "qdr_action_t", "qdr_field_t", - "qdr_link_work_t", "qd_buffer_t", "qd_bitmask_t", diff --git a/src/router_core/connections.c b/src/router_core/connections.c index 5ee6453..127ba0a 100644 --- a/src/router_core/connections.c +++ b/src/router_core/connections.c @@ -384,6 +384,7 @@ int qdr_connection_process(qdr_connection_t *conn) sys_mutex_lock(conn->work_lock); link_work = DEQ_HEAD(link->work_list); if (link_work) { + // link_work ref transfered to local link_work DEQ_REMOVE_HEAD(link->work_list); link_work->processing = true; } @@ -438,14 +439,15 @@ int qdr_connection_process(qdr_connection_t *conn) sys_mutex_lock(conn->work_lock); if (link_work->work_type == QDR_LINK_WORK_DELIVERY && link_work->value > 0 && !link->detach_received) { + // link_work ref transfered from link_work to work_list DEQ_INSERT_HEAD(link->work_list, link_work); link_work->processing = false; link_work = 0; // Halt work processing } else { - qdr_error_free(link_work->error); - free_qdr_link_work_t(link_work); + qdr_link_work_release(link_work); link_work = DEQ_HEAD(link->work_list); if (link_work) { + // link_work ref transfered to local link_work DEQ_REMOVE_HEAD(link->work_list); link_work->processing = true; } @@ -727,6 +729,8 @@ void qdr_link_enqueue_work_CT(qdr_core_t *core, qdr_connection_t *conn = link->conn; sys_mutex_lock(conn->work_lock); + // expect: caller transfers refcount: + assert(sys_atomic_get(&work->ref_count) > 0); DEQ_INSERT_TAIL(link->work_list, work); qdr_add_link_ref(&conn->links_with_work[link->priority], link, QDR_LINK_LIST_CLASS_WORK); sys_mutex_unlock(conn->work_lock); @@ -768,6 +772,7 @@ void qdr_link_cleanup_deliveries_CT(qdr_core_t *core, qdr_connection_t *conn, qd d->where = QDR_DELIVERY_NOWHERE; if (on_shutdown) d->tracking_addr = 0; + qdr_link_work_release(d->link_work); d->link_work = 0; d = DEQ_NEXT(d); } @@ -777,6 +782,7 @@ void qdr_link_cleanup_deliveries_CT(qdr_core_t *core, qdr_connection_t *conn, qd while (d) { assert(d->where == QDR_DELIVERY_IN_UNSETTLED); d->where = QDR_DELIVERY_NOWHERE; + qdr_link_work_release(d->link_work); d->link_work = 0; if (on_shutdown) d->tracking_addr = 0; @@ -788,6 +794,7 @@ void qdr_link_cleanup_deliveries_CT(qdr_core_t *core, qdr_connection_t *conn, qd while (d) { assert(d->where == QDR_DELIVERY_IN_SETTLED); d->where = QDR_DELIVERY_NOWHERE; + qdr_link_work_release(d->link_work); d->link_work = 0; if (on_shutdown) d->tracking_addr = 0; @@ -1019,8 +1026,7 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_li qdr_link_work_t *link_work = DEQ_HEAD(work_list); while (link_work) { DEQ_REMOVE_HEAD(work_list); - qdr_error_free(link_work->error); - free_qdr_link_work_t(link_work); + qdr_link_work_release(link_work); link_work = DEQ_HEAD(work_list); } @@ -1186,9 +1192,9 @@ void qdr_link_outbound_detach_CT(qdr_core_t *core, qdr_link_t *link, qdr_error_t // // tell the I/O thread to do the detach // - qdr_link_work_t *work = new_qdr_link_work_t(); - ZERO(work); - work->work_type = ++link->detach_count == 1 ? QDR_LINK_WORK_FIRST_DETACH : QDR_LINK_WORK_SECOND_DETACH; + + link->detach_count += 1; + qdr_link_work_t *work = qdr_link_work(link->detach_count == 1 ? QDR_LINK_WORK_FIRST_DETACH : QDR_LINK_WORK_SECOND_DETACH); work->close_link = close; if (error) @@ -1641,6 +1647,7 @@ static void qdr_link_process_initial_delivery_CT(qdr_core_t *core, qdr_link_t *l case QDR_DELIVERY_IN_UNDELIVERED: DEQ_REMOVE(old_link->undelivered, dlv); dlv->where = QDR_DELIVERY_NOWHERE; + qdr_link_work_release(dlv->link_work); dlv->link_work = 0; // expect: caller holds reference to dlv (in action) assert(sys_atomic_get(&dlv->ref_count) > 1); diff --git a/src/router_core/delivery.c b/src/router_core/delivery.c index 0ddd1d5..8de73e2 100644 --- a/src/router_core/delivery.c +++ b/src/router_core/delivery.c @@ -481,6 +481,7 @@ static void qdr_delete_delivery_internal_CT(qdr_core_t *core, qdr_delivery_t *de ref = DEQ_HEAD(delivery->peers); } + qdr_link_work_release(delivery->link_work); qd_bitmask_free(delivery->link_exclusion); qd_delivery_state_free(delivery->local_state); qd_delivery_state_free(delivery->remote_state); diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c index 88e0a31..71acb5b 100644 --- a/src/router_core/forwarder.c +++ b/src/router_core/forwarder.c @@ -232,8 +232,8 @@ static void qdr_forward_drop_presettled_CT_LH(qdr_core_t *core, qdr_link_t *link assert(dlv->link_work); if (dlv->link_work && (--dlv->link_work->value == 0)) { DEQ_REMOVE(link->work_list, dlv->link_work); - qdr_error_free(dlv->link_work->error); - free_qdr_link_work_t(dlv->link_work); + qdr_link_work_release(dlv->link_work); // for work_list + qdr_link_work_release(dlv->link_work); // for dlv ref dlv->link_work = 0; } dlv->disposition = PN_RELEASED; @@ -275,15 +275,13 @@ void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t *out_link, qdr_delivery if (work && work->work_type == QDR_LINK_WORK_DELIVERY) { work->value++; } else { - work = new_qdr_link_work_t(); - ZERO(work); - work->work_type = QDR_LINK_WORK_DELIVERY; + work = qdr_link_work(QDR_LINK_WORK_DELIVERY); work->value = 1; DEQ_INSERT_TAIL(out_link->work_list, work); } qdr_add_link_ref(&out_link->conn->links_with_work[out_link->priority], out_link, QDR_LINK_LIST_CLASS_WORK); - out_dlv->link_work = work; + out_dlv->link_work = qdr_link_work_getref(work); sys_mutex_unlock(out_link->conn->work_lock); // diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c index b49a1aa..343de9b 100644 --- a/src/router_core/router_core.c +++ b/src/router_core/router_core.c @@ -26,6 +26,8 @@ #include <stdio.h> #include <strings.h> +ALLOC_DECLARE(qdr_link_work_t); + ALLOC_DEFINE(qdr_address_t); ALLOC_DEFINE(qdr_address_config_t); ALLOC_DEFINE(qdr_node_t); @@ -248,8 +250,7 @@ void qdr_core_free(qdr_core_t *core) qdr_link_work_t *link_work = DEQ_HEAD(link->work_list); while (link_work) { DEQ_REMOVE_HEAD(link->work_list); - qdr_error_free(link_work->error); - free_qdr_link_work_t(link_work); + qdr_link_work_release(link_work); link_work = DEQ_HEAD(link->work_list); } sys_mutex_unlock(link->conn->work_lock); @@ -1076,3 +1077,40 @@ void qdr_protocol_adaptor_free(qdr_core_t *core, qdr_protocol_adaptor_t *adaptor DEQ_REMOVE(core->protocol_adaptors, adaptor); free(adaptor); } + + +qdr_link_work_t *qdr_link_work(qdr_link_work_type_t type) +{ + qdr_link_work_t *work = new_qdr_link_work_t(); + if (work) { + ZERO(work); + work->work_type = type; + sys_atomic_init(&work->ref_count, 1); + } + return work; +} + + +qdr_link_work_t *qdr_link_work_getref(qdr_link_work_t *work) +{ + if (work) { + uint32_t old = sys_atomic_inc(&work->ref_count); + (void)old; // mask unused var compiler warning + assert(old != 0); + } + return work; +} + +void qdr_link_work_release(qdr_link_work_t *work) +{ + if (work) { + uint32_t old = sys_atomic_dec(&work->ref_count); + assert(old != 0); + if (old == 1) { + qdr_error_free(work->error); + free_qdr_link_work_t(work); + } + } +} + + diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h index 117580e..4852134 100644 --- a/src/router_core/router_core_private.h +++ b/src/router_core/router_core_private.h @@ -316,6 +316,7 @@ typedef enum { typedef struct qdr_link_work_t { DEQ_LINKS(struct qdr_link_work_t); qdr_link_work_type_t work_type; + sys_atomic_t ref_count; qdr_error_t *error; int value; qdr_link_work_drain_action_t drain_action; @@ -323,9 +324,12 @@ typedef struct qdr_link_work_t { bool processing; } qdr_link_work_t; -ALLOC_DECLARE(qdr_link_work_t); DEQ_DECLARE(qdr_link_work_t, qdr_link_work_list_t); +qdr_link_work_t *qdr_link_work(qdr_link_work_type_t type); +qdr_link_work_t *qdr_link_work_getref(qdr_link_work_t *work); +void qdr_link_work_release(qdr_link_work_t *work); + #define QDR_AGENT_MAX_COLUMNS 64 #define QDR_AGENT_COLUMN_NULL (QDR_AGENT_MAX_COLUMNS + 1) diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c index f52c772..e7ed17d 100644 --- a/src/router_core/transfer.c +++ b/src/router_core/transfer.c @@ -207,6 +207,7 @@ int qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit) assert(dlv == DEQ_HEAD(link->undelivered)); DEQ_REMOVE_HEAD(link->undelivered); + qdr_link_work_release(dlv->link_work); dlv->link_work = 0; if (settled || qdr_delivery_oversize(dlv) || qdr_delivery_is_aborted(dlv)) { @@ -229,6 +230,7 @@ int qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit) // if (dlv == DEQ_HEAD(link->undelivered)) { DEQ_REMOVE_HEAD(link->undelivered); + qdr_link_work_release(dlv->link_work); dlv->link_work = 0; dlv->where = QDR_DELIVERY_NOWHERE; qd_nullify_safe_ptr(&dlv->link_sp); @@ -301,8 +303,8 @@ void qdr_link_complete_sent_message(qdr_core_t *core, qdr_link_t *link) if (dlv->link_work->value == 0) { DEQ_REMOVE_HEAD(link->work_list); - qdr_error_free(dlv->link_work->error); - free_qdr_link_work_t(dlv->link_work); + qdr_link_work_release(dlv->link_work); // for work_list ref + qdr_link_work_release(dlv->link_work); // for dlv ref dlv->link_work = 0; } } @@ -439,10 +441,8 @@ static void qdr_link_flow_CT(qdr_core_t *core, qdr_action_t *action, bool discar if (clink->link_direction == QD_INCOMING) qdr_link_issue_credit_CT(core, link->connected_link, credit, drain); else { - work = new_qdr_link_work_t(); - ZERO(work); - work->work_type = QDR_LINK_WORK_FLOW; - work->value = credit; + work = qdr_link_work(QDR_LINK_WORK_FLOW); + work->value = credit; if (drain) work->drain_action = QDR_LINK_WORK_DRAIN_ACTION_DRAINED; qdr_link_enqueue_work_CT(core, clink, work); @@ -460,9 +460,7 @@ static void qdr_link_flow_CT(qdr_core_t *core, qdr_action_t *action, bool discar // if (link->link_direction == QD_OUTGOING && (credit > 0 || drain_was_set)) { if (drain_was_set) { - work = new_qdr_link_work_t(); - ZERO(work); - work->work_type = QDR_LINK_WORK_FLOW; + work = qdr_link_work(QDR_LINK_WORK_FLOW); work->drain_action = QDR_LINK_WORK_DRAIN_ACTION_DRAINED; } @@ -944,10 +942,8 @@ void qdr_link_issue_credit_CT(qdr_core_t *core, qdr_link_t *link, int credit, bo sys_mutex_unlock(conn->work_lock); // need a new work flow item - work = new_qdr_link_work_t(); - ZERO(work); - work->work_type = QDR_LINK_WORK_FLOW; - work->value = credit; + work = qdr_link_work(QDR_LINK_WORK_FLOW); + work->value = credit; if (drain_changed) work->drain_action = drain_action; qdr_link_enqueue_work_CT(core, link, work); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org