[ https://issues.apache.org/jira/browse/DISPATCH-767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16078265#comment-16078265 ]
ASF GitHub Bot commented on DISPATCH-767: ----------------------------------------- Github user alanconway commented on a diff in the pull request: https://github.com/apache/qpid-dispatch/pull/172#discussion_r126170901 --- Diff: src/message.c --- @@ -1151,89 +1287,140 @@ void qd_message_send(qd_message_t *in_msg, { qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg; qd_message_content_t *content = msg->content; - qd_buffer_t *buf = DEQ_HEAD(content->buffers); - unsigned char *cursor; + qd_buffer_t *buf = 0; pn_link_t *pnl = qd_link_pn(link); - qd_buffer_list_t new_ma; - DEQ_INIT(new_ma); + // How many receivers does this message have? + int fanout = qd_message_fanout(in_msg); - // Process the message annotations if any - compose_message_annotations(msg, &new_ma, strip_annotations); + if (msg->sent_depth < QD_DEPTH_MESSAGE_ANNOTATIONS) { + // + // Start with the very first buffer; + // + buf = DEQ_HEAD(content->buffers); - // - // This is the case where the message annotations have been modified. - // The message send must be divided into sections: The existing header; - // the new message annotations; the rest of the existing message. - // Note that the original message annotations that are still in the - // buffer chain must not be sent. - // - // Start by making sure that we've parsed the message sections through - // the message annotations - // - // ??? NO LONGER NECESSARY??? - if (!qd_message_check(in_msg, QD_DEPTH_MESSAGE_ANNOTATIONS)) { - qd_log(log_source, QD_LOG_ERROR, "Cannot send: %s", qd_error_message); - return; - } + if (!qd_message_check(in_msg, QD_DEPTH_MESSAGE_ANNOTATIONS)) { + qd_log(log_source, QD_LOG_ERROR, "Cannot send: %s", qd_error_message); + return; + } - // - // Send header if present - // - cursor = qd_buffer_base(buf); - if (content->section_message_header.length > 0) { - buf = content->section_message_header.buffer; - cursor = content->section_message_header.offset + qd_buffer_base(buf); - advance(&cursor, &buf, - content->section_message_header.length + content->section_message_header.hdr_length, - send_handler, (void*) pnl); - } + // + // Send header if present + // + unsigned char *cursor = qd_buffer_base(buf); + int header_consume = content->section_message_header.length + content->section_message_header.hdr_length; + if (content->section_message_header.length > 0) { + buf = content->section_message_header.buffer; + cursor = content->section_message_header.offset + qd_buffer_base(buf); + advance(&cursor, &buf, header_consume, send_handler, (void*) pnl); + } - // - // Send delivery annotation if present - // - if (content->section_delivery_annotation.length > 0) { - buf = content->section_delivery_annotation.buffer; - cursor = content->section_delivery_annotation.offset + qd_buffer_base(buf); - advance(&cursor, &buf, - content->section_delivery_annotation.length + content->section_delivery_annotation.hdr_length, - send_handler, (void*) pnl); - } + // + // Send delivery annotation if present + // + int da_consume = content->section_delivery_annotation.length + content->section_delivery_annotation.hdr_length; + if (content->section_delivery_annotation.length > 0) { + buf = content->section_delivery_annotation.buffer; + cursor = content->section_delivery_annotation.offset + qd_buffer_base(buf); + advance(&cursor, &buf, da_consume, send_handler, (void*) pnl); + } - // - // Send new message annotations - // - qd_buffer_t *da_buf = DEQ_HEAD(new_ma); - while (da_buf) { - char *to_send = (char*) qd_buffer_base(da_buf); - pn_link_send(pnl, to_send, qd_buffer_size(da_buf)); - da_buf = DEQ_NEXT(da_buf); - } - qd_buffer_list_free_buffers(&new_ma); + qd_buffer_list_t new_ma; + DEQ_INIT(new_ma); - // - // Skip over replaced message annotations - // - if (content->section_message_annotation.length > 0) - advance(&cursor, &buf, - content->section_message_annotation.hdr_length + content->section_message_annotation.length, - 0, 0); + // Process the message annotations if any + compose_message_annotations(msg, &new_ma, strip_annotations); + + // + // Send new message annotations + // + qd_buffer_t *da_buf = DEQ_HEAD(new_ma); + while (da_buf) { + char *to_send = (char*) qd_buffer_base(da_buf); + pn_link_send(pnl, to_send, qd_buffer_size(da_buf)); + da_buf = DEQ_NEXT(da_buf); + } + qd_buffer_list_free_buffers(&new_ma); + + // + // Skip over replaced message annotations + // + int ma_consume = content->section_message_annotation.hdr_length + content->section_message_annotation.length; + if (content->section_message_annotation.length > 0) + advance(&cursor, &buf, ma_consume, 0, 0); + + msg->cursor.buffer = buf; + + // + // If this message has no header and no delivery annotations and no message annotations, set the offset to 0. + // + if (header_consume == 0 && da_consume == 0 && ma_consume ==0) + msg->cursor.offset = 0; + else + msg->cursor.offset = cursor - qd_buffer_base(buf); + + msg->sent_depth = QD_DEPTH_MESSAGE_ANNOTATIONS; - // - // Send remaining partial buffer - // - if (buf) { - size_t len = qd_buffer_size(buf) - (cursor - qd_buffer_base(buf)); - advance(&cursor, &buf, len, send_handler, (void*) pnl); } - // Fall through to process the remaining buffers normally - // Note that 'advance' will have moved us to the next buffer in the chain. + buf = msg->cursor.buffer; + if (!buf) + return; + + bool receive_complete = qd_message_receive_complete(in_msg); while (buf) { - pn_link_send(pnl, (char*) qd_buffer_base(buf), qd_buffer_size(buf)); - buf = DEQ_NEXT(buf); + size_t buf_size = qd_buffer_size(buf); + + // This will send the remaining data in the buffer if any. + pn_link_send(pnl, (char*) qd_buffer_at(buf, msg->cursor.offset), buf_size - msg->cursor.offset); + + // If the entire message has been received, there is no need to lock before sending because no one else is + // trying to modify the data structure. + if (!receive_complete) + sys_mutex_lock(msg->content->lock); --- End diff -- I wouldn't bother with the conditional lock - if there's no contention the cost of the lock is small, and the condition introduces one more way for a future programmer to screw up the thread safety logic by mistake. > Message Cut-Through/Streaming for efficient handling of large messages > ---------------------------------------------------------------------- > > Key: DISPATCH-767 > URL: https://issues.apache.org/jira/browse/DISPATCH-767 > Project: Qpid Dispatch > Issue Type: Improvement > Components: Router Node > Reporter: Ted Ross > Assignee: Ganesh Murthy > Fix For: 1.0.0 > > > When large, multi-frame messages are sent through the router, there is no > need to wait for the entire message to arrive before starting to send it > onward. > This feature causes the router to route the first frame and allow subsequent > frames in a delivery to be streamed out in pipeline fashion. Ideally, the > memory usage in the router should only involve pending frames. This would > allow the router to handle arbitrary numbers of concurrent arbitrarily large > messages. -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@qpid.apache.org For additional commands, e-mail: dev-h...@qpid.apache.org