[
https://issues.apache.org/jira/browse/DISPATCH-767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16078265#comment-16078265
]
ASF GitHub Bot commented on DISPATCH-767:
-----------------------------------------
Github user alanconway commented on a diff in the pull request:
https://github.com/apache/qpid-dispatch/pull/172#discussion_r126170901
--- Diff: src/message.c ---
@@ -1151,89 +1287,140 @@ void qd_message_send(qd_message_t *in_msg,
{
qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
qd_message_content_t *content = msg->content;
- qd_buffer_t *buf = DEQ_HEAD(content->buffers);
- unsigned char *cursor;
+ qd_buffer_t *buf = 0;
pn_link_t *pnl = qd_link_pn(link);
- qd_buffer_list_t new_ma;
- DEQ_INIT(new_ma);
+ // How many receivers does this message have?
+ int fanout = qd_message_fanout(in_msg);
- // Process the message annotations if any
- compose_message_annotations(msg, &new_ma, strip_annotations);
+ if (msg->sent_depth < QD_DEPTH_MESSAGE_ANNOTATIONS) {
+ //
+ // Start with the very first buffer;
+ //
+ buf = DEQ_HEAD(content->buffers);
- //
- // This is the case where the message annotations have been modified.
- // The message send must be divided into sections: The existing
header;
- // the new message annotations; the rest of the existing message.
- // Note that the original message annotations that are still in the
- // buffer chain must not be sent.
- //
- // Start by making sure that we've parsed the message sections through
- // the message annotations
- //
- // ??? NO LONGER NECESSARY???
- if (!qd_message_check(in_msg, QD_DEPTH_MESSAGE_ANNOTATIONS)) {
- qd_log(log_source, QD_LOG_ERROR, "Cannot send: %s",
qd_error_message);
- return;
- }
+ if (!qd_message_check(in_msg, QD_DEPTH_MESSAGE_ANNOTATIONS)) {
+ qd_log(log_source, QD_LOG_ERROR, "Cannot send: %s",
qd_error_message);
+ return;
+ }
- //
- // Send header if present
- //
- cursor = qd_buffer_base(buf);
- if (content->section_message_header.length > 0) {
- buf = content->section_message_header.buffer;
- cursor = content->section_message_header.offset +
qd_buffer_base(buf);
- advance(&cursor, &buf,
- content->section_message_header.length +
content->section_message_header.hdr_length,
- send_handler, (void*) pnl);
- }
+ //
+ // Send header if present
+ //
+ unsigned char *cursor = qd_buffer_base(buf);
+ int header_consume = content->section_message_header.length +
content->section_message_header.hdr_length;
+ if (content->section_message_header.length > 0) {
+ buf = content->section_message_header.buffer;
+ cursor = content->section_message_header.offset +
qd_buffer_base(buf);
+ advance(&cursor, &buf, header_consume, send_handler, (void*)
pnl);
+ }
- //
- // Send delivery annotation if present
- //
- if (content->section_delivery_annotation.length > 0) {
- buf = content->section_delivery_annotation.buffer;
- cursor = content->section_delivery_annotation.offset +
qd_buffer_base(buf);
- advance(&cursor, &buf,
- content->section_delivery_annotation.length +
content->section_delivery_annotation.hdr_length,
- send_handler, (void*) pnl);
- }
+ //
+ // Send delivery annotation if present
+ //
+ int da_consume = content->section_delivery_annotation.length +
content->section_delivery_annotation.hdr_length;
+ if (content->section_delivery_annotation.length > 0) {
+ buf = content->section_delivery_annotation.buffer;
+ cursor = content->section_delivery_annotation.offset +
qd_buffer_base(buf);
+ advance(&cursor, &buf, da_consume, send_handler, (void*) pnl);
+ }
- //
- // Send new message annotations
- //
- qd_buffer_t *da_buf = DEQ_HEAD(new_ma);
- while (da_buf) {
- char *to_send = (char*) qd_buffer_base(da_buf);
- pn_link_send(pnl, to_send, qd_buffer_size(da_buf));
- da_buf = DEQ_NEXT(da_buf);
- }
- qd_buffer_list_free_buffers(&new_ma);
+ qd_buffer_list_t new_ma;
+ DEQ_INIT(new_ma);
- //
- // Skip over replaced message annotations
- //
- if (content->section_message_annotation.length > 0)
- advance(&cursor, &buf,
- content->section_message_annotation.hdr_length +
content->section_message_annotation.length,
- 0, 0);
+ // Process the message annotations if any
+ compose_message_annotations(msg, &new_ma, strip_annotations);
+
+ //
+ // Send new message annotations
+ //
+ qd_buffer_t *da_buf = DEQ_HEAD(new_ma);
+ while (da_buf) {
+ char *to_send = (char*) qd_buffer_base(da_buf);
+ pn_link_send(pnl, to_send, qd_buffer_size(da_buf));
+ da_buf = DEQ_NEXT(da_buf);
+ }
+ qd_buffer_list_free_buffers(&new_ma);
+
+ //
+ // Skip over replaced message annotations
+ //
+ int ma_consume = content->section_message_annotation.hdr_length +
content->section_message_annotation.length;
+ if (content->section_message_annotation.length > 0)
+ advance(&cursor, &buf, ma_consume, 0, 0);
+
+ msg->cursor.buffer = buf;
+
+ //
+ // If this message has no header and no delivery annotations and
no message annotations, set the offset to 0.
+ //
+ if (header_consume == 0 && da_consume == 0 && ma_consume ==0)
+ msg->cursor.offset = 0;
+ else
+ msg->cursor.offset = cursor - qd_buffer_base(buf);
+
+ msg->sent_depth = QD_DEPTH_MESSAGE_ANNOTATIONS;
- //
- // Send remaining partial buffer
- //
- if (buf) {
- size_t len = qd_buffer_size(buf) - (cursor - qd_buffer_base(buf));
- advance(&cursor, &buf, len, send_handler, (void*) pnl);
}
- // Fall through to process the remaining buffers normally
- // Note that 'advance' will have moved us to the next buffer in the
chain.
+ buf = msg->cursor.buffer;
+ if (!buf)
+ return;
+
+ bool receive_complete = qd_message_receive_complete(in_msg);
while (buf) {
- pn_link_send(pnl, (char*) qd_buffer_base(buf),
qd_buffer_size(buf));
- buf = DEQ_NEXT(buf);
+ size_t buf_size = qd_buffer_size(buf);
+
+ // This will send the remaining data in the buffer if any.
+ pn_link_send(pnl, (char*) qd_buffer_at(buf, msg->cursor.offset),
buf_size - msg->cursor.offset);
+
+ // If the entire message has been received, there is no need to
lock before sending because no one else is
+ // trying to modify the data structure.
+ if (!receive_complete)
+ sys_mutex_lock(msg->content->lock);
--- End diff --
I wouldn't bother with the conditional lock - if there's no contention the
cost of the lock is small, and the condition introduces one more way for a
future programmer to screw up the thread safety logic by mistake.
> Message Cut-Through/Streaming for efficient handling of large messages
> ----------------------------------------------------------------------
>
> Key: DISPATCH-767
> URL: https://issues.apache.org/jira/browse/DISPATCH-767
> Project: Qpid Dispatch
> Issue Type: Improvement
> Components: Router Node
> Reporter: Ted Ross
> Assignee: Ganesh Murthy
> Fix For: 1.0.0
>
>
> When large, multi-frame messages are sent through the router, there is no
> need to wait for the entire message to arrive before starting to send it
> onward.
> This feature causes the router to route the first frame and allow subsequent
> frames in a delivery to be streamed out in pipeline fashion. Ideally, the
> memory usage in the router should only involve pending frames. This would
> allow the router to handle arbitrary numbers of concurrent arbitrarily large
> messages.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]