This is an automated email from the ASF dual-hosted git repository. tross pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/master by this push: new 39309ea DISPATCH-1322 - Prevent credit starvation in router-to-router links. Added protection for deliveries held in link processing outside the lock. This closes #493 39309ea is described below commit 39309ea3b1606e1a0382ee4177b5df984eea536a Author: Ted Ross <tr...@redhat.com> AuthorDate: Fri Apr 26 14:00:21 2019 -0400 DISPATCH-1322 - Prevent credit starvation in router-to-router links. Added protection for deliveries held in link processing outside the lock. This closes #493 --- src/router_core/delivery.c | 2 +- src/router_core/transfer.c | 22 ++++++++++++++-------- 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/src/router_core/delivery.c b/src/router_core/delivery.c index afcefa6..049dcbb 100644 --- a/src/router_core/delivery.c +++ b/src/router_core/delivery.c @@ -303,7 +303,7 @@ bool qdr_delivery_settled_CT(qdr_core_t *core, qdr_delivery_t *dlv) // issued immediately even for unsettled deliveries. // if (moved && link->link_direction == QD_INCOMING && - link->link_type != QD_LINK_ROUTER && !link->connected_link) + link->link_type != QD_LINK_ROUTER && !link->edge && !link->connected_link) qdr_link_issue_credit_CT(core, link, 1, false); return moved; diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c index 99b1db9..17afa8d 100644 --- a/src/router_core/transfer.c +++ b/src/router_core/transfer.c @@ -145,7 +145,9 @@ int qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit) sys_mutex_lock(conn->work_lock); dlv = DEQ_HEAD(link->undelivered); if (dlv) { + qdr_delivery_incref(dlv, "qdr_link_process_deliveries - holding the undelivered delivery locally"); uint64_t new_disp = 0; + // DISPATCH-1302 race hack fix: There is a race between the CORE thread // and the outbound (this) thread over settlement. It occurs when the CORE // thread is trying to propagate settlement to a peer (this delivery) @@ -175,6 +177,7 @@ int qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit) // If the undelivered list is cleared the link may have detached. Stop processing. offer = DEQ_SIZE(link->undelivered); if (offer == 0) { + qdr_delivery_decref(core, dlv, "qdr_link_process_deliveries - release local reference - closed link"); sys_mutex_unlock(conn->work_lock); return num_deliveries_completed; } @@ -193,6 +196,8 @@ int qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit) } } else { + qdr_delivery_decref(core, dlv, "qdr_link_process_deliveries - release local reference - not send_complete"); + // // The message is still being received/sent. // 1. We cannot remove the delivery from the undelivered list. @@ -215,8 +220,8 @@ int qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit) // the core will need to update the delivery's disposition if (new_disp) - qdr_delivery_update_disposition(((qd_router_t *)core->user_context)->router_core, - dlv, new_disp, true, 0, 0, false); + qdr_delivery_update_disposition(core, dlv, new_disp, true, 0, 0, false); + qdr_delivery_decref(core, dlv, "qdr_link_process_deliveries - release local reference - done processing"); } else { sys_mutex_unlock(conn->work_lock); break; @@ -407,8 +412,8 @@ static void qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery // AND the incoming link is targeted (not anonymous). // // We shall release the delivery (it is currently undeliverable). If the distribution is - // multicast, we will replenish the credit. If it is anycast, we will allow the credit to - // drain. + // multicast or it's on an edge connection, we will replenish the credit. Otherwise, we + // will allow the credit to drain. // if (dlv->settled) { // Increment the presettled_dropped_deliveries on the in_link @@ -426,12 +431,13 @@ static void qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery qdr_delivery_release_CT(core, dlv); // - // Drain credit on the link + // Drain credit on the link if it is not in an edge connection // - qdr_link_issue_credit_CT(core, link, 0, true); + if (!link->edge) + qdr_link_issue_credit_CT(core, link, 0, true); } - if (qdr_is_addr_treatment_multicast(link->owning_addr)) + if (link->edge || qdr_is_addr_treatment_multicast(link->owning_addr)) qdr_link_issue_credit_CT(core, link, 1, false); else link->credit_pending++; @@ -531,7 +537,7 @@ static void qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery // deliveries because it increases the risk of credit starvation if there // are many addresses sharing the link. // - if (link->link_type == QD_LINK_ROUTER) + if (link->link_type == QD_LINK_ROUTER || link->edge) qdr_link_issue_credit_CT(core, link, 1, false); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org