Repository: qpid-dispatch Updated Branches: refs/heads/master e22091b3f -> db96fd8f1
DISPATCH-788 - First attempt at adding a core thread API around handling of peers. The next step is to introduce a list to hold more than one peer (cherry picked from commit b196ebb3159ed7670f1f295d06fed97ae0bedda1) Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/db96fd8f Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/db96fd8f Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/db96fd8f Branch: refs/heads/master Commit: db96fd8f1748b4c7faeefeb8580567143ad5350a Parents: e22091b Author: Ganesh Murthy <gmur...@redhat.com> Authored: Mon Jun 19 10:57:26 2017 -0400 Committer: Ganesh Murthy <gmur...@redhat.com> Committed: Tue Jun 20 13:04:11 2017 -0400 ---------------------------------------------------------------------- src/router_core/connections.c | 21 +++++------- src/router_core/forwarder.c | 37 +++++++++----------- src/router_core/router_core_private.h | 24 +++++++++++++ src/router_core/transfer.c | 54 ++++++++++++++++++++++++++---- 4 files changed, 94 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/db96fd8f/src/router_core/connections.c ---------------------------------------------------------------------- diff --git a/src/router_core/connections.c b/src/router_core/connections.c index f0b8d8e..df2f69e 100644 --- a/src/router_core/connections.c +++ b/src/router_core/connections.c @@ -673,13 +673,11 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_li qdr_delivery_t *peer; while (dlv) { DEQ_REMOVE_HEAD(undelivered); - peer = dlv->peer; - if (peer) { - dlv->peer = 0; - peer->peer = 0; + peer = qdr_delivery_first_peer_CT(dlv); + while (peer) { qdr_delivery_release_CT(core, peer); - qdr_delivery_decref_CT(core, peer); - qdr_delivery_decref_CT(core, dlv); + qdr_delivery_unlink_peers_CT(core, dlv, peer); + peer = qdr_delivery_next_peer_CT(dlv); } // @@ -715,15 +713,12 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_li dlv->tracking_addr = 0; } - peer = dlv->peer; - if (peer) { - dlv->peer = 0; - peer->peer = 0; + peer = qdr_delivery_first_peer_CT(dlv); + while (peer) { if (link->link_direction == QD_OUTGOING) qdr_delivery_failed_CT(core, peer); - - qdr_delivery_decref_CT(core, peer); - qdr_delivery_decref_CT(core, dlv); + qdr_delivery_unlink_peers_CT(core, dlv, peer); + peer = qdr_delivery_next_peer_CT(dlv); } // http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/db96fd8f/src/router_core/forwarder.c ---------------------------------------------------------------------- diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c index 8cfcb03..7828c6a 100644 --- a/src/router_core/forwarder.c +++ b/src/router_core/forwarder.c @@ -101,33 +101,26 @@ static void qdr_forward_find_closest_remotes_CT(qdr_core_t *core, qdr_address_t qdr_delivery_t *qdr_forward_new_delivery_CT(qdr_core_t *core, qdr_delivery_t *in_dlv, qdr_link_t *link, qd_message_t *msg) { - qdr_delivery_t *dlv = new_qdr_delivery_t(); - uint64_t *tag = (uint64_t*) dlv->tag; - - ZERO(dlv); - sys_atomic_init(&dlv->ref_count, 0); - dlv->link = link; - dlv->msg = qd_message_copy(msg); - dlv->settled = !in_dlv || in_dlv->settled; - dlv->presettled = dlv->settled; + qdr_delivery_t *out_dlv = new_qdr_delivery_t(); + uint64_t *tag = (uint64_t*) out_dlv->tag; + + ZERO(out_dlv); + sys_atomic_init(&out_dlv->ref_count, 0); + out_dlv->link = link; + out_dlv->msg = qd_message_copy(msg); + out_dlv->settled = !in_dlv || in_dlv->settled; + out_dlv->presettled = out_dlv->settled; *tag = core->next_tag++; - dlv->tag_length = 8; - dlv->error = 0; + out_dlv->tag_length = 8; + out_dlv->error = 0; // - // Create peer linkage only if the delivery is not settled + // Create peer linkage only if the outgoing delivery is not settled // - if (!dlv->settled) { - if (in_dlv && in_dlv->peer == 0) { - dlv->peer = in_dlv; - in_dlv->peer = dlv; + if (!out_dlv->settled && in_dlv) + qdr_delivery_link_peers_CT(in_dlv, out_dlv); - qdr_delivery_incref(dlv); - qdr_delivery_incref(in_dlv); - } - } - - return dlv; + return out_dlv; } http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/db96fd8f/src/router_core/router_core_private.h ---------------------------------------------------------------------- diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h index 07d832b..f33aeda 100644 --- a/src/router_core/router_core_private.h +++ b/src/router_core/router_core_private.h @@ -309,6 +309,7 @@ struct qdr_delivery_t { sys_atomic_t ref_count; qdr_link_t *link; qdr_delivery_t *peer; + qdr_delivery_t *next_peer; qd_message_t *msg; qd_iterator_t *to_addr; qd_iterator_t *origin; @@ -693,6 +694,29 @@ void qdr_delivery_release_CT(qdr_core_t *core, qdr_delivery_t *delivery); void qdr_delivery_failed_CT(qdr_core_t *core, qdr_delivery_t *delivery); bool qdr_delivery_settled_CT(qdr_core_t *core, qdr_delivery_t *delivery); void qdr_delivery_decref_CT(qdr_core_t *core, qdr_delivery_t *delivery); + +/** + * Links the in_dlv to the out_dlv and increments ref counts of both deliveries + */ +void qdr_delivery_link_peers_CT(qdr_delivery_t *in_dlv, qdr_delivery_t *out_dlv); + +/** + * Zeroes out peer references from both peers and decrefs ref counts. + */ +void qdr_delivery_unlink_peers_CT(qdr_core_t *core, qdr_delivery_t *dlv, qdr_delivery_t *peer); + +/** + * Returns the first peer of the delivery. + * @see qdr_delivery_next_peer_CT + */ +qdr_delivery_t *qdr_delivery_first_peer_CT(qdr_delivery_t *dlv); + +/** + * Returns the next peer of the passed in delivery. + */ +qdr_delivery_t *qdr_delivery_next_peer_CT(qdr_delivery_t *dlv); + + void qdr_agent_enqueue_response_CT(qdr_core_t *core, qdr_query_t *query); void qdr_post_mobile_added_CT(qdr_core_t *core, const char *address_hash); http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/db96fd8f/src/router_core/transfer.c ---------------------------------------------------------------------- diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c index 40d4c45..7dc4970 100644 --- a/src/router_core/transfer.c +++ b/src/router_core/transfer.c @@ -413,6 +413,51 @@ static void qdr_delete_delivery_internal_CT(qdr_core_t *core, qdr_delivery_t *de } +void qdr_delivery_link_peers_CT(qdr_delivery_t *in_dlv, qdr_delivery_t *out_dlv) +{ + assert(!in_dlv->peer); + assert(!out_dlv->peer); + + out_dlv->peer = in_dlv; + in_dlv->peer = out_dlv; + + qdr_delivery_incref(out_dlv); + qdr_delivery_incref(in_dlv); +} + + +void qdr_delivery_unlink_peers_CT(qdr_core_t *core, qdr_delivery_t *dlv, qdr_delivery_t *peer) +{ + // + // Make sure that the passed in deliveries are indeed peers. + // + + assert(dlv->peer == peer); + assert(peer->peer == dlv); + + dlv->peer = 0; + peer->peer = 0; + + qdr_delivery_decref_CT(core, dlv); + qdr_delivery_decref_CT(core, peer); +} + + +qdr_delivery_t *qdr_delivery_first_peer_CT(qdr_delivery_t *dlv) +{ + dlv->next_peer = dlv->peer; + return dlv->next_peer; +} + +qdr_delivery_t *qdr_delivery_next_peer_CT(qdr_delivery_t *dlv) +{ + //Get the next peer. In the current case we have no next peer. + // When a peer list is introduced, this function might return something based on the content of the peer list. + dlv->next_peer = 0; + return dlv->next_peer; +} + + void qdr_delivery_decref_CT(qdr_core_t *core, qdr_delivery_t *dlv) { uint32_t ref_count = sys_atomic_dec(&dlv->ref_count); @@ -671,7 +716,7 @@ static void qdr_send_to_CT(qdr_core_t *core, qdr_action_t *action, bool discard) static void qdr_update_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool discard) { qdr_delivery_t *dlv = action->args.delivery.delivery; - qdr_delivery_t *peer = dlv->peer; + qdr_delivery_t *peer = qdr_delivery_first_peer_CT(dlv); bool push = false; bool peer_moved = false; bool dlv_moved = false; @@ -705,17 +750,12 @@ static void qdr_update_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool if (settled) { if (peer) { peer->settled = true; - peer->peer = 0; - dlv->peer = 0; - if (peer->link) { peer_moved = qdr_delivery_settled_CT(core, peer); if (peer_moved) push = true; } - - qdr_delivery_decref_CT(core, dlv); - qdr_delivery_decref_CT(core, peer); + qdr_delivery_unlink_peers_CT(core, dlv, peer); } if (dlv->link) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org