Repository: qpid-dispatch Updated Branches: refs/heads/master db96fd8f1 -> 653628894
DISPATCH-788 - Part 2 - Added a list of peer references in case there is more than one peer. This will help in the future when we introduce large message streaming (cherry picked from commit 33aab339be641e0803631018df10a30550be2e2d) Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/65362889 Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/65362889 Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/65362889 Branch: refs/heads/master Commit: 6536288944d0051e407c6ce57c4cef8bc7dd0a88 Parents: db96fd8 Author: Ganesh Murthy <gmur...@redhat.com> Authored: Tue Jun 20 14:24:30 2017 -0400 Committer: Ganesh Murthy <gmur...@redhat.com> Committed: Wed Jun 21 17:08:00 2017 -0400 ---------------------------------------------------------------------- src/router_core/forwarder.c | 4 +- src/router_core/router_core.c | 2 +- src/router_core/router_core_private.h | 61 ++++++++++++------------ src/router_core/transfer.c | 75 +++++++++++++++++++++++++----- 4 files changed, 99 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/65362889/src/router_core/forwarder.c ---------------------------------------------------------------------- diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c index 7828c6a..e31a97a 100644 --- a/src/router_core/forwarder.c +++ b/src/router_core/forwarder.c @@ -110,14 +110,14 @@ qdr_delivery_t *qdr_forward_new_delivery_CT(qdr_core_t *core, qdr_delivery_t *in out_dlv->msg = qd_message_copy(msg); out_dlv->settled = !in_dlv || in_dlv->settled; out_dlv->presettled = out_dlv->settled; - *tag = core->next_tag++; + *tag = core->next_tag++; out_dlv->tag_length = 8; out_dlv->error = 0; // // Create peer linkage only if the outgoing delivery is not settled // - if (!out_dlv->settled && in_dlv) + if (!out_dlv->settled) qdr_delivery_link_peers_CT(in_dlv, out_dlv); return out_dlv; http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/65362889/src/router_core/router_core.c ---------------------------------------------------------------------- diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c index 9fd47ae..76e2270 100644 --- a/src/router_core/router_core.c +++ b/src/router_core/router_core.c @@ -382,7 +382,7 @@ void qdr_del_connection_ref(qdr_connection_ref_list_t *ref_list, qdr_connection_ } -void qdr_add_delivery_ref(qdr_delivery_ref_list_t *list, qdr_delivery_t *dlv) +void qdr_add_delivery_ref_CT(qdr_delivery_ref_list_t *list, qdr_delivery_t *dlv) { qdr_delivery_ref_t *ref = new_qdr_delivery_ref_t(); DEQ_ITEM_INIT(ref); http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/65362889/src/router_core/router_core_private.h ---------------------------------------------------------------------- diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h index f33aeda..61e5afc 100644 --- a/src/router_core/router_core_private.h +++ b/src/router_core/router_core_private.h @@ -303,34 +303,6 @@ typedef enum { QDR_DELIVERY_IN_UNSETTLED } qdr_delivery_where_t; -struct qdr_delivery_t { - DEQ_LINKS(qdr_delivery_t); - void *context; - sys_atomic_t ref_count; - qdr_link_t *link; - qdr_delivery_t *peer; - qdr_delivery_t *next_peer; - qd_message_t *msg; - qd_iterator_t *to_addr; - qd_iterator_t *origin; - uint64_t disposition; - pn_data_t *extension_state; - qdr_error_t *error; - bool settled; - bool presettled; - bool cleared_proton_ref; - qdr_delivery_where_t where; - uint8_t tag[32]; - int tag_length; - qd_bitmask_t *link_exclusion; - qdr_address_t *tracking_addr; - int tracking_addr_bit; - qdr_link_work_t *link_work; ///< Delivery work item for this delivery -}; - -ALLOC_DECLARE(qdr_delivery_t); -DEQ_DECLARE(qdr_delivery_t, qdr_delivery_list_t); - typedef struct qdr_delivery_ref_t { DEQ_LINKS(struct qdr_delivery_ref_t); qdr_delivery_t *dlv; @@ -339,7 +311,38 @@ typedef struct qdr_delivery_ref_t { ALLOC_DECLARE(qdr_delivery_ref_t); DEQ_DECLARE(qdr_delivery_ref_t, qdr_delivery_ref_list_t); -void qdr_add_delivery_ref(qdr_delivery_ref_list_t *list, qdr_delivery_t *dlv); +struct qdr_delivery_t { + DEQ_LINKS(qdr_delivery_t); + void *context; + sys_atomic_t ref_count; + qdr_link_t *link; + qdr_delivery_t *peer; /// Use this peer if the delivery has one and only one peer. + qdr_delivery_ref_t *next_peer_ref; + qd_message_t *msg; + qd_iterator_t *to_addr; + qd_iterator_t *origin; + uint64_t disposition; + pn_data_t *extension_state; + qdr_error_t *error; + bool settled; + bool presettled; + bool cleared_proton_ref; + qdr_delivery_where_t where; + uint8_t tag[32]; + int tag_length; + qd_bitmask_t *link_exclusion; + qdr_address_t *tracking_addr; + int tracking_addr_bit; + qdr_link_work_t *link_work; ///< Delivery work item for this delivery + qdr_delivery_ref_list_t peers; /// Use this list if there if the delivery has more than one peer. + +}; + +ALLOC_DECLARE(qdr_delivery_t); +DEQ_DECLARE(qdr_delivery_t, qdr_delivery_list_t); + + +void qdr_add_delivery_ref_CT(qdr_delivery_ref_list_t *list, qdr_delivery_t *dlv); void qdr_del_delivery_ref(qdr_delivery_ref_list_t *list, qdr_delivery_ref_t *ref); #define QDR_LINK_LIST_CLASS_ADDRESS 0 http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/65362889/src/router_core/transfer.c ---------------------------------------------------------------------- diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c index 7dc4970..5ae67c8 100644 --- a/src/router_core/transfer.c +++ b/src/router_core/transfer.c @@ -412,14 +412,38 @@ static void qdr_delete_delivery_internal_CT(qdr_core_t *core, qdr_delivery_t *de free_qdr_delivery_t(delivery); } +static bool qdr_delivery_has_peer_CT(qdr_delivery_t *dlv) +{ + return dlv->peer || DEQ_SIZE(dlv->peers) > 0; +} + void qdr_delivery_link_peers_CT(qdr_delivery_t *in_dlv, qdr_delivery_t *out_dlv) { - assert(!in_dlv->peer); - assert(!out_dlv->peer); + // If there is no delivery or a peer, we cannot link each other. + if (!in_dlv || !out_dlv) + return; + + if (!qdr_delivery_has_peer_CT(in_dlv)) { + // This is the very first peer. Link them up. + assert(!out_dlv->peer); + in_dlv->peer = out_dlv; + } + else { + if (in_dlv->peer) { + // This is the first time we know that in_dlv is going to have more than one peer. + // There is already a peer in the in_dlv->peer pointer, move it into a list and zero it out. + qdr_add_delivery_ref_CT(&in_dlv->peers, in_dlv->peer); + + // Zero out the peer pointer. Since there is more than one peer, this peer has been moved to the "peers" linked list. + // All peers will now reside in the peers linked list. No need to decref/incref here because you are transferring ownership. + in_dlv->peer = 0; + } + + qdr_add_delivery_ref_CT(&in_dlv->peers, out_dlv); + } out_dlv->peer = in_dlv; - in_dlv->peer = out_dlv; qdr_delivery_incref(out_dlv); qdr_delivery_incref(in_dlv); @@ -428,10 +452,13 @@ void qdr_delivery_link_peers_CT(qdr_delivery_t *in_dlv, qdr_delivery_t *out_dlv) void qdr_delivery_unlink_peers_CT(qdr_core_t *core, qdr_delivery_t *dlv, qdr_delivery_t *peer) { + + // If there is no delivery or a peer, we cannot proceed. + if (!dlv || !peer) + return; // // Make sure that the passed in deliveries are indeed peers. // - assert(dlv->peer == peer); assert(peer->peer == dlv); @@ -445,16 +472,42 @@ void qdr_delivery_unlink_peers_CT(qdr_core_t *core, qdr_delivery_t *dlv, qdr_del qdr_delivery_t *qdr_delivery_first_peer_CT(qdr_delivery_t *dlv) { - dlv->next_peer = dlv->peer; - return dlv->next_peer; + // What if there are no peers for this delivery? + if (!qdr_delivery_has_peer_CT(dlv)) + return 0; + + if (dlv->peer) { + // If there is a dlv->peer, it is the one and only peer. + return dlv->peer; + } + else { + // The delivery has more than one peer. + qdr_delivery_ref_t *dlv_ref = DEQ_HEAD(dlv->peers); + + // Save the next peer to dlv->next_peer_ref so we can use it when somebody calls qdr_delivery_next_peer_CT + dlv->next_peer_ref = DEQ_NEXT(dlv_ref); + + // Return the first peer. + return dlv_ref->dlv; + } } qdr_delivery_t *qdr_delivery_next_peer_CT(qdr_delivery_t *dlv) { - //Get the next peer. In the current case we have no next peer. - // When a peer list is introduced, this function might return something based on the content of the peer list. - dlv->next_peer = 0; - return dlv->next_peer; + if (dlv->peer) { + // There is no next_peer if there is only one peer. If there is a non-zero dlv->peer, it is the only peer + return 0; + } + else { + // There is more than one peer to this delivery. + qdr_delivery_ref_t *next_peer_ref = dlv->next_peer_ref; + if (next_peer_ref) { + // Save the next peer to dlv->next_peer_ref so we can use it when somebody calls qdr_delivery_next_peer_CT + dlv->next_peer_ref = DEQ_NEXT(dlv->next_peer_ref); + return next_peer_ref->dlv; + } + return 0; + } } @@ -883,7 +936,7 @@ void qdr_delivery_push_CT(qdr_core_t *core, qdr_delivery_t *dlv) sys_mutex_lock(link->conn->work_lock); if (dlv->where != QDR_DELIVERY_IN_UNDELIVERED) { qdr_delivery_incref(dlv); - qdr_add_delivery_ref(&link->updated_deliveries, dlv); + qdr_add_delivery_ref_CT(&link->updated_deliveries, dlv); qdr_add_link_ref(&link->conn->links_with_work, link, QDR_LINK_LIST_CLASS_WORK); activate = true; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org