This is an automated email from the ASF dual-hosted git repository. gmurthy pushed a commit to branch dev-protocol-adaptors-2 in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/dev-protocol-adaptors-2 by this push: new 042a647 DISPATCH-1852: Added code to accumulate DATA frames in the message body in case credit does not arrive and the header has not been routed. This closes #926 042a647 is described below commit 042a647e3e6df0a7871c9f3d194e64dd95bc07a1 Author: Ganesh Murthy <gmur...@apache.org> AuthorDate: Fri Nov 20 10:02:41 2020 -0500 DISPATCH-1852: Added code to accumulate DATA frames in the message body in case credit does not arrive and the header has not been routed. This closes #926 --- src/adaptors/http2/http2_adaptor.c | 112 +++++++++++++++++++++---------------- src/adaptors/http2/http2_adaptor.h | 1 + 2 files changed, 64 insertions(+), 49 deletions(-) diff --git a/src/adaptors/http2/http2_adaptor.c b/src/adaptors/http2/http2_adaptor.c index a2ee4fe..85760de 100644 --- a/src/adaptors/http2/http2_adaptor.c +++ b/src/adaptors/http2/http2_adaptor.c @@ -414,14 +414,23 @@ static int on_data_chunk_recv_callback(nghttp2_session *session, return 0; stream_data->bytes_in += len; - qd_buffer_list_t buffers; DEQ_INIT(buffers); qd_buffer_list_append(&buffers, (uint8_t *)data, len); - if (stream_data->in_dlv) { - if (!stream_data->in_dlv_released) + // + // DISPATCH-: If an in_dlv is present it means that the qdr_link_deliver() has already been called (delivery has already been routed) + // in which case qd_message_stream_data_append can be called to append buffers to the message body + // If stream_data->in_dlv = 0 but stream_data->header_and_props_composed is true, it means that the message has not been routed yet + // but the message already has headers and properties + // in which case the qd_message_stream_data_append() can be called to add body data to the message. + // In many cases when the response message is streamed by a server, the entire message body can arrive before we get credit to route it. + // We want to be able to keep collecting the incoming DATA in the message object so we can ultimately route it when the credit does ultimately arrive. + // + if (stream_data->in_dlv || stream_data->header_and_props_composed) { + if (!stream_data->in_dlv_released) { qd_message_stream_data_append(stream_data->message, &buffers); + } } else { if (!stream_data->body) { @@ -655,29 +664,61 @@ static int on_header_callback(nghttp2_session *session, } -static void compose_and_deliver(qdr_http2_stream_data_t *stream_data, qd_composed_field_t *header_and_props, qdr_http2_connection_t *conn, bool receive_complete) +static bool compose_and_deliver(qdr_http2_connection_t *conn, qdr_http2_stream_data_t *stream_data, bool receive_complete) { - if (receive_complete) { - if (!stream_data->body) { - stream_data->body = qd_compose(QD_PERFORMATIVE_BODY_DATA, 0); - qd_compose_insert_binary(stream_data->body, 0, 0); - qd_log(http2_adaptor->log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] Inserting empty body data in compose_and_deliver", conn->conn_id, stream_data->stream_id); + if (!stream_data->header_and_props_composed) { + qd_composed_field_t *header_and_props = 0; + if (conn->ingress) { + header_and_props = qd_message_compose_amqp(stream_data->message, + conn->config->address, // const char *to + 0, // const char *subject + stream_data->reply_to, // const char *reply_to + 0, // const char *content_type + 0, // const char *content_encoding + 0, // int32_t correlation_id + conn->config->site); } - } - if (stream_data->body) { - qd_message_compose_4(stream_data->message, header_and_props, stream_data->app_properties, stream_data->body, receive_complete); - } - else { - qd_message_compose_3(stream_data->message, header_and_props, stream_data->app_properties, receive_complete); + else { + header_and_props = qd_message_compose_amqp(stream_data->message, + stream_data->reply_to, // const char *to + 0, // const char *subject + 0, // const char *reply_to + 0, // const char *content_type + 0, // const char *content_encoding + 0, // int32_t correlation_id + conn->config->site); + } + + if (receive_complete) { + if (!stream_data->body) { + stream_data->body = qd_compose(QD_PERFORMATIVE_BODY_DATA, 0); + qd_compose_insert_binary(stream_data->body, 0, 0); + qd_log(http2_adaptor->log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] Inserting empty body data in compose_and_deliver", conn->conn_id, stream_data->stream_id); + } + } + if (stream_data->body) { + qd_message_compose_4(stream_data->message, header_and_props, stream_data->app_properties, stream_data->body, receive_complete); + } + else { + qd_message_compose_3(stream_data->message, header_and_props, stream_data->app_properties, receive_complete); + } + + // The header and properties have been added. Now we can start adding BODY DATA to this message. + stream_data->header_and_props_composed = true; + qd_compose_free(header_and_props); } qd_log(http2_adaptor->log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"][L%"PRIu64"] Initiating qdr_link_deliver in compose_and_deliver", conn->conn_id, stream_data->stream_id, stream_data->in_link->identity); if (!stream_data->in_dlv && stream_data->in_link_credit > 0) { stream_data->in_dlv = qdr_link_deliver(stream_data->in_link, stream_data->message, 0, false, 0, 0, 0, 0); - qd_log(http2_adaptor->log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"][L%"PRIu64"] Routed delivery in compose_and_deliver dlv:%lx", stream_data->session_data->conn->conn_id, stream_data->stream_id, stream_data->in_link->identity, (long) stream_data->in_dlv); + qd_log(http2_adaptor->log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"][L%"PRIu64"] Routed delivery in compose_and_deliver dlv:%lx", conn->conn_id, stream_data->stream_id, stream_data->in_link->identity, (long) stream_data->in_dlv); qdr_delivery_set_context(stream_data->in_dlv, stream_data); qdr_delivery_decref(http2_adaptor->core, stream_data->in_dlv, "http2_adaptor - compose_and_deliver - release protection of return from deliver"); stream_data->in_link_credit -= 1; + return true; + } + else { + return false; } } @@ -689,44 +730,17 @@ static bool route_delivery(qdr_http2_stream_data_t *stream_data, bool receive_co return false; } - qd_composed_field_t *header_and_props = 0; - - if (stream_data->in_link_credit == 0) { - qd_log(http2_adaptor->log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] No credit on in_link, not routing delivery", conn->conn_id, stream_data->stream_id); - return false; - } - bool delivery_routed = false; if (conn->ingress) { if (stream_data->reply_to && stream_data->entire_header_arrived && !stream_data->in_dlv) { - header_and_props = qd_message_compose_amqp(stream_data->message, - conn->config->address, // const char *to - 0, // const char *subject - stream_data->reply_to, // const char *reply_to - 0, // const char *content_type - 0, // const char *content_encoding - 0, // int32_t correlation_id - conn->config->site); - compose_and_deliver(stream_data, header_and_props, conn, receive_complete); - qd_compose_free(header_and_props); - delivery_routed = true; + delivery_routed = compose_and_deliver(conn, stream_data, receive_complete); } } else { - if (stream_data->entire_header_arrived) { - header_and_props = qd_message_compose_amqp(stream_data->message, - stream_data->reply_to, // const char *to - 0, // const char *subject - 0, // const char *reply_to - 0, // const char *content_type - 0, // const char *content_encoding - 0, // int32_t correlation_id - conn->config->site); + if (stream_data->entire_header_arrived && !stream_data->in_dlv) { qd_log(http2_adaptor->log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] Calling compose_and_deliver, routing delivery", conn->conn_id, stream_data->stream_id); - compose_and_deliver(stream_data, header_and_props, conn, receive_complete); - qd_compose_free(header_and_props); - delivery_routed = true; + delivery_routed = compose_and_deliver(conn, stream_data, receive_complete); } } @@ -841,9 +855,9 @@ static int on_frame_recv_callback(nghttp2_session *session, qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] NGHTTP2_DATA frame received", conn->conn_id, stream_id); if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM) { - qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] NGHTTP2_DATA NGHTTP2_FLAG_END_STREAM flag received, receive_complete = true", conn->conn_id, stream_id); if (!stream_data->in_dlv_released) { qd_message_set_receive_complete(stream_data->message); + qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] NGHTTP2_DATA NGHTTP2_FLAG_END_STREAM flag received, setting receive_complete = true", conn->conn_id, stream_id); } advance_stream_status(stream_data); @@ -1273,7 +1287,7 @@ static void qdr_http_flow(void *context, qdr_link_t *link, int credit) stream_data->in_link_credit += credit; if (!stream_data->in_dlv) { if (route_delivery(stream_data, qd_message_receive_complete(stream_data->message))) { - qd_log(http2_adaptor->log_source, QD_LOG_TRACE, "[C%"PRIu64"] qdr_http_flow, delivery routed successfully", stream_data->session_data->conn->conn_id); + qd_log(http2_adaptor->log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] qdr_http_flow, delivery routed successfully", stream_data->session_data->conn->conn_id, stream_data->stream_id); } } } @@ -1448,7 +1462,7 @@ static void qdr_http_activate(void *notused, qdr_connection_t *c) qdr_http2_connection_t* conn = (qdr_http2_connection_t*) qdr_connection_get_context(c); if (conn) { if (conn->pn_raw_conn) { - qd_log(http2_adaptor->log_source, QD_LOG_INFO, "[C%"PRIu64"] Activation triggered, calling pn_raw_connection_wake()", conn->conn_id); + qd_log(http2_adaptor->log_source, QD_LOG_TRACE, "[C%"PRIu64"] Activation triggered, calling pn_raw_connection_wake()", conn->conn_id); pn_raw_connection_wake(conn->pn_raw_conn); } else if (conn->activate_timer) { diff --git a/src/adaptors/http2/http2_adaptor.h b/src/adaptors/http2/http2_adaptor.h index b900bad..c5adfc6 100644 --- a/src/adaptors/http2/http2_adaptor.h +++ b/src/adaptors/http2/http2_adaptor.h @@ -105,6 +105,7 @@ struct qdr_http2_stream_data_t { bool disp_updated; // Has the disposition already been set on the out_dlv bool disp_applied; // Has the disp been applied to the out_dlv. The stream is ready to be freed now. bool in_dlv_released; + bool header_and_props_composed; // true if the header and properties of the inbound message have already been composed so we don't have to do it again. //for stats: char *method; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org