kgiusti commented on a change in pull request #751:
URL: https://github.com/apache/qpid-dispatch/pull/751#discussion_r445728883



##########
File path: src/message.c
##########
@@ -2178,6 +2197,130 @@ void qd_message_compose_4(qd_message_t *msg, 
qd_composed_field_t *field1, qd_com
 }
 
 
+void qd_message_compose_5(qd_message_t        *msg,
+                          qd_composed_field_t *headers,
+                          qd_buffer_list_t    *body,
+                          bool                 complete)
+{
+    qd_message_content_t *content         = MSG_CONTENT(msg);
+    qd_buffer_list_t     *headers_buffers = headers ? 
qd_compose_buffers(headers) : 0;
+
+    DEQ_INIT(content->buffers);
+    if (headers_buffers)
+        DEQ_APPEND(content->buffers, (*headers_buffers));
+
+    if (body) {
+        DEQ_APPEND(content->buffers, (*body));
+    }
+
+    content->receive_complete = complete;
+}
+
+
+int qd_message_extend(qd_message_t *msg, qd_buffer_list_t *buffers)
+{
+    qd_message_content_t *content = MSG_CONTENT(msg);
+    qd_buffer_t          *buf     = DEQ_HEAD(*buffers);
+    int                   count;
+
+    LOCK(content->lock);
+    while (buf) {
+        qd_buffer_set_fanout(buf, content->fanout);
+        buf = DEQ_NEXT(buf);
+    }
+
+    DEQ_APPEND(content->buffers, (*buffers));
+    count = DEQ_SIZE(content->buffers);
+    UNLOCK(content->lock);
+    return count;
+}
+
+
+int qd_message_read_body(qd_message_t *in_msg, pn_raw_buffer_t* buffers, int 
length)
+{
+    qd_message_pvt_t     *msg     = (qd_message_pvt_t*) in_msg;
+    if (!(msg->cursor.buffer && msg->cursor.cursor)) {
+        qd_field_location_t  *loc     = qd_message_field_location(in_msg, 
QD_FIELD_BODY);
+        if (!loc || loc->tag == QD_AMQP_NULL)
+            return 0;
+        // TODO: need to actually determine this, could be different if vbin32 
sent
+        int preamble = 5;
+        if (loc->offset + preamble < qd_buffer_size(loc->buffer)) {
+            msg->cursor.buffer = loc->buffer;
+            msg->cursor.cursor = qd_buffer_base(loc->buffer) + loc->offset + 
preamble;
+        } else {
+            msg->cursor.buffer = DEQ_NEXT(loc->buffer);
+            if (!msg->cursor.buffer) return 0;
+            msg->cursor.cursor = qd_buffer_base(msg->cursor.buffer) + 
((loc->offset + preamble) - qd_buffer_size(loc->buffer));
+        }
+    }
+
+    qd_buffer_t   *buf    = msg->cursor.buffer;
+    unsigned char *cursor = msg->cursor.cursor;
+
+    // if we are at the end of the current buffer, try to move to the
+    // next buffer
+    if (cursor == qd_buffer_base(buf) + qd_buffer_size(buf)) {
+        buf = DEQ_NEXT(buf);
+        if (buf) {
+            cursor = qd_buffer_base(buf);
+            msg->cursor.buffer = buf;
+            msg->cursor.cursor = cursor;
+        } else {
+            return 0;
+        }
+    }
+
+    int count;
+    for (count = 0; count < length && buf; count++) {
+        buffers[count].bytes = (char*) qd_buffer_base(buf);
+        buffers[count].capacity = qd_buffer_size(buf);
+        buffers[count].size = qd_buffer_size(buf);
+        buffers[count].offset = cursor - qd_buffer_base(buf);
+        buffers[count].context = (uintptr_t) buf;
+        buf = DEQ_NEXT(buf);
+        if (buf) {
+            cursor = qd_buffer_base(buf);
+            msg->cursor.buffer = buf;
+            msg->cursor.cursor = cursor;
+        } else {
+            msg->cursor.cursor = qd_buffer_base(msg->cursor.buffer) + 
qd_buffer_size(msg->cursor.buffer);
+        }
+    }
+    return count;
+}
+
+
+void qd_message_release_body(qd_message_t *msg, pn_raw_buffer_t *buffers, int 
buffer_count)
+{
+    qd_message_pvt_t     *pvt     = (qd_message_pvt_t*) msg;
+    qd_message_content_t *content = MSG_CONTENT(msg);
+    qd_buffer_t          *buf;
+
+    LOCK(content->lock);
+    //
+    // Decrement the buffer fanout for each of the referenced buffers.
+    //
+    if (pvt->is_fanout) {
+        for (int i = 0; i < buffer_count; i++) {
+            buf = (qd_buffer_t*) buffers[i].context;
+            qd_buffer_dec_fanout(buf);

Review comment:
       Will this need to handle release of Q2 flow control on the corresponding 
incoming link?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@qpid.apache.org
For additional commands, e-mail: dev-h...@qpid.apache.org

Reply via email to