This is an automated email from the ASF dual-hosted git repository. tross pushed a commit to branch dev-protocol-adaptors in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
commit 5d021e224119f78ce043d7c1fc4d94b76f49e7c0 Author: Ted Ross <tr...@apache.org> AuthorDate: Fri Aug 7 17:30:34 2020 -0400 DISPATCH-1742 - Completed implementation of outbound streaming path --- src/adaptors/reference_adaptor.c | 89 ++++++++++++++++++++++----------- src/message.c | 103 ++++++++++++++++++++++++++++++++++----- src/message_private.h | 1 + 3 files changed, 154 insertions(+), 39 deletions(-) diff --git a/src/adaptors/reference_adaptor.c b/src/adaptors/reference_adaptor.c index b6b0c1b..3c05bd8 100644 --- a/src/adaptors/reference_adaptor.c +++ b/src/adaptors/reference_adaptor.c @@ -247,39 +247,72 @@ static uint64_t qdr_ref_deliver(void *context, qdr_link_t *link, qdr_delivery_t switch (status) { case QD_MESSAGE_DEPTH_OK: { + // + // At least one complete body performative has arrived. It is now safe to switch + // over to the per-message extraction of body-data segments. + // printf("qdr_ref_deliver: depth ok\n"); qd_message_body_data_t *body_data; qd_message_body_data_result_t body_data_result; - body_data_result = qd_message_next_body_data(msg, &body_data); - - switch (body_data_result) { - case QD_MESSAGE_BODY_DATA_OK: { - qd_iterator_t *body_iter = qd_message_body_data_iterator(body_data); - char *body = (char*) qd_iterator_copy(body_iter); - printf("qdr_ref_deliver: message body-data received: %s\n", body); - free(body); - qd_iterator_free(body_iter); - break; - } + + // + // Process as many body-data segments as are available. + // + while (true) { + body_data_result = qd_message_next_body_data(msg, &body_data); + + switch (body_data_result) { + case QD_MESSAGE_BODY_DATA_OK: { + // + // We have a new valid body-data segment. Handle it + // + printf("qdr_ref_deliver: body_data_buffer_count: %d\n", qd_message_body_data_buffer_count(body_data)); + + qd_iterator_t *body_iter = qd_message_body_data_iterator(body_data); + char *body = (char*) qd_iterator_copy(body_iter); + printf("qdr_ref_deliver: message body-data received: %s\n", body); + free(body); + qd_iterator_free(body_iter); + qd_message_body_data_release(body_data); + break; + } - case QD_MESSAGE_BODY_DATA_INCOMPLETE: - printf("qdr_ref_deliver: body-data incomplete\n"); - break; - - case QD_MESSAGE_BODY_DATA_NO_MORE: - qd_message_set_send_complete(msg); - qdr_link_flow(adaptor->core, link, 1, false); - return PN_ACCEPTED; // This will cause the delivery to be settled + case QD_MESSAGE_BODY_DATA_INCOMPLETE: + // + // A new segment has not completely arrived yet. Check again later. + // + printf("qdr_ref_deliver: body-data incomplete\n"); + return 0; + + case QD_MESSAGE_BODY_DATA_NO_MORE: + // + // We have already handled the last body-data segment for this delivery. + // Complete the "sending" of this delivery and replenish credit. + // + // Note that depending on the adaptor, it might be desirable to delay the + // acceptance and settlement of this delivery until a later event (i.e. when + // a requested action has completed). + // + qd_message_set_send_complete(msg); + qdr_link_flow(adaptor->core, link, 1, false); + return PN_ACCEPTED; // This will cause the delivery to be settled - case QD_MESSAGE_BODY_DATA_INVALID: - printf("qdr_ref_deliver: body-data invalid\n"); - qdr_link_flow(adaptor->core, link, 1, false); - return PN_REJECTED; - - case QD_MESSAGE_BODY_DATA_NOT_DATA: - printf("qdr_ref_deliver: body not data\n"); - qdr_link_flow(adaptor->core, link, 1, false); - return PN_REJECTED; + case QD_MESSAGE_BODY_DATA_INVALID: + // + // The body-data is corrupt in some way. Stop handling the delivery and reject it. + // + printf("qdr_ref_deliver: body-data invalid\n"); + qdr_link_flow(adaptor->core, link, 1, false); + return PN_REJECTED; + + case QD_MESSAGE_BODY_DATA_NOT_DATA: + // + // Valid data was seen, but it is not a body-data performative. Reject the delivery. + // + printf("qdr_ref_deliver: body not data\n"); + qdr_link_flow(adaptor->core, link, 1, false); + return PN_REJECTED; + } } break; diff --git a/src/message.c b/src/message.c index 24aa928..69312f3 100644 --- a/src/message.c +++ b/src/message.c @@ -658,7 +658,8 @@ static qd_section_status_t message_section_check(qd_buffer_t **buffer, const unsigned char *pattern, int pattern_length, const unsigned char *expected_tags, - qd_field_location_t *location) + qd_field_location_t *location, + bool dup_ok) { if (!*cursor || !can_advance(cursor, buffer)) return QD_SECTION_NEED_MORE; @@ -691,7 +692,7 @@ static qd_section_status_t message_section_check(qd_buffer_t **buffer, if (*expected_tags == 0) return QD_SECTION_INVALID; // Error: Unexpected tag - if (location->parsed) + if (location->parsed && !dup_ok) return QD_SECTION_INVALID; // Error: Duplicate section // @@ -1916,9 +1917,9 @@ static qd_message_depth_status_t message_check_depth_LH(qd_message_content_t *co return QD_MESSAGE_DEPTH_OK; qd_section_status_t rc; - rc = message_section_check(&content->parse_buffer, &content->parse_cursor, short_pattern, SHORT, expected_tags, location); + rc = message_section_check(&content->parse_buffer, &content->parse_cursor, short_pattern, SHORT, expected_tags, location, false); if (rc == QD_SECTION_NO_MATCH) // try the alternative - rc = message_section_check(&content->parse_buffer, &content->parse_cursor, long_pattern, LONG, expected_tags, location); + rc = message_section_check(&content->parse_buffer, &content->parse_cursor, long_pattern, LONG, expected_tags, location, false); if (rc == QD_SECTION_MATCH || (optional && rc == QD_SECTION_NO_MATCH)) { content->parse_depth = depth; @@ -2302,8 +2303,49 @@ int qd_message_extend(qd_message_t *msg, qd_composed_field_t *field) */ static void find_last_buffer(qd_field_location_t *location, unsigned char **cursor, qd_buffer_t **buffer) { - //qd_buffer_t *buf = location->buffer; - + qd_buffer_t *buf = location->buffer; + size_t remaining = location->hdr_length + location->length; + + while (!!buf && remaining > 0) { + size_t this_buf_size = qd_buffer_size(buf) - (buf == location->buffer ? location->offset : 0); + if (remaining <= this_buf_size) { + *buffer = buf; + *cursor = qd_buffer_base(buf) + (buf == location->buffer ? location->offset : 0) + remaining; + return; + } + remaining -= this_buf_size; + buf = DEQ_NEXT(buf); + } + + assert(false); // The field should already have been validated as complete. +} + + +void trim_body_data_headers(qd_message_body_data_t *body_data) +{ + const qd_field_location_t *location = &body_data->section; + qd_buffer_t *buffer = location->buffer; + unsigned char *cursor = qd_buffer_base(buffer) + location->offset; + + bool good = advance(&cursor, &buffer, location->hdr_length); + assert(good); + if (good) { + unsigned char tag = 0; + next_octet(&cursor, &buffer, &tag); + if (tag == QD_AMQP_VBIN8) + advance(&cursor, &buffer, 1); + else if (tag == QD_AMQP_VBIN32) + advance(&cursor, &buffer, 4); + + can_advance(&cursor, &buffer); // bump cursor to the next buffer if necessary + + body_data->payload.buffer = buffer; + body_data->payload.offset = cursor - qd_buffer_base(buffer); + body_data->payload.length = location->length; + body_data->payload.hdr_length = 0; + body_data->payload.parsed = true; + body_data->payload.tag = tag; + } } @@ -2317,18 +2359,27 @@ static void find_last_buffer(qd_field_location_t *location, unsigned char **curs */ qd_iterator_t *qd_message_body_data_iterator(const qd_message_body_data_t *body_data) { - return 0; + const qd_field_location_t *location = &body_data->payload; + + return qd_iterator_buffer(location->buffer, location->offset, location->length, ITER_VIEW_ALL); } /** * qd_message_body_data_buffer_count * - * Return the number of buffers contained in the body_data object. + * Return the number of buffers contained in payload portion of the body_data object. */ int qd_message_body_data_buffer_count(const qd_message_body_data_t *body_data) { - return 0; + int count = 1; + qd_buffer_t *buffer = body_data->payload.buffer; + while (!!buffer && buffer != body_data->last_buffer) { + buffer = DEQ_NEXT(buffer); + count++; + } + + return count; } @@ -2341,7 +2392,34 @@ int qd_message_body_data_buffer_count(const qd_message_body_data_t *body_data) */ int qd_message_body_data_buffers(qd_message_body_data_t *body_data, pn_raw_buffer_t *buffers, int offset, int count) { - return 0; + int actual_count = 0; + qd_buffer_t *buffer = body_data->payload.buffer; + + // + // Skip the offset + // + while (offset > 0 && !!buffer) { + buffer = DEQ_NEXT(buffer); + offset--; + } + + // + // Fill the buffer array + // + int idx = 0; + while (idx < count && !!buffer) { + buffers[idx].context = 0; + buffers[idx].bytes = (char*) qd_buffer_base(buffer) + (buffer == body_data->payload.buffer ? body_data->payload.offset : 0); + buffers[idx].capacity = BUFFER_SIZE; + buffers[idx].size = qd_buffer_size(buffer) - (buffer == body_data->payload.buffer ? body_data->payload.offset : 0); + buffers[idx].offset = 0; + + buffer = DEQ_NEXT(buffer); + actual_count++; + idx++; + } + + return actual_count; } @@ -2353,6 +2431,7 @@ int qd_message_body_data_buffers(qd_message_body_data_t *body_data, pn_raw_buffe */ void qd_message_body_data_release(qd_message_body_data_t *body_data) { + free_qd_message_body_data_t(body_data); } @@ -2374,6 +2453,7 @@ qd_message_body_data_result_t qd_message_next_body_data(qd_message_t *in_msg, qd find_last_buffer(&body_data->section, &msg->body_cursor, &msg->body_buffer); body_data->last_buffer = msg->body_buffer; + trim_body_data_headers(body_data); assert(DEQ_SIZE(msg->body_data_list) == 0); DEQ_INSERT_TAIL(msg->body_data_list, body_data); @@ -2390,7 +2470,7 @@ qd_message_body_data_result_t qd_message_next_body_data(qd_message_t *in_msg, qd section_status = message_section_check(&msg->body_buffer, &msg->body_cursor, BODY_DATA_SHORT, 3, TAGS_BINARY, - &location); + &location, true); switch (section_status) { case QD_SECTION_INVALID: @@ -2404,6 +2484,7 @@ qd_message_body_data_result_t qd_message_next_body_data(qd_message_t *in_msg, qd body_data->section = location; find_last_buffer(&body_data->section, &msg->body_cursor, &msg->body_buffer); body_data->last_buffer = msg->body_buffer; + trim_body_data_headers(body_data); DEQ_INSERT_TAIL(msg->body_data_list, body_data); *out_body_data = body_data; return QD_MESSAGE_BODY_DATA_OK; diff --git a/src/message_private.h b/src/message_private.h index 6dc901a..20f4ce4 100644 --- a/src/message_private.h +++ b/src/message_private.h @@ -65,6 +65,7 @@ struct qd_message_body_data_t { DEQ_LINKS(qd_message_body_data_t); // Linkage to form a DEQ qd_message_pvt_t *owning_message; // Pointer to the owning message qd_field_location_t section; // Section descriptor for the field + qd_field_location_t payload; // Descriptor for the payload of the body data qd_buffer_t *last_buffer; // Pointer to the last buffer in the field }; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org