DISPATCH-767 - Added code to support multi frame message streaming 1. Added new core API function qdr_deliver_continue() that will continue delivering a large message 2. Modified qdr_link_process_deliveries() to not remove deliveries from the undelivered list until they are fully delivered 3. Modified qd_message_receive() to recieve partial messages. 4. Modified qd_message_send() to be able to handle streaming send. This function can now be called multiple times for the same message. It keeps internal pointers to the point upto which the message has been sent and is able to continue where it left off. Message content buffers are freed as soon as the message has been sent to all recipients. 5. Added peer linkage for large settled deliveries and added a settled list to handle with abrupt connection terminations when large messages are being transmitted. 6. Added unit tests to test large messages.
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/c9262728 Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/c9262728 Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/c9262728 Branch: refs/heads/master Commit: c9262728dc1a6d1bcdc5a68f4b19f1b9d2221d53 Parents: 1ca80b6 Author: Ganesh Murthy <gmur...@redhat.com> Authored: Wed Jul 5 11:51:06 2017 -0400 Committer: Ganesh Murthy <gmur...@redhat.com> Committed: Thu Aug 10 16:10:09 2017 -0400 ---------------------------------------------------------------------- include/qpid/dispatch/buffer.h | 21 ++ include/qpid/dispatch/message.h | 61 ++++- include/qpid/dispatch/router_core.h | 8 +- src/buffer.c | 35 ++- src/message.c | 360 +++++++++++++++++++------- src/message_private.h | 15 +- src/router_core/connections.c | 36 +++ src/router_core/forwarder.c | 80 ++++-- src/router_core/router_core.c | 9 + src/router_core/router_core_private.h | 31 ++- src/router_core/transfer.c | 324 ++++++++++++++++++++---- src/router_node.c | 393 ++++++++++++++++------------- tests/system_tests_one_router.py | 61 ++++- tests/system_tests_two_routers.py | 61 ++++- 14 files changed, 1133 insertions(+), 362 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c9262728/include/qpid/dispatch/buffer.h ---------------------------------------------------------------------- diff --git a/include/qpid/dispatch/buffer.h b/include/qpid/dispatch/buffer.h index e913377..d4fcd15 100644 --- a/include/qpid/dispatch/buffer.h +++ b/include/qpid/dispatch/buffer.h @@ -27,15 +27,19 @@ */ #include <qpid/dispatch/ctools.h> +#include <qpid/dispatch/atomic.h> typedef struct qd_buffer_t qd_buffer_t; DEQ_DECLARE(qd_buffer_t, qd_buffer_list_t); +extern size_t BUFFER_SIZE; + /** A raw byte buffer .*/ struct qd_buffer_t { DEQ_LINKS(qd_buffer_t); unsigned int size; ///< Size of data content + sys_atomic_t bfanout; // The number of receivers for this buffer }; /** @@ -117,6 +121,23 @@ void qd_buffer_list_free_buffers(qd_buffer_list_t *list); */ unsigned int qd_buffer_list_length(const qd_buffer_list_t *list); +/* + * Increase the fanout by 1. How many receivers should this buffer be sent to. + */ +void qd_buffer_add_fanout(qd_buffer_t *buf); + +/** + * Return the buffer's fanout. + */ +size_t qd_buffer_fanout(qd_buffer_t *buf); + +/** + * Advance the buffer by len. Does not manipulate the contents of the buffer + * @param buf A pointer to an allocated buffer + * @param len The number of octets that by which the buffer should be advanced. + */ +unsigned char *qd_buffer_at(qd_buffer_t *buf, size_t len); + ///@} http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c9262728/include/qpid/dispatch/message.h ---------------------------------------------------------------------- diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h index d3f2bb4..111c6a0 100644 --- a/include/qpid/dispatch/message.h +++ b/include/qpid/dispatch/message.h @@ -201,9 +201,9 @@ int qd_message_get_phase_annotation(const qd_message_t *msg); void qd_message_set_ingress_annotation(qd_message_t *msg, qd_composed_field_t *ingress_field); /** - * Receive message data via a delivery. This function may be called more than once on the same - * delivery if the message spans multiple frames. Once a complete message has been received, this - * function shall return the message. + * Receive message data frame by frame via a delivery. This function may be called more than once on the same + * delivery if the message spans multiple frames. Always returns a message. The message buffers are filled up to the point with the data that was been received so far. + * The buffer keeps filling up on successive calls to this function. * * @param delivery An incoming delivery from a link * @return A pointer to the complete message or 0 if the message is not yet complete. @@ -296,6 +296,61 @@ qd_parsed_field_t *qd_message_get_trace (qd_message_t *msg); */ int qd_message_get_phase_val (qd_message_t *msg); +/* + * Should the message be discarded. + * A message can be discarded if the disposition is released or rejected. + * + * @param msg A pointer to the message. + **/ +bool qd_message_is_discard(qd_message_t *msg); + +/** + *Set the discard field on the message to to the passed in boolean value. + * + * @param msg A pointer to the message. + * @param discard - the boolean value of discard. + */ +void qd_message_set_discard(qd_message_t *msg, bool discard); + +/** + * Has the message been completely received? + * Return true if the message is fully received + * Returns false if only the partial message has been received, if there is more of the message to be received. + * + * @param msg A pointer to the message. + */ +bool qd_message_receive_complete(qd_message_t *msg); + +/** + * Returns true if the message has been completely received AND the message has been completely sent. + */ +bool qd_message_send_complete(qd_message_t *msg); + +/** + * Returns true if the delivery tag has already been sent. + */ +bool qd_message_tag_sent(qd_message_t *msg); + + +/** + * Sets if the delivery tag has already been sent out or not. + */ +void qd_message_set_tag_sent(qd_message_t *msg, bool tag_sent); + +/** + * Get the number of receivers for this message. + * + * @param msg A pointer to the message. + */ +size_t qd_message_fanout(qd_message_t *msg); + +/** + * Increase the fanout of the message by 1. + * + * @param msg A pointer to the message. + */ +void qd_message_add_fanout(qd_message_t *msg); + ///@} #endif http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c9262728/include/qpid/dispatch/router_core.h ---------------------------------------------------------------------- diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h index 0031ed7..18499e0 100644 --- a/include/qpid/dispatch/router_core.h +++ b/include/qpid/dispatch/router_core.h @@ -541,8 +541,9 @@ qdr_delivery_t *qdr_link_deliver_to(qdr_link_t *link, qd_message_t *msg, qdr_delivery_t *qdr_link_deliver_to_routed_link(qdr_link_t *link, qd_message_t *msg, bool settled, const uint8_t *tag, int tag_length, uint64_t disposition, pn_data_t* disposition_state); +qdr_delivery_t *qdr_deliver_continue(qdr_delivery_t *delivery); -void qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit); +int qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit); void qdr_link_flow(qdr_core_t *core, qdr_link_t *link, int credit, bool drain_mode); @@ -589,6 +590,11 @@ void qdr_delivery_tag(const qdr_delivery_t *delivery, const char **tag, int *len qd_message_t *qdr_delivery_message(const qdr_delivery_t *delivery); qdr_error_t *qdr_delivery_error(const qdr_delivery_t *delivery); void qdr_delivery_write_extension_state(qdr_delivery_t *dlv, pn_delivery_t* pdlv, bool update_disposition); +bool qdr_delivery_send_complete(const qdr_delivery_t *delivery); +bool qdr_delivery_tag_sent(const qdr_delivery_t *delivery); +void qdr_delivery_set_tag_sent(const qdr_delivery_t *delivery, bool tag_sent); +bool qdr_delivery_receive_complete(const qdr_delivery_t *delivery); +void qdr_delivery_set_cleared_proton_ref(qdr_delivery_t *dlv, bool cleared_proton_ref); /** ****************************************************************************** http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c9262728/src/buffer.c ---------------------------------------------------------------------- diff --git a/src/buffer.c b/src/buffer.c index 57f30c9..14cf229 100644 --- a/src/buffer.c +++ b/src/buffer.c @@ -24,17 +24,18 @@ #include <string.h> -static size_t buffer_size = 512; -static int size_locked = 0; +size_t BUFFER_SIZE = 512; + +static int size_locked = 0; ALLOC_DECLARE(qd_buffer_t); -ALLOC_DEFINE_CONFIG(qd_buffer_t, sizeof(qd_buffer_t), &buffer_size, 0); +ALLOC_DEFINE_CONFIG(qd_buffer_t, sizeof(qd_buffer_t), &BUFFER_SIZE, 0); void qd_buffer_set_size(size_t size) { assert(!size_locked); - buffer_size = size; + BUFFER_SIZE = size; } @@ -44,7 +45,8 @@ qd_buffer_t *qd_buffer(void) qd_buffer_t *buf = new_qd_buffer_t(); DEQ_ITEM_INIT(buf); - buf->size = 0; + buf->size = 0; + sys_atomic_init(&buf->bfanout, 0); return buf; } @@ -70,7 +72,7 @@ unsigned char *qd_buffer_cursor(qd_buffer_t *buf) size_t qd_buffer_capacity(qd_buffer_t *buf) { - return buffer_size - buf->size; + return BUFFER_SIZE - buf->size; } @@ -83,7 +85,26 @@ size_t qd_buffer_size(qd_buffer_t *buf) void qd_buffer_insert(qd_buffer_t *buf, size_t len) { buf->size += len; - assert(buf->size <= buffer_size); + assert(buf->size <= BUFFER_SIZE); +} + +void qd_buffer_add_fanout(qd_buffer_t *buf) +{ + sys_atomic_inc(&buf->bfanout); +} + +size_t qd_buffer_fanout(qd_buffer_t *buf) +{ + return buf->bfanout; +} + + +unsigned char *qd_buffer_at(qd_buffer_t *buf, size_t len) +{ + // If the len is greater than the buffer size, we might point to some garbage. + // We dont want that to happen, so do the assert. + assert(len <= BUFFER_SIZE); + return ((unsigned char*) &buf[1]) + len; } unsigned int qd_buffer_list_clone(qd_buffer_list_t *dst, const qd_buffer_list_t *src) http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c9262728/src/message.c ---------------------------------------------------------------------- diff --git a/src/message.c b/src/message.c index 2dbe9ee..0788194 100644 --- a/src/message.c +++ b/src/message.c @@ -838,6 +838,13 @@ qd_message_t *qd_message() DEQ_INIT(msg->ma_trace); DEQ_INIT(msg->ma_ingress); msg->ma_phase = 0; + msg->ma_phase = 0; + msg->sent_depth = QD_DEPTH_NONE; + msg->cursor.buffer = 0; + msg->cursor.cursor = 0; + msg->send_complete = false; + msg->tag_sent = false; + msg->content = new_qd_message_content_t(); if (msg->content == 0) { @@ -913,6 +920,12 @@ qd_message_t *qd_message_copy(qd_message_t *in_msg) copy->content = content; + copy->sent_depth = QD_DEPTH_NONE; + copy->cursor.buffer = 0; + copy->cursor.cursor = 0; + copy->send_complete = false; + copy->tag_sent = false; + qd_message_message_annotations((qd_message_t*) copy); sys_atomic_inc(&content->ref_count); @@ -999,11 +1012,84 @@ void qd_message_set_ingress_annotation(qd_message_t *in_msg, qd_composed_field_t qd_compose_free(ingress_field); } +bool qd_message_is_discard(qd_message_t *msg) +{ + if (!msg) + return false; + qd_message_pvt_t *pvt_msg = (qd_message_pvt_t*) msg; + return pvt_msg->content->discard; +} + +void qd_message_set_discard(qd_message_t *msg, bool discard) +{ + if (!msg) + return; + + qd_message_pvt_t *pvt_msg = (qd_message_pvt_t*) msg; + pvt_msg->content->discard = discard; +} + +size_t qd_message_fanout(qd_message_t *in_msg) +{ + if (!in_msg) + return 0; + qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg; + return msg->content->fanout; +} + +void qd_message_add_fanout(qd_message_t *in_msg) +{ + assert(in_msg); + qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg; + sys_atomic_inc(&msg->content->fanout); +} + +bool qd_message_receive_complete(qd_message_t *in_msg) +{ + if (!in_msg) + return false; + qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg; + return msg->content->receive_complete; +} + +bool qd_message_send_complete(qd_message_t *in_msg) +{ + if (!in_msg) + return false; + + qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg; + return msg->send_complete; +} + +bool qd_message_tag_sent(qd_message_t *in_msg) +{ + if (!in_msg) + return false; + + qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg; + return msg->tag_sent; +} + +void qd_message_set_tag_sent(qd_message_t *in_msg, bool tag_sent) +{ + if (!in_msg) + return; + + qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg; + msg->tag_sent = tag_sent; +} + +qd_iterator_pointer_t qd_message_cursor(qd_message_pvt_t *in_msg) +{ + qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg; + return msg->cursor; +} + qd_message_t *qd_message_receive(pn_delivery_t *delivery) { pn_link_t *link = pn_delivery_link(delivery); ssize_t rc; - qd_buffer_t *buf; + qd_buffer_t *buf = 0; pn_record_t *record = pn_delivery_attachments(delivery); qd_message_pvt_t *msg = (qd_message_pvt_t*) pn_record_get(record, PN_DELIVERY_CTX); @@ -1023,27 +1109,48 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery) } // + // The discard flag indicates if we should continue receiving the message. + // This is pertinent in the case of large messages. When large messages are being received, we try to send out part of the + // message that has been received so far. If we not able to send it anywhere, there is no need to keep creating buffers + // + bool discard = qd_message_is_discard((qd_message_t*)msg); + + // // Get a reference to the tail buffer on the message. This is the buffer into which - // we will store incoming message data. If there is no buffer in the message, allocate - // an empty one and add it to the message. + // we will store incoming message data. If there is no buffer in the message, this is the + // first time we are here and we need to allocate an empty one and add it to the message. // - buf = DEQ_TAIL(msg->content->buffers); - if (!buf) { - buf = qd_buffer(); - DEQ_INSERT_TAIL(msg->content->buffers, buf); + if (!discard) { + buf = DEQ_TAIL(msg->content->buffers); + if (!buf) { + buf = qd_buffer(); + DEQ_INSERT_TAIL(msg->content->buffers, buf); + } } while (1) { - // - // Try to receive enough data to fill the remaining space in the tail buffer. - // - rc = pn_link_recv(link, (char*) qd_buffer_cursor(buf), qd_buffer_capacity(buf)); + if (discard) { + char dummy[BUFFER_SIZE]; + rc = pn_link_recv(link, dummy, BUFFER_SIZE); + } + else { + // + // Try to receive enough data to fill the remaining space in the tail buffer. + // + + rc = pn_link_recv(link, (char*) qd_buffer_cursor(buf), qd_buffer_capacity(buf)); + } // // If we receive PN_EOS, we have come to the end of the message. // if (rc == PN_EOS) { // + // We have received the entire message since rc == PN_EOS, set the receive_complete flag to true + // + msg->content->receive_complete = true; + + // // Clear the value in the record with key PN_DELIVERY_CTX // pn_record_set(record, PN_DELIVERY_CTX, 0); @@ -1053,9 +1160,10 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery) // will only happen if the size of the message content is an exact multiple // of the buffer size. // - - if (qd_buffer_size(buf) == 0) { + if (buf && qd_buffer_size(buf) == 0) { + sys_mutex_lock(msg->content->lock); DEQ_REMOVE_TAIL(msg->content->buffers); + sys_mutex_unlock(msg->content->lock); qd_buffer_free(buf); } @@ -1063,6 +1171,8 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery) } if (rc > 0) { + if (discard) + continue; // // We have received a positive number of bytes for the message. Advance // the cursor in the buffer. @@ -1073,17 +1183,21 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery) // If the buffer is full, allocate a new empty buffer and append it to the // tail of the message's list. // + sys_mutex_lock(msg->content->lock); if (qd_buffer_capacity(buf) == 0) { buf = qd_buffer(); DEQ_INSERT_TAIL(msg->content->buffers, buf); } + sys_mutex_unlock(msg->content->lock); + } else // // We received zero bytes, and no PN_EOS. This means that we've received // all of the data available up to this point, but it does not constitute // the entire message. We'll be back later to finish it up. + // Return the message so that the caller can start sending out whatever we have received so far // - break; + return (qd_message_t*) msg; } return 0; @@ -1226,98 +1340,160 @@ void qd_message_send(qd_message_t *in_msg, { qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg; qd_message_content_t *content = msg->content; - qd_buffer_t *buf = DEQ_HEAD(content->buffers); - unsigned char *cursor; + qd_buffer_t *buf = 0; pn_link_t *pnl = qd_link_pn(link); - qd_buffer_list_t new_ma; - qd_buffer_list_t new_ma_trailer; - DEQ_INIT(new_ma); - DEQ_INIT(new_ma_trailer); + int fanout = qd_message_fanout(in_msg); - // Process the message annotations if any - compose_message_annotations(msg, &new_ma, &new_ma_trailer, strip_annotations); + if (msg->sent_depth < QD_DEPTH_MESSAGE_ANNOTATIONS) { - // - // Send header if present - // - cursor = qd_buffer_base(buf); - if (content->section_message_header.length > 0) { - buf = content->section_message_header.buffer; - cursor = content->section_message_header.offset + qd_buffer_base(buf); - advance(&cursor, &buf, - content->section_message_header.length + content->section_message_header.hdr_length, - send_handler, (void*) pnl); - } + qd_buffer_list_t new_ma; + qd_buffer_list_t new_ma_trailer; + DEQ_INIT(new_ma); + DEQ_INIT(new_ma_trailer); - // - // Send delivery annotation if present - // - if (content->section_delivery_annotation.length > 0) { - buf = content->section_delivery_annotation.buffer; - cursor = content->section_delivery_annotation.offset + qd_buffer_base(buf); - advance(&cursor, &buf, - content->section_delivery_annotation.length + content->section_delivery_annotation.hdr_length, - send_handler, (void*) pnl); - } + // Process the message annotations if any + compose_message_annotations(msg, &new_ma, &new_ma_trailer, strip_annotations); - // - // Send new message annotations map start if any - // - qd_buffer_t *da_buf = DEQ_HEAD(new_ma); - while (da_buf) { - char *to_send = (char*) qd_buffer_base(da_buf); - pn_link_send(pnl, to_send, qd_buffer_size(da_buf)); - da_buf = DEQ_NEXT(da_buf); - } - qd_buffer_list_free_buffers(&new_ma); + // + // Start with the very first buffer; + // + buf = DEQ_HEAD(content->buffers); - // - // Annotations possibly include an opaque blob of user annotations - // - if (content->field_user_annotations.length > 0) { - qd_buffer_t *buf2 = content->field_user_annotations.buffer; - unsigned char *cursor2 = content->field_user_annotations.offset + qd_buffer_base(buf); - advance(&cursor2, &buf2, - content->field_user_annotations.length, - send_handler, (void*) pnl); - } - // - // Annotations may include the v1 new_ma_trailer - // - qd_buffer_t *ta_buf = DEQ_HEAD(new_ma_trailer); - while (ta_buf) { - char *to_send = (char*) qd_buffer_base(ta_buf); - pn_link_send(pnl, to_send, qd_buffer_size(ta_buf)); - ta_buf = DEQ_NEXT(ta_buf); - } - qd_buffer_list_free_buffers(&new_ma_trailer); + // + // Send header if present + // + unsigned char *cursor = qd_buffer_base(buf); + int header_consume = content->section_message_header.length + content->section_message_header.hdr_length; + if (content->section_message_header.length > 0) { + buf = content->section_message_header.buffer; + cursor = content->section_message_header.offset + qd_buffer_base(buf); + advance(&cursor, &buf, header_consume, send_handler, (void*) pnl); + } + // + // Send delivery annotation if present + // + int da_consume = content->section_delivery_annotation.length + content->section_delivery_annotation.hdr_length; + if (content->section_delivery_annotation.length > 0) { + buf = content->section_delivery_annotation.buffer; + cursor = content->section_delivery_annotation.offset + qd_buffer_base(buf); + advance(&cursor, &buf, da_consume, send_handler, (void*) pnl); + } - // - // Skip over replaced message annotations - // - if (content->section_message_annotation.length > 0) - advance(&cursor, &buf, - content->section_message_annotation.hdr_length + content->section_message_annotation.length, - 0, 0); + // + // Send new message annotations map start if any + // + qd_buffer_t *da_buf = DEQ_HEAD(new_ma); + while (da_buf) { + char *to_send = (char*) qd_buffer_base(da_buf); + pn_link_send(pnl, to_send, qd_buffer_size(da_buf)); + da_buf = DEQ_NEXT(da_buf); + } + qd_buffer_list_free_buffers(&new_ma); + + // + // Annotations possibly include an opaque blob of user annotations + // + if (content->field_user_annotations.length > 0) { + qd_buffer_t *buf2 = content->field_user_annotations.buffer; + unsigned char *cursor2 = content->field_user_annotations.offset + qd_buffer_base(buf); + advance(&cursor2, &buf2, + content->field_user_annotations.length, + send_handler, (void*) pnl); + } + + // + // Annotations may include the v1 new_ma_trailer + // + qd_buffer_t *ta_buf = DEQ_HEAD(new_ma_trailer); + while (ta_buf) { + char *to_send = (char*) qd_buffer_base(ta_buf); + pn_link_send(pnl, to_send, qd_buffer_size(ta_buf)); + ta_buf = DEQ_NEXT(ta_buf); + } + qd_buffer_list_free_buffers(&new_ma_trailer); + + + // + // Skip over replaced message annotations + // + int ma_consume = content->section_message_annotation.hdr_length + content->section_message_annotation.length; + if (content->section_message_annotation.length > 0) + advance(&cursor, &buf, ma_consume, 0, 0); + + + msg->cursor.buffer = buf; + + // + // If this message has no header and no delivery annotations and no message annotations, set the offset to 0. + // + if (header_consume == 0 && da_consume == 0 && ma_consume ==0) + msg->cursor.cursor = qd_buffer_base(buf); + else + msg->cursor.cursor = cursor; + + msg->sent_depth = QD_DEPTH_MESSAGE_ANNOTATIONS; - // - // Send remaining partial buffer - // - if (buf) { - size_t len = qd_buffer_size(buf) - (cursor - qd_buffer_base(buf)); - advance(&cursor, &buf, len, send_handler, (void*) pnl); } - // Fall through to process the remaining buffers normally - // Note that 'advance' will have moved us to the next buffer in the chain. + buf = msg->cursor.buffer; + if (!buf) + return; while (buf) { - pn_link_send(pnl, (char*) qd_buffer_base(buf), qd_buffer_size(buf)); - buf = DEQ_NEXT(buf); + size_t buf_size = qd_buffer_size(buf); + + // This will send the remaining data in the buffer if any. + int num_bytes_to_send = buf_size - (msg->cursor.cursor - qd_buffer_base(buf)); + if (num_bytes_to_send > 0) + pn_link_send(pnl, (const char*)msg->cursor.cursor, num_bytes_to_send); + + // If the entire message has already been received, taking out this lock is not that expensive + // because there is no contention for this lock. + sys_mutex_lock(msg->content->lock); + + qd_buffer_t *next_buf = DEQ_NEXT(buf); + if (next_buf) { + // There is a next buffer, the previous buffer has been fully sent by now. + qd_buffer_add_fanout(buf); + if (fanout == qd_buffer_fanout(buf)) { + qd_buffer_t *local_buf = DEQ_HEAD(content->buffers); + while (local_buf && local_buf != next_buf) { + DEQ_REMOVE_HEAD(content->buffers); + qd_buffer_free(local_buf); + local_buf = DEQ_HEAD(content->buffers); + } + } + msg->cursor.buffer = next_buf; + msg->cursor.cursor = qd_buffer_base(next_buf); + } + else { + if (qd_message_receive_complete(in_msg)) { + // + // There is no next_buf and there is no more of the message coming, this means + // that we have completely sent out the message. + // + msg->send_complete = true; + msg->cursor.buffer = 0; + msg->cursor.cursor = 0; + + } + else { + // + // There is more of the message to come, update your cursor pointers + // you will come back into this function to deliver more as bytes arrive + // + msg->cursor.buffer = buf; + msg->cursor.cursor = qd_buffer_at(buf, buf_size); + } + } + + sys_mutex_unlock(msg->content->lock); + + buf = next_buf; } } @@ -1546,6 +1722,7 @@ void qd_message_compose_1(qd_message_t *msg, const char *to, qd_buffer_list_t *b { qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_HEADER, 0); qd_message_content_t *content = MSG_CONTENT(msg); + content->receive_complete = true; qd_compose_start_list(field); qd_compose_insert_bool(field, 0); // durable @@ -1594,6 +1771,8 @@ void qd_message_compose_1(qd_message_t *msg, const char *to, qd_buffer_list_t *b void qd_message_compose_2(qd_message_t *msg, qd_composed_field_t *field) { qd_message_content_t *content = MSG_CONTENT(msg); + content->receive_complete = true; + qd_buffer_list_t *field_buffers = qd_compose_buffers(field); content->buffers = *field_buffers; @@ -1604,6 +1783,7 @@ void qd_message_compose_2(qd_message_t *msg, qd_composed_field_t *field) void qd_message_compose_3(qd_message_t *msg, qd_composed_field_t *field1, qd_composed_field_t *field2) { qd_message_content_t *content = MSG_CONTENT(msg); + content->receive_complete = true; qd_buffer_list_t *field1_buffers = qd_compose_buffers(field1); qd_buffer_list_t *field2_buffers = qd_compose_buffers(field2); http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c9262728/src/message_private.h ---------------------------------------------------------------------- diff --git a/src/message_private.h b/src/message_private.h index a3b459b..073ac7f 100644 --- a/src/message_private.h +++ b/src/message_private.h @@ -94,7 +94,6 @@ typedef struct { qd_buffer_t *parse_buffer; unsigned char *parse_cursor; qd_message_depth_t parse_depth; - bool ma_parsed; // have parsed annotations in incoming message qd_iterator_t *ma_field_iter_in; // 'message field iterator' for msg.FIELD_MESSAGE_ANNOTATION @@ -107,11 +106,21 @@ typedef struct { qd_parsed_field_t *ma_pf_to_override; qd_parsed_field_t *ma_pf_trace; int ma_int_phase; + //qd_parsed_field_t *parsed_message_annotations; + + bool discard; // Should this message be discarded? + bool receive_complete; // true if the message has been completely received, false otherwise + sys_atomic_t fanout; // The number of receivers for this message. This number does not include in-process subscribers. } qd_message_content_t; typedef struct { DEQ_LINKS(qd_message_t); // Deque linkage that overlays the qd_message_t - qd_message_content_t *content; + qd_iterator_pointer_t cursor; // A pointer to the current location of the outgoing byte stream. + qd_message_depth_t message_depth; // What is the depth of the message that has been received so far + qd_message_depth_t sent_depth; // How much of the message has been sent? QD_DEPTH_NONE means nothing has been sent so far, QD_DEPTH_HEADER means the header has already been sent, dont send it again and so on. + bool send_complete; // Has the message been completely received and completely sent? + bool tag_sent; // Tags are sent + qd_message_content_t *content; // The actual content of the message. The content is never copied qd_buffer_list_t ma_to_override; // to field in outgoing message annotations. qd_buffer_list_t ma_trace; // trace list in outgoing message annotations qd_buffer_list_t ma_ingress; // ingress field in outgoing message annotations @@ -127,6 +136,8 @@ ALLOC_DECLARE(qd_message_content_t); /** Initialize logging */ void qd_message_initialize(); +qd_iterator_pointer_t qd_message_cursor(qd_message_pvt_t *msg); + ///@} #endif http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c9262728/src/router_core/connections.c ---------------------------------------------------------------------- diff --git a/src/router_core/connections.c b/src/router_core/connections.c index f9638bf..264068d 100644 --- a/src/router_core/connections.c +++ b/src/router_core/connections.c @@ -238,6 +238,7 @@ int qdr_connection_process(qdr_connection_t *conn) if (ref) { link = ref->link; qdr_del_link_ref(&conn->links_with_work, ref->link, QDR_LINK_LIST_CLASS_WORK); + link_work = DEQ_HEAD(link->work_list); if (link_work) { DEQ_REMOVE_HEAD(link->work_list); @@ -611,11 +612,13 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_li qdr_delivery_ref_list_t updated_deliveries; qdr_delivery_list_t undelivered; qdr_delivery_list_t unsettled; + qdr_delivery_list_t settled; qdr_link_work_list_t work_list; sys_mutex_lock(conn->work_lock); DEQ_MOVE(link->work_list, work_list); DEQ_MOVE(link->updated_deliveries, updated_deliveries); + DEQ_MOVE(link->undelivered, undelivered); qdr_delivery_t *d = DEQ_HEAD(undelivered); while (d) { @@ -631,6 +634,14 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_li d->where = QDR_DELIVERY_NOWHERE; d = DEQ_NEXT(d); } + + DEQ_MOVE(link->settled, settled); + d = DEQ_HEAD(settled); + while (d) { + assert(d->where == QDR_DELIVERY_IN_SETTLED); + d->where = QDR_DELIVERY_NOWHERE; + d = DEQ_NEXT(d); + } sys_mutex_unlock(conn->work_lock); // @@ -736,6 +747,31 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_li dlv = DEQ_HEAD(unsettled); } + //Free/unlink/decref the settled deliveries. + dlv = DEQ_HEAD(settled); + while (dlv) { + DEQ_REMOVE_HEAD(settled); + peer = qdr_delivery_first_peer_CT(dlv); + qdr_delivery_t *next_peer = 0; + while (peer) { + next_peer = qdr_delivery_next_peer_CT(dlv); + qdr_delivery_unlink_peers_CT(core, dlv, peer); + peer = next_peer; + } + + // + // Account for the lost reference from the Proton delivery + // + if (!dlv->cleared_proton_ref) { + dlv->cleared_proton_ref = true; + qdr_delivery_decref_CT(core, dlv); + } + + // This decref is for the removing the delivery from the settled list + qdr_delivery_decref_CT(core, dlv); + dlv = DEQ_HEAD(settled); + } + // // Remove the reference to this link in the connection's reference lists // http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c9262728/src/router_core/forwarder.c ---------------------------------------------------------------------- diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c index e31a97a..232868d 100644 --- a/src/router_core/forwarder.c +++ b/src/router_core/forwarder.c @@ -115,9 +115,15 @@ qdr_delivery_t *qdr_forward_new_delivery_CT(qdr_core_t *core, qdr_delivery_t *in out_dlv->error = 0; // - // Create peer linkage only if the outgoing delivery is not settled + // Add one to the message fanout. This will later be used in the qd_message_send function that sends out messages. // - if (!out_dlv->settled) + qd_message_add_fanout(msg); + + // + // Create peer linkage if the outgoing delivery is unsettled. This peer linkage is necessary to deal with dispositions that show up in the future. + // Also create peer linkage if the message is not yet been completely received. This linkage will help us stream large pre-settled multicast messages. + // + if (!out_dlv->settled || !qd_message_receive_complete(msg)) qdr_delivery_link_peers_CT(in_dlv, out_dlv); return out_dlv; @@ -163,28 +169,30 @@ static void qdr_forward_drop_presettled_CT_LH(qdr_core_t *core, qdr_link_t *link } -void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_t *dlv) +void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t *out_link, qdr_delivery_t *out_dlv) { - sys_mutex_lock(link->conn->work_lock); + sys_mutex_lock(out_link->conn->work_lock); // // If the delivery is pre-settled and the outbound link is at or above capacity, // discard all pre-settled deliveries on the undelivered list prior to enqueuing // the new delivery. // - if (dlv->settled && link->capacity > 0 && DEQ_SIZE(link->undelivered) >= link->capacity) - qdr_forward_drop_presettled_CT_LH(core, link); + if (out_dlv->settled && out_link->capacity > 0 && DEQ_SIZE(out_link->undelivered) >= out_link->capacity) + qdr_forward_drop_presettled_CT_LH(core, out_link); - DEQ_INSERT_TAIL(link->undelivered, dlv); - dlv->where = QDR_DELIVERY_IN_UNDELIVERED; - qdr_delivery_incref(dlv); + DEQ_INSERT_TAIL(out_link->undelivered, out_dlv); + out_dlv->where = QDR_DELIVERY_IN_UNDELIVERED; + + // This incref is for putting the delivery in the undelivered list + qdr_delivery_incref(out_dlv); // // We must put a work item on the link's work list to represent this pending delivery. // If there's already a delivery item on the tail of the work list, simply join that item // by incrementing the value. // - qdr_link_work_t *work = DEQ_TAIL(link->work_list); + qdr_link_work_t *work = DEQ_TAIL(out_link->work_list); if (work && work->work_type == QDR_LINK_WORK_DELIVERY) { work->value++; } else { @@ -192,16 +200,16 @@ void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_t * ZERO(work); work->work_type = QDR_LINK_WORK_DELIVERY; work->value = 1; - DEQ_INSERT_TAIL(link->work_list, work); - qdr_add_link_ref(&link->conn->links_with_work, link, QDR_LINK_LIST_CLASS_WORK); + DEQ_INSERT_TAIL(out_link->work_list, work); } - dlv->link_work = work; - sys_mutex_unlock(link->conn->work_lock); + qdr_add_link_ref(&out_link->conn->links_with_work, out_link, QDR_LINK_LIST_CLASS_WORK); + out_dlv->link_work = work; + sys_mutex_unlock(out_link->conn->work_lock); // // Activate the outgoing connection for later processing. // - qdr_connection_activate_CT(core, link->conn); + qdr_connection_activate_CT(core, out_link->conn); } @@ -235,6 +243,7 @@ int qdr_forward_multicast_CT(qdr_core_t *core, int fanout = 0; qd_bitmask_t *link_exclusion = !!in_delivery ? in_delivery->link_exclusion : 0; bool presettled = !!in_delivery ? in_delivery->settled : true; + bool receive_complete = qd_message_receive_complete(qdr_delivery_message(in_delivery)); // // If the delivery is not presettled, set the settled flag for forwarding so all @@ -245,7 +254,6 @@ int qdr_forward_multicast_CT(qdr_core_t *core, // if (!presettled) { in_delivery->settled = true; - // // If the router is configured to reject unsettled multicasts, settle and reject this delivery. // @@ -351,9 +359,23 @@ int qdr_forward_multicast_CT(qdr_core_t *core, // // Forward to in-process subscribers // + qdr_subscription_t *sub = DEQ_HEAD(addr->subscriptions); while (sub) { - qdr_forward_on_message_CT(core, sub, in_delivery ? in_delivery->link : 0, msg); + // + // Only if the message has been completely received, forward it to the subscription + // Subscriptions, at the moment, dont have the ability to deal with partial messages + // + if (receive_complete) + qdr_forward_on_message_CT(core, sub, in_delivery ? in_delivery->link : 0, msg); + else + // + // Receive is not complete, we will store the sub in in_delivery->subscriptions so we can send the message to the subscription + // after the message fully arrives + // + DEQ_INSERT_TAIL(in_delivery->subscriptions, sub); + + fanout++; addr->deliveries_to_container++; sub = DEQ_NEXT(sub); @@ -370,10 +392,13 @@ int qdr_forward_multicast_CT(qdr_core_t *core, else { // // The delivery was not presettled and it was forwarded to at least - // one destination. Accept and settle the delivery. + // one destination. Accept and settle the delivery only if the entire delivery + // has been received. // - in_delivery->disposition = PN_ACCEPTED; - qdr_delivery_push_CT(core, in_delivery); + if (receive_complete) { + in_delivery->disposition = PN_ACCEPTED; + qdr_delivery_push_CT(core, in_delivery); + } } } @@ -395,9 +420,22 @@ int qdr_forward_closest_CT(qdr_core_t *core, // Forward to an in-process subscriber if there is one. // if (!exclude_inprocess) { + bool receive_complete = qd_message_receive_complete(msg); qdr_subscription_t *sub = DEQ_HEAD(addr->subscriptions); if (sub) { - qdr_forward_on_message_CT(core, sub, in_delivery ? in_delivery->link : 0, msg); + + // + // Only if the message has been completely received, forward it. + // Subscriptions, at the moment, dont have the ability to deal with partial messages + // + if (receive_complete) + qdr_forward_on_message_CT(core, sub, in_delivery ? in_delivery->link : 0, msg); + else + // + // Receive is not complete, we will store the sub in in_delivery->subscriptions so we can send the message to the subscription + // after the message fully arrives + // + DEQ_INSERT_TAIL(in_delivery->subscriptions, sub); // // If the incoming delivery is not settled, it should be accepted and settled here. http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c9262728/src/router_core/router_core.c ---------------------------------------------------------------------- diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c index 510b427..010ad98 100644 --- a/src/router_core/router_core.c +++ b/src/router_core/router_core.c @@ -308,6 +308,15 @@ qdr_address_t *qdr_add_local_address_CT(qdr_core_t *core, char aclass, const cha return addr; } +bool qdr_is_addr_treatment_multicast(qdr_address_t *addr) +{ + if (addr) { + if (addr->treatment == QD_TREATMENT_MULTICAST_FLOOD || addr->treatment == QD_TREATMENT_MULTICAST_ONCE) + return true; + } + return false; +} + void qdr_core_remove_address(qdr_core_t *core, qdr_address_t *addr) { // Remove the address from the list and hash index http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c9262728/src/router_core/router_core_private.h ---------------------------------------------------------------------- diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h index da36b14..052ed7f 100644 --- a/src/router_core/router_core_private.h +++ b/src/router_core/router_core_private.h @@ -300,7 +300,8 @@ DEQ_DECLARE(qdr_router_ref_t, qdr_router_ref_list_t); typedef enum { QDR_DELIVERY_NOWHERE = 0, QDR_DELIVERY_IN_UNDELIVERED, - QDR_DELIVERY_IN_UNSETTLED + QDR_DELIVERY_IN_UNSETTLED, + QDR_DELIVERY_IN_SETTLED } qdr_delivery_where_t; typedef struct qdr_delivery_ref_t { @@ -311,6 +312,17 @@ typedef struct qdr_delivery_ref_t { ALLOC_DECLARE(qdr_delivery_ref_t); DEQ_DECLARE(qdr_delivery_ref_t, qdr_delivery_ref_list_t); +struct qdr_subscription_t { + DEQ_LINKS(qdr_subscription_t); + qdr_core_t *core; + qdr_address_t *addr; + qdr_receive_t on_message; + void *on_message_context; +}; + +DEQ_DECLARE(qdr_subscription_t, qdr_subscription_list_t); + + struct qdr_delivery_t { DEQ_LINKS(qdr_delivery_t); void *context; @@ -334,6 +346,7 @@ struct qdr_delivery_t { qdr_address_t *tracking_addr; int tracking_addr_bit; qdr_link_work_t *link_work; ///< Delivery work item for this delivery + qdr_subscription_list_t subscriptions; qdr_delivery_ref_list_t peers; /// Use this list if there if the delivery has more than one peer. }; @@ -375,6 +388,7 @@ struct qdr_link_t { qdr_auto_link_t *auto_link; ///< [ref] Auto_link that owns this link qdr_delivery_list_t undelivered; ///< Deliveries to be forwarded or sent qdr_delivery_list_t unsettled; ///< Unsettled deliveries + qdr_delivery_list_t settled; ///< Settled deliveries qdr_delivery_ref_list_t updated_deliveries; ///< References to deliveries (in the unsettled list) with updates. bool admin_enabled; qdr_link_oper_status_t oper_status; @@ -419,18 +433,6 @@ DEQ_DECLARE(qdr_connection_ref_t, qdr_connection_ref_list_t); void qdr_add_connection_ref(qdr_connection_ref_list_t *ref_list, qdr_connection_t *conn); void qdr_del_connection_ref(qdr_connection_ref_list_t *ref_list, qdr_connection_t *conn); - -struct qdr_subscription_t { - DEQ_LINKS(qdr_subscription_t); - qdr_core_t *core; - qdr_address_t *addr; - qdr_receive_t on_message; - void *on_message_context; -}; - -DEQ_DECLARE(qdr_subscription_t, qdr_subscription_list_t); - - struct qdr_address_t { DEQ_LINKS(qdr_address_t); qdr_subscription_list_t subscriptions; ///< In-process message subscribers @@ -489,7 +491,7 @@ struct qdr_address_config_t { ALLOC_DECLARE(qdr_address_config_t); DEQ_DECLARE(qdr_address_config_t, qdr_address_config_list_t); void qdr_core_remove_address_config(qdr_core_t *core, qdr_address_config_t *addr); - +bool qdr_is_addr_treatment_multicast(qdr_address_t *addr); // // Connection Information @@ -699,6 +701,7 @@ void qdr_delivery_release_CT(qdr_core_t *core, qdr_delivery_t *delivery); void qdr_delivery_failed_CT(qdr_core_t *core, qdr_delivery_t *delivery); bool qdr_delivery_settled_CT(qdr_core_t *core, qdr_delivery_t *delivery); void qdr_delivery_decref_CT(qdr_core_t *core, qdr_delivery_t *delivery); +void qdr_forward_on_message_CT(qdr_core_t *core, qdr_subscription_t *sub, qdr_link_t *link, qd_message_t *msg); /** * Links the in_dlv to the out_dlv and increments ref counts of both deliveries http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c9262728/src/router_core/transfer.c ---------------------------------------------------------------------- diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c index 9453996..4198262 100644 --- a/src/router_core/transfer.c +++ b/src/router_core/transfer.c @@ -26,6 +26,7 @@ static void qdr_link_flow_CT(qdr_core_t *core, qdr_action_t *action, bool discar static void qdr_send_to_CT(qdr_core_t *core, qdr_action_t *action, bool discard); static void qdr_update_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool discard); static void qdr_delete_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool discard); +static void qdr_deliver_continue_CT(qdr_core_t *core, qdr_action_t *action, bool discard); //================================================================================== // Internal Functions @@ -114,40 +115,82 @@ qdr_delivery_t *qdr_link_deliver_to_routed_link(qdr_link_t *link, qd_message_t * } -void qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit) +qdr_delivery_t *qdr_deliver_continue(qdr_delivery_t *in_dlv) +{ + qdr_action_t *action = qdr_action(qdr_deliver_continue_CT, "deliver_continue"); + action->args.connection.delivery = in_dlv; + + // This incref is for the action reference + qdr_delivery_incref(in_dlv); + qdr_action_enqueue(in_dlv->link->core, action); + return in_dlv; +} + + +int qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit) { qdr_connection_t *conn = link->conn; qdr_delivery_t *dlv; bool drained = false; int offer = -1; bool settled = false; + bool send_complete = false; + int num_deliveries_completed = 0; if (link->link_direction == QD_OUTGOING) { while (credit > 0 && !drained) { sys_mutex_lock(conn->work_lock); dlv = DEQ_HEAD(link->undelivered); - if (dlv) { - DEQ_REMOVE_HEAD(link->undelivered); - dlv->link_work = 0; - settled = dlv->settled; - if (!settled) { - DEQ_INSERT_TAIL(link->unsettled, dlv); - dlv->where = QDR_DELIVERY_IN_UNSETTLED; - } else - dlv->where = QDR_DELIVERY_NOWHERE; - - credit--; - link->total_deliveries++; - offer = DEQ_SIZE(link->undelivered); - } else - drained = true; sys_mutex_unlock(conn->work_lock); - if (dlv) { - link->credit_to_core--; + settled = dlv->settled; core->deliver_handler(core->user_context, link, dlv, settled); - if (settled) - qdr_delivery_decref(core, dlv); + sys_mutex_lock(conn->work_lock); + send_complete = qdr_delivery_send_complete(dlv); + if (send_complete) { + // + // The entire message has been sent. It is now the appropriate time to have the delivery removed + // from the head of the undelivered list and move it to the unsettled list if it is not settled. + // + DEQ_REMOVE_HEAD(link->undelivered); + num_deliveries_completed ++; + dlv->link_work = 0; + + if (settled) { + dlv->where = QDR_DELIVERY_NOWHERE; + + // This decref is for removing this settled delivery from the undelivered list + qdr_delivery_decref(core, dlv); + + } else { + DEQ_INSERT_TAIL(link->unsettled, dlv); + dlv->where = QDR_DELIVERY_IN_UNSETTLED; + } + + credit--; + link->credit_to_core--; + link->total_deliveries++; + offer = DEQ_SIZE(link->undelivered); + } + else { + // + // The message is still being received/sent. + // 1. We cannot remove the delivery from the undelivered list. + // This delivery needs to stay at the head of the undelivered list until the entire message has been sent out i.e other deliveries in the + // undelivered list have to wait before this entire large delivery is sent out + // 2. We need to call deliver_handler so any newly arrived bytes can be pushed out + // 3. We need to break out of this loop otherwise a thread will keep spinning in here until the entire message has been sent out. + // + sys_mutex_unlock(conn->work_lock); + + // + // Note here that we are not incrementing num_deliveries_processed. Since this delivery is still coming in or still being sent out, + // we cannot consider this delivery as fully processed. + // + return num_deliveries_completed; + } + sys_mutex_unlock(conn->work_lock); + } } @@ -156,6 +199,8 @@ void qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit) else if (offer != -1) core->offer_handler(core->user_context, link, offer); } + + return num_deliveries_completed; } @@ -234,12 +279,49 @@ void qdr_delivery_set_context(qdr_delivery_t *delivery, void *context) delivery->context = context; } +void qdr_delivery_set_cleared_proton_ref(qdr_delivery_t *dlv, bool cleared_proton_ref) +{ + dlv->cleared_proton_ref = cleared_proton_ref; +} + void *qdr_delivery_get_context(qdr_delivery_t *delivery) { return delivery->context; } + +bool qdr_delivery_send_complete(const qdr_delivery_t *delivery) +{ + if (!delivery) + return false; + return qd_message_send_complete(delivery->msg); +} + +bool qdr_delivery_tag_sent(const qdr_delivery_t *delivery) +{ + if (!delivery) + return false; + return qd_message_tag_sent(delivery->msg); +} + +void qdr_delivery_set_tag_sent(const qdr_delivery_t *delivery, bool tag_sent) +{ + if (!delivery) + return; + + qd_message_set_tag_sent(delivery->msg, tag_sent); +} + + +bool qdr_delivery_receive_complete(const qdr_delivery_t *delivery) +{ + if (!delivery) + return false; + return qd_message_receive_complete(delivery->msg); +} + + void qdr_delivery_incref(qdr_delivery_t *delivery) { sys_atomic_inc(&delivery->ref_count); @@ -272,6 +354,8 @@ void qdr_delivery_tag(const qdr_delivery_t *delivery, const char **tag, int *len qd_message_t *qdr_delivery_message(const qdr_delivery_t *delivery) { + if (!delivery) + return 0; return delivery->msg; } @@ -417,9 +501,19 @@ static void qdr_delete_delivery_internal_CT(qdr_core_t *core, qdr_delivery_t *de link->modified_deliveries++; } + // + // Free all the peer qdr_delivery_ref_t references + // + qdr_delivery_ref_t *ref = DEQ_HEAD(delivery->peers); + while (ref) { + qdr_del_delivery_ref(&delivery->peers, ref); + ref = DEQ_HEAD(delivery->peers); + } + qd_bitmask_free(delivery->link_exclusion); qdr_error_free(delivery->error); free_qdr_delivery_t(delivery); + } static bool qdr_delivery_has_peer_CT(qdr_delivery_t *dlv) @@ -427,7 +521,6 @@ static bool qdr_delivery_has_peer_CT(qdr_delivery_t *dlv) return dlv->peer || DEQ_SIZE(dlv->peers) > 0; } - void qdr_delivery_link_peers_CT(qdr_delivery_t *in_dlv, qdr_delivery_t *out_dlv) { // If there is no delivery or a peer, we cannot link each other. @@ -466,17 +559,38 @@ void qdr_delivery_unlink_peers_CT(qdr_core_t *core, qdr_delivery_t *dlv, qdr_del // If there is no delivery or a peer, we cannot proceed. if (!dlv || !peer) return; - // - // Make sure that the passed in deliveries are indeed peers. - // - assert(dlv->peer == peer); - assert(peer->peer == dlv); - - dlv->peer = 0; - peer->peer = 0; - qdr_delivery_decref_CT(core, dlv); - qdr_delivery_decref_CT(core, peer); + if (dlv->peer) { + // + // This is the easy case. One delivery has only one peer. we can simply + // zero them out and directly decref. + // + assert(dlv->peer == peer); + dlv->peer = 0; + peer->peer = 0; + qdr_delivery_decref_CT(core, dlv); + qdr_delivery_decref_CT(core, peer); + } + else { + // + // The dlv has more than one peer. We are going to find the peer of dlv that match with the passed in peer + // and delete that peer. + // + qdr_delivery_ref_t *dlv_ref = DEQ_HEAD(dlv->peers); + while (dlv_ref) { + qdr_delivery_t * peer_dlv = dlv_ref->dlv; + if (peer_dlv == peer) { + if (peer->peer) { + peer->peer = 0; + qdr_delivery_decref_CT(core, dlv); + } + qdr_del_delivery_ref(&dlv->peers, dlv_ref); + qdr_delivery_decref_CT(core, peer); + break; + } + dlv_ref = DEQ_NEXT(dlv_ref); + } + } } @@ -524,6 +638,7 @@ qdr_delivery_t *qdr_delivery_next_peer_CT(qdr_delivery_t *dlv) void qdr_delivery_decref_CT(qdr_core_t *core, qdr_delivery_t *dlv) { uint32_t ref_count = sys_atomic_dec(&dlv->ref_count); + assert(ref_count > 0); if (ref_count == 1) @@ -610,6 +725,7 @@ static long qdr_addr_path_count_CT(qdr_address_t *addr) static void qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_t *dlv, qdr_address_t *addr) { + bool receive_complete = qd_message_receive_complete(qdr_delivery_message(dlv)); if (addr && addr == link->owning_addr && qdr_addr_path_count_CT(addr) == 0) { // // We are trying to forward a delivery on an address that has no outbound paths @@ -658,23 +774,48 @@ static void qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery // // If the delivery is not settled, release it. // - if (!dlv->settled) + if (!dlv->settled) { qdr_delivery_release_CT(core, dlv); + + // + // Set the discard flag on the message only if the message is not completely received yet. + // + if (!receive_complete) + qd_message_set_discard(dlv->msg, true); + } + + // + // Decrementing the delivery ref count for the action + // qdr_delivery_decref_CT(core, dlv); qdr_link_issue_credit_CT(core, link, 1, false); } else if (fanout > 0) { - if (dlv->settled) { + if (dlv->settled || qdr_is_addr_treatment_multicast(addr)) { // // The delivery is settled. Keep it off the unsettled list and issue // replacement credit for it now. // qdr_link_issue_credit_CT(core, link, 1, false); + if (receive_complete) { + // + // This decref is for the action ref + // + qdr_delivery_decref_CT(core, dlv); + } + else { + // + // The message is still coming through since receive_complete is false. We have to put this delivery in the settled list. + // We need to do this because we have linked this delivery to a peer. + // If this connection goes down, we will have to unlink peer so that peer knows that its peer is not-existent anymore + // and need to tell the other side that the message has been aborted. + // - // - // If the delivery has no more references, free it now. - // - assert(!dlv->peer); - qdr_delivery_decref_CT(core, dlv); + // + // Again, don't bother decrementing then incrementing the ref_count, we are still using the action ref count + // + DEQ_INSERT_TAIL(link->settled, dlv); + dlv->where = QDR_DELIVERY_IN_SETTLED; + } } else { // // Again, don't bother decrementing then incrementing the ref_count @@ -703,13 +844,15 @@ static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool dis qdr_delivery_t *dlv = action->args.connection.delivery; qdr_link_t *link = dlv->link; - // - // If this is an attach-routed link, put the delivery directly onto the peer link - // + if (link->connected_link) { + // + // If this is an attach-routed link, put the delivery directly onto the peer link + // qdr_delivery_t *peer = qdr_forward_new_delivery_CT(core, dlv, link->connected_link, dlv->msg); qdr_delivery_copy_extension_state(dlv, peer, true); + // // Copy the delivery tag. For link-routing, the delivery tag must be preserved. // @@ -717,9 +860,9 @@ static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool dis memcpy(peer->tag, action->args.connection.tag, peer->tag_length); qdr_forward_deliver_CT(core, link->connected_link, peer); - qd_message_free(dlv->msg); - dlv->msg = 0; + link->total_deliveries++; + if (!dlv->settled) { DEQ_INSERT_TAIL(link->unsettled, dlv); dlv->where = QDR_DELIVERY_IN_UNSETTLED; @@ -754,7 +897,7 @@ static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool dis } // - // Give the action reference to the qdr_link_forward function. + // Give the action reference to the qdr_link_forward function. Don't decref/incref. // qdr_link_forward_CT(core, link, dlv, addr); } else { @@ -869,6 +1012,99 @@ static void qdr_delete_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool } +static void qdr_deliver_continue_peers_CT(qdr_core_t *core, qdr_delivery_t *in_dlv) +{ + qdr_delivery_t *peer = qdr_delivery_first_peer_CT(in_dlv); + while (peer) { + qdr_link_work_t *work = peer->link_work; + // + // Determines if the peer connection can be activated. + // For a large message, the peer delivery's link_work MUST be at the head of the peer link's work list. This link work is only removed + // after the streaming message has been sent. + // + if (work) { + sys_mutex_lock(peer->link->conn->work_lock); + if (work == DEQ_HEAD(peer->link->work_list)) { + qdr_add_link_ref(&peer->link->conn->links_with_work, peer->link, QDR_LINK_LIST_CLASS_WORK); + sys_mutex_unlock(peer->link->conn->work_lock); + + // + // Activate the outgoing connection for later processing. + // + qdr_connection_activate_CT(core, peer->link->conn); + } + else { + sys_mutex_unlock(peer->link->conn->work_lock); + + } + } + + peer = qdr_delivery_next_peer_CT(in_dlv); + } +} + + +static void qdr_deliver_continue_CT(qdr_core_t *core, qdr_action_t *action, bool discard) +{ + if (discard) + return; + + qdr_delivery_t *in_dlv = action->args.connection.delivery; + + // This decref is for the action reference + qdr_delivery_decref_CT(core, in_dlv); + + // + // If it is already in the undelivered list or it has no peers, don't try to deliver this again. + // + if (in_dlv->where == QDR_DELIVERY_IN_UNDELIVERED || !qdr_delivery_has_peer_CT(in_dlv)) + return; + + qdr_deliver_continue_peers_CT(core, in_dlv); + + + if (qd_message_receive_complete(qdr_delivery_message(in_dlv))) { + // + // The entire message has now been received. Check to see if there are in process subscriptions that need to + // receive this message. in process subscriptions, at this time, can deal only with full messages. + // + qdr_subscription_t *sub = DEQ_HEAD(in_dlv->subscriptions); + while (sub) { + DEQ_REMOVE_HEAD(in_dlv->subscriptions); + qdr_forward_on_message_CT(core, sub, in_dlv ? in_dlv->link : 0, in_dlv->msg); + sub = DEQ_HEAD(in_dlv->subscriptions); + } + + // This is a multicast delivery + if (qdr_is_addr_treatment_multicast(in_dlv->link->owning_addr)) { + assert(in_dlv->where == QDR_DELIVERY_IN_SETTLED); + // + // The router will settle on behalf of the receiver in the case of multicast and send out settled + // deliveries to the receivers. + // + in_dlv->disposition = PN_ACCEPTED; + qdr_delivery_push_CT(core, in_dlv); + + // + // The in_dlv has one or more peers. These peers will have to be unlinked. + // + qdr_delivery_t *peer = qdr_delivery_first_peer_CT(in_dlv); + qdr_delivery_t *next_peer = 0; + while (peer) { + next_peer = qdr_delivery_next_peer_CT(in_dlv); + qdr_delivery_unlink_peers_CT(core, in_dlv, peer); + peer = next_peer; + } + + // Remove the delivery from the settled list and decref the in_dlv. + in_dlv->where = QDR_DELIVERY_NOWHERE; + qdr_delivery_decref_CT(core, in_dlv); // This decref is for removing the delivery from the settled list. + DEQ_REMOVE(in_dlv->link->settled, in_dlv); + } + } +} + + /** * Add link-work to provide credit to the link in an IO thread */ http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c9262728/src/router_node.c ---------------------------------------------------------------------- diff --git a/src/router_node.c b/src/router_node.c index dd21fec..8c964e0 100644 --- a/src/router_node.c +++ b/src/router_node.c @@ -186,51 +186,47 @@ static qd_iterator_t *router_annotate_message(qd_router_t *router, */ static void AMQP_rx_handler(void* context, qd_link_t *link, pn_delivery_t *pnd) { - qd_router_t *router = (qd_router_t*) context; - pn_link_t *pn_link = qd_link_pn(link); - qdr_link_t *rlink = (qdr_link_t*) qd_link_get_context(link); - qd_connection_t *conn = qd_link_connection(link); + qd_router_t *router = (qd_router_t*) context; + pn_link_t *pn_link = qd_link_pn(link); + qdr_link_t *rlink = (qdr_link_t*) qd_link_get_context(link); + qd_connection_t *conn = qd_link_connection(link); const qd_server_config_t *cf = qd_connection_config(conn); - qdr_delivery_t *delivery = 0; - qd_message_t *msg; + qdr_delivery_t *delivery = pn_delivery_get_context(pnd); // - // Receive the message into a local representation. If the returned message - // pointer is NULL, we have not yet received a complete message. - // - // Note: In the link-routing case, consider cutting the message through. There's - // no reason to wait for the whole message to be received before starting to - // send it. + // Receive the message into a local representation. // - msg = qd_message_receive(pnd); - - if (!msg) - return; + qd_message_t *msg = qd_message_receive(pnd); + bool receive_complete = qd_message_receive_complete(msg); - if (cf->log_message) { - char repr[qd_message_repr_len()]; - char* message_repr = qd_message_repr((qd_message_t*)msg, - repr, - sizeof(repr), - cf->log_bits); - if (message_repr) { - qd_log(qd_message_log_source(), QD_LOG_TRACE, "Link %s received %s", - pn_link_name(pn_link), - message_repr); + if (receive_complete) { + // + // The entire message has been received and we are ready to consume the delivery by calling pn_link_advance(). + // + pn_link_advance(pn_link); + + // Since the entire message has been received, we can print out its contents to the log if necessary. + if (cf->log_message) { + char repr[qd_message_repr_len()]; + char* message_repr = qd_message_repr((qd_message_t*)msg, + repr, + sizeof(repr), + cf->log_bits); + if (message_repr) { + qd_log(qd_message_log_source(), QD_LOG_TRACE, "Link %s received %s", + pn_link_name(pn_link), + message_repr); + } } } // - // Consume the delivery. - // - pn_link_advance(pn_link); - - // // If there's no router link, free the message and finish. It's likely that the link // is closing. // if (!rlink) { - qd_message_free(msg); + if (receive_complete) // The entire message has been received but there is nowhere to send it to, free it and do nothing. + qd_message_free(msg); return; } @@ -239,20 +235,33 @@ static void AMQP_rx_handler(void* context, qd_link_t *link, pn_delivery_t *pnd) // if (qdr_link_is_routed(rlink)) { pn_delivery_tag_t dtag = pn_delivery_tag(pnd); - delivery = qdr_link_deliver_to_routed_link(rlink, msg, pn_delivery_settled(pnd), - (uint8_t*) dtag.start, dtag.size, - pn_disposition_type(pn_delivery_remote(pnd)), - pn_disposition_data(pn_delivery_remote(pnd))); - + // + // A delivery object was already available via pn_delivery_get_context. This means a qdr_delivery was already created. Use it to continue delivery. + // if (delivery) { - if (pn_delivery_settled(pnd)) + qdr_deliver_continue(delivery); + + // + // Settle the proton delivery only if all the data has been received. + // + if (pn_delivery_settled(pnd) && receive_complete) { pn_delivery_settle(pnd); - else { - pn_delivery_set_context(pnd, delivery); - qdr_delivery_set_context(delivery, pnd); - qdr_delivery_incref(delivery); + qdr_delivery_decref(router->router_core, delivery); } } + else { + delivery = qdr_link_deliver_to_routed_link(rlink, + msg, + pn_delivery_settled(pnd), + (uint8_t*) dtag.start, + dtag.size, + pn_disposition_type(pn_delivery_remote(pnd)), + pn_disposition_data(pn_delivery_remote(pnd))); + pn_delivery_set_context(pnd, delivery); + qdr_delivery_set_context(delivery, pnd); + qdr_delivery_incref(delivery); + } + return; } @@ -294,134 +303,142 @@ static void AMQP_rx_handler(void* context, qd_link_t *link, pn_delivery_t *pnd) qd_message_depth_t validation_depth = (anonymous_link || check_user) ? QD_DEPTH_PROPERTIES : QD_DEPTH_MESSAGE_ANNOTATIONS; bool valid_message = qd_message_check(msg, validation_depth); - if (valid_message) { - if (check_user) { - // This connection must not allow proxied user_id - qd_iterator_t *userid_iter = qd_message_field_iterator(msg, QD_FIELD_USER_ID); - if (userid_iter) { - // The user_id property has been specified - if (qd_iterator_remaining(userid_iter) > 0) { - // user_id property in message is not blank - if (!qd_iterator_equal(userid_iter, (const unsigned char *)conn->user_id)) { - // This message is rejected: attempted user proxy is disallowed - qd_log(router->log_source, QD_LOG_DEBUG, "Message rejected due to user_id proxy violation. User:%s", conn->user_id); - pn_link_flow(pn_link, 1); - pn_delivery_update(pnd, PN_REJECTED); - pn_delivery_settle(pnd); - qd_message_free(msg); - qd_iterator_free(userid_iter); - return; - } + if (!valid_message && receive_complete) { + // + // The entire message has been received and the message is still invalid. Reject the message. + // + qd_message_set_discard(msg, true); + pn_link_flow(pn_link, 1); + pn_delivery_update(pnd, PN_REJECTED); + pn_delivery_settle(pnd); + qd_message_free(msg); + } + + if (!valid_message) { + return; + } + + if (delivery) { + qdr_deliver_continue(delivery); + + if (pn_delivery_settled(pnd) && receive_complete) { + pn_delivery_settle(pnd); + qdr_delivery_decref(router->router_core, delivery); + } + return; + } + + if (check_user) { + // This connection must not allow proxied user_id + qd_iterator_t *userid_iter = qd_message_field_iterator(msg, QD_FIELD_USER_ID); + if (userid_iter) { + // The user_id property has been specified + if (qd_iterator_remaining(userid_iter) > 0) { + // user_id property in message is not blank + if (!qd_iterator_equal(userid_iter, (const unsigned char *)conn->user_id)) { + // This message is rejected: attempted user proxy is disallowed + qd_log(router->log_source, QD_LOG_DEBUG, "Message rejected due to user_id proxy violation. User:%s", conn->user_id); + pn_link_flow(pn_link, 1); + pn_delivery_update(pnd, PN_REJECTED); + pn_delivery_settle(pnd); + qd_message_free(msg); + qd_iterator_free(userid_iter); + return; } - qd_iterator_free(userid_iter); } + qd_iterator_free(userid_iter); } + } - qd_message_message_annotations(msg); - qd_bitmask_t *link_exclusions; + qd_message_message_annotations(msg); + qd_bitmask_t *link_exclusions; - qd_iterator_t *ingress_iter = router_annotate_message(router, msg, &link_exclusions); + qd_iterator_t *ingress_iter = router_annotate_message(router, msg, &link_exclusions); - if (anonymous_link) { - qd_iterator_t *addr_iter = 0; - int phase = 0; - - // - // If the message has delivery annotations, get the to-override field from the annotations. - // - qd_parsed_field_t *ma_to = qd_message_get_to_override(msg); - if (ma_to) { - addr_iter = qd_iterator_dup(qd_parse_raw(ma_to)); - phase = qd_message_get_phase_val(msg); - } + if (anonymous_link) { + qd_iterator_t *addr_iter = 0; + int phase = 0; - // - // Still no destination address? Use the TO field from the message properties. - // - if (!addr_iter) { - addr_iter = qd_message_field_iterator(msg, QD_FIELD_TO); - - // - // If the address came from the TO field and we need to apply a tenant-space, - // set the to-override with the annotated address. - // - if (addr_iter && tenant_space) { - qd_iterator_reset_view(addr_iter, ITER_VIEW_ADDRESS_WITH_SPACE); - qd_iterator_annotate_space(addr_iter, tenant_space, tenant_space_len); - qd_composed_field_t *to_override = qd_compose_subfield(0); - qd_compose_insert_string_iterator(to_override, addr_iter); - qd_message_set_to_override_annotation(msg, to_override); - } - } + // + // If the message has delivery annotations, get the to-override field from the annotations. + // + qd_parsed_field_t *ma_to = qd_message_get_to_override(msg); + if (ma_to) { + addr_iter = qd_iterator_dup(qd_parse_raw(ma_to)); + phase = qd_message_get_phase_annotation(msg); + } + + // + // Still no destination address? Use the TO field from the message properties. + // + if (!addr_iter) { + addr_iter = qd_message_field_iterator(msg, QD_FIELD_TO); - if (addr_iter) { - qd_iterator_reset_view(addr_iter, ITER_VIEW_ADDRESS_HASH); - if (phase > 0) - qd_iterator_annotate_phase(addr_iter, '0' + (char) phase); - delivery = qdr_link_deliver_to(rlink, msg, ingress_iter, addr_iter, pn_delivery_settled(pnd), - link_exclusions); - } - } else { // - // This is a targeted link, not anonymous. + // If the address came from the TO field and we need to apply a tenant-space, + // set the to-override with the annotated address. // - const char *term_addr = pn_terminus_get_address(qd_link_remote_target(link)); - if (!term_addr) - term_addr = pn_terminus_get_address(qd_link_source(link)); - - if (term_addr) { + if (addr_iter && tenant_space) { + qd_iterator_reset_view(addr_iter, ITER_VIEW_ADDRESS_WITH_SPACE); + qd_iterator_annotate_space(addr_iter, tenant_space, tenant_space_len); qd_composed_field_t *to_override = qd_compose_subfield(0); - if (tenant_space) { - qd_iterator_t *aiter = qd_iterator_string(term_addr, ITER_VIEW_ADDRESS_WITH_SPACE); - qd_iterator_annotate_space(aiter, tenant_space, tenant_space_len); - qd_compose_insert_string_iterator(to_override, aiter); - qd_iterator_free(aiter); - } else - qd_compose_insert_string(to_override, term_addr); + qd_compose_insert_string_iterator(to_override, addr_iter); qd_message_set_to_override_annotation(msg, to_override); - int phase = qdr_link_phase(rlink); - if (phase != 0) - qd_message_set_phase_annotation(msg, phase); } - delivery = qdr_link_deliver(rlink, msg, ingress_iter, pn_delivery_settled(pnd), link_exclusions); } - if (delivery) { - if (pn_delivery_settled(pnd)) - pn_delivery_settle(pnd); - else { - pn_delivery_set_context(pnd, delivery); - qdr_delivery_set_context(delivery, pnd); - qdr_delivery_incref(delivery); - } - } else { - // - // The message is now and will always be unroutable because there is no address. - // - pn_link_flow(pn_link, 1); - pn_delivery_update(pnd, PN_REJECTED); - pn_delivery_settle(pnd); - qd_message_free(msg); + if (addr_iter) { + qd_iterator_reset_view(addr_iter, ITER_VIEW_ADDRESS_HASH); + if (phase > 0) + qd_iterator_annotate_phase(addr_iter, '0' + (char) phase); + delivery = qdr_link_deliver_to(rlink, msg, ingress_iter, addr_iter, pn_delivery_settled(pnd), + link_exclusions); } - + } else { // - // Rules for delivering messages: + // This is a targeted link, not anonymous. // - // For addressed (non-anonymous) links: - // to-override must be set (done in the core?) - // uses qdr_link_deliver to hand over to the core + const char *term_addr = pn_terminus_get_address(qd_link_remote_target(link)); + if (!term_addr) + term_addr = pn_terminus_get_address(qd_link_source(link)); + + if (term_addr) { + qd_composed_field_t *to_override = qd_compose_subfield(0); + if (tenant_space) { + qd_iterator_t *aiter = qd_iterator_string(term_addr, ITER_VIEW_ADDRESS_WITH_SPACE); + qd_iterator_annotate_space(aiter, tenant_space, tenant_space_len); + qd_compose_insert_string_iterator(to_override, aiter); + qd_iterator_free(aiter); + } else + qd_compose_insert_string(to_override, term_addr); + qd_message_set_to_override_annotation(msg, to_override); + int phase = qdr_link_phase(rlink); + if (phase != 0) + qd_message_set_phase_annotation(msg, phase); + } + delivery = qdr_link_deliver(rlink, msg, ingress_iter, pn_delivery_settled(pnd), link_exclusions); + } + + if (delivery) { // - // For anonymous links: - // If there's a to-override in the annotations, use that address - // Or, use the 'to' field in the message properties + // Settle the proton delivery only if all the data has arrived // + if (pn_delivery_settled(pnd)) { + if (receive_complete) { + pn_delivery_settle(pnd); + return; + } + } - - + // If this delivery is unsettled or if this is a settled multi-frame large message, set the context + pn_delivery_set_context(pnd, delivery); + qdr_delivery_set_context(delivery, pnd); + qdr_delivery_incref(delivery); } else { // - // Message is invalid. Reject the message and don't involve the router core. + // If there is no delivery, the message is now and will always be unroutable because there is no address. // + qd_message_set_discard(msg, true); pn_link_flow(pn_link, 1); pn_delivery_update(pnd, PN_REJECTED); pn_delivery_settle(pnd); @@ -447,6 +464,11 @@ static void AMQP_disposition_handler(void* context, qd_link_t *link, pn_delivery if (!delivery) return; + bool receive_complete = qdr_delivery_receive_complete(delivery); + + if (!receive_complete) + return; + pn_disposition_t *disp = pn_delivery_remote(pnd); pn_condition_t *cond = pn_disposition_condition(disp); qdr_error_t *error = qdr_error_from_pn(cond); @@ -459,6 +481,7 @@ static void AMQP_disposition_handler(void* context, qd_link_t *link, pn_delivery if (pn_delivery_settled(pnd)) { pn_delivery_set_context(pnd, 0); qdr_delivery_set_context(delivery, 0); + qdr_delivery_set_cleared_proton_ref(delivery, true); // // Don't decref the delivery here. Rather, we will _give_ the reference to the core. @@ -479,8 +502,9 @@ static void AMQP_disposition_handler(void* context, qd_link_t *link, pn_delivery // // If settled, close out the delivery // - if (pn_delivery_settled(pnd)) + if (pn_delivery_settled(pnd)) { pn_delivery_settle(pnd); + } } @@ -988,21 +1012,21 @@ static int CORE_link_push(void *context, qdr_link_t *link, int limit) qd_link_t *qlink = (qd_link_t*) qdr_link_get_context(link); if (!qlink) return 0; - + pn_link_t *plink = qd_link_pn(qlink); if (plink) { int link_credit = pn_link_credit(plink); if (link_credit > limit) link_credit = limit; - qdr_link_process_deliveries(router->router_core, link, link_credit); - return link_credit; - } + if (link_credit > 0) + // We will not bother calling qdr_link_process_deliveries if we have no credit. + return qdr_link_process_deliveries(router->router_core, link, link_credit); + } return 0; } - static void CORE_link_deliver(void *context, qdr_link_t *link, qdr_delivery_t *dlv, bool settled) { qd_router_t *router = (qd_router_t*) context; @@ -1014,37 +1038,61 @@ static void CORE_link_deliver(void *context, qdr_link_t *link, qdr_delivery_t *d if (!plink) return; - const char *tag; - int tag_length; - - qdr_delivery_tag(dlv, &tag, &tag_length); - - pn_delivery(plink, pn_dtag(tag, tag_length)); - pn_delivery_t *pdlv = pn_link_current(plink); - - // handle any delivery-state on the transfer e.g. transactional-state - qdr_delivery_write_extension_state(dlv, pdlv, true); // // If the remote send settle mode is set to 'settled' then settle the delivery on behalf of the receiver. // bool remote_snd_settled = qd_link_remote_snd_settle_mode(qlink) == PN_SND_SETTLED; + pn_delivery_t *pdlv = 0; + + if (!qdr_delivery_tag_sent(dlv)) { + const char *tag; + int tag_length; + + qdr_delivery_tag(dlv, &tag, &tag_length); + + // Create a new proton delivery on link 'plink' + pn_delivery(plink, pn_dtag(tag, tag_length)); + + pdlv = pn_link_current(plink); - if (!settled && !remote_snd_settled) { - pn_delivery_set_context(pdlv, dlv); - qdr_delivery_set_context(dlv, pdlv); - qdr_delivery_incref(dlv); + // handle any delivery-state on the transfer e.g. transactional-state + qdr_delivery_write_extension_state(dlv, pdlv, true); + + // + // If the remote send settle mode is set to 'settled', we should settle the delivery on behalf of the receiver. + // + if (!settled && !remote_snd_settled) { + if (qdr_delivery_get_context(dlv) == 0) { + pn_delivery_set_context(pdlv, dlv); + qdr_delivery_set_context(dlv, pdlv); + qdr_delivery_incref(dlv); + } + } + + qdr_delivery_set_tag_sent(dlv, true); + } + + if (!pdlv) { + pdlv = pn_link_current(plink); } qd_message_send(qdr_delivery_message(dlv), qlink, qdr_link_strip_annotations_out(link)); - if (!settled && remote_snd_settled) - // Tell the core that the delivery has been accepted and settled, since we are settling on behalf of the receiver - qdr_delivery_update_disposition(router->router_core, dlv, PN_ACCEPTED, true, 0, 0, false); + bool send_complete = qdr_delivery_send_complete(dlv); + + if (send_complete) { + if (!settled && remote_snd_settled) { + // Tell the core that the delivery has been accepted and settled, since we are settling on behalf of the receiver + qdr_delivery_update_disposition(router->router_core, dlv, PN_ACCEPTED, true, 0, 0, false); + } - if (settled || remote_snd_settled) - pn_delivery_settle(pdlv); + pn_link_advance(plink); - pn_link_advance(plink); + if (settled || remote_snd_settled) { + if (pdlv) + pn_delivery_settle(pdlv); + } + } } @@ -1089,6 +1137,7 @@ static void CORE_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t di qdr_delivery_set_context(dlv, 0); pn_delivery_set_context(pnd, 0); pn_delivery_settle(pnd); + qdr_delivery_set_cleared_proton_ref(dlv, true); qdr_delivery_decref(router->router_core, dlv); } } http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c9262728/tests/system_tests_one_router.py ---------------------------------------------------------------------- diff --git a/tests/system_tests_one_router.py b/tests/system_tests_one_router.py index b8f6f9c..f520871 100644 --- a/tests/system_tests_one_router.py +++ b/tests/system_tests_one_router.py @@ -1100,10 +1100,11 @@ class RouterTest(TestCase): test.run() self.assertEqual(None, test.error) - def test_17_multiframe_presettled(self): - test = MultiframePresettledTest(self.address) - test.run() - self.assertEqual(None, test.error) + # Will uncomment this test once https://issues.apache.org/jira/browse/PROTON-1514 is fixed + #def test_17_multiframe_presettled(self): + # test = MultiframePresettledTest(self.address) + # test.run() + # self.assertEqual(None, test.error) def test_18_released_vs_modified(self): test = ReleasedVsModifiedTest(self.address) @@ -1135,6 +1136,11 @@ class RouterTest(TestCase): test.run() self.assertTrue(test.passed) + def test_22_large_streaming_test(self): + test = LargeMessageStreamTest(self.address) + test.run() + self.assertEqual(None, test.error) + def test_reject_disposition(self): test = RejectDispositionTest(self.address) test.run() @@ -1352,6 +1358,53 @@ class MulticastUnsettledTest(MessagingHandler): def run(self): Container(self).run() +class LargeMessageStreamTest(MessagingHandler): + def __init__(self, address): + super(LargeMessageStreamTest, self).__init__() + self.address = address + self.dest = "LargeMessageStreamTest" + self.error = None + self.count = 10 + self.n_sent = 0 + self.timer = None + self.conn = None + self.sender = None + self.receiver = None + self.n_received = 0 + self.body = "" + for i in range(10000): + self.body += "0123456789101112131415" + + def check_if_done(self): + if self.n_received == self.count: + self.timer.cancel() + self.conn.close() + + def timeout(self): + self.error = "Timeout Expired: sent=%d, received=%d" % (self.n_sent, self.n_received) + self.conn.close() + + def on_start(self, event): + self.timer = event.reactor.schedule(TIMEOUT, Timeout(self)) + self.conn = event.container.connect(self.address) + self.sender = event.container.create_sender(self.conn, self.dest) + self.receiver = event.container.create_receiver(self.conn, self.dest, name="A") + self.receiver.flow(self.count) + + def on_sendable(self, event): + for i in range(self.count): + msg = Message(body=self.body) + # send(msg) calls the stream function which streams data from sender to the router + event.sender.send(msg) + self.n_sent += 1 + + def on_message(self, event): + self.n_received += 1 + self.check_if_done() + + def run(self): + Container(self).run() + class MultiframePresettledTest(MessagingHandler): def __init__(self, address): --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org