[ 
https://issues.apache.org/jira/browse/DISPATCH-767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16078265#comment-16078265
 ] 

ASF GitHub Bot commented on DISPATCH-767:
-----------------------------------------

Github user alanconway commented on a diff in the pull request:

    https://github.com/apache/qpid-dispatch/pull/172#discussion_r126170901
  
    --- Diff: src/message.c ---
    @@ -1151,89 +1287,140 @@ void qd_message_send(qd_message_t *in_msg,
     {
         qd_message_pvt_t     *msg     = (qd_message_pvt_t*) in_msg;
         qd_message_content_t *content = msg->content;
    -    qd_buffer_t          *buf     = DEQ_HEAD(content->buffers);
    -    unsigned char        *cursor;
    +    qd_buffer_t          *buf     = 0;
         pn_link_t            *pnl     = qd_link_pn(link);
     
    -    qd_buffer_list_t new_ma;
    -    DEQ_INIT(new_ma);
    +    // How many receivers does this message have?
    +    int                  fanout   = qd_message_fanout(in_msg);
     
    -    // Process  the message annotations if any
    -    compose_message_annotations(msg, &new_ma, strip_annotations);
    +    if (msg->sent_depth < QD_DEPTH_MESSAGE_ANNOTATIONS) {
    +        //
    +        // Start with the very first buffer;
    +        //
    +        buf = DEQ_HEAD(content->buffers);
     
    -    //
    -    // This is the case where the message annotations have been modified.
    -    // The message send must be divided into sections:  The existing 
header;
    -    // the new message annotations; the rest of the existing message.
    -    // Note that the original message annotations that are still in the
    -    // buffer chain must not be sent.
    -    //
    -    // Start by making sure that we've parsed the message sections through
    -    // the message annotations
    -    //
    -    // ??? NO LONGER NECESSARY???
    -    if (!qd_message_check(in_msg, QD_DEPTH_MESSAGE_ANNOTATIONS)) {
    -        qd_log(log_source, QD_LOG_ERROR, "Cannot send: %s", 
qd_error_message);
    -        return;
    -    }
    +        if (!qd_message_check(in_msg, QD_DEPTH_MESSAGE_ANNOTATIONS)) {
    +            qd_log(log_source, QD_LOG_ERROR, "Cannot send: %s", 
qd_error_message);
    +            return;
    +        }
     
    -    //
    -    // Send header if present
    -    //
    -    cursor = qd_buffer_base(buf);
    -    if (content->section_message_header.length > 0) {
    -        buf    = content->section_message_header.buffer;
    -        cursor = content->section_message_header.offset + 
qd_buffer_base(buf);
    -        advance(&cursor, &buf,
    -                content->section_message_header.length + 
content->section_message_header.hdr_length,
    -                send_handler, (void*) pnl);
    -    }
    +        //
    +        // Send header if present
    +        //
    +        unsigned char *cursor = qd_buffer_base(buf);
    +        int header_consume = content->section_message_header.length + 
content->section_message_header.hdr_length;
    +        if (content->section_message_header.length > 0) {
    +            buf    = content->section_message_header.buffer;
    +            cursor = content->section_message_header.offset + 
qd_buffer_base(buf);
    +            advance(&cursor, &buf, header_consume, send_handler, (void*) 
pnl);
    +        }
     
    -    //
    -    // Send delivery annotation if present
    -    //
    -    if (content->section_delivery_annotation.length > 0) {
    -        buf    = content->section_delivery_annotation.buffer;
    -        cursor = content->section_delivery_annotation.offset + 
qd_buffer_base(buf);
    -        advance(&cursor, &buf,
    -                content->section_delivery_annotation.length + 
content->section_delivery_annotation.hdr_length,
    -                send_handler, (void*) pnl);
    -    }
    +        //
    +        // Send delivery annotation if present
    +        //
    +        int da_consume = content->section_delivery_annotation.length + 
content->section_delivery_annotation.hdr_length;
    +        if (content->section_delivery_annotation.length > 0) {
    +            buf    = content->section_delivery_annotation.buffer;
    +            cursor = content->section_delivery_annotation.offset + 
qd_buffer_base(buf);
    +            advance(&cursor, &buf, da_consume, send_handler, (void*) pnl);
    +        }
     
    -    //
    -    // Send new message annotations
    -    //
    -    qd_buffer_t *da_buf = DEQ_HEAD(new_ma);
    -    while (da_buf) {
    -        char *to_send = (char*) qd_buffer_base(da_buf);
    -        pn_link_send(pnl, to_send, qd_buffer_size(da_buf));
    -        da_buf = DEQ_NEXT(da_buf);
    -    }
    -    qd_buffer_list_free_buffers(&new_ma);
    +        qd_buffer_list_t new_ma;
    +        DEQ_INIT(new_ma);
     
    -    //
    -    // Skip over replaced message annotations
    -    //
    -    if (content->section_message_annotation.length > 0)
    -        advance(&cursor, &buf,
    -                content->section_message_annotation.hdr_length + 
content->section_message_annotation.length,
    -                0, 0);
    +        // Process  the message annotations if any
    +        compose_message_annotations(msg, &new_ma, strip_annotations);
    +
    +        //
    +        // Send new message annotations
    +        //
    +        qd_buffer_t *da_buf = DEQ_HEAD(new_ma);
    +        while (da_buf) {
    +            char *to_send = (char*) qd_buffer_base(da_buf);
    +            pn_link_send(pnl, to_send, qd_buffer_size(da_buf));
    +            da_buf = DEQ_NEXT(da_buf);
    +        }
    +        qd_buffer_list_free_buffers(&new_ma);
    +
    +        //
    +        // Skip over replaced message annotations
    +        //
    +        int ma_consume = content->section_message_annotation.hdr_length + 
content->section_message_annotation.length;
    +        if (content->section_message_annotation.length > 0)
    +            advance(&cursor, &buf, ma_consume, 0, 0);
    +
    +        msg->cursor.buffer = buf;
    +
    +        //
    +        // If this message has no header and no delivery annotations and 
no message annotations, set the offset to 0.
    +        //
    +        if (header_consume == 0 && da_consume == 0 && ma_consume ==0)
    +            msg->cursor.offset = 0;
    +        else
    +            msg->cursor.offset = cursor - qd_buffer_base(buf);
    +
    +        msg->sent_depth = QD_DEPTH_MESSAGE_ANNOTATIONS;
     
    -    //
    -    // Send remaining partial buffer
    -    //
    -    if (buf) {
    -        size_t len = qd_buffer_size(buf) - (cursor - qd_buffer_base(buf));
    -        advance(&cursor, &buf, len, send_handler, (void*) pnl);
         }
     
    -    // Fall through to process the remaining buffers normally
    -    // Note that 'advance' will have moved us to the next buffer in the 
chain.
    +    buf = msg->cursor.buffer;
     
    +    if (!buf)
    +        return;
    +
    +    bool receive_complete = qd_message_receive_complete(in_msg);
     
         while (buf) {
    -        pn_link_send(pnl, (char*) qd_buffer_base(buf), 
qd_buffer_size(buf));
    -        buf = DEQ_NEXT(buf);
    +        size_t buf_size = qd_buffer_size(buf);
    +
    +        // This will send the remaining data in the buffer if any.
    +        pn_link_send(pnl, (char*) qd_buffer_at(buf, msg->cursor.offset), 
buf_size - msg->cursor.offset);
    +
    +        // If the entire message has been received,  there is no need to 
lock before sending because no one else is
    +        // trying to modify the data structure.
    +        if (!receive_complete)
    +            sys_mutex_lock(msg->content->lock);
    --- End diff --
    
    I wouldn't bother with the conditional lock  - if there's no contention the 
cost of the lock is small, and the condition introduces one more way for a 
future programmer to screw up the thread safety logic by mistake.


> Message Cut-Through/Streaming for efficient handling of large messages
> ----------------------------------------------------------------------
>
>                 Key: DISPATCH-767
>                 URL: https://issues.apache.org/jira/browse/DISPATCH-767
>             Project: Qpid Dispatch
>          Issue Type: Improvement
>          Components: Router Node
>            Reporter: Ted Ross
>            Assignee: Ganesh Murthy
>             Fix For: 1.0.0
>
>
> When large, multi-frame messages are sent through the router, there is no 
> need to wait for the entire message to arrive before starting to send it 
> onward.
> This feature causes the router to route the first frame and allow subsequent 
> frames in a delivery to be streamed out in pipeline fashion.  Ideally, the 
> memory usage in the router should only involve pending frames.  This would 
> allow the router to handle arbitrary numbers of concurrent arbitrarily large 
> messages.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@qpid.apache.org
For additional commands, e-mail: dev-h...@qpid.apache.org

Reply via email to