This is an automated email from the ASF dual-hosted git repository. tross pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/master by this push: new f31d69b DISPATCH-1297: Fix the outgoing buffer reference counting f31d69b is described below commit f31d69b6bea62d6142413c24ac0b2bdd016923d1 Author: Kenneth Giusti <kgiu...@apache.org> AuthorDate: Thu Mar 14 12:00:14 2019 -0400 DISPATCH-1297: Fix the outgoing buffer reference counting The older code did not correctly account for the loss of a multicast consumer. It could end up releasing a buffer before it was sent to all consumers. This closes #473 --- include/qpid/dispatch/atomic.h | 30 ++++ include/qpid/dispatch/buffer.h | 16 +- include/qpid/dispatch/message.h | 20 +-- src/buffer.c | 17 ++- src/message.c | 308 ++++++++++++++++++++++---------------- src/message_private.h | 7 +- src/router_core/connections.c | 4 - src/router_core/forwarder.c | 60 ++++---- src/router_core/transfer.c | 6 - tests/system_tests_edge_router.py | 6 +- 10 files changed, 278 insertions(+), 196 deletions(-) diff --git a/include/qpid/dispatch/atomic.h b/include/qpid/dispatch/atomic.h index 08d63f1..db41b40 100644 --- a/include/qpid/dispatch/atomic.h +++ b/include/qpid/dispatch/atomic.h @@ -53,6 +53,12 @@ static inline uint32_t sys_atomic_get(sys_atomic_t *ref) return atomic_load(ref); } +static inline int32_t sys_atomic_set(sys_atomic_t *ref, uint32_t value) +{ + return atomic_exchange(ref, value); +} + + static inline void sys_atomic_destroy(sys_atomic_t *ref) {} @@ -84,6 +90,15 @@ static inline uint32_t sys_atomic_get(sys_atomic_t *ref) return *ref; } +static inline uint32_t sys_atomic_set(sys_atomic_t *ref, uint32_t value) +{ + uint32_t old = *ref; + while (!__sync_bool_compare_and_swap(ref, old, value)) { + old = *ref; + } + return old; +} + static inline void sys_atomic_destroy(sys_atomic_t *ref) {} @@ -120,6 +135,11 @@ static inline uint32_t sys_atomic_get(sys_atomic_t *ref) return *ref; } +static inline void sys_atomic_set(sys_atomic_t *ref, uint32_t value) +{ + return atomic_swap_32(ref, value); +} + static inline void sys_atomic_destroy(sys_atomic_t *ref) {} #else @@ -167,6 +187,16 @@ static inline uint32_t sys_atomic_get(sys_atomic_t *ref) return value; } +static inline uint32_t sys_atomic_set(sys_atomic_t *ref, uint32_t value) +{ + uint32_t old; + sys_mutex_lock(ref->lock); + old = ref->value; + ref->value = value; + sys_mutex_unlock(ref->lock); + return old; +} + static inline void sys_atomic_destroy(sys_atomic_t *ref) { sys_mutex_lock(ref->lock); diff --git a/include/qpid/dispatch/buffer.h b/include/qpid/dispatch/buffer.h index d4fcd15..79a4a2a 100644 --- a/include/qpid/dispatch/buffer.h +++ b/include/qpid/dispatch/buffer.h @@ -121,15 +121,23 @@ void qd_buffer_list_free_buffers(qd_buffer_list_t *list); */ unsigned int qd_buffer_list_length(const qd_buffer_list_t *list); -/* +/** + * Set the fanout value on the buffer. + * @return the _old_ count before updating + */ +uint32_t qd_buffer_set_fanout(qd_buffer_t *buf, uint32_t value); + +/** * Increase the fanout by 1. How many receivers should this buffer be sent to. + * @return the _old_ count (pre increment) */ -void qd_buffer_add_fanout(qd_buffer_t *buf); +uint32_t qd_buffer_inc_fanout(qd_buffer_t *buf); /** - * Return the buffer's fanout. + * Decrease the fanout by one + * @return the _old_ count (pre decrement) */ -size_t qd_buffer_fanout(qd_buffer_t *buf); +uint32_t qd_buffer_dec_fanout(qd_buffer_t *buf); /** * Advance the buffer by len. Does not manipulate the contents of the buffer diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h index c26caef..01e3913 100644 --- a/include/qpid/dispatch/message.h +++ b/include/qpid/dispatch/message.h @@ -360,24 +360,14 @@ bool qd_message_tag_sent(qd_message_t *msg); void qd_message_set_tag_sent(qd_message_t *msg, bool tag_sent); /** - * Get the number of receivers for this message. - * - * @param msg A pointer to the message. - */ -size_t qd_message_fanout(qd_message_t *msg); - -/** * Increase the fanout of the message by 1. * - * @param msg A pointer to the message. - */ -void qd_message_add_fanout(qd_message_t *msg); - -/** - * Increments the num_closed_receivers by 1. This is necessary to track the number of receivers that - * dropped out during or just before transmission of a large message. + * @param in_msg A pointer to the inbound message. + * @param out_msg A pointer to the outbound message or 0 if forwarding to a + * local subscriber. */ -void qd_message_add_num_closed_receivers(qd_message_t *in_msg); +void qd_message_add_fanout(qd_message_t *in_msg, + qd_message_t *out_msg); /** * Disable the Q2-holdoff for this message. diff --git a/src/buffer.c b/src/buffer.c index e404850..a127953 100644 --- a/src/buffer.c +++ b/src/buffer.c @@ -53,6 +53,7 @@ qd_buffer_t *qd_buffer(void) void qd_buffer_free(qd_buffer_t *buf) { if (!buf) return; + sys_atomic_destroy(&buf->bfanout); free_qd_buffer_t(buf); } @@ -87,14 +88,22 @@ void qd_buffer_insert(qd_buffer_t *buf, size_t len) assert(buf->size <= BUFFER_SIZE); } -void qd_buffer_add_fanout(qd_buffer_t *buf) + +uint32_t qd_buffer_set_fanout(qd_buffer_t *buf, uint32_t value) +{ + return sys_atomic_set(&buf->bfanout, value); +} + + +uint32_t qd_buffer_inc_fanout(qd_buffer_t *buf) { - sys_atomic_inc(&buf->bfanout); + return sys_atomic_inc(&buf->bfanout); } -size_t qd_buffer_fanout(qd_buffer_t *buf) + +uint32_t qd_buffer_dec_fanout(qd_buffer_t *buf) { - return buf->bfanout; + return sys_atomic_dec(&buf->bfanout); } diff --git a/src/message.c b/src/message.c index c342485..36d7d09 100644 --- a/src/message.c +++ b/src/message.c @@ -614,7 +614,6 @@ static int qd_check_and_advance(qd_buffer_t **buffer, // // Pattern matched and tag is expected. Mark the beginning of the section. // - location->parsed = 1; location->buffer = *buffer; location->offset = *cursor - qd_buffer_base(*buffer); location->length = 0; @@ -664,6 +663,30 @@ static int qd_check_and_advance(qd_buffer_t **buffer, if (consume) advance(&test_cursor, &test_buffer, consume); + // + // increment the reference count of the parsed section as location now + // references it. Note that the cursor has advanced to the octet after the + // parsed section, so be careful not to include an extra buffer past the + // end + // + qd_buffer_t *start = *buffer; + qd_buffer_t *last = test_buffer; + if (last != start && last != 0) { + if (test_cursor == qd_buffer_base(last)) { + // last does not include octets for the current section + last = DEQ_PREV(last); + } + } + + while (start) { + qd_buffer_inc_fanout(start); + if (start == last) + break; + start = DEQ_NEXT(start); + } + + location->parsed = 1; + *cursor = test_cursor; *buffer = test_buffer; return 1; @@ -874,6 +897,7 @@ qd_message_t *qd_message() msg->cursor.cursor = 0; msg->send_complete = false; msg->tag_sent = false; + msg->is_fanout = false; msg->content = new_qd_message_content_t(); @@ -917,20 +941,35 @@ void qd_message_free(qd_message_t *in_msg) if (content->ma_pf_trace) qd_parse_free(content->ma_pf_trace); - qd_buffer_t *buf = DEQ_HEAD(content->buffers); - while (buf) { - DEQ_REMOVE_HEAD(content->buffers); - qd_buffer_free(buf); - buf = DEQ_HEAD(content->buffers); - } + qd_buffer_list_free_buffers(&content->buffers); if (content->pending) qd_buffer_free(content->pending); sys_mutex_free(content->lock); free_qd_message_content_t(content); - } + } else if (msg->is_fanout) { + // + // Adjust the content's fanout count and decrement all buffer fanout + // counts starting with the msg cursor. If the buffer count drops to + // zero we can free it. + // + LOCK(content->lock); + + qd_buffer_t *buf = msg->cursor.buffer; + while (buf) { + qd_buffer_t *next_buf = DEQ_NEXT(buf); + if (qd_buffer_dec_fanout(buf) == 1) { + DEQ_REMOVE(content->buffers, buf); + qd_buffer_free(buf); + } + buf = next_buf; + } + --content->fanout; + + UNLOCK(content->lock); + } free_qd_message_t((qd_message_t*) msg); } @@ -958,6 +997,7 @@ qd_message_t *qd_message_copy(qd_message_t *in_msg) copy->cursor.cursor = 0; copy->send_complete = false; copy->tag_sent = false; + copy->is_fanout = false; qd_message_message_annotations((qd_message_t*) copy); @@ -1062,26 +1102,29 @@ void qd_message_set_discard(qd_message_t *msg, bool discard) pvt_msg->content->discard = discard; } -size_t qd_message_fanout(qd_message_t *in_msg) -{ - if (!in_msg) - return 0; - qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg; - return msg->content->fanout; -} -void qd_message_add_fanout(qd_message_t *in_msg) +void qd_message_add_fanout(qd_message_t *in_msg, + qd_message_t *out_msg) { - assert(in_msg); - qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg; - sys_atomic_inc(&msg->content->fanout); -} -void qd_message_add_num_closed_receivers(qd_message_t *in_msg) -{ + // out_msg will be 0 if we are forwarding to an internal subscriber (like + // $management). If so we treat in_msg like an out_msg assert(in_msg); - qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg; - msg->content->num_closed_receivers++; + qd_message_pvt_t *msg = (qd_message_pvt_t *)((out_msg) ? out_msg : in_msg); + msg->is_fanout = true; + + qd_message_content_t *content = msg->content; + + LOCK(content->lock); + ++content->fanout; + + // do not free the buffers until all fanout consumers are done with them + qd_buffer_t *buf = DEQ_HEAD(content->buffers); + while (buf) { + qd_buffer_inc_fanout(buf); + buf = DEQ_NEXT(buf); + } + UNLOCK(content->lock); } @@ -1236,6 +1279,7 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery) } // Loop until msg is complete, error seen, or incoming bytes are consumed + qd_message_content_t *content = msg->content; bool recv_error = false; while (1) { // @@ -1247,56 +1291,58 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery) if (at_eos || recv_error) { // Message is complete - LOCK(msg->content->lock); + LOCK(content->lock); { // Append last buffer if any with data - if (msg->content->pending) { - if (qd_buffer_size(msg->content->pending) > 0) { - // pending buffer has bytes that are port of message - DEQ_INSERT_TAIL(msg->content->buffers, - msg->content->pending); + if (content->pending) { + if (qd_buffer_size(content->pending) > 0) { + // pending buffer has bytes that are part of message + qd_buffer_set_fanout(content->pending, content->fanout); + DEQ_INSERT_TAIL(content->buffers, + content->pending); } else { // pending buffer is empty - qd_buffer_free(msg->content->pending); + qd_buffer_free(content->pending); } - msg->content->pending = 0; + content->pending = 0; } else { // pending buffer is absent } - msg->content->receive_complete = true; - msg->content->aborted = pn_delivery_aborted(delivery); - msg->content->input_link = 0; + content->receive_complete = true; + content->aborted = pn_delivery_aborted(delivery); + content->input_link = 0; // unlink message and delivery pn_record_set(record, PN_DELIVERY_CTX, 0); } - UNLOCK(msg->content->lock); + UNLOCK(content->lock); break; } // // Handle a missing or full pending buffer // - if (!msg->content->pending) { + if (!content->pending) { // Pending buffer is absent: get a new one - msg->content->pending = qd_buffer(); + content->pending = qd_buffer(); } else { // Pending buffer exists - if (qd_buffer_capacity(msg->content->pending) == 0) { + if (qd_buffer_capacity(content->pending) == 0) { // Pending buffer is full - LOCK(msg->content->lock); - DEQ_INSERT_TAIL(msg->content->buffers, msg->content->pending); - msg->content->pending = 0; + LOCK(content->lock); + qd_buffer_set_fanout(content->pending, content->fanout); + DEQ_INSERT_TAIL(content->buffers, content->pending); + content->pending = 0; if (qd_message_Q2_holdoff_should_block((qd_message_t *)msg)) { if (!qd_link_is_q2_limit_unbounded(qdl)) { - msg->content->q2_input_holdoff = true; - UNLOCK(msg->content->lock); + content->q2_input_holdoff = true; + UNLOCK(content->lock); break; } } - UNLOCK(msg->content->lock); - msg->content->pending = qd_buffer(); + UNLOCK(content->lock); + content->pending = qd_buffer(); } else { // Pending buffer still has capacity } @@ -1306,8 +1352,8 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery) // Try to fill the remaining space in the pending buffer. // rc = pn_link_recv(link, - (char*) qd_buffer_cursor(msg->content->pending), - qd_buffer_capacity(msg->content->pending)); + (char*) qd_buffer_cursor(content->pending), + qd_buffer_capacity(content->pending)); if (rc < 0) { // error or eos seen. next pass breaks out of loop @@ -1317,7 +1363,7 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery) // We have received a positive number of bytes for the message. Advance // the cursor in the buffer. // - qd_buffer_insert(msg->content->pending, rc); + qd_buffer_insert(content->pending, rc); } else { // // We received zero bytes, and no PN_EOS. This means that we've received @@ -1479,7 +1525,7 @@ void qd_message_send(qd_message_t *in_msg, if (msg->sent_depth < QD_DEPTH_MESSAGE_ANNOTATIONS) { - if (msg->content->aborted) { + if (content->aborted) { // Message is aborted before any part of it has been sent. // Declare the message to be sent, msg->send_complete = true; @@ -1585,98 +1631,108 @@ void qd_message_send(qd_message_t *in_msg, pn_session_t *pns = pn_link_session(pnl); - while (msg->content->aborted || - (buf && - (msg->cursor.cursor < qd_buffer_cursor(buf) || buf->next != 0) && - pn_session_outgoing_bytes(pns) <= QD_QLIMIT_Q3_UPPER)) { - - if (msg->content->aborted) { - if (pn_link_current(pnl)) { - msg->send_complete = true; - if (!pn_delivery_aborted(pn_link_current(pnl))) { - pn_delivery_abort(pn_link_current(pnl)); - } - } - break; - } + while (!content->aborted + && buf + && pn_session_outgoing_bytes(pns) <= QD_QLIMIT_Q3_UPPER) { + // This will send the remaining data in the buffer if any. There may be + // zero bytes left to send if we stopped here last time and there was + // no next buf + // size_t buf_size = qd_buffer_size(buf); - - // This will send the remaining data in the buffer if any. int num_bytes_to_send = buf_size - (msg->cursor.cursor - qd_buffer_base(buf)); + ssize_t bytes_sent = 0; if (num_bytes_to_send > 0) { - // We are deliberately avoiding the return value of pn_link_send because we can't do anything nice with it. - (void) pn_link_send(pnl, (const char*)msg->cursor.cursor, num_bytes_to_send); + bytes_sent = pn_link_send(pnl, (const char*)msg->cursor.cursor, num_bytes_to_send); } - // If the entire message has already been received, taking out this lock is not that expensive - // because there is no contention for this lock. - LOCK(msg->content->lock); - - qd_buffer_t *next_buf = DEQ_NEXT(buf); - if (next_buf) { - // There is a next buffer, the previous buffer has been fully sent by now. - qd_buffer_add_fanout(buf); - - if (qd_message_fanout(in_msg) - msg->content->num_closed_receivers == qd_buffer_fanout(buf)) { - qd_buffer_t *local_buf = DEQ_HEAD(content->buffers); - while (local_buf && local_buf != next_buf) { - DEQ_REMOVE_HEAD(content->buffers); - qd_buffer_free(local_buf); - if (!msg->content->buffers_freed) - msg->content->buffers_freed = true; - - local_buf = DEQ_HEAD(content->buffers); - - // by freeing a buffer there now may be room to restart a - // stalled message receiver - if (msg->content->q2_input_holdoff) { - if (qd_message_Q2_holdoff_should_unblock((qd_message_t *)msg)) { - // wake up receive side - // Note: clearing holdoff here is easy compared to - // clearing it in the deferred callback. Tracing - // shows that rx_handler may run and subsequently - // set input holdoff before the deferred handler - // runs. - msg->content->q2_input_holdoff = false; - *restart_rx = true; - } - } - } + LOCK(content->lock); + + if (bytes_sent < 0) { + // + // send error - likely the link has failed and we will eventually + // get a link detach event for this link + // + content->aborted = true; + msg->send_complete = true; + if (!pn_delivery_aborted(pn_link_current(pnl))) { + pn_delivery_abort(pn_link_current(pnl)); } - msg->cursor.buffer = next_buf; - msg->cursor.cursor = qd_buffer_base(next_buf); - } - else { - // There is no next_buf - if (qd_message_receive_complete(in_msg)) { + + qd_log(qd_message_log_source(), + QD_LOG_WARNING, + "Sending data on link %s has failed (code=%zi)", + pn_link_name(pnl), bytes_sent); + + } else { + + msg->cursor.cursor += bytes_sent; + + if (bytes_sent == num_bytes_to_send) { // - // There is no more of the message coming, this means - // that we have completely sent out the message. + // sent the whole buffer. + // Can we move to the next buffer? Only if there is a next buffer + // or we are at the end and done sending this message // - msg->send_complete = true; - msg->cursor.buffer = 0; - msg->cursor.cursor = 0; + qd_buffer_t *next_buf = DEQ_NEXT(buf); + bool complete = qd_message_receive_complete(in_msg); + + if (next_buf || complete) { + // + // this buffer may be freed if there are no more references to it + // + uint32_t ref_count = (msg->is_fanout) ? qd_buffer_dec_fanout(buf) : 1; + if (ref_count == 1) { + + DEQ_REMOVE(content->buffers, buf); + qd_buffer_free(buf); + ++content->buffers_freed; + + // by freeing a buffer there now may be room to restart a + // stalled message receiver + if (content->q2_input_holdoff) { + if (qd_message_Q2_holdoff_should_unblock((qd_message_t *)msg)) { + // wake up receive side + // Note: clearing holdoff here is easy compared to + // clearing it in the deferred callback. Tracing + // shows that rx_handler may run and subsequently + // set input holdoff before the deferred handler + // runs. + content->q2_input_holdoff = false; + *restart_rx = true; + } + } + } // end free buffer - if (msg->content->aborted) { - if (!pn_delivery_aborted(pn_link_current(pnl))) { - pn_delivery_abort(pn_link_current(pnl)); - } + msg->cursor.buffer = next_buf; + msg->cursor.cursor = (next_buf) ? qd_buffer_base(next_buf) : 0; + + msg->send_complete = (complete && !next_buf); } - } - else { + + buf = next_buf; + + } else if (num_bytes_to_send && bytes_sent == 0) { // - // There is more of the message to come, update your cursor pointers - // you will come back into this function to deliver more as bytes arrive + // the proton link cannot take anymore data, + // retry later... // - msg->cursor.buffer = buf; - msg->cursor.cursor = qd_buffer_at(buf, buf_size); + buf = 0; + qd_log(qd_message_log_source(), QD_LOG_DEBUG, + "Link %s output limit reached", pn_link_name(pnl)); } } - UNLOCK(msg->content->lock); + UNLOCK(content->lock); + } - buf = next_buf; + if (content->aborted) { + if (pn_link_current(pnl)) { + msg->send_complete = true; + if (!pn_delivery_aborted(pn_link_current(pnl))) { + pn_delivery_abort(pn_link_current(pnl)); + } + } } *q3_stalled = (pn_session_outgoing_bytes(pns) > QD_QLIMIT_Q3_UPPER); @@ -1710,7 +1766,7 @@ static bool qd_message_check_LH(qd_message_content_t *content, qd_message_depth_ qd_error_clear(); // - // In the case of a streaming or multi buffer message, there is a change that some buffers might be freed before the entire + // In the case of a streaming or multi buffer message, there is a chance that some buffers might be freed before the entire // message has arrived in which case we cannot reliably check the message using the depth. // if (content->buffers_freed) diff --git a/src/message_private.h b/src/message_private.h index 2402e58..850c534 100644 --- a/src/message_private.h +++ b/src/message_private.h @@ -69,6 +69,8 @@ typedef struct { sys_atomic_t ref_count; // The number of messages referencing this qd_buffer_list_t buffers; // The buffer chain containing the message qd_buffer_t *pending; // Buffer owned by and filled by qd_message_receive + uint64_t buffers_freed; // count of large msg buffers freed on send + qd_field_location_t section_message_header; // The message header list qd_field_location_t section_delivery_annotation; // The delivery annotation map qd_field_location_t section_message_annotation; // The message annotation map @@ -107,8 +109,7 @@ typedef struct { qd_parsed_field_t *ma_pf_to_override; qd_parsed_field_t *ma_pf_trace; int ma_int_phase; - sys_atomic_t fanout; // The number of receivers for this message. This number does not include in-process subscribers. - int num_closed_receivers; + uint32_t fanout; // The number of receivers for this message, including in-process subscribers. qd_link_t *input_link; // message received on this link bool ma_parsed; // have parsed annotations in incoming message @@ -117,7 +118,6 @@ typedef struct { bool q2_input_holdoff; // hold off calling pn_link_recv bool aborted; // receive completed with abort flag set bool disable_q2_holdoff; // Disable the Q2 flow control - bool buffers_freed; // Has at least one buffer been freed ? bool priority_parsed; bool priority_present; uint8_t priority; // The priority of this message @@ -136,6 +136,7 @@ typedef struct { bool strip_annotations_in; bool send_complete; // Has the message been completely received and completely sent? bool tag_sent; // Tags are sent + bool is_fanout; // If msg is an outgoing fanout } qd_message_pvt_t; ALLOC_DECLARE(qd_message_t); diff --git a/src/router_core/connections.c b/src/router_core/connections.c index 6bdc781..24741fe 100644 --- a/src/router_core/connections.c +++ b/src/router_core/connections.c @@ -719,10 +719,6 @@ static void qdr_link_cleanup_deliveries_CT(qdr_core_t *core, qdr_connection_t *c peer = qdr_delivery_next_peer_CT(dlv); } - if (dlv->link->link_direction == QD_OUTGOING) { - qdr_delivery_add_num_closed_receivers(dlv); - } - // // Updates global and link level delivery counters like presettled_deliveries, accepted_deliveries, released_deliveries etc // diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c index 1d6a208..a58e165 100644 --- a/src/router_core/forwarder.c +++ b/src/router_core/forwarder.c @@ -131,7 +131,7 @@ qdr_delivery_t *qdr_forward_new_delivery_CT(qdr_core_t *core, qdr_delivery_t *in // // Add one to the message fanout. This will later be used in the qd_message_send function that sends out messages. // - qd_message_add_fanout(msg); + qd_message_add_fanout(msg, out_dlv->msg); // // Create peer linkage if the outgoing delivery is unsettled. This peer linkage is necessary to deal with dispositions that show up in the future. @@ -302,6 +302,32 @@ static inline bool qdr_forward_edge_echo_CT(qdr_delivery_t *in_dlv, qdr_link_t * } +/** + * Handle forwarding to a subscription + */ +static void qdr_forward_to_subscriber(qdr_core_t *core, qdr_subscription_t *sub, qdr_delivery_t *in_dlv, qd_message_t *in_msg, bool receive_complete) +{ + qd_message_add_fanout(in_msg, 0); + + // + // Only if the message has been completely received, forward it to the subscription + // Subscriptions, at the moment, dont have the ability to deal with partial messages + // + if (receive_complete) + qdr_forward_on_message_CT(core, sub, in_dlv ? in_dlv->link : 0, in_msg); + else { + // + // Receive is not complete, we will store the sub in + // in_dlv->subscriptions so we can send the message to the subscription + // after the message fully arrives + // + assert(in_dlv); + DEQ_INSERT_TAIL(in_dlv->subscriptions, sub); + qd_message_Q2_holdoff_disable(in_msg); + } +} + + int qdr_forward_multicast_CT(qdr_core_t *core, qdr_address_t *addr, qd_message_t *msg, @@ -450,21 +476,7 @@ int qdr_forward_multicast_CT(qdr_core_t *core, // qdr_subscription_t *sub = DEQ_HEAD(addr->subscriptions); while (sub) { - // - // Only if the message has been completely received, forward it to the subscription - // Subscriptions, at the moment, dont have the ability to deal with partial messages - // - if (receive_complete) - qdr_forward_on_message_CT(core, sub, in_delivery ? in_delivery->link : 0, msg); - else { - // - // Receive is not complete, we will store the sub in in_delivery->subscriptions so we can send the message to the subscription - // after the message fully arrives - // - DEQ_INSERT_TAIL(in_delivery->subscriptions, sub); - qd_message_Q2_holdoff_disable(msg); - } - + qdr_forward_to_subscriber(core, sub, in_delivery, msg, receive_complete); fanout++; addr->deliveries_to_container++; sub = DEQ_NEXT(sub); @@ -520,21 +532,7 @@ int qdr_forward_closest_CT(qdr_core_t *core, bool receive_complete = qd_message_receive_complete(msg); qdr_subscription_t *sub = DEQ_HEAD(addr->subscriptions); if (sub) { - - // - // Only if the message has been completely received, forward it. - // Subscriptions, at the moment, dont have the ability to deal with partial messages - // - if (receive_complete) - qdr_forward_on_message_CT(core, sub, in_delivery ? in_delivery->link : 0, msg); - else { - // - // Receive is not complete, we will store the sub in in_delivery->subscriptions so we can send the message to the subscription - // after the message fully arrives - // - DEQ_INSERT_TAIL(in_delivery->subscriptions, sub); - qd_message_Q2_holdoff_disable(msg); - } + qdr_forward_to_subscriber(core, sub, in_delivery, msg, receive_complete); // // If the incoming delivery is not settled, it should be accepted and settled here. diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c index 4c843e4..b186c09 100644 --- a/src/router_core/transfer.c +++ b/src/router_core/transfer.c @@ -394,12 +394,6 @@ bool qdr_delivery_is_aborted(const qdr_delivery_t *delivery) return qd_message_aborted(delivery->msg); } -void qdr_delivery_add_num_closed_receivers(qdr_delivery_t *delivery) -{ - assert(delivery); - qd_message_add_num_closed_receivers(delivery->msg); -} - void qdr_delivery_decref(qdr_core_t *core, qdr_delivery_t *delivery, const char *label) { diff --git a/tests/system_tests_edge_router.py b/tests/system_tests_edge_router.py index a8895c1..d640ff2 100644 --- a/tests/system_tests_edge_router.py +++ b/tests/system_tests_edge_router.py @@ -1393,8 +1393,8 @@ class MobileAddressMulticastTest(MessagingHandler): self.check_addr_host = self.sender_host if self.large_msg: - for i in range(10000): - self.body += "0123456789101112131415" + self.body = "0123456789101112131415" * 10000 + self.properties = {'big field': 'X' * 32000} def timeout(self): if self.dup_msg: @@ -1468,7 +1468,7 @@ class MobileAddressMulticastTest(MessagingHandler): while self.n_sent < self.count: msg = None if self.large_msg: - msg = Message(body=self.body) + msg = Message(body=self.body, properties=self.properties) else: msg = Message(body="Message %d" % self.n_sent) msg.correlation_id = self.n_sent --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org