This is an automated email from the ASF dual-hosted git repository.

tross pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/master by this push:
     new f31d69b  DISPATCH-1297: Fix the outgoing buffer reference counting
f31d69b is described below

commit f31d69b6bea62d6142413c24ac0b2bdd016923d1
Author: Kenneth Giusti <kgiu...@apache.org>
AuthorDate: Thu Mar 14 12:00:14 2019 -0400

    DISPATCH-1297: Fix the outgoing buffer reference counting
    
    The older code did not correctly account for the loss of a multicast
    consumer.  It could end up releasing a buffer before it was sent to
    all consumers.
    
    This closes #473
---
 include/qpid/dispatch/atomic.h    |  30 ++++
 include/qpid/dispatch/buffer.h    |  16 +-
 include/qpid/dispatch/message.h   |  20 +--
 src/buffer.c                      |  17 ++-
 src/message.c                     | 308 ++++++++++++++++++++++----------------
 src/message_private.h             |   7 +-
 src/router_core/connections.c     |   4 -
 src/router_core/forwarder.c       |  60 ++++----
 src/router_core/transfer.c        |   6 -
 tests/system_tests_edge_router.py |   6 +-
 10 files changed, 278 insertions(+), 196 deletions(-)

diff --git a/include/qpid/dispatch/atomic.h b/include/qpid/dispatch/atomic.h
index 08d63f1..db41b40 100644
--- a/include/qpid/dispatch/atomic.h
+++ b/include/qpid/dispatch/atomic.h
@@ -53,6 +53,12 @@ static inline uint32_t sys_atomic_get(sys_atomic_t *ref)
     return atomic_load(ref);
 }
 
+static inline int32_t sys_atomic_set(sys_atomic_t *ref, uint32_t value)
+{
+    return atomic_exchange(ref, value);
+}
+
+
 static inline void sys_atomic_destroy(sys_atomic_t *ref) {}
 
 
@@ -84,6 +90,15 @@ static inline uint32_t sys_atomic_get(sys_atomic_t *ref)
     return *ref;
 }
 
+static inline uint32_t sys_atomic_set(sys_atomic_t *ref, uint32_t value)
+{
+    uint32_t old = *ref;
+    while (!__sync_bool_compare_and_swap(ref, old, value)) {
+        old = *ref;
+    }
+    return old;
+}
+
 static inline void sys_atomic_destroy(sys_atomic_t *ref) {}
 
 
@@ -120,6 +135,11 @@ static inline uint32_t sys_atomic_get(sys_atomic_t *ref)
     return *ref;
 }
 
+static inline void sys_atomic_set(sys_atomic_t *ref, uint32_t value)
+{
+    return atomic_swap_32(ref, value);
+}
+
 static inline void sys_atomic_destroy(sys_atomic_t *ref) {}
 
 #else
@@ -167,6 +187,16 @@ static inline uint32_t sys_atomic_get(sys_atomic_t *ref)
     return value;
 }
 
