This is an automated email from the ASF dual-hosted git repository. tross pushed a commit to branch dev-protocol-adaptors in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
commit df0568a8d4f03d04fe5046ee4159c330bb4291ca Author: Ted Ross <tr...@apache.org> AuthorDate: Wed Jul 15 17:25:35 2020 -0400 Dataplane: Fixed message parsing so it can handle partial and streaming content. --- src/message.c | 256 +++++++++++++++++++++++++++++++------------------- src/message_private.h | 8 +- 2 files changed, 165 insertions(+), 99 deletions(-) diff --git a/src/message.c b/src/message.c index 7f11e5e..c00b909 100644 --- a/src/message.c +++ b/src/message.c @@ -122,7 +122,6 @@ static void quote(char* bytes, int n, char **begin, char *end) { /** * Populates the buffer with formatted epoch_time */ -//static void format_time(pn_timestamp_t epoch_time, char *format, char *buffer, size_t len) static void format_time(pn_timestamp_t epoch_time, char *format, char *buffer, size_t len) { struct timeval local_timeval; @@ -369,36 +368,59 @@ char* qd_message_repr(qd_message_t *msg, char* buffer, size_t len, qd_log_bits f return buffer; } + +/** + * Return true if there is at least one consumable octet in the buffer chain + * starting at *cursor. If the cursor is beyond the end of the buffer, and there + * is another buffer in the chain, move the cursor and buffer pointers to reference + * the first octet in the next buffer. Note that this movement does NOT constitute + * advancement of the cursor in the buffer chain. + */ +static bool can_advance(unsigned char **cursor, qd_buffer_t **buffer) +{ + if (qd_buffer_cursor(*buffer) > *cursor) + return true; + + if (DEQ_NEXT(*buffer)) { + *buffer = DEQ_NEXT(*buffer); + *cursor = qd_buffer_base(*buffer); + } + + return qd_buffer_cursor(*buffer) > *cursor; +} + + /** * Advance cursor through buffer chain by 'consume' bytes. * Cursor and buffer args are advanced to point to new position in buffer chain. * - if the number of bytes in the buffer chain is less than or equal to - * the consume number then set *cursor and *buffer to NULL and - * return the number of missing bytes + * the consume number then return false * - the original buffer chain is not changed or freed. * * @param cursor Pointer into current buffer content * @param buffer pointer to current buffer * @param consume number of bytes to advance - * @return 0 if all bytes consumed, != 0 if not enough bytes available + * @return true if all bytes consumed, false if not enough bytes available */ -static int advance(unsigned char **cursor, qd_buffer_t **buffer, int consume) +static bool advance(unsigned char **cursor, qd_buffer_t **buffer, int consume) { + if (!can_advance(cursor, buffer)) + return false; + unsigned char *local_cursor = *cursor; qd_buffer_t *local_buffer = *buffer; int remaining = qd_buffer_cursor(local_buffer) - local_cursor; while (consume > 0) { - if (consume < remaining) { + if (consume <= remaining) { local_cursor += consume; consume = 0; } else { + if (!local_buffer->next) + return false; + consume -= remaining; local_buffer = local_buffer->next; - if (local_buffer == 0){ - local_cursor = 0; - break; - } local_cursor = qd_buffer_base(local_buffer); remaining = qd_buffer_size(local_buffer); } @@ -407,7 +429,7 @@ static int advance(unsigned char **cursor, qd_buffer_t **buffer, int consume) *cursor = local_cursor; *buffer = local_buffer; - return consume; + return true; } @@ -457,21 +479,29 @@ static void advance_guarded(unsigned char **cursor, qd_buffer_t **buffer, int co } -static unsigned char next_octet(unsigned char **cursor, qd_buffer_t **buffer) +/** + * If there is an octet to be consumed, put it in octet and return true, else return false. + */ +static bool next_octet(unsigned char **cursor, qd_buffer_t **buffer, unsigned char *octet) { - unsigned char result = **cursor; - advance(cursor, buffer, 1); - return result; + if (can_advance(cursor, buffer)) { + *octet = **cursor; + advance(cursor, buffer, 1); + return true; + } + return false; } -static int traverse_field(unsigned char **cursor, qd_buffer_t **buffer, qd_field_location_t *field) +static bool traverse_field(unsigned char **cursor, qd_buffer_t **buffer, qd_field_location_t *field) { qd_buffer_t *start_buffer = *buffer; unsigned char *start_cursor = *cursor; + unsigned char tag; + unsigned char octet; - unsigned char tag = next_octet(cursor, buffer); - if (!(*cursor)) return 0; + if (!next_octet(cursor, buffer, &tag)) + return false; int consume = 0; size_t hdr_length = 1; @@ -500,23 +530,33 @@ static int traverse_field(unsigned char **cursor, qd_buffer_t **buffer, qd_field case 0xD0 : case 0xF0 : hdr_length += 3; - consume |= ((int) next_octet(cursor, buffer)) << 24; - if (!(*cursor)) return 0; - consume |= ((int) next_octet(cursor, buffer)) << 16; - if (!(*cursor)) return 0; - consume |= ((int) next_octet(cursor, buffer)) << 8; - if (!(*cursor)) return 0; + if (!next_octet(cursor, buffer, &octet)) + return false; + consume |= ((int) octet) << 24; + + if (!next_octet(cursor, buffer, &octet)) + return false; + consume |= ((int) octet) << 16; + + if (!next_octet(cursor, buffer, &octet)) + return false; + consume |= ((int) octet) << 8; + // Fall through to the next case... case 0xA0 : case 0xC0 : case 0xE0 : hdr_length++; - consume |= (int) next_octet(cursor, buffer); - if (!(*cursor)) return 0; + if (!next_octet(cursor, buffer, &octet)) + return false; + consume |= (int) octet; break; } + if (!advance(cursor, buffer, consume)) + return false; + if (field && !field->parsed) { field->buffer = start_buffer; field->offset = start_cursor - qd_buffer_base(start_buffer); @@ -526,48 +566,58 @@ static int traverse_field(unsigned char **cursor, qd_buffer_t **buffer, qd_field field->tag = tag; } - advance(cursor, buffer, consume); - return 1; + return true; } -static int start_list(unsigned char **cursor, qd_buffer_t **buffer) +static int get_list_count(unsigned char **cursor, qd_buffer_t **buffer) { - unsigned char tag = next_octet(cursor, buffer); - if (!(*cursor)) return 0; - int length = 0; - int count = 0; + unsigned char tag; + unsigned char octet; + + if (!next_octet(cursor, buffer, &tag)) + return 0; + + int count = 0; switch (tag) { case 0x45 : // list0 break; case 0xd0 : // list32 - length |= ((int) next_octet(cursor, buffer)) << 24; - if (!(*cursor)) return 0; - length |= ((int) next_octet(cursor, buffer)) << 16; - if (!(*cursor)) return 0; - length |= ((int) next_octet(cursor, buffer)) << 8; - if (!(*cursor)) return 0; - length |= (int) next_octet(cursor, buffer); - if (!(*cursor)) return 0; - - count |= ((int) next_octet(cursor, buffer)) << 24; - if (!(*cursor)) return 0; - count |= ((int) next_octet(cursor, buffer)) << 16; - if (!(*cursor)) return 0; - count |= ((int) next_octet(cursor, buffer)) << 8; - if (!(*cursor)) return 0; - count |= (int) next_octet(cursor, buffer); - if (!(*cursor)) return 0; + // + // Advance past the list length + // + if (!advance(cursor, buffer, 4)) + return 0; + + if (!next_octet(cursor, buffer, &octet)) + return 0; + count |= ((int) octet) << 24; + + if (!next_octet(cursor, buffer, &octet)) + return 0; + count |= ((int) octet) << 16; + + if (!next_octet(cursor, buffer, &octet)) + return 0; + count |= ((int) octet) << 8; + + if (!next_octet(cursor, buffer, &octet)) + return 0; + count |= (int) octet; break; case 0xc0 : // list8 - length |= (int) next_octet(cursor, buffer); - if (!(*cursor)) return 0; + // + // Advance past the list length + // + if (!advance(cursor, buffer, 1)) + return 0; - count |= (int) next_octet(cursor, buffer); - if (!(*cursor)) return 0; + if (!next_octet(cursor, buffer, &octet)) + return 0; + count |= (int) octet; break; } @@ -609,14 +659,13 @@ static qd_section_status_t message_section_check(qd_buffer_t **buffer, const unsigned char *expected_tags, qd_field_location_t *location) { - qd_buffer_t *test_buffer = *buffer; - unsigned char *test_cursor = *cursor; - - if (!test_cursor) + if (!*cursor || !can_advance(cursor, buffer)) return QD_SECTION_NEED_MORE; + qd_buffer_t *test_buffer = *buffer; + unsigned char *test_cursor = *cursor; unsigned char *end_of_buffer = qd_buffer_cursor(test_buffer); - int idx = 0; + int idx = 0; while (idx < pattern_length && *test_cursor == pattern[idx]) { idx++; @@ -656,14 +705,19 @@ static qd_section_status_t message_section_check(qd_buffer_t **buffer, // Check that the full section is present, if so advance the pointers to // consume the whole section. // - int pre_consume = 1; // Count the already extracted tag + int pre_consume = 1; // Count the already extracted tag uint32_t consume = 0; - unsigned char tag = next_octet(&test_cursor, &test_buffer); + unsigned char tag; + unsigned char octet; + + if (!next_octet(&test_cursor, &test_buffer, &tag)) + return QD_SECTION_NEED_MORE; + unsigned char tag_subcat = tag & 0xF0; // if there is no more data the only valid data type is a null type (0x40), // size is implied as 0 - if (!test_cursor && tag_subcat != 0x40) + if (!can_advance(&test_cursor, &test_buffer) && tag_subcat != 0x40) return QD_SECTION_NEED_MORE; switch (tag_subcat) { @@ -680,12 +734,18 @@ static qd_section_status_t message_section_check(qd_buffer_t **buffer, case 0xF0: // uint32_t size field: pre_consume += 3; - consume |= ((uint32_t) next_octet(&test_cursor, &test_buffer)) << 24; - if (!test_cursor) return QD_SECTION_NEED_MORE; - consume |= ((uint32_t) next_octet(&test_cursor, &test_buffer)) << 16; - if (!test_cursor) return QD_SECTION_NEED_MORE; - consume |= ((uint32_t) next_octet(&test_cursor, &test_buffer)) << 8; - if (!test_cursor) return QD_SECTION_NEED_MORE; + if (!next_octet(&test_cursor, &test_buffer, &octet)) + return QD_SECTION_NEED_MORE; + consume |= ((uint32_t) octet) << 24; + + if (!next_octet(&test_cursor, &test_buffer, &octet)) + return QD_SECTION_NEED_MORE; + consume |= ((uint32_t) octet) << 16; + + if (!next_octet(&test_cursor, &test_buffer, &octet)) + return QD_SECTION_NEED_MORE; + consume |= ((uint32_t) octet) << 8; + // Fall through to the next case... case 0xA0: @@ -693,14 +753,15 @@ static qd_section_status_t message_section_check(qd_buffer_t **buffer, case 0xE0: // uint8_t size field pre_consume += 1; - consume |= (uint32_t) next_octet(&test_cursor, &test_buffer); - if (!test_cursor && consume > 0) return QD_SECTION_NEED_MORE; + if (!next_octet(&test_cursor, &test_buffer, &octet)) + return QD_SECTION_NEED_MORE; + consume |= (uint32_t) octet; break; } location->length = pre_consume + consume; if (consume) { - if (advance(&test_cursor, &test_buffer, consume) != 0) { + if (!advance(&test_cursor, &test_buffer, consume)) { return QD_SECTION_NEED_MORE; // whole section not fully received } } @@ -728,7 +789,7 @@ static qd_section_status_t message_section_check(qd_buffer_t **buffer, start = DEQ_NEXT(start); } - location->parsed = 1; + location->parsed = 1; *cursor = test_cursor; *buffer = test_buffer; @@ -786,19 +847,19 @@ static qd_field_location_t *qd_message_properties_field(qd_message_t *msg, qd_me static const intptr_t offsets[] = { // position of the field's qd_field_location_t in the message content // object - (intptr_t) &((qd_message_content_t *)0)->field_message_id, - (intptr_t) &((qd_message_content_t *)0)->field_user_id, - (intptr_t) &((qd_message_content_t *)0)->field_to, - (intptr_t) &((qd_message_content_t *)0)->field_subject, - (intptr_t) &((qd_message_content_t *)0)->field_reply_to, - (intptr_t) &((qd_message_content_t *)0)->field_correlation_id, - (intptr_t) &((qd_message_content_t *)0)->field_content_type, - (intptr_t) &((qd_message_content_t *)0)->field_content_encoding, - (intptr_t) &((qd_message_content_t *)0)->field_absolute_expiry_time, - (intptr_t) &((qd_message_content_t *)0)->field_creation_time, - (intptr_t) &((qd_message_content_t *)0)->field_group_id, - (intptr_t) &((qd_message_content_t *)0)->field_group_sequence, - (intptr_t) &((qd_message_content_t *)0)->field_reply_to_group_id + (intptr_t) &((qd_message_content_t*) 0)->field_message_id, + (intptr_t) &((qd_message_content_t*) 0)->field_user_id, + (intptr_t) &((qd_message_content_t*) 0)->field_to, + (intptr_t) &((qd_message_content_t*) 0)->field_subject, + (intptr_t) &((qd_message_content_t*) 0)->field_reply_to, + (intptr_t) &((qd_message_content_t*) 0)->field_correlation_id, + (intptr_t) &((qd_message_content_t*) 0)->field_content_type, + (intptr_t) &((qd_message_content_t*) 0)->field_content_encoding, + (intptr_t) &((qd_message_content_t*) 0)->field_absolute_expiry_time, + (intptr_t) &((qd_message_content_t*) 0)->field_creation_time, + (intptr_t) &((qd_message_content_t*) 0)->field_group_id, + (intptr_t) &((qd_message_content_t*) 0)->field_group_sequence, + (intptr_t) &((qd_message_content_t*) 0)->field_reply_to_group_id }; // update table above if new fields need to be accessed: assert(QD_FIELD_MESSAGE_ID <= field && field <= QD_FIELD_REPLY_TO_GROUP_ID); @@ -810,23 +871,27 @@ static qd_field_location_t *qd_message_properties_field(qd_message_t *msg, qd_me } const int index = field - QD_FIELD_MESSAGE_ID; - qd_field_location_t *const location = (qd_field_location_t *)((char *)content + offsets[index]); + qd_field_location_t *const location = (qd_field_location_t*) ((char*) content + offsets[index]); if (location->parsed) return location; // requested field not parsed out. Need to parse out up to the requested field: qd_buffer_t *buffer = content->section_message_properties.buffer; unsigned char *cursor = qd_buffer_base(buffer) + content->section_message_properties.offset; - advance(&cursor, &buffer, content->section_message_properties.hdr_length); - if (index >= start_list(&cursor, &buffer)) return 0; // properties list too short + if (!advance(&cursor, &buffer, content->section_message_properties.hdr_length)) + return 0; + if (index >= get_list_count(&cursor, &buffer)) + return 0; // properties list too short int position = 0; while (position < index) { - qd_field_location_t *f = (qd_field_location_t *)((char *)content + offsets[position]); - if (f->parsed) - advance(&cursor, &buffer, f->hdr_length + f->length); - else // parse it out - if (!traverse_field(&cursor, &buffer, f)) return 0; + qd_field_location_t *f = (qd_field_location_t*) ((char*) content + offsets[position]); + if (f->parsed) { + if (!advance(&cursor, &buffer, f->hdr_length + f->length)) + return 0; + } else // parse it out + if (!traverse_field(&cursor, &buffer, f)) + return 0; position++; } @@ -1862,7 +1927,7 @@ static qd_message_depth_status_t message_check_depth_LH(qd_message_content_t *co return QD_MESSAGE_DEPTH_INCOMPLETE; // no more data is going to come. OK if at the end and optional: - if (!content->parse_cursor && optional) + if (!can_advance(&content->parse_cursor, &content->parse_buffer) && optional) return QD_MESSAGE_DEPTH_OK; // otherwise we've got an invalid (truncated) header @@ -2063,7 +2128,8 @@ qd_iterator_t *qd_message_field_iterator(qd_message_t *msg, qd_message_field_t f qd_buffer_t *buffer = loc->buffer; unsigned char *cursor = qd_buffer_base(loc->buffer) + loc->offset; - advance(&cursor, &buffer, loc->hdr_length); + if (!advance(&cursor, &buffer, loc->hdr_length)) + return 0; return qd_iterator_buffer(buffer, cursor - qd_buffer_base(buffer), loc->length, ITER_VIEW_ALL); } diff --git a/src/message_private.h b/src/message_private.h index 3d3c59e..47f8f3e 100644 --- a/src/message_private.h +++ b/src/message_private.h @@ -93,11 +93,11 @@ typedef struct { qd_field_location_t field_group_id; qd_field_location_t field_group_sequence; qd_field_location_t field_reply_to_group_id; - qd_field_location_t body; // The body of the message - qd_buffer_t *parse_buffer; - unsigned char *parse_cursor; - qd_message_depth_t parse_depth; + + qd_buffer_t *parse_buffer; // Pointer to the buffer where parsing should resume, if needed + unsigned char *parse_cursor; // Pointer to octet in parse_buffer where parsing should resume, if needed + qd_message_depth_t parse_depth; // The depth to which this message content has been parsed qd_iterator_t *ma_field_iter_in; // 'message field iterator' for msg.FIELD_MESSAGE_ANNOTATION qd_iterator_pointer_t ma_user_annotation_blob; // Original user annotations --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org