This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch dev-protocol-adaptors-2 in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
commit a8e548bfca44851305a8b6a59cfb21272ca668ca Author: Gordon Sim <g...@redhat.com> AuthorDate: Wed Oct 7 12:09:57 2020 +0100 DISPATCH-1654: fix for streaming message --- include/qpid/dispatch/amqp.h | 2 ++ include/qpid/dispatch/message.h | 17 +++++++++++ include/qpid/dispatch/parse.h | 2 ++ src/adaptors/tcp_adaptor.c | 3 ++ src/amqp.c | 1 + src/message.c | 28 +++++++++++++++++- src/message_private.h | 2 ++ src/parse.c | 12 +++++++- src/router_node.c | 64 ++++++++++++++++++++--------------------- 9 files changed, 97 insertions(+), 34 deletions(-) diff --git a/include/qpid/dispatch/amqp.h b/include/qpid/dispatch/amqp.h index aca440c..6c1064f 100644 --- a/include/qpid/dispatch/amqp.h +++ b/include/qpid/dispatch/amqp.h @@ -112,6 +112,7 @@ extern const char * const QD_MA_TRACE; ///< Trace extern const char * const QD_MA_TO; ///< To-Override extern const char * const QD_MA_PHASE; ///< Phase for override address extern const char * const QD_MA_CLASS; ///< Message-Class +extern const char * const QD_MA_STREAM; ///< Indicate streaming message #define QD_MA_PREFIX_LEN (9) #define QD_MA_INGRESS_LEN (16) @@ -119,6 +120,7 @@ extern const char * const QD_MA_CLASS; ///< Message-Class #define QD_MA_TO_LEN (11) #define QD_MA_PHASE_LEN (14) #define QD_MA_CLASS_LEN (14) +#define QD_MA_STREAM_LEN (15) extern const int QD_MA_MAX_KEY_LEN; ///< strlen of longest key name extern const int QD_MA_N_KEYS; ///< number of router annotation keys diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h index 9978d1c..b797eca 100644 --- a/include/qpid/dispatch/message.h +++ b/include/qpid/dispatch/message.h @@ -197,6 +197,23 @@ void qd_message_set_phase_annotation(qd_message_t *msg, int phase); int qd_message_get_phase_annotation(const qd_message_t *msg); /** + * Indicate whether message should be considered to be streaming. + * + * @param msg Pointer to an outgoing message. + * @param stream true if the message is streaming + * + */ +void qd_message_set_stream_annotation(qd_message_t *msg, bool stream); +/** + * Test whether received message should be considered to be streaming. + * + * @param msg Pointer to an outgoing message. + * @return true if the received message has the streaming annotation set, else false. + * + */ +int qd_message_is_streaming(qd_message_t *msg); + +/** * Set the value for the QD_MA_INGRESS field in the outgoing message * annotations for the message. * diff --git a/include/qpid/dispatch/parse.h b/include/qpid/dispatch/parse.h index 7fed15d..f6e5fd7 100644 --- a/include/qpid/dispatch/parse.h +++ b/include/qpid/dispatch/parse.h @@ -301,6 +301,7 @@ void qd_parse_annotations( qd_parsed_field_t **ma_phase, qd_parsed_field_t **ma_to_override, qd_parsed_field_t **ma_trace, + qd_parsed_field_t **ma_stream, qd_iterator_pointer_t *blob_pointer, uint32_t *blob_item_count); @@ -312,6 +313,7 @@ typedef enum { QD_MAE_TRACE, QD_MAE_TO, QD_MAE_PHASE, + QD_MAE_STREAM, QD_MAE_NONE } qd_ma_enum_t; diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c index f5d0ac7..cd22384 100644 --- a/src/adaptors/tcp_adaptor.c +++ b/src/adaptors/tcp_adaptor.c @@ -145,6 +145,8 @@ static int handle_incoming(qdr_tcp_connection_t *conn) } else { qd_message_t *msg = qd_message(); + qd_message_set_stream_annotation(msg, true); + qd_composed_field_t *props = qd_compose(QD_PERFORMATIVE_PROPERTIES, 0); qd_compose_start_list(props); qd_compose_insert_null(props); // message-id @@ -966,6 +968,7 @@ static void qdr_tcp_adaptor_init(qdr_core_t *core, void **adaptor_context) adaptor->log_source = qd_log_source("TCP_ADAPTOR"); DEQ_INIT(adaptor->listeners); DEQ_INIT(adaptor->connectors); + DEQ_INIT(adaptor->connections); *adaptor_context = adaptor; tcp_adaptor = adaptor; diff --git a/src/amqp.c b/src/amqp.c index 7c9d9c0..8da6db2 100644 --- a/src/amqp.c +++ b/src/amqp.c @@ -30,6 +30,7 @@ const char * const QD_MA_TRACE = "x-opt-qd.trace"; const char * const QD_MA_TO = "x-opt-qd.to"; const char * const QD_MA_PHASE = "x-opt-qd.phase"; const char * const QD_MA_CLASS = "x-opt-qd.class"; +const char * const QD_MA_STREAM = "x-opt-qd.stream"; const int QD_MA_MAX_KEY_LEN = 16; const int QD_MA_N_KEYS = 5; // max number of router annotations to send/receive const int QD_MA_FILTER_LEN = 5; // N tailing inbound entries to search for stripping diff --git a/src/message.c b/src/message.c index a3a8152..360fd84 100644 --- a/src/message.c +++ b/src/message.c @@ -1100,6 +1100,7 @@ qd_message_t *qd_message_copy(qd_message_t *in_msg) qd_buffer_list_clone(©->ma_trace, &msg->ma_trace); qd_buffer_list_clone(©->ma_ingress, &msg->ma_ingress); copy->ma_phase = msg->ma_phase; + copy->ma_stream = msg->ma_stream; copy->strip_annotations_in = msg->strip_annotations_in; copy->content = content; @@ -1131,6 +1132,7 @@ void qd_message_message_annotations(qd_message_t *in_msg) if (content->ma_field_iter_in == 0) return; + qd_parsed_field_t *ma_pf_stream = 0; qd_parse_annotations( msg->strip_annotations_in, content->ma_field_iter_in, @@ -1138,6 +1140,7 @@ void qd_message_message_annotations(qd_message_t *in_msg) &content->ma_pf_phase, &content->ma_pf_to_override, &content->ma_pf_trace, + &ma_pf_stream, &content->ma_user_annotation_blob, &content->ma_count); @@ -1157,6 +1160,10 @@ void qd_message_message_annotations(qd_message_t *in_msg) content->ma_int_phase = qd_parse_as_int(content->ma_pf_phase); } + if (ma_pf_stream) { + content->ma_stream = qd_parse_as_int(ma_pf_stream); + } + return; } @@ -1189,6 +1196,12 @@ int qd_message_get_phase_annotation(const qd_message_t *in_msg) return msg->ma_phase; } +void qd_message_set_stream_annotation(qd_message_t *in_msg, bool stream) +{ + qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg; + msg->ma_stream = stream; +} + void qd_message_set_ingress_annotation(qd_message_t *in_msg, qd_composed_field_t *ingress_field) { qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg; @@ -1613,7 +1626,8 @@ static void compose_message_annotations_v1(qd_message_pvt_t *msg, qd_buffer_list if (!DEQ_IS_EMPTY(msg->ma_to_override) || !DEQ_IS_EMPTY(msg->ma_trace) || !DEQ_IS_EMPTY(msg->ma_ingress) || - msg->ma_phase != 0) { + msg->ma_phase != 0 || + msg->ma_stream) { if (!map_started) { qd_compose_start_map(out_ma); @@ -1643,6 +1657,12 @@ static void compose_message_annotations_v1(qd_message_pvt_t *msg, qd_buffer_list qd_compose_insert_int(field, msg->ma_phase); field_count++; } + + if (msg->ma_stream) { + qd_compose_insert_symbol(field, QD_MA_STREAM); + qd_compose_insert_int(field, msg->ma_stream); + field_count++; + } // pad out to N fields for (; field_count < QD_MA_N_KEYS; field_count++) { qd_compose_insert_symbol(field, QD_MA_PREFIX); @@ -2126,6 +2146,7 @@ qd_message_depth_status_t qd_message_check_depth(const qd_message_t *in_msg, qd_ qd_message_depth_status_t result; LOCK(content->lock); + //printf("qd_message_check_depth(%p, %i)\n", (void*) in_msg, depth); result = qd_message_check_LH(content, depth); UNLOCK(content->lock); return result; @@ -2671,6 +2692,11 @@ int qd_message_get_phase_val(qd_message_t *msg) return ((qd_message_pvt_t*)msg)->content->ma_int_phase; } +int qd_message_is_streaming(qd_message_t *msg) +{ + return ((qd_message_pvt_t*)msg)->content->ma_stream; +} + void qd_message_Q2_holdoff_disable(qd_message_t *msg) { diff --git a/src/message_private.h b/src/message_private.h index 02f002c..a6ca077 100644 --- a/src/message_private.h +++ b/src/message_private.h @@ -121,6 +121,7 @@ typedef struct { qd_parsed_field_t *ma_pf_to_override; qd_parsed_field_t *ma_pf_trace; int ma_int_phase; + bool ma_stream; uint64_t max_message_size; // configured max; 0 if no max to enforce uint64_t bytes_received; // bytes returned by pn_link_recv() when enforcing max_message_size uint32_t fanout; // The number of receivers for this message, including in-process subscribers. @@ -148,6 +149,7 @@ struct qd_message_pvt_t { qd_buffer_list_t ma_trace; // trace list in outgoing message annotations qd_buffer_list_t ma_ingress; // ingress field in outgoing message annotations int ma_phase; // phase for the override address + bool ma_stream; // indicates whether this message is streaming qd_message_body_data_list_t body_data_list; // TODO - move this to the content for one-time parsing (TLR) qd_message_body_data_t *next_body_data; unsigned char *body_cursor; diff --git a/src/parse.c b/src/parse.c index 25d303d..3b43c10 100644 --- a/src/parse.c +++ b/src/parse.c @@ -777,6 +777,7 @@ const char *qd_parse_annotations_v1( qd_parsed_field_t **ma_phase, qd_parsed_field_t **ma_to_override, qd_parsed_field_t **ma_trace, + qd_parsed_field_t **ma_stream, qd_iterator_pointer_t *blob_pointer, uint32_t *blob_item_count) { @@ -850,6 +851,11 @@ const char *qd_parse_annotations_v1( ma_type = QD_MAE_INGRESS; } break; + case QD_MA_STREAM_LEN: + if (memcmp(QD_MA_STREAM + QMPL, dp, QD_MA_STREAM_LEN - QMPL) == 0) { + ma_type = QD_MAE_STREAM; + } + break; default: // padding annotations are ignored here break; @@ -885,6 +891,9 @@ const char *qd_parse_annotations_v1( case QD_MAE_PHASE: *ma_phase = val_field; break; + case QD_MAE_STREAM: + *ma_stream = val_field; + break; case QD_MAE_NONE: assert(false); break; @@ -920,6 +929,7 @@ void qd_parse_annotations( qd_parsed_field_t **ma_phase, qd_parsed_field_t **ma_to_override, qd_parsed_field_t **ma_trace, + qd_parsed_field_t **ma_stream, qd_iterator_pointer_t *blob_pointer, uint32_t *blob_item_count) { @@ -958,7 +968,7 @@ void qd_parse_annotations( qd_iterator_free(raw_iter); (void) qd_parse_annotations_v1(strip_annotations_in, ma_iter_in, ma_ingress, ma_phase, - ma_to_override, ma_trace, + ma_to_override, ma_trace, ma_stream, blob_pointer, blob_item_count); return; diff --git a/src/router_node.c b/src/router_node.c index 956ce77..aa4cf04 100644 --- a/src/router_node.c +++ b/src/router_node.c @@ -491,38 +491,6 @@ static bool AMQP_rx_handler(void* context, qd_link_t *link) } // - // Head of line blocking avoidance (DISPATCH-1545) - // - // Before we can forward a message we need to determine whether or not this - // message is "streaming" - a large message that has the potential to block - // other messages sharing the trunk link. At this point we cannot for sure - // know the actual length of the incoming message, so we employ the - // following heuristic to determine if the message is "streaming": - // - // - If the message is receive-complete it is NOT a streaming message. - // - If it is NOT receive-complete: - // Continue buffering incoming data until: - // - receive has completed => NOT a streaming message - // - not rx-complete AND Q2 threshold hit => a streaming message - // - // Once Q2 is hit we MUST forward the message regardless of rx-complete - // since Q2 will block forever unless the incoming data is drained via - // forwarding. - // - if (!receive_complete) { - if (qd_message_is_Q2_blocked(msg)) { - qd_log(router->log_source, QD_LOG_DEBUG, - "[C%"PRIu64" L%"PRIu64"] Incoming message classified as streaming. User:%s", - conn->connection_id, - qd_link_link_id(link), - conn->user_id); - } else { - // Continue buffering this message - return false; - } - } - - // // Determine if the incoming link is anonymous. If the link is addressed, // there are some optimizations we can take advantage of. // @@ -607,6 +575,38 @@ static bool AMQP_rx_handler(void* context, qd_link_t *link) qd_iterator_t *ingress_iter = router_annotate_message(router, msg, &link_exclusions, &distance, &ingress_index); // + // Head of line blocking avoidance (DISPATCH-1545) + // + // Before we can forward a message we need to determine whether or not this + // message is "streaming" - a large message that has the potential to block + // other messages sharing the trunk link. At this point we cannot for sure + // know the actual length of the incoming message, so we employ the + // following heuristic to determine if the message is "streaming": + // + // - If the message is receive-complete it is NOT a streaming message. + // - If it is NOT receive-complete: + // Continue buffering incoming data until: + // - receive has completed => NOT a streaming message + // - not rx-complete AND Q2 threshold hit => a streaming message + // + // Once Q2 is hit we MUST forward the message regardless of rx-complete + // since Q2 will block forever unless the incoming data is drained via + // forwarding. + // + if (!receive_complete) { + if (qd_message_is_streaming(msg) || qd_message_is_Q2_blocked(msg)) { + qd_log(router->log_source, QD_LOG_DEBUG, + "[C%"PRIu64" L%"PRIu64"] Incoming message classified as streaming. User:%s", + conn->connection_id, + qd_link_link_id(link), + conn->user_id); + } else { + // Continue buffering this message + return false; + } + } + + // // If this delivery has traveled further than the known radius of the network topology (plus 1), // release and settle the delivery. This can happen in the case of "flood" multicast where the // deliveries follow all available paths. This will only discard messages that will reach their --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org