kgiusti commented on a change in pull request #751: URL: https://github.com/apache/qpid-dispatch/pull/751#discussion_r445728883
########## File path: src/message.c ########## @@ -2178,6 +2197,130 @@ void qd_message_compose_4(qd_message_t *msg, qd_composed_field_t *field1, qd_com } +void qd_message_compose_5(qd_message_t *msg, + qd_composed_field_t *headers, + qd_buffer_list_t *body, + bool complete) +{ + qd_message_content_t *content = MSG_CONTENT(msg); + qd_buffer_list_t *headers_buffers = headers ? qd_compose_buffers(headers) : 0; + + DEQ_INIT(content->buffers); + if (headers_buffers) + DEQ_APPEND(content->buffers, (*headers_buffers)); + + if (body) { + DEQ_APPEND(content->buffers, (*body)); + } + + content->receive_complete = complete; +} + + +int qd_message_extend(qd_message_t *msg, qd_buffer_list_t *buffers) +{ + qd_message_content_t *content = MSG_CONTENT(msg); + qd_buffer_t *buf = DEQ_HEAD(*buffers); + int count; + + LOCK(content->lock); + while (buf) { + qd_buffer_set_fanout(buf, content->fanout); + buf = DEQ_NEXT(buf); + } + + DEQ_APPEND(content->buffers, (*buffers)); + count = DEQ_SIZE(content->buffers); + UNLOCK(content->lock); + return count; +} + + +int qd_message_read_body(qd_message_t *in_msg, pn_raw_buffer_t* buffers, int length) +{ + qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg; + if (!(msg->cursor.buffer && msg->cursor.cursor)) { + qd_field_location_t *loc = qd_message_field_location(in_msg, QD_FIELD_BODY); + if (!loc || loc->tag == QD_AMQP_NULL) + return 0; + // TODO: need to actually determine this, could be different if vbin32 sent + int preamble = 5; + if (loc->offset + preamble < qd_buffer_size(loc->buffer)) { + msg->cursor.buffer = loc->buffer; + msg->cursor.cursor = qd_buffer_base(loc->buffer) + loc->offset + preamble; + } else { + msg->cursor.buffer = DEQ_NEXT(loc->buffer); + if (!msg->cursor.buffer) return 0; + msg->cursor.cursor = qd_buffer_base(msg->cursor.buffer) + ((loc->offset + preamble) - qd_buffer_size(loc->buffer)); + } + } + + qd_buffer_t *buf = msg->cursor.buffer; + unsigned char *cursor = msg->cursor.cursor; + + // if we are at the end of the current buffer, try to move to the + // next buffer + if (cursor == qd_buffer_base(buf) + qd_buffer_size(buf)) { + buf = DEQ_NEXT(buf); + if (buf) { + cursor = qd_buffer_base(buf); + msg->cursor.buffer = buf; + msg->cursor.cursor = cursor; + } else { + return 0; + } + } + + int count; + for (count = 0; count < length && buf; count++) { + buffers[count].bytes = (char*) qd_buffer_base(buf); + buffers[count].capacity = qd_buffer_size(buf); + buffers[count].size = qd_buffer_size(buf); + buffers[count].offset = cursor - qd_buffer_base(buf); + buffers[count].context = (uintptr_t) buf; + buf = DEQ_NEXT(buf); + if (buf) { + cursor = qd_buffer_base(buf); + msg->cursor.buffer = buf; + msg->cursor.cursor = cursor; + } else { + msg->cursor.cursor = qd_buffer_base(msg->cursor.buffer) + qd_buffer_size(msg->cursor.buffer); + } + } + return count; +} + + +void qd_message_release_body(qd_message_t *msg, pn_raw_buffer_t *buffers, int buffer_count) +{ + qd_message_pvt_t *pvt = (qd_message_pvt_t*) msg; + qd_message_content_t *content = MSG_CONTENT(msg); + qd_buffer_t *buf; + + LOCK(content->lock); + // + // Decrement the buffer fanout for each of the referenced buffers. + // + if (pvt->is_fanout) { + for (int i = 0; i < buffer_count; i++) { + buf = (qd_buffer_t*) buffers[i].context; + qd_buffer_dec_fanout(buf); Review comment: Will this need to handle release of Q2 flow control on the corresponding incoming link? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@qpid.apache.org For additional commands, e-mail: dev-h...@qpid.apache.org