Repository: qpid-dispatch
Updated Branches:
  refs/heads/master db96fd8f1 -> 653628894


DISPATCH-788 - Part 2 - Added a list of peer references in case there is more 
than one peer. This will help in the future when we introduce large message 
streaming

(cherry picked from commit 33aab339be641e0803631018df10a30550be2e2d)


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/65362889
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/65362889
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/65362889

Branch: refs/heads/master
Commit: 6536288944d0051e407c6ce57c4cef8bc7dd0a88
Parents: db96fd8
Author: Ganesh Murthy <gmur...@redhat.com>
Authored: Tue Jun 20 14:24:30 2017 -0400
Committer: Ganesh Murthy <gmur...@redhat.com>
Committed: Wed Jun 21 17:08:00 2017 -0400

----------------------------------------------------------------------
 src/router_core/forwarder.c           |  4 +-
 src/router_core/router_core.c         |  2 +-
 src/router_core/router_core_private.h | 61 ++++++++++++------------
 src/router_core/transfer.c            | 75 +++++++++++++++++++++++++-----
 4 files changed, 99 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/65362889/src/router_core/forwarder.c
----------------------------------------------------------------------
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index 7828c6a..e31a97a 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -110,14 +110,14 @@ qdr_delivery_t *qdr_forward_new_delivery_CT(qdr_core_t 
*core, qdr_delivery_t *in
     out_dlv->msg        = qd_message_copy(msg);
     out_dlv->settled    = !in_dlv || in_dlv->settled;
     out_dlv->presettled = out_dlv->settled;
-    *tag            = core->next_tag++;
+    *tag                = core->next_tag++;
     out_dlv->tag_length = 8;
     out_dlv->error      = 0;
 
     //
     // Create peer linkage only if the outgoing delivery is not settled
     //
-    if (!out_dlv->settled && in_dlv)
+    if (!out_dlv->settled)
         qdr_delivery_link_peers_CT(in_dlv, out_dlv);
 
     return out_dlv;

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/65362889/src/router_core/router_core.c
----------------------------------------------------------------------
diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c
index 9fd47ae..76e2270 100644
--- a/src/router_core/router_core.c
+++ b/src/router_core/router_core.c
@@ -382,7 +382,7 @@ void qdr_del_connection_ref(qdr_connection_ref_list_t 
*ref_list, qdr_connection_
 }
 
 
-void qdr_add_delivery_ref(qdr_delivery_ref_list_t *list, qdr_delivery_t *dlv)
+void qdr_add_delivery_ref_CT(qdr_delivery_ref_list_t *list, qdr_delivery_t 
*dlv)
 {
     qdr_delivery_ref_t *ref = new_qdr_delivery_ref_t();
     DEQ_ITEM_INIT(ref);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/65362889/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h 
b/src/router_core/router_core_private.h
index f33aeda..61e5afc 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -303,34 +303,6 @@ typedef enum {
     QDR_DELIVERY_IN_UNSETTLED
 } qdr_delivery_where_t;
 
-struct qdr_delivery_t {
-    DEQ_LINKS(qdr_delivery_t);
-    void                *context;
-    sys_atomic_t         ref_count;
-    qdr_link_t          *link;
-    qdr_delivery_t      *peer;
-    qdr_delivery_t      *next_peer;
-    qd_message_t        *msg;
-    qd_iterator_t       *to_addr;
-    qd_iterator_t       *origin;
-    uint64_t             disposition;
-    pn_data_t           *extension_state;
-    qdr_error_t         *error;
-    bool                 settled;
-    bool                 presettled;
-    bool                 cleared_proton_ref;
-    qdr_delivery_where_t where;
-    uint8_t              tag[32];
-    int                  tag_length;
-    qd_bitmask_t        *link_exclusion;
-    qdr_address_t       *tracking_addr;
-    int                  tracking_addr_bit;
-    qdr_link_work_t     *link_work;         ///< Delivery work item for this 
delivery
-};
-
-ALLOC_DECLARE(qdr_delivery_t);
-DEQ_DECLARE(qdr_delivery_t, qdr_delivery_list_t);
-
 typedef struct qdr_delivery_ref_t {
     DEQ_LINKS(struct qdr_delivery_ref_t);
     qdr_delivery_t *dlv;
@@ -339,7 +311,38 @@ typedef struct qdr_delivery_ref_t {
 ALLOC_DECLARE(qdr_delivery_ref_t);
 DEQ_DECLARE(qdr_delivery_ref_t, qdr_delivery_ref_list_t);
 
-void qdr_add_delivery_ref(qdr_delivery_ref_list_t *list, qdr_delivery_t *dlv);
+struct qdr_delivery_t {
+    DEQ_LINKS(qdr_delivery_t);
+    void                   *context;
+    sys_atomic_t            ref_count;
+    qdr_link_t             *link;
+    qdr_delivery_t         *peer;          /// Use this peer if the delivery 
has one and only one peer.
+    qdr_delivery_ref_t     *next_peer_ref;
+    qd_message_t           *msg;
+    qd_iterator_t          *to_addr;
+    qd_iterator_t          *origin;
+    uint64_t                disposition;
+    pn_data_t              *extension_state;
+    qdr_error_t            *error;
+    bool                    settled;
+    bool                    presettled;
+    bool                    cleared_proton_ref;
+    qdr_delivery_where_t    where;
+    uint8_t                 tag[32];
+    int                     tag_length;
+    qd_bitmask_t           *link_exclusion;
+    qdr_address_t          *tracking_addr;
+    int                     tracking_addr_bit;
+    qdr_link_work_t        *link_work;         ///< Delivery work item for 
this delivery
+    qdr_delivery_ref_list_t peers;             /// Use this list if there if 
the delivery has more than one peer.
+
+};
+
+ALLOC_DECLARE(qdr_delivery_t);
+DEQ_DECLARE(qdr_delivery_t, qdr_delivery_list_t);
+
+
+void qdr_add_delivery_ref_CT(qdr_delivery_ref_list_t *list, qdr_delivery_t 
*dlv);
 void qdr_del_delivery_ref(qdr_delivery_ref_list_t *list, qdr_delivery_ref_t 
*ref);
 
 #define QDR_LINK_LIST_CLASS_ADDRESS    0

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/65362889/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 7dc4970..5ae67c8 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -412,14 +412,38 @@ static void qdr_delete_delivery_internal_CT(qdr_core_t 
*core, qdr_delivery_t *de
     free_qdr_delivery_t(delivery);
 }
 
+static bool qdr_delivery_has_peer_CT(qdr_delivery_t *dlv)
+{
+    return dlv->peer || DEQ_SIZE(dlv->peers) > 0;
+}
+
 
 void qdr_delivery_link_peers_CT(qdr_delivery_t *in_dlv, qdr_delivery_t 
*out_dlv)
 {
-    assert(!in_dlv->peer);
-    assert(!out_dlv->peer);
+    // If there is no delivery or a peer, we cannot link each other.
+    if (!in_dlv || !out_dlv)
+        return;
+
+    if (!qdr_delivery_has_peer_CT(in_dlv)) {
+        // This is the very first peer. Link them up.
+        assert(!out_dlv->peer);
+        in_dlv->peer = out_dlv;
+    }
+    else {
+        if (in_dlv->peer) {
+            // This is the first time we know that in_dlv is going to have 
more than one peer.
+            // There is already a peer in the in_dlv->peer pointer, move it 
into a list and zero it out.
+            qdr_add_delivery_ref_CT(&in_dlv->peers, in_dlv->peer);
+
+            // Zero out the peer pointer. Since there is more than one peer, 
this peer has been moved to the "peers" linked list.
+            // All peers will now reside in the peers linked list. No need to 
decref/incref here because you are transferring ownership.
+            in_dlv->peer = 0;
+        }
+
+        qdr_add_delivery_ref_CT(&in_dlv->peers, out_dlv);
+    }
 
     out_dlv->peer = in_dlv;
-    in_dlv->peer = out_dlv;
 
     qdr_delivery_incref(out_dlv);
     qdr_delivery_incref(in_dlv);
@@ -428,10 +452,13 @@ void qdr_delivery_link_peers_CT(qdr_delivery_t *in_dlv, 
qdr_delivery_t *out_dlv)
 
 void qdr_delivery_unlink_peers_CT(qdr_core_t *core, qdr_delivery_t *dlv, 
qdr_delivery_t *peer)
 {
+
+    // If there is no delivery or a peer, we cannot proceed.
+    if (!dlv || !peer)
+        return;
     //
     // Make sure that the passed in deliveries are indeed peers.
     //
-
     assert(dlv->peer == peer);
     assert(peer->peer == dlv);
 
@@ -445,16 +472,42 @@ void qdr_delivery_unlink_peers_CT(qdr_core_t *core, 
qdr_delivery_t *dlv, qdr_del
 
 qdr_delivery_t *qdr_delivery_first_peer_CT(qdr_delivery_t *dlv)
 {
-    dlv->next_peer = dlv->peer;
-    return dlv->next_peer;
+    // What if there are no peers for this delivery?
+    if (!qdr_delivery_has_peer_CT(dlv))
+        return 0;
+
+    if (dlv->peer) {
+        // If there is a dlv->peer, it is the one and only peer.
+        return dlv->peer;
+    }
+    else {
+        // The delivery has more than one peer.
+        qdr_delivery_ref_t *dlv_ref = DEQ_HEAD(dlv->peers);
+
+        // Save the next peer to dlv->next_peer_ref so we can use it when 
somebody calls qdr_delivery_next_peer_CT
+        dlv->next_peer_ref = DEQ_NEXT(dlv_ref);
+
+        // Return the first peer.
+        return dlv_ref->dlv;
+    }
 }
 
 qdr_delivery_t *qdr_delivery_next_peer_CT(qdr_delivery_t *dlv)
 {
-    //Get the next peer. In the current case we have no next peer.
-    // When a peer list is introduced, this function might return something 
based on the content of the peer list.
-    dlv->next_peer = 0;
-    return dlv->next_peer;
+    if (dlv->peer) {
+        // There is no next_peer if there is only one peer. If there is a 
non-zero dlv->peer, it is the only peer
+        return 0;
+    }
+    else {
+        // There is more than one peer to this delivery.
+        qdr_delivery_ref_t *next_peer_ref = dlv->next_peer_ref;
+        if (next_peer_ref) {
+            // Save the next peer to dlv->next_peer_ref so we can use it when 
somebody calls qdr_delivery_next_peer_CT
+            dlv->next_peer_ref = DEQ_NEXT(dlv->next_peer_ref);
+            return next_peer_ref->dlv;
+        }
+        return 0;
+    }
 }
 
 
@@ -883,7 +936,7 @@ void qdr_delivery_push_CT(qdr_core_t *core, qdr_delivery_t 
*dlv)
     sys_mutex_lock(link->conn->work_lock);
     if (dlv->where != QDR_DELIVERY_IN_UNDELIVERED) {
         qdr_delivery_incref(dlv);
-        qdr_add_delivery_ref(&link->updated_deliveries, dlv);
+        qdr_add_delivery_ref_CT(&link->updated_deliveries, dlv);
         qdr_add_link_ref(&link->conn->links_with_work, link, 
QDR_LINK_LIST_CLASS_WORK);
         activate = true;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to