This is an automated email from the ASF dual-hosted git repository. tross pushed a commit to branch dev-protocol-adaptors in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
commit 20f841a982cab439cc1e902f0b4f961d21133b60 Author: Ted Ross <tr...@apache.org> AuthorDate: Fri Jun 19 13:19:12 2020 -0400 Dataplane: (from gsim) Implementation of qd_message_read_body. --- src/message.c | 55 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/src/message.c b/src/message.c index 97bcab2..55d134e 100644 --- a/src/message.c +++ b/src/message.c @@ -2237,6 +2237,61 @@ int qd_message_extend(qd_message_t *msg, qd_buffer_list_t *buffers) } +int qd_message_read_body(qd_message_t *in_msg, pn_raw_buffer_t* buffers, int length) +{ + qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg; + if (!(msg->cursor.buffer && msg->cursor.cursor)) { + qd_field_location_t *loc = qd_message_field_location(in_msg, QD_FIELD_BODY); + if (!loc || loc->tag == QD_AMQP_NULL) + return 0; + // TODO: need to actually determine this, could be different if vbin32 sent + int preamble = 5; + if (loc->offset + preamble < qd_buffer_size(loc->buffer)) { + msg->cursor.buffer = loc->buffer; + msg->cursor.cursor = qd_buffer_base(loc->buffer) + loc->offset + preamble; + } else { + msg->cursor.buffer = DEQ_NEXT(loc->buffer); + if (!msg->cursor.buffer) return 0; + msg->cursor.cursor = qd_buffer_base(msg->cursor.buffer) + ((loc->offset + preamble) - qd_buffer_size(loc->buffer)); + } + } + + qd_buffer_t *buf = msg->cursor.buffer; + unsigned char *cursor = msg->cursor.cursor; + + // if we are at the end of the current buffer, try to move to the + // next buffer + if (cursor == qd_buffer_base(buf) + qd_buffer_size(buf)) { + buf = DEQ_NEXT(buf); + if (buf) { + cursor = qd_buffer_base(buf); + msg->cursor.buffer = buf; + msg->cursor.cursor = cursor; + } else { + return 0; + } + } + + int count; + for (count = 0; count < length && buf; count++) { + buffers[count].bytes = (char*) qd_buffer_base(buf); + buffers[count].capacity = qd_buffer_size(buf); + buffers[count].size = qd_buffer_size(buf); + buffers[count].offset = cursor - qd_buffer_base(buf); + buffers[count].context = (uintptr_t) buf; + buf = DEQ_NEXT(buf); + if (buf) { + cursor = qd_buffer_base(buf); + msg->cursor.buffer = buf; + msg->cursor.cursor = cursor; + } else { + msg->cursor.cursor = qd_buffer_base(msg->cursor.buffer) + qd_buffer_size(msg->cursor.buffer); + } + } + return count; +} + + qd_parsed_field_t *qd_message_get_ingress (qd_message_t *msg) { return ((qd_message_pvt_t*)msg)->content->ma_pf_ingress; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org