+static inline uint32_t sys_atomic_set(sys_atomic_t *ref, uint32_t value)
+{
+    uint32_t old;
+    sys_mutex_lock(ref->lock);
+    old = ref->value;
+    ref->value = value;
+    sys_mutex_unlock(ref->lock);
+    return old;
+}
+
 static inline void sys_atomic_destroy(sys_atomic_t *ref)
 {
     sys_mutex_lock(ref->lock);
diff --git a/include/qpid/dispatch/buffer.h b/include/qpid/dispatch/buffer.h
index d4fcd15..79a4a2a 100644
--- a/include/qpid/dispatch/buffer.h
+++ b/include/qpid/dispatch/buffer.h
@@ -121,15 +121,23 @@ void qd_buffer_list_free_buffers(qd_buffer_list_t *list);
  */
 unsigned int qd_buffer_list_length(const qd_buffer_list_t *list);
 
-/*
+/**
+ * Set the fanout value on the buffer.
+ * @return the _old_ count before updating
+ */
+uint32_t qd_buffer_set_fanout(qd_buffer_t *buf, uint32_t value);
+
+/**
  * Increase the fanout by 1. How many receivers should this buffer be sent to.
+ * @return the _old_ count (pre increment)
  */
-void qd_buffer_add_fanout(qd_buffer_t *buf);
+uint32_t qd_buffer_inc_fanout(qd_buffer_t *buf);
 
 /**
- * Return the buffer's fanout.
+ * Decrease the fanout by one
+ * @return the _old_ count (pre decrement)
  */
-size_t qd_buffer_fanout(qd_buffer_t *buf);
+uint32_t qd_buffer_dec_fanout(qd_buffer_t *buf);
 
 /**
  * Advance the buffer by len. Does not manipulate the contents of the buffer
diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h
index c26caef..01e3913 100644
--- a/include/qpid/dispatch/message.h
+++ b/include/qpid/dispatch/message.h
@@ -360,24 +360,14 @@ bool qd_message_tag_sent(qd_message_t *msg);
 void qd_message_set_tag_sent(qd_message_t *msg, bool tag_sent);
 
 /**
- * Get the number of receivers for this message.
- *
- * @param msg A pointer to the message.
- */
-size_t qd_message_fanout(qd_message_t *msg);
-
-/**
  * Increase the fanout of the message by 1.
  *
- * @param msg A pointer to the message.
- */
-void qd_message_add_fanout(qd_message_t *msg);
-
-/**
- * Increments the num_closed_receivers by 1. This is necessary to track the 
number of receivers that
- * dropped out during or just before transmission of a large message.
+ * @param in_msg A pointer to the inbound message.
+ * @param out_msg A pointer to the outbound message or 0 if forwarding to a
+ * local subscriber.
  */
-void qd_message_add_num_closed_receivers(qd_message_t *in_msg);
+void qd_message_add_fanout(qd_message_t *in_msg,
+                           qd_message_t *out_msg);
 
 /**
  * Disable the Q2-holdoff for this message.
diff --git a/src/buffer.c b/src/buffer.c
index e404850..a127953 100644
--- a/src/buffer.c
+++ b/src/buffer.c
@@ -53,6 +53,7 @@ qd_buffer_t *qd_buffer(void)
 void qd_buffer_free(qd_buffer_t *buf)
 {
     if (!buf) return;
+    sys_atomic_destroy(&buf->bfanout);
     free_qd_buffer_t(buf);
 }
 
@@ -87,14 +88,22 @@ void qd_buffer_insert(qd_buffer_t *buf, size_t len)
     assert(buf->size <= BUFFER_SIZE);
 }
 
-void qd_buffer_add_fanout(qd_buffer_t *buf)
+
+uint32_t qd_buffer_set_fanout(qd_buffer_t *buf, uint32_t value)
+{
+    return sys_atomic_set(&buf->bfanout, value);
+}
+
+
+uint32_t qd_buffer_inc_fanout(qd_buffer_t *buf)
 {
-    sys_atomic_inc(&buf->bfanout);
+    return sys_atomic_inc(&buf->bfanout);
 }
 
-size_t qd_buffer_fanout(qd_buffer_t *buf)
+
+uint32_t qd_buffer_dec_fanout(qd_buffer_t *buf)
 {
-    return buf->bfanout;
+    return sys_atomic_dec(&buf->bfanout);
 }
 
 
diff --git a/src/message.c b/src/message.c
index c342485..36d7d09 100644
--- a/src/message.c
+++ b/src/message.c
@@ -614,7 +614,6 @@ static int qd_check_and_advance(qd_buffer_t         
**buffer,
     //
     // Pattern matched and tag is expected.  Mark the beginning of the section.
     //
-    location->parsed     = 1;
     location->buffer     = *buffer;
     location->offset     = *cursor - qd_buffer_base(*buffer);
     location->length     = 0;
@@ -664,6 +663,30 @@ static int qd_check_and_advance(qd_buffer_t         
**buffer,
     if (consume)
         advance(&test_cursor, &test_buffer, consume);
 
+    //
+    // increment the reference count of the parsed section as location now
+    // references it. Note that the cursor has advanced to the octet after the
+    // parsed section, so be careful not to include an extra buffer past the
+    // end
+    //
+    qd_buffer_t *start = *buffer;
+    qd_buffer_t *last = test_buffer;
+    if (last != start && last != 0) {
+        if (test_cursor == qd_buffer_base(last)) {
+            // last does not include octets for the current section
+            last = DEQ_PREV(last);
+        }
+    }
+
+    while (start) {
+        qd_buffer_inc_fanout(start);
+        if (start == last)
+            break;
+        start = DEQ_NEXT(start);
+    }
+
+    location->parsed     = 1;
+
     *cursor = test_cursor;
     *buffer = test_buffer;
     return 1;
@@ -874,6 +897,7 @@ qd_message_t *qd_message()
     msg->cursor.cursor = 0;
     msg->send_complete = false;
     msg->tag_sent      = false;
+    msg->is_fanout     = false;
 
     msg->content = new_qd_message_content_t();
 
@@ -917,20 +941,35 @@ void qd_message_free(qd_message_t *in_msg)
         if (content->ma_pf_trace)
             qd_parse_free(content->ma_pf_trace);
 
-        qd_buffer_t *buf = DEQ_HEAD(content->buffers);
-        while (buf) {
-            DEQ_REMOVE_HEAD(content->buffers);
-            qd_buffer_free(buf);
-            buf = DEQ_HEAD(content->buffers);
-        }
+        qd_buffer_list_free_buffers(&content->buffers);
 
         if (content->pending)
             qd_buffer_free(content->pending);
 
         sys_mutex_free(content->lock);
         free_qd_message_content_t(content);
-    }
 
+    } else if (msg->is_fanout) {
+        //
+        // Adjust the content's fanout count and decrement all buffer fanout
+        // counts starting with the msg cursor.  If the buffer count drops to
+        // zero we can free it.
+        //
+        LOCK(content->lock);
+
+        qd_buffer_t *buf = msg->cursor.buffer;
+        while (buf) {
+            qd_buffer_t *next_buf = DEQ_NEXT(buf);
+            if (qd_buffer_dec_fanout(buf) == 1) {
+                DEQ_REMOVE(content->buffers, buf);
+                qd_buffer_free(buf);
+            }
+            buf = next_buf;
+        }
+        --content->fanout;
+
+        UNLOCK(content->lock);
+    }
     free_qd_message_t((qd_message_t*) msg);
 }
 
@@ -958,6 +997,7 @@ qd_message_t *qd_message_copy(qd_message_t *in_msg)
     copy->cursor.cursor = 0;
     copy->send_complete = false;
     copy->tag_sent      = false;
+    copy->is_fanout     = false;
 
     qd_message_message_annotations((qd_message_t*) copy);
 
@@ -1062,26 +1102,29 @@ void qd_message_set_discard(qd_message_t *msg, bool 
discard)
     pvt_msg->content->discard = discard;
 }
 
-size_t qd_message_fanout(qd_message_t *in_msg)
-{
-    if (!in_msg)
-        return 0;
-    qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
-    return msg->content->fanout;
-}
 
-void qd_message_add_fanout(qd_message_t *in_msg)
+void qd_message_add_fanout(qd_message_t *in_msg,
+                           qd_message_t *out_msg)
 {
-    assert(in_msg);
-    qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
-    sys_atomic_inc(&msg->content->fanout);
-}
 
-void qd_message_add_num_closed_receivers(qd_message_t *in_msg)
-{
+    // out_msg will be 0 if we are forwarding to an internal subscriber (like
+    // $management).  If so we treat in_msg like an out_msg
     assert(in_msg);
-    qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
-    msg->content->num_closed_receivers++;
+    qd_message_pvt_t *msg = (qd_message_pvt_t *)((out_msg) ? out_msg : in_msg);
+    msg->is_fanout = true;
+
+    qd_message_content_t *content = msg->content;
+
+    LOCK(content->lock);
+    ++content->fanout;
+
+    // do not free the buffers until all fanout consumers are done with them
+    qd_buffer_t *buf = DEQ_HEAD(content->buffers);
+    while (buf) {
+        qd_buffer_inc_fanout(buf);
+        buf = DEQ_NEXT(buf);
+    }
+    UNLOCK(content->lock);
 }
 
 
@@ -1236,6 +1279,7 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery)
     }
 
     // Loop until msg is complete, error seen, or incoming bytes are consumed
+    qd_message_content_t *content = msg->content;
     bool recv_error = false;
     while (1) {
         //
@@ -1247,56 +1291,58 @@ qd_message_t *qd_message_receive(pn_delivery_t 
*delivery)
 
         if (at_eos || recv_error) {
             // Message is complete
-            LOCK(msg->content->lock);
+            LOCK(content->lock);
             {
                 // Append last buffer if any with data
-                if (msg->content->pending) {
-                    if (qd_buffer_size(msg->content->pending) > 0) {
-                        // pending buffer has bytes that are port of message
-                        DEQ_INSERT_TAIL(msg->content->buffers,
-                                        msg->content->pending);
+                if (content->pending) {
+                    if (qd_buffer_size(content->pending) > 0) {
+                        // pending buffer has bytes that are part of message
+                        qd_buffer_set_fanout(content->pending, 
content->fanout);
+                        DEQ_INSERT_TAIL(content->buffers,
+                                        content->pending);
                     } else {
                         // pending buffer is empty
-                        qd_buffer_free(msg->content->pending);
+                        qd_buffer_free(content->pending);
                     }
-                    msg->content->pending = 0;
+                    content->pending = 0;
                 } else {
                     // pending buffer is absent
                 }
 
-                msg->content->receive_complete = true;
-                msg->content->aborted = pn_delivery_aborted(delivery);
-                msg->content->input_link = 0;
+                content->receive_complete = true;
+                content->aborted = pn_delivery_aborted(delivery);
+                content->input_link = 0;
 
                 // unlink message and delivery
                 pn_record_set(record, PN_DELIVERY_CTX, 0);
             }
-            UNLOCK(msg->content->lock);
+            UNLOCK(content->lock);
             break;
         }
 
         //
         // Handle a missing or full pending buffer
         //
-        if (!msg->content->pending) {
+        if (!content->pending) {
             // Pending buffer is absent: get a new one
-            msg->content->pending = qd_buffer();
+            content->pending = qd_buffer();
         } else {
             // Pending buffer exists
-            if (qd_buffer_capacity(msg->content->pending) == 0) {
+            if (qd_buffer_capacity(content->pending) == 0) {
                 // Pending buffer is full
-                LOCK(msg->content->lock);
-                DEQ_INSERT_TAIL(msg->content->buffers, msg->content->pending);
-                msg->content->pending = 0;
+                LOCK(content->lock);
+                qd_buffer_set_fanout(content->pending, content->fanout);
+                DEQ_INSERT_TAIL(content->buffers, content->pending);
+                content->pending = 0;
                 if (qd_message_Q2_holdoff_should_block((qd_message_t *)msg)) {
                     if (!qd_link_is_q2_limit_unbounded(qdl)) {
-                        msg->content->q2_input_holdoff = true;
-                        UNLOCK(msg->content->lock);
+                        content->q2_input_holdoff = true;
+                        UNLOCK(content->lock);
                         break;
                     }
                 }
-                UNLOCK(msg->content->lock);
-                msg->content->pending = qd_buffer();
+                UNLOCK(content->lock);
+                content->pending = qd_buffer();
             } else {
                 // Pending buffer still has capacity
             }
@@ -1306,8 +1352,8 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery)
         // Try to fill the remaining space in the pending buffer.
         //
         rc = pn_link_recv(link,
-                          (char*) qd_buffer_cursor(msg->content->pending),
-                          qd_buffer_capacity(msg->content->pending));
+                          (char*) qd_buffer_cursor(content->pending),
+                          qd_buffer_capacity(content->pending));
 
         if (rc < 0) {
             // error or eos seen. next pass breaks out of loop
@@ -1317,7 +1363,7 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery)
             // We have received a positive number of bytes for the message.  
Advance
             // the cursor in the buffer.
             //
-            qd_buffer_insert(msg->content->pending, rc);
+            qd_buffer_insert(content->pending, rc);
         } else {
             //
             // We received zero bytes, and no PN_EOS.  This means that we've 
received
@@ -1479,7 +1525,7 @@ void qd_message_send(qd_message_t *in_msg,
 
     if (msg->sent_depth < QD_DEPTH_MESSAGE_ANNOTATIONS) {
 
-        if (msg->content->aborted) {
+        if (content->aborted) {
             // Message is aborted before any part of it has been sent.
             // Declare the message to be sent,
             msg->send_complete = true;
@@ -1585,98 +1631,108 @@ void qd_message_send(qd_message_t *in_msg,
 
     pn_session_t     *pns  = pn_link_session(pnl);
 
-    while (msg->content->aborted ||
-           (buf &&
-            (msg->cursor.cursor < qd_buffer_cursor(buf) || buf->next != 0) &&
-            pn_session_outgoing_bytes(pns) <= QD_QLIMIT_Q3_UPPER)) {
-
-        if (msg->content->aborted) {
-            if (pn_link_current(pnl)) {
-                msg->send_complete = true;
-                if (!pn_delivery_aborted(pn_link_current(pnl))) {
-                    pn_delivery_abort(pn_link_current(pnl));
-                }
-            }
-            break;
-        }
+    while (!content->aborted
+           && buf
+           && pn_session_outgoing_bytes(pns) <= QD_QLIMIT_Q3_UPPER) {
 
+        // This will send the remaining data in the buffer if any. There may be
+        // zero bytes left to send if we stopped here last time and there was
+        // no next buf
+        //
         size_t buf_size = qd_buffer_size(buf);
-
-        // This will send the remaining data in the buffer if any.
         int num_bytes_to_send = buf_size - (msg->cursor.cursor - 
qd_buffer_base(buf));
+        ssize_t bytes_sent = 0;
         if (num_bytes_to_send > 0) {
-            // We are deliberately avoiding the return value of pn_link_send 
because we can't do anything nice with it.
-            (void) pn_link_send(pnl, (const char*)msg->cursor.cursor, 
num_bytes_to_send);
+            bytes_sent = pn_link_send(pnl, (const char*)msg->cursor.cursor, 
num_bytes_to_send);
         }
 
-        // If the entire message has already been received,  taking out this 
lock is not that expensive
-        // because there is no contention for this lock.
-        LOCK(msg->content->lock);
-
-        qd_buffer_t *next_buf = DEQ_NEXT(buf);
-        if (next_buf) {
-            // There is a next buffer, the previous buffer has been fully sent 
by now.
-            qd_buffer_add_fanout(buf);
-
-            if (qd_message_fanout(in_msg) - msg->content->num_closed_receivers 
== qd_buffer_fanout(buf)) {
-                qd_buffer_t *local_buf = DEQ_HEAD(content->buffers);
-                while (local_buf && local_buf != next_buf) {
-                    DEQ_REMOVE_HEAD(content->buffers);
-                    qd_buffer_free(local_buf);
-                    if (!msg->content->buffers_freed)
-                        msg->content->buffers_freed = true;
-
-                    local_buf = DEQ_HEAD(content->buffers);
-
-                    // by freeing a buffer there now may be room to restart a
-                    // stalled message receiver
-                    if (msg->content->q2_input_holdoff) {
-                        if (qd_message_Q2_holdoff_should_unblock((qd_message_t 
*)msg)) {
-                            // wake up receive side
-                            // Note: clearing holdoff here is easy compared to
-                            // clearing it in the deferred callback. Tracing
-                            // shows that rx_handler may run and subsequently
-                            // set input holdoff before the deferred handler
-                            // runs.
-                            msg->content->q2_input_holdoff = false;
-                            *restart_rx = true;
-                        }
-                    }
-                }
+        LOCK(content->lock);
+
+        if (bytes_sent < 0) {
+            //
+            // send error - likely the link has failed and we will eventually
+            // get a link detach event for this link
+            //
+            content->aborted = true;
+            msg->send_complete = true;
+            if (!pn_delivery_aborted(pn_link_current(pnl))) {
+                pn_delivery_abort(pn_link_current(pnl));
             }
-            msg->cursor.buffer = next_buf;
-            msg->cursor.cursor = qd_buffer_base(next_buf);
-        }
-        else {
-            // There is no next_buf
-            if (qd_message_receive_complete(in_msg)) {
+
+            qd_log(qd_message_log_source(),
+                   QD_LOG_WARNING,
+                   "Sending data on link %s has failed (code=%zi)",
+                   pn_link_name(pnl), bytes_sent);
+
+        } else {
+
+            msg->cursor.cursor += bytes_sent;
+
+            if (bytes_sent == num_bytes_to_send) {
                 //
-                // There is no more of the message coming, this means
-                // that we have completely sent out the message.
+                // sent the whole buffer.
+                // Can we move to the next buffer?  Only if there is a next 
buffer
+                // or we are at the end and done sending this message
                 //
-                msg->send_complete = true;
-                msg->cursor.buffer = 0;
-                msg->cursor.cursor = 0;
+                qd_buffer_t *next_buf = DEQ_NEXT(buf);
+                bool complete = qd_message_receive_complete(in_msg);
+
+                if (next_buf || complete) {
+                    //
+                    // this buffer may be freed if there are no more 
references to it
+                    //
+                    uint32_t ref_count = (msg->is_fanout) ? 
qd_buffer_dec_fanout(buf) : 1;
+                    if (ref_count == 1) {
+
+                        DEQ_REMOVE(content->buffers, buf);
+                        qd_buffer_free(buf);
+                        ++content->buffers_freed;
+
+                        // by freeing a buffer there now may be room to 
restart a
+                        // stalled message receiver
+                        if (content->q2_input_holdoff) {
+                            if 
(qd_message_Q2_holdoff_should_unblock((qd_message_t *)msg)) {
+                                // wake up receive side
+                                // Note: clearing holdoff here is easy 
compared to
+                                // clearing it in the deferred callback. 
Tracing
+                                // shows that rx_handler may run and 
subsequently
+                                // set input holdoff before the deferred 
handler
+                                // runs.
+                                content->q2_input_holdoff = false;
+                                *restart_rx = true;
+                            }
+                        }
+                    }   // end free buffer
 
-                if (msg->content->aborted) {
-                    if (!pn_delivery_aborted(pn_link_current(pnl))) {
-                        pn_delivery_abort(pn_link_current(pnl));
-                    }
+                    msg->cursor.buffer = next_buf;
+                    msg->cursor.cursor = (next_buf) ? qd_buffer_base(next_buf) 
: 0;
+
+                    msg->send_complete = (complete && !next_buf);
                 }
-            }
-            else {
+
+                buf = next_buf;
+
+            } else if (num_bytes_to_send && bytes_sent == 0) {
                 //
-                // There is more of the message to come, update your cursor 
pointers
-                // you will come back into this function to deliver more as 
bytes arrive
+                // the proton link cannot take anymore data,
+                // retry later...
                 //
-                msg->cursor.buffer = buf;
-                msg->cursor.cursor = qd_buffer_at(buf, buf_size);
+                buf = 0;
+                qd_log(qd_message_log_source(), QD_LOG_DEBUG,
+                       "Link %s output limit reached", pn_link_name(pnl));
             }
         }
 
-        UNLOCK(msg->content->lock);
+        UNLOCK(content->lock);
+    }
 
-        buf = next_buf;
+    if (content->aborted) {
+        if (pn_link_current(pnl)) {
+            msg->send_complete = true;
+            if (!pn_delivery_aborted(pn_link_current(pnl))) {
+                pn_delivery_abort(pn_link_current(pnl));
+            }
+        }
     }
 
     *q3_stalled = (pn_session_outgoing_bytes(pns) > QD_QLIMIT_Q3_UPPER);
@@ -1710,7 +1766,7 @@ static bool qd_message_check_LH(qd_message_content_t 
*content, qd_message_depth_
     qd_error_clear();
 
     //
-    // In the case of a streaming or multi buffer message, there is a change 
that some buffers might be freed before the entire
+    // In the case of a streaming or multi buffer message, there is a chance 
that some buffers might be freed before the entire
     // message has arrived in which case we cannot reliably check the message 
using the depth.
     //
     if (content->buffers_freed)
diff --git a/src/message_private.h b/src/message_private.h
index 2402e58..850c534 100644
--- a/src/message_private.h
+++ b/src/message_private.h
@@ -69,6 +69,8 @@ typedef struct {
     sys_atomic_t         ref_count;                       // The number of 
messages referencing this
     qd_buffer_list_t     buffers;                         // The buffer chain 
containing the message
     qd_buffer_t         *pending;                         // Buffer owned by 
and filled by qd_message_receive
+    uint64_t             buffers_freed;                   // count of large 
msg buffers freed on send
+
     qd_field_location_t  section_message_header;          // The message 
header list
     qd_field_location_t  section_delivery_annotation;     // The delivery 
annotation map
     qd_field_location_t  section_message_annotation;      // The message 
annotation map
@@ -107,8 +109,7 @@ typedef struct {
     qd_parsed_field_t   *ma_pf_to_override;
     qd_parsed_field_t   *ma_pf_trace;
     int                  ma_int_phase;
-    sys_atomic_t         fanout;                         // The number of 
receivers for this message. This number does not include in-process subscribers.
-    int                  num_closed_receivers;
+    uint32_t             fanout;                         // The number of 
receivers for this message, including in-process subscribers.
     qd_link_t           *input_link;                     // message received 
on this link
 
     bool                 ma_parsed;                      // have parsed 
annotations in incoming message
@@ -117,7 +118,6 @@ typedef struct {
     bool                 q2_input_holdoff;               // hold off calling 
pn_link_recv
     bool                 aborted;                        // receive completed 
with abort flag set
     bool                 disable_q2_holdoff;             // Disable the Q2 
flow control
-    bool                 buffers_freed;                  // Has at least one 
buffer been freed ?
     bool                 priority_parsed;
     bool                 priority_present;
     uint8_t              priority;                       // The priority of 
this message
@@ -136,6 +136,7 @@ typedef struct {
     bool                  strip_annotations_in;
     bool                  send_complete;   // Has the message been completely 
received and completely sent?
     bool                  tag_sent;        // Tags are sent
+    bool                  is_fanout;       // If msg is an outgoing fanout
 } qd_message_pvt_t;
 
 ALLOC_DECLARE(qd_message_t);
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 6bdc781..24741fe 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -719,10 +719,6 @@ static void qdr_link_cleanup_deliveries_CT(qdr_core_t 
*core, qdr_connection_t *c
             peer = qdr_delivery_next_peer_CT(dlv);
         }
 
-        if (dlv->link->link_direction == QD_OUTGOING) {
-            qdr_delivery_add_num_closed_receivers(dlv);
-        }
-
         //
         // Updates global and link level delivery counters like 
presettled_deliveries, accepted_deliveries, released_deliveries etc
         //
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index 1d6a208..a58e165 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -131,7 +131,7 @@ qdr_delivery_t *qdr_forward_new_delivery_CT(qdr_core_t 
*core, qdr_delivery_t *in
     //
     // Add one to the message fanout. This will later be used in the 
qd_message_send function that sends out messages.
     //
-    qd_message_add_fanout(msg);
+    qd_message_add_fanout(msg, out_dlv->msg);
 
     //
     // Create peer linkage if the outgoing delivery is unsettled. This peer 
linkage is necessary to deal with dispositions that show up in the future.
@@ -302,6 +302,32 @@ static inline bool qdr_forward_edge_echo_CT(qdr_delivery_t 
*in_dlv, qdr_link_t *
 }
 
 
+/**
+ * Handle forwarding to a subscription
+ */
+static void qdr_forward_to_subscriber(qdr_core_t *core, qdr_subscription_t 
*sub, qdr_delivery_t *in_dlv, qd_message_t *in_msg, bool receive_complete)
+{
+    qd_message_add_fanout(in_msg, 0);
+
+    //
+    // Only if the message has been completely received, forward it to the 
subscription
+    // Subscriptions, at the moment, dont have the ability to deal with 
partial messages
+    //
+    if (receive_complete)
+        qdr_forward_on_message_CT(core, sub, in_dlv ? in_dlv->link : 0, 
in_msg);
+    else {
+        //
+        // Receive is not complete, we will store the sub in
+        // in_dlv->subscriptions so we can send the message to the subscription
+        // after the message fully arrives
+        //
+        assert(in_dlv);
+        DEQ_INSERT_TAIL(in_dlv->subscriptions, sub);
+        qd_message_Q2_holdoff_disable(in_msg);
+    }
+}
+
+
 int qdr_forward_multicast_CT(qdr_core_t      *core,
                              qdr_address_t   *addr,
                              qd_message_t    *msg,
@@ -450,21 +476,7 @@ int qdr_forward_multicast_CT(qdr_core_t      *core,
         //
         qdr_subscription_t *sub = DEQ_HEAD(addr->subscriptions);
         while (sub) {
-            //
-            // Only if the message has been completely received, forward it to 
the subscription
-            // Subscriptions, at the moment, dont have the ability to deal 
with partial messages
-            //
-            if (receive_complete)
-                qdr_forward_on_message_CT(core, sub, in_delivery ? 
in_delivery->link : 0, msg);
-            else {
-                //
-                // Receive is not complete, we will store the sub in 
in_delivery->subscriptions so we can send the message to the subscription
-                // after the message fully arrives
-                //
-                DEQ_INSERT_TAIL(in_delivery->subscriptions, sub);
-                qd_message_Q2_holdoff_disable(msg);
-            }
-
+            qdr_forward_to_subscriber(core, sub, in_delivery, msg, 
receive_complete);
             fanout++;
             addr->deliveries_to_container++;
             sub = DEQ_NEXT(sub);
@@ -520,21 +532,7 @@ int qdr_forward_closest_CT(qdr_core_t      *core,
         bool receive_complete = qd_message_receive_complete(msg);
         qdr_subscription_t *sub = DEQ_HEAD(addr->subscriptions);
         if (sub) {
-
-            //
-            // Only if the message has been completely received, forward it.
-            // Subscriptions, at the moment, dont have the ability to deal 
with partial messages
-            //
-            if (receive_complete)
-                qdr_forward_on_message_CT(core, sub, in_delivery ? 
in_delivery->link : 0, msg);
-            else {
-                //
-                // Receive is not complete, we will store the sub in 
in_delivery->subscriptions so we can send the message to the subscription
-                // after the message fully arrives
-                //
-                DEQ_INSERT_TAIL(in_delivery->subscriptions, sub);
-                qd_message_Q2_holdoff_disable(msg);
-            }
+            qdr_forward_to_subscriber(core, sub, in_delivery, msg, 
receive_complete);
 
             //
             // If the incoming delivery is not settled, it should be accepted 
and settled here.
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 4c843e4..b186c09 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -394,12 +394,6 @@ bool qdr_delivery_is_aborted(const qdr_delivery_t 
*delivery)
     return qd_message_aborted(delivery->msg);
 }
 
-void qdr_delivery_add_num_closed_receivers(qdr_delivery_t *delivery)
-{
-    assert(delivery);
-    qd_message_add_num_closed_receivers(delivery->msg);
-}
-
 
 void qdr_delivery_decref(qdr_core_t *core, qdr_delivery_t *delivery, const 
char *label)
 {
diff --git a/tests/system_tests_edge_router.py 
b/tests/system_tests_edge_router.py
index a8895c1..d640ff2 100644
--- a/tests/system_tests_edge_router.py
+++ b/tests/system_tests_edge_router.py
@@ -1393,8 +1393,8 @@ class MobileAddressMulticastTest(MessagingHandler):
             self.check_addr_host = self.sender_host
 
         if self.large_msg:
-            for i in range(10000):
-                self.body += "0123456789101112131415"
+            self.body = "0123456789101112131415" * 10000
+            self.properties = {'big field': 'X' * 32000}
 
     def timeout(self):
         if self.dup_msg:
@@ -1468,7 +1468,7 @@ class MobileAddressMulticastTest(MessagingHandler):
         while self.n_sent < self.count:
             msg = None
             if self.large_msg:
-                msg = Message(body=self.body)
+                msg = Message(body=self.body, properties=self.properties)
             else:
                 msg = Message(body="Message %d" % self.n_sent)
             msg.correlation_id = self.n_sent


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to