This is an automated email from the ASF dual-hosted git repository. kgiusti pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
commit 87222386c1bb6f57eb7965c8a49f5a41002af8c4 Author: Kenneth Giusti <kgiu...@apache.org> AuthorDate: Tue Nov 23 13:17:58 2021 -0500 DISPATCH-1403: define a common buffer field API Export the buffer data handling routines in the interator library as a general API. Inlined for performance. --- include/qpid/dispatch/buffer_field.h | 43 +++++ include/qpid/dispatch/iterator.h | 32 ++-- include/qpid/dispatch/parse.h | 32 ++-- src/buffer_field_api.h | 299 +++++++++++++++++++++++++++++++++++ src/iterator.c | 221 +++++--------------------- src/message.c | 4 +- src/message_private.h | 4 +- src/parse.c | 112 ++++--------- tests/buffer_test.c | 244 ++++++++++++++++++++++++++++ 9 files changed, 683 insertions(+), 308 deletions(-) diff --git a/include/qpid/dispatch/buffer_field.h b/include/qpid/dispatch/buffer_field.h new file mode 100644 index 0000000..b43f26d --- /dev/null +++ b/include/qpid/dispatch/buffer_field.h @@ -0,0 +1,43 @@ +#ifndef __dispatch_buffer_field_h__ +#define __dispatch_buffer_field_h__ 1 +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** @file + * Data fields spanning multiple buffers + * @internal + * @defgroup buffer_field buffer_field + * @{ + */ + +#include "qpid/dispatch/buffer.h" + + +/* descriptor for a sequence of bytes in a buffer list + */ +typedef struct qd_buffer_field_t qd_buffer_field_t; +struct qd_buffer_field_t { + qd_buffer_t *buffer; // hold start of data + const uint8_t *cursor; // first octet of data + size_t remaining; // length of data +}; + +///@} + +#endif diff --git a/include/qpid/dispatch/iterator.h b/include/qpid/dispatch/iterator.h index b730872..157ca5f 100644 --- a/include/qpid/dispatch/iterator.h +++ b/include/qpid/dispatch/iterator.h @@ -19,7 +19,7 @@ * under the License. */ -#include "qpid/dispatch/buffer.h" +#include "qpid/dispatch/buffer_field.h" #include <stdbool.h> #include <stdint.h> @@ -127,12 +127,6 @@ typedef enum { } qd_iterator_view_t; -typedef struct { - qd_buffer_t *buffer; - unsigned char *cursor; - int remaining; -} qd_iterator_pointer_t; - /** @} */ /** \name global * Global Methods @@ -310,18 +304,6 @@ bool qd_iterator_equal(qd_iterator_t *iter, const unsigned char *string); bool qd_iterator_prefix(qd_iterator_t *iter, const char *prefix); /** - * Return true iff the prefix string matches the characters addressed by ptr. - * This function ignores octets beyond the length of the prefix. - * Caller's pointer is held constant. - * - * @param ptr buffer chain cursor holding message bytes - * @param skip AMQP housekeeping bytes to skip over before finding the incoming string - * @param prefix the prefix to be matched - * @return true if all bytes of prefix match bytes in user string - */ -bool qd_iterator_prefix_ptr(const qd_iterator_pointer_t *ptr, uint32_t skip, const char *prefix); - -/** * Copy the iterator's view into buffer up to a maximum of n bytes. View is * reset to the beginning and cursor is advanced by the number of bytes * copied. There is no trailing '\0' added. @@ -458,11 +440,15 @@ bool qd_iterator_next_segment(qd_iterator_t *iter, uint32_t *hash); * Exposes iter's buffer, cursor, and remaining values. * * @param iter iter that still has data in its view. - * @param ptr Pointer object which is to receive cursor position + * @return a copy of the iter's view cursor + */ +qd_buffer_field_t qd_iterator_get_view_cursor(const qd_iterator_t *iter); + +/** + * Construct an iterator from a buffer field */ -void qd_iterator_get_view_cursor( - const qd_iterator_t *iter, - qd_iterator_pointer_t *ptr); +qd_iterator_t *qd_iterator_buffer_field(const qd_buffer_field_t *bfield, + qd_iterator_view_t view); /** @} */ /** @} */ diff --git a/include/qpid/dispatch/parse.h b/include/qpid/dispatch/parse.h index d47a312..5fe0552 100644 --- a/include/qpid/dispatch/parse.h +++ b/include/qpid/dispatch/parse.h @@ -19,7 +19,7 @@ * under the License. */ -#include "qpid/dispatch/buffer.h" +#include "qpid/dispatch/buffer_field.h" #include "qpid/dispatch/iterator.h" /**@file @@ -48,12 +48,12 @@ DEQ_DECLARE(qd_parsed_turbo_t, qd_parsed_turbo_list_t); */ struct qd_parsed_turbo_t { DEQ_LINKS(qd_parsed_turbo_t); - qd_iterator_pointer_t bufptr; // location/size of field in buffer - uint8_t tag; - uint32_t size; - uint32_t count; - uint32_t length_of_size; - uint32_t length_of_count; + qd_buffer_field_t bufptr; // location/size of field in buffer + uint8_t tag; + uint32_t size; + uint32_t count; + uint32_t length_of_size; + uint32_t length_of_count; }; /** @@ -295,15 +295,15 @@ qd_parsed_field_t *qd_parse_value_by_key(qd_parsed_field_t *field, const char *k * @param blob_item_count number of map entries referenced by blob_iterator */ void qd_parse_annotations( - bool strip_annotations_in, - qd_iterator_t *ma_iter_in, - qd_parsed_field_t **ma_ingress, - qd_parsed_field_t **ma_phase, - qd_parsed_field_t **ma_to_override, - qd_parsed_field_t **ma_trace, - qd_parsed_field_t **ma_stream, - qd_iterator_pointer_t *blob_pointer, - uint32_t *blob_item_count); + bool strip_annotations_in, + qd_iterator_t *ma_iter_in, + qd_parsed_field_t **ma_ingress, + qd_parsed_field_t **ma_phase, + qd_parsed_field_t **ma_to_override, + qd_parsed_field_t **ma_trace, + qd_parsed_field_t **ma_stream, + qd_buffer_field_t *blob_pointer, + uint32_t *blob_item_count); /** * Identify which annotation is being parsed diff --git a/src/buffer_field_api.h b/src/buffer_field_api.h new file mode 100644 index 0000000..a2ac8cb --- /dev/null +++ b/src/buffer_field_api.h @@ -0,0 +1,299 @@ +#ifndef __dispatch_buffer_field_api_h__ +#define __dispatch_buffer_field_api_h__ 1 +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** @file + * Inline API for common operations on buffer_fields + * @internal + * @{ + */ + +#include "qpid/dispatch/buffer_field.h" +#include <stdbool.h> + +/* qd_buffer_field_normalize + * + * Invariant: a non-empty buffer fields cursor always points to a valid octet, + * never at the end of a non-terminal buffer. Normalizing ensures that + * invariant holds. + */ +static inline void qd_buffer_field_normalize(qd_buffer_field_t *bfield) +{ + assert(bfield); + if (bfield->remaining) { + while (bfield->cursor == qd_buffer_cursor(bfield->buffer)) { + bfield->buffer = DEQ_NEXT(bfield->buffer); + assert(bfield->buffer); // error: remaining value incorrect + bfield->cursor = qd_buffer_base(bfield->buffer); + } + } +} + + +/* qd_buffer_field + * + * Constructor - ensures buffer field is well formed. + */ +static inline qd_buffer_field_t qd_buffer_field(qd_buffer_t *buffer, const uint8_t *cursor, size_t remaining) +{ + assert(buffer); + qd_buffer_field_t bf = {.buffer = buffer, .cursor = cursor, .remaining = remaining}; + qd_buffer_field_normalize(&bf); + return bf; +} + + +/* qd_buffer_field_extend + * + * Increase the size of the field by amount octets + */ +static inline size_t qd_buffer_field_extend(qd_buffer_field_t *bfield, size_t amount) +{ + assert(bfield); + size_t old = bfield->remaining; + bfield->remaining += amount; + if (old == 0) // move cursor to start of new data + qd_buffer_field_normalize(bfield); + return bfield->remaining; +} + + +/* qd_buffer_field_ncopy + * + * Copy up to n octets from bfield to dest, advance bfield by the number of + * octets copied + * + * @return total of octets copied - may be < n if len(bfield) < n + */ +static inline size_t qd_buffer_field_ncopy(qd_buffer_field_t *bfield, uint8_t *dest, size_t n) +{ + assert(bfield); + + const uint8_t * const start = dest; + size_t count = MIN(n, bfield->remaining); + + while (count) { + size_t avail = qd_buffer_cursor(bfield->buffer) - bfield->cursor; + if (count < avail) { + // fastpath: no need to adjust buffer pointers + memcpy(dest, bfield->cursor, count); + dest += count; + bfield->cursor += count; + bfield->remaining -= count; + return dest - start; + } + + memcpy(dest, bfield->cursor, avail); + dest += avail; + count -= avail; + + // count is >= what is available in the current buffer, move to next + + bfield->remaining -= avail; + bfield->cursor += avail; + qd_buffer_field_normalize(bfield); + } + + return dest - start; +} + + +/* qd_buffer_field_advance + * + * Move the cursor of bfield forward by amount octets. + * + * @return total of octets skipped - may be < amount if len(bfield) < amount + */ +static inline size_t qd_buffer_field_advance(qd_buffer_field_t *bfield, size_t amount) +{ + assert(bfield); + + const size_t blen = bfield->remaining; + size_t count = MIN(amount, blen); + + while (count > 0) { + size_t avail = qd_buffer_cursor(bfield->buffer) - bfield->cursor; + + if (count < avail) { + // fastpath: no need to adjust buffer pointers + bfield->cursor += count; + bfield->remaining -= count; + break; + } + + // count is >= what is available in the current buffer, move to next + count -= avail; + bfield->remaining -= avail; + bfield->cursor += avail; + qd_buffer_field_normalize(bfield); + } + + return blen - bfield->remaining; +} + + +/* qd_buffer_field_octet + * + * Get the first octet of the field and move the cursor to the next octet (if + * present). bfield length is decremented by 1 + * + * @return true of octet read, false if no octet available (end of field). + */ +static inline bool qd_buffer_field_octet(qd_buffer_field_t *bfield, uint8_t *octet) +{ + assert(bfield); + + if (bfield->remaining) { + assert(bfield->cursor < qd_buffer_cursor(bfield->buffer)); + *octet = *bfield->cursor++; + bfield->remaining -= 1; + qd_buffer_field_normalize(bfield); + return true; + } + return false; +} + + +/* qd_buffer_field_uint32 + * + * Get the next 4 octets of the field and convert them to a uint32 value. Move + * the cursor past the 4 octets and decrement the length by 4. uint32 values + * are used extensively in the AMQP type encodings for meta data (size and + * count). + * + * @return true of uint32 read, false if not enough octets available (end of field). + */ +static inline bool qd_buffer_field_uint32(qd_buffer_field_t *bfield, uint32_t *value) +{ + assert(bfield); + + if (bfield->remaining >= 4) { + uint8_t buf[4]; + qd_buffer_field_ncopy(bfield, buf, 4); + *value = (((uint32_t) buf[0]) << 24) + | (((uint32_t) buf[1]) << 16) + | (((uint32_t) buf[2]) << 8) + | ((uint32_t) buf[3]); + return true; + } + return false; +} + + +/* qd_buffer_field_strdup + * + * Return a null terminated string containing the bfield data. Caller assumes + * responsibility for calling free() on the returned value when finished with + * it. Caller also should ensure the data is actually a value that can be + * rendered as a C string (e.g. no internal zero values). + * + * @return null terminated C string, must be free()ed by caller. + */ +static inline char *qd_buffer_field_strdup(qd_buffer_field_t *bfield) +{ + assert(bfield); + + const size_t len = bfield->remaining + 1; + char *str = qd_malloc(len); + qd_buffer_field_ncopy(bfield, (uint8_t*) str, bfield->remaining); + str[len - 1] = 0; + return str; +} + + +/* qd_buffer_field_equal + * + * Check if the field is exactly equal to count octets of data. If equal + * advance the bfield count octets. + * + * @return true if equal + */ +static inline bool qd_buffer_field_equal(qd_buffer_field_t *bfield, const uint8_t *data, size_t count) +{ + assert(bfield); + + if (bfield->remaining < count) + return false; + + const qd_buffer_field_t save = *bfield; + + while (count) { + + size_t avail = qd_buffer_cursor(bfield->buffer) - bfield->cursor; + + if (count < avail) { + // fastpath: no need to adjust buffer pointers + if (memcmp(data, bfield->cursor, count) != 0) { + *bfield = save; + return false; + } + bfield->cursor += count; + bfield->remaining -= count; + return true; + } + + if (memcmp(data, bfield->cursor, avail) != 0) { + *bfield = save; + return false; + } + + data += avail; + count -= avail; + bfield->remaining -= avail; + bfield->cursor += avail; + qd_buffer_field_normalize(bfield); + } + + return true; +} + + +/* qd_buffer_list_append_field + * + * Copy the contents of bfield to the end of the buflist buffer chain. This + * copies all data - no bfield buffers are moved to buflist. bfield is advanced + * to the end of data. + * + * @return void + */ +static inline void qd_buffer_list_append_field(qd_buffer_list_t *buflist, qd_buffer_field_t *bfield) +{ + assert(buflist); + assert(bfield); + + while (bfield->remaining) { + size_t avail = qd_buffer_cursor(bfield->buffer) - bfield->cursor; + size_t len = MIN(bfield->remaining, avail); + + qd_buffer_list_append(buflist, bfield->cursor, len); + bfield->remaining -= len; + if (!bfield->remaining) { + bfield->cursor += len; + } else { + bfield->buffer = DEQ_NEXT(bfield->buffer); + assert(bfield->buffer); + bfield->cursor = qd_buffer_base(bfield->buffer); + } + } +} + +///@} + +#endif diff --git a/src/iterator.c b/src/iterator.c index 3e46b37..3b34891 100644 --- a/src/iterator.c +++ b/src/iterator.c @@ -23,6 +23,7 @@ #include "qpid/dispatch/amqp.h" #include "qpid/dispatch/ctools.h" #include "qpid/dispatch/hash.h" +#include "buffer_field_api.h" #include <stdio.h> #include <string.h> @@ -51,9 +52,9 @@ ALLOC_DECLARE(qd_hash_segment_t); ALLOC_DEFINE(qd_hash_segment_t); struct qd_iterator_t { - qd_iterator_pointer_t start_pointer; // Pointer to the raw data - qd_iterator_pointer_t view_start_pointer; // Pointer to the start of the view - qd_iterator_pointer_t view_pointer; // Pointer to the remaining view + qd_buffer_field_t start_pointer; // Pointer to the raw data + qd_buffer_field_t view_start_pointer; // Pointer to the start of the view + qd_buffer_field_t view_pointer; // Pointer to the remaining view qd_iterator_view_t view; int annotation_length; int annotation_remaining; @@ -138,7 +139,7 @@ static void parse_address_view(qd_iterator_t *iter) // in order to aid the router in looking up addresses. // - qd_iterator_pointer_t save_pointer = iter->view_pointer; + qd_buffer_field_t save_pointer = iter->view_pointer; iter->annotation_length = 1; if (iter->prefix_override == '\0' && qd_iterator_prefix(iter, "_")) { @@ -257,7 +258,7 @@ static void parse_node_view(qd_iterator_t *iter) void qd_iterator_remove_trailing_separator(qd_iterator_t *iter) { // Save the iterator's pointer so we can apply it back before returning from this function. - qd_iterator_pointer_t save_pointer = iter->view_pointer; + qd_buffer_field_t save_pointer = iter->view_pointer; char current_octet = 0; while (!iterator_at_end(iter)) { @@ -301,7 +302,7 @@ static void view_initialize(qd_iterator_t *iter) // state_t state = STATE_START; unsigned int octet; - qd_iterator_pointer_t save_pointer = {0,0,0}; + qd_buffer_field_t save_pointer = {0,0,0}; while (!iterator_at_end(iter) && state != STATE_AT_NODE_ID) { octet = qd_iterator_octet(iter); @@ -387,38 +388,11 @@ static inline int iterator_field_ncopy(qd_iterator_t *iter, unsigned char *buffe { assert(in_field_data(iter)); - const unsigned char *start = buffer; - int count = MIN(n, iter->view_pointer.remaining); if (iter->view_pointer.buffer) { - do { - size_t avail = qd_buffer_cursor(iter->view_pointer.buffer) - iter->view_pointer.cursor; - // optimize: early exit when no need to advance buffer pointers - if (count < avail) { - memcpy(buffer, iter->view_pointer.cursor, count); - iter->view_pointer.cursor += count; - iter->view_pointer.remaining -= count; - return buffer - start + count; - } - // count is >= what is available in the current buffer, move to next - memcpy(buffer, iter->view_pointer.cursor, avail); - buffer += avail; - count -= avail; - iter->view_pointer.cursor += avail; - iter->view_pointer.remaining -= avail; - if (iter->view_pointer.remaining) { - iter->view_pointer.buffer = DEQ_NEXT(iter->view_pointer.buffer); - if (iter->view_pointer.buffer) { - iter->view_pointer.cursor = qd_buffer_base(iter->view_pointer.buffer); - } else { - // DISPATCH-1394: field is truncated (remaining is inaccurate!) - iter->view_pointer.remaining = 0; - break; - } - } - } while (count); - return buffer - start; + return qd_buffer_field_ncopy(&iter->view_pointer, (uint8_t*)buffer, n); } else { // string or binary array + int count = MIN(n, iter->view_pointer.remaining); memcpy(buffer, iter->view_pointer.cursor, count); iter->view_pointer.cursor += count; iter->view_pointer.remaining -= count; @@ -436,33 +410,11 @@ static inline void iterator_field_move_cursor(qd_iterator_t *iter, uint32_t leng // prefix assert(in_field_data(iter)); - uint32_t count = MIN(length, iter->view_pointer.remaining); if (iter->view_pointer.buffer) { - do { - uint32_t avail = qd_buffer_cursor(iter->view_pointer.buffer) - iter->view_pointer.cursor; - // optimized: early exit when no need to update iterators buffer pointers - if (count < avail) { - iter->view_pointer.cursor += count; - iter->view_pointer.remaining -= count; - return; - } - // count is >= what is available in the current buffer, move to next - count -= avail; - iter->view_pointer.cursor += avail; - iter->view_pointer.remaining -= avail; - if (iter->view_pointer.remaining) { - iter->view_pointer.buffer = DEQ_NEXT(iter->view_pointer.buffer); - if (iter->view_pointer.buffer) { - iter->view_pointer.cursor = qd_buffer_base(iter->view_pointer.buffer); - } else { - // DISPATCH-1394: field is truncated (remaining is inaccurate!) - iter->view_pointer.remaining = 0; - return; - } - } - } while (count); + qd_buffer_field_advance(&iter->view_pointer, length); } else { // string/binary data + uint32_t count = MIN(length, iter->view_pointer.remaining); iter->view_pointer.cursor += count; iter->view_pointer.remaining -= count; } @@ -484,42 +436,7 @@ static inline bool iterator_field_equal(qd_iterator_t *iter, const unsigned char return false; if (iter->view_pointer.buffer) { - - qd_iterator_pointer_t save_pointer = iter->view_pointer; - - do { - size_t avail = qd_buffer_cursor(iter->view_pointer.buffer) - iter->view_pointer.cursor; - // optimized: early exit when no need to update iterators buffer pointers - if (count < avail) { - if (memcmp(buffer, iter->view_pointer.cursor, count) != 0) { - iter->view_pointer = save_pointer; - return false; - } - iter->view_pointer.cursor += count; - iter->view_pointer.remaining -= count; - return true; - } - // count is >= what is available in the current buffer - if (memcmp(buffer, iter->view_pointer.cursor, avail) != 0) { - iter->view_pointer = save_pointer; - return false; - } - - buffer += avail; - count -= avail; - iter->view_pointer.cursor += avail; - iter->view_pointer.remaining -= avail; - if (iter->view_pointer.remaining) { - iter->view_pointer.buffer = DEQ_NEXT(iter->view_pointer.buffer); - if (iter->view_pointer.buffer) { - iter->view_pointer.cursor = qd_buffer_base(iter->view_pointer.buffer); - } else { - // DISPATCH-1394: field is truncated (remaining is inaccurate!) - iter->view_pointer = save_pointer; - return false; - } - } - } while (count); + return qd_buffer_field_equal(&iter->view_pointer, (const uint8_t*) buffer, count); } else { // string or binary array @@ -624,10 +541,8 @@ qd_iterator_t *qd_iterator_buffer(qd_buffer_t *buffer, int offset, int length, q return 0; ZERO(iter); - iter->start_pointer.buffer = buffer; - iter->start_pointer.cursor = qd_buffer_base(buffer) + offset; - iter->start_pointer.remaining = length; - iter->phase = '0'; + iter->start_pointer = qd_buffer_field(buffer, qd_buffer_base(buffer) + offset, length); + iter->phase = '0'; qd_iterator_reset_view(iter, view); @@ -739,18 +654,14 @@ unsigned char qd_iterator_octet(qd_iterator_t *iter) if (iter->view_pointer.remaining == 0) return (unsigned char) 0; - unsigned char result = *(iter->view_pointer.cursor); - - // we know remaining > 0, so we can simply move the cursor - - iter->view_pointer.cursor++; - - // the slow path: if we've moved "off" the end, simply advance to the next buffer - if (--iter->view_pointer.remaining - && iter->view_pointer.buffer - && qd_buffer_cursor(iter->view_pointer.buffer) == iter->view_pointer.cursor) { - iter->view_pointer.buffer = iter->view_pointer.buffer->next; - iter->view_pointer.cursor = qd_buffer_base(iter->view_pointer.buffer); + unsigned char result; + if (iter->view_pointer.buffer) { + uint8_t octet; + (void)qd_buffer_field_octet(&iter->view_pointer, &octet); + result = (unsigned char) octet; + } else { // string or binary array + result = *(iter->view_pointer.cursor)++; + --iter->view_pointer.remaining; } if (iter->mode == MODE_TO_SLASH && iter->view_pointer.remaining && *(iter->view_pointer.cursor) == '/') { @@ -878,8 +789,8 @@ bool qd_iterator_prefix(qd_iterator_t *iter, const char *prefix) if (!iter) return false; - qd_iterator_pointer_t save_pointer = iter->view_pointer; - unsigned char *c = (unsigned char*) prefix; + qd_buffer_field_t save_pointer = iter->view_pointer; + unsigned char *c = (unsigned char*) prefix; while(*c) { if (*c != qd_iterator_octet(iter)) @@ -896,69 +807,6 @@ bool qd_iterator_prefix(qd_iterator_t *iter, const char *prefix) } -// bare bones copy of field_iterator_move_cursor with no field/view baggage -void iterator_pointer_move_cursor(qd_iterator_pointer_t *ptr, uint32_t length) -{ - uint32_t count = length > ptr->remaining ? ptr->remaining : length; - - while (count) { - uint32_t remaining = qd_buffer_cursor(ptr->buffer) - ptr->cursor; - remaining = remaining > count ? count : remaining; - ptr->cursor += remaining; - ptr->remaining -= remaining; - count -= remaining; - if (ptr->cursor == qd_buffer_cursor(ptr->buffer)) { - ptr->buffer = ptr->buffer->next; - if (ptr->buffer == 0) { - ptr->remaining = 0; - ptr->cursor = 0; - break; - } else { - ptr->cursor = qd_buffer_base(ptr->buffer); - } - } - } -} - - -// bare bones copy of qd_iterator_prefix with no iterator baggage -bool qd_iterator_prefix_ptr(const qd_iterator_pointer_t *ptr, uint32_t skip, const char *prefix) -{ - if (!ptr) - return false; - - // if ptr->buffer holds enough bytes for the comparison then - // don't fiddle with the iterator motions. Just do the comparison directly. - const int avail = qd_buffer_cursor(ptr->buffer) - ptr->cursor; - if (avail >= skip + QD_MA_PREFIX_LEN) { - // there's enough in current buffer to do straight compare - const void * blk1 = ptr->cursor + skip; - const void * blk2 = prefix; - return memcmp(blk1, blk2, QD_MA_PREFIX_LEN) == 0; - } - - // otherwise compare across buffer boundaries - // this, too, could be optimized a bit - qd_iterator_pointer_t lptr; - *&lptr = *ptr; - - iterator_pointer_move_cursor(&lptr, skip); - - unsigned char *c = (unsigned char*) prefix; - while(*c && lptr.remaining) { - unsigned char ic = *lptr.cursor; - - if (*c != ic) - break; - c++; - - iterator_pointer_move_cursor(&lptr, 1); - } - - return *c == 0; -} - - int qd_iterator_length(const qd_iterator_t *iter) { return iter ? iterator_length(iter) : 0; @@ -1135,13 +983,9 @@ bool qd_iterator_next_segment(qd_iterator_t *iter, uint32_t *hash) } -void qd_iterator_get_view_cursor( - const qd_iterator_t *iter, - qd_iterator_pointer_t *ptr) +qd_buffer_field_t qd_iterator_get_view_cursor(const qd_iterator_t *iter) { - ptr->buffer = iter->view_pointer.buffer; - ptr->cursor = iter->view_pointer.cursor; - ptr->remaining = iter->view_pointer.remaining; + return iter->view_pointer; } @@ -1154,3 +998,16 @@ void qd_iterator_finalize(void) my_area = 0; my_router = 0; } + + +qd_iterator_t *qd_iterator_buffer_field(const qd_buffer_field_t *bfield, + qd_iterator_view_t view) +{ + assert(bfield); + qd_buffer_field_t copy = *bfield; + qd_buffer_field_normalize(©); + return qd_iterator_buffer(copy.buffer, + copy.cursor - qd_buffer_base(copy.buffer), + copy.remaining, + view); +} diff --git a/src/message.c b/src/message.c index e35a14f..1a79fe2 100644 --- a/src/message.c +++ b/src/message.c @@ -1190,8 +1190,8 @@ void qd_message_message_annotations(qd_message_t *in_msg) // Construct pseudo-field location of user annotations blob // This holds all annotations if no router-specific annotations are present if (content->ma_count > 0) { - qd_field_location_t *cf = &content->field_user_annotations; - qd_iterator_pointer_t *uab = &content->ma_user_annotation_blob; + qd_field_location_t *cf = &content->field_user_annotations; + qd_buffer_field_t *uab = &content->ma_user_annotation_blob; cf->buffer = uab->buffer; cf->offset = uab->cursor - qd_buffer_base(uab->buffer); cf->length = uab->remaining; diff --git a/src/message_private.h b/src/message_private.h index 944612e..fff1b17 100644 --- a/src/message_private.h +++ b/src/message_private.h @@ -120,7 +120,7 @@ typedef struct { qd_message_depth_t parse_depth; // Depth to which message content has been parsed qd_iterator_t *ma_field_iter_in; // Iter for msg.FIELD_MESSAGE_ANNOTATION - qd_iterator_pointer_t ma_user_annotation_blob; // Original user annotations + qd_buffer_field_t ma_user_annotation_blob; // Original user annotations // with router annotations stripped uint32_t ma_count; // Number of map elements in blob // after router fields stripped @@ -152,7 +152,7 @@ typedef struct { } qd_message_content_t; struct qd_message_pvt_t { - qd_iterator_pointer_t cursor; // Pointer to current location of outgoing byte stream. + qd_buffer_field_t cursor; // Pointer to current location of outgoing byte stream. qd_message_depth_t message_depth; // Depth of incoming received message qd_message_depth_t sent_depth; // Depth of outgoing sent message qd_message_content_t *content; // Singleton content shared by reference between diff --git a/src/parse.c b/src/parse.c index 9ebbef7..3f43e5f 100644 --- a/src/parse.c +++ b/src/parse.c @@ -22,6 +22,7 @@ #include "qpid/dispatch/alloc.h" #include "qpid/dispatch/amqp.h" #include "qpid/dispatch/ctools.h" +#include "buffer_field_api.h" #include <assert.h> #include <inttypes.h> @@ -237,7 +238,7 @@ const char *qd_parse_turbo(qd_iterator_t *iter, ZERO(turbo); // Get the buffer pointers for the map element - qd_iterator_get_view_cursor(iter, &turbo->bufptr); + turbo->bufptr = qd_iterator_get_view_cursor(iter); // Get description of the map element parse_error = get_type_info(iter, &turbo->tag, &turbo->size, &turbo->count, @@ -259,7 +260,9 @@ const char *qd_parse_turbo(qd_iterator_t *iter, for (int idx=0; idx < n_allocs; idx += 2) { qd_parsed_turbo_t *turbo = DEQ_HEAD(*annos); assert(turbo); - if (qd_iterator_prefix_ptr(&turbo->bufptr, turbo->length_of_size + 1, QD_MA_PREFIX)) + qd_buffer_field_t key = turbo->bufptr; + qd_buffer_field_advance(&key, turbo->length_of_size + 1); + if (qd_buffer_field_equal(&key, (const uint8_t*) QD_MA_PREFIX, QD_MA_PREFIX_LEN)) break; // leading anno is a user annotation map key @@ -714,73 +717,16 @@ qd_parsed_field_t *qd_parse_value_by_key(qd_parsed_field_t *field, const char *k } -// TODO(kgiusti) - de-duplicate all the buffer chain walking code! -// See DISPATCH-1403 -// -static inline int _turbo_advance(qd_iterator_pointer_t *ptr, int length) -{ - const int start = ptr->remaining; - int move = MIN(length, ptr->remaining); - while (move > 0) { - int avail = qd_buffer_cursor(ptr->buffer) - ptr->cursor; - if (move < avail) { - ptr->cursor += move; - ptr->remaining -= move; - break; - } - move -= avail; - ptr->remaining -= avail; - if (ptr->remaining == 0) { - ptr->cursor += avail; // move to end - break; - } - - // More remaining in buffer chain: advance to next buffer in chain - assert(DEQ_NEXT(ptr->buffer)); - if (!DEQ_NEXT(ptr->buffer)) { - // this is an error! ptr->remainer is not accurate. This should not happen - // since the MA field must be completely received at this point - // (see DISPATCH-1394). - int copied = start - ptr->remaining; - ptr->remaining = 0; - ptr->cursor += avail; // force to end of chain - return copied; - } - ptr->buffer = DEQ_NEXT(ptr->buffer); - ptr->cursor = qd_buffer_base(ptr->buffer); - } - return start - ptr->remaining; -} - - -// TODO(kgiusti): deduplicate! -// See DISPATCH-1403 -// -static inline int _turbo_copy(qd_iterator_pointer_t *ptr, char *buffer, int length) -{ - int move = MIN(length, ptr->remaining); - char * const start = buffer; - while (ptr->remaining && move > 0) { - int avail = MIN(move, qd_buffer_cursor(ptr->buffer) - ptr->cursor); - memcpy(buffer, ptr->cursor, avail); - buffer += avail; - move -= avail; - _turbo_advance(ptr, avail); - } - return (buffer - start); -} - - const char *qd_parse_annotations_v1( - bool strip_anno_in, - qd_iterator_t *ma_iter_in, - qd_parsed_field_t **ma_ingress, - qd_parsed_field_t **ma_phase, - qd_parsed_field_t **ma_to_override, - qd_parsed_field_t **ma_trace, - qd_parsed_field_t **ma_stream, - qd_iterator_pointer_t *blob_pointer, - uint32_t *blob_item_count) + bool strip_anno_in, + qd_iterator_t *ma_iter_in, + qd_parsed_field_t **ma_ingress, + qd_parsed_field_t **ma_phase, + qd_parsed_field_t **ma_to_override, + qd_parsed_field_t **ma_trace, + qd_parsed_field_t **ma_stream, + qd_buffer_field_t *blob_pointer, + uint32_t *blob_item_count) { // Do full parse qd_iterator_reset(ma_iter_in); @@ -804,7 +750,7 @@ const char *qd_parse_annotations_v1( if (!strip_anno_in) { anno = DEQ_HEAD(annos); while (anno) { - uint8_t * dp; // pointer to key name in raw buf or extract buf + const uint8_t *dp; // pointer to key name in raw buf or extract buf char key_name[QD_MA_MAX_KEY_LEN]; // key name extracted across buf boundary int key_len = anno->size; @@ -814,12 +760,12 @@ const char *qd_parse_annotations_v1( dp = anno->bufptr.cursor + anno->length_of_size + 1; } else { // Pull the key name from multiple buffers - qd_iterator_pointer_t wbuf = anno->bufptr; // scratch buf pointers for getting key - _turbo_advance(&wbuf, anno->length_of_size + 1); + qd_buffer_field_t wbuf = anno->bufptr; // scratch buf pointers for getting key + qd_buffer_field_advance(&wbuf, anno->length_of_size + 1); int t_size = MIN(anno->size, QD_MA_MAX_KEY_LEN); // get this many total - key_len = _turbo_copy(&wbuf, key_name, t_size); + key_len = qd_buffer_field_ncopy(&wbuf, (uint8_t*) key_name, t_size); - dp = (uint8_t *)key_name; + dp = (const uint8_t *)key_name; } // Verify that the key starts with the prefix. @@ -924,15 +870,15 @@ const char *qd_parse_annotations_v1( void qd_parse_annotations( - bool strip_annotations_in, - qd_iterator_t *ma_iter_in, - qd_parsed_field_t **ma_ingress, - qd_parsed_field_t **ma_phase, - qd_parsed_field_t **ma_to_override, - qd_parsed_field_t **ma_trace, - qd_parsed_field_t **ma_stream, - qd_iterator_pointer_t *blob_pointer, - uint32_t *blob_item_count) + bool strip_annotations_in, + qd_iterator_t *ma_iter_in, + qd_parsed_field_t **ma_ingress, + qd_parsed_field_t **ma_phase, + qd_parsed_field_t **ma_to_override, + qd_parsed_field_t **ma_trace, + qd_parsed_field_t **ma_stream, + qd_buffer_field_t *blob_pointer, + uint32_t *blob_item_count) { *ma_ingress = 0; *ma_phase = 0; @@ -964,7 +910,7 @@ void qd_parse_annotations( // If there are no router annotations then all annotations // are the user's opaque blob. - qd_iterator_get_view_cursor(raw_iter, blob_pointer); + *blob_pointer = qd_iterator_get_view_cursor(raw_iter); qd_iterator_free(raw_iter); diff --git a/tests/buffer_test.c b/tests/buffer_test.c index 8ad38e7..95fd7cc 100644 --- a/tests/buffer_test.c +++ b/tests/buffer_test.c @@ -19,6 +19,7 @@ #define _GNU_SOURCE #include "qpid/dispatch/buffer.h" +#include "buffer_field_api.h" #include "test_case.h" @@ -125,6 +126,248 @@ static char *test_buffer_list_append(void *context) } +static char *test_buffer_field(void *context) +{ + char *result = 0; + static const uint8_t data1[10] = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}; + static const uint8_t data2[10] = {0xF9, 0xF8, 0xF7, 0xF6, 0xF5, 0xF4, 0xF3, 0xF2, 0xF1, 0xF0}; + qd_buffer_list_t list; + qd_buffer_list_t other_list; + qd_buffer_field_t bfield; + + DEQ_INIT(list); + DEQ_INIT(other_list); + + // test buffer list (2000 octets): + for (int i = 0; i < 100; ++i) { + qd_buffer_list_t tmp; + DEQ_INIT(tmp); + fill_buffer(&tmp, (unsigned char *) data1, 10); + qd_buffer_t *b = qd_buffer(); + DEQ_INSERT_TAIL(tmp, b); // empty buffer + DEQ_APPEND(list, tmp); + fill_buffer(&tmp, (unsigned char *) data2, 10); + DEQ_APPEND(list, tmp); + } + + // verify octet read + + bfield.buffer = DEQ_HEAD(list); + bfield.cursor = qd_buffer_base(bfield.buffer); + bfield.remaining = 2000; + + int total_octets = 0; + size_t expected_length = 2000; + uint8_t next_octet = 0; + uint8_t octet = 0xFF; + while (qd_buffer_field_octet(&bfield, &octet)) { + total_octets += 1; + expected_length -= 1; + + if (bfield.remaining != expected_length) { + result = "octet length not updated"; + goto exit; + } + if (octet != next_octet) { + result = "Unexpected next octet"; + goto exit; + } + if (next_octet == 0x09) + next_octet = 0xF9; + else if (next_octet == 0xF0) + next_octet = 0; + else if (next_octet < 0x09) + next_octet += 1; + else + next_octet -= 1; + } + + if (total_octets != 2000 || bfield.remaining != 0) { + result = "Next octet wrong length"; + goto exit; + } + + // verify advance + + bfield.buffer = DEQ_HEAD(list); + bfield.cursor = qd_buffer_base(bfield.buffer); + bfield.remaining = 2000; + + size_t amount = qd_buffer_field_advance(&bfield, 2); + if (amount != 2) { + result = "advance 2 failed"; + goto exit; + } + + if (!qd_buffer_field_octet(&bfield, &octet) || octet != 2) { + result = "expected to advance to '2'"; + goto exit; + } + + amount = qd_buffer_field_advance(&bfield, 1995); + if (amount != 1995) { + result = "advance 1995 failed"; + goto exit; + } + + if (bfield.remaining != 2) { + result = "expected 2 last octets"; + goto exit; + } + + if (!qd_buffer_field_octet(&bfield, &octet) || octet != 0xF1) { + result = "expected to advance to '0xF1'"; + goto exit; + } + + amount = qd_buffer_field_advance(&bfield, 3); + if (amount != 1 || bfield.remaining != 0) { + result = "failed to advance to end of field"; + goto exit; + } + + // verify ncopy + + bfield.buffer = DEQ_HEAD(list); + bfield.cursor = qd_buffer_base(bfield.buffer); + bfield.remaining = 2000; + + uint8_t dest[10]; + amount = qd_buffer_field_ncopy(&bfield, dest, 5); + if (amount != 5) { + result = "failed to ncopy 5"; + goto exit; + } + if (memcmp(dest, data1, 5)) { + result = "ncopy 5 failed"; + goto exit; + } + amount = qd_buffer_field_ncopy(&bfield, dest, 10); + if (amount != 10) { + result = "failed to ncopy 10"; + goto exit; + } + if (memcmp(dest, &data1[5], 5) || memcmp(&dest[5], &data2[0], 5)) { + result = "ncopy 10 failed"; + goto exit; + } + amount = qd_buffer_field_advance(&bfield, 1980); + if (amount != 1980) { + result = "advance 1980 failed"; + goto exit; + } + amount = qd_buffer_field_ncopy(&bfield, dest, 10); + if (amount != 5) { + result = "ncopy expected 5 failed"; + goto exit; + } + if (memcmp(dest, &data2[5], 5) || bfield.remaining != 0) { + result = "ncopy at end failed"; + goto exit; + } + + // verify equal + + bfield.buffer = DEQ_HEAD(list); + bfield.cursor = qd_buffer_base(bfield.buffer); + bfield.remaining = 2000; + + const uint8_t pattern[] = "\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\xF9\xF8\xF7\xF6\xF5\xF4\xF3\xF2\xF1\xF0"; + const uint8_t pattern_bad[] = "\xF9\xF8\xF7\xF6\xF5\xF4\xF3\xF2\xF1\xF0\xAA"; + if (qd_buffer_field_equal(&bfield, (uint8_t*) "\x00\x01\x03", 3)) { + result = "expected equal 3 to fail"; + goto exit; + } + if (bfield.remaining != 2000) { + result = "do not advance on failed equal"; + goto exit; + } + if (!qd_buffer_field_equal(&bfield, pattern, 20)) { + result = "expected pattern match"; + goto exit; + } + if (bfield.remaining != 1980) { + result = "match did not advance"; + goto exit; + } + (void)qd_buffer_field_advance(&bfield, 1960); + if (!qd_buffer_field_equal(&bfield, pattern, 10)) { + result = "expected sub pattern match"; + goto exit; + } + if (qd_buffer_field_equal(&bfield, pattern_bad, 11)) { + result = "did not expect sub pattern match"; + goto exit; + } + if (bfield.remaining != 10) { + result = "mismatch advanced"; + goto exit; + } + if (!qd_buffer_field_equal(&bfield, &pattern[10], 9 )) { + result = "expected end sub pattern match"; + goto exit; + } + + if (!qd_buffer_field_octet(&bfield, &octet) || octet != 0xF0) { + result = "failed to octet read the extra trailing octet in the pattern"; + } + + // verify buffer list append + + bfield.buffer = DEQ_HEAD(list); + bfield.cursor = qd_buffer_base(bfield.buffer); + bfield.remaining = 2000; + + qd_buffer_field_t saved_bfield = bfield; + qd_buffer_t *bptr = 0; + + qd_buffer_list_append_field(&other_list, &bfield); + if (bfield.remaining) { + result = "expected to append 2000 octets"; + goto exit; + } + bptr = DEQ_HEAD(other_list); + uint32_t cmp_count = 0; + while (bptr) { + if (!qd_buffer_field_equal(&saved_bfield, qd_buffer_base(bptr), qd_buffer_size(bptr))) { + result = "expected list and buffers to be equal"; + goto exit; + } + cmp_count += qd_buffer_size(bptr); + bptr = DEQ_NEXT(bptr); + } + + if (saved_bfield.remaining != 0) { + result = "expected saved_bfield to be empty"; + goto exit; + } + + if (cmp_count != 2000) { + result = "did not compare 2000 octets"; + goto exit; + } + + qd_buffer_list_free_buffers(&other_list); + + const char *append_str = "abcdefghijklmnopqrstuvwxyz"; + qd_buffer_list_append(&other_list, (const uint8_t *)append_str, strlen(append_str)); + + bfield.buffer = DEQ_HEAD(other_list); + bfield.cursor = qd_buffer_base(bfield.buffer); + bfield.remaining = strlen(append_str); + + if (!qd_buffer_field_equal(&bfield, (const uint8_t*) append_str, strlen(append_str))) { + result = "expected to equal append_str"; + goto exit; + } + +exit: + qd_buffer_list_free_buffers(&list); + qd_buffer_list_free_buffers(&other_list); + return result; +} + + int buffer_tests() { int result = 0; @@ -132,6 +375,7 @@ int buffer_tests() TEST_CASE(test_buffer_list_clone, 0); TEST_CASE(test_buffer_list_append, 0); + TEST_CASE(test_buffer_field, 0); return result; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org