This is an automated email from the ASF dual-hosted git repository. gmurthy pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/main by this push: new 1b96ceb DISPATCH-1940, DISPATCH-2198: Added code to handle http2 and q2 flow control window updates. This closes #1301 1b96ceb is described below commit 1b96ceb8818ae77949275f1d9616e07ca51ce5a0 Author: Ganesh Murthy <gmur...@apache.org> AuthorDate: Mon Jun 7 15:49:37 2021 -0400 DISPATCH-1940, DISPATCH-2198: Added code to handle http2 and q2 flow control window updates. This closes #1301 --- include/qpid/dispatch/message.h | 12 + src/adaptors/http2/http2_adaptor.c | 438 +++++++++++++++++++++++++------------ src/adaptors/http2/http2_adaptor.h | 11 +- src/message.c | 31 ++- tests/http2_server.py | 8 + tests/http2_slow_q2_server.py | 107 +++++++++ tests/images/test.jpg | Bin 0 -> 9634665 bytes tests/system_tests_http2.py | 140 +++++++++++- 8 files changed, 593 insertions(+), 154 deletions(-) diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h index 6072e6d..4297b9c 100644 --- a/include/qpid/dispatch/message.h +++ b/include/qpid/dispatch/message.h @@ -416,6 +416,18 @@ qd_message_stream_data_result_t qd_message_next_stream_data(qd_message_t *msg, q /** + * qd_message_stream_data_footer_append + * + * Constructs a footer field by calling the qd_compose(QD_PERFORMATIVE_FOOTER, field); + * It then inserts the passed in buffer list to the composed field and proceeds to disable q2 before finally adding the footer + * field to the message. + * + * Use this function if you have the complete footer data available in the passed in buffer list + */ +int qd_message_stream_data_footer_append(qd_message_t *message, qd_buffer_list_t *footer_props); + + +/** * qd_message_stream_data_append * * Append the buffers in data as a sequence of one or more BODY_DATA sections diff --git a/src/adaptors/http2/http2_adaptor.c b/src/adaptors/http2/http2_adaptor.c index 1f3aaf4..b5d5f61 100644 --- a/src/adaptors/http2/http2_adaptor.c +++ b/src/adaptors/http2/http2_adaptor.c @@ -65,6 +65,7 @@ typedef struct qdr_http2_adaptor_t { static qdr_http2_adaptor_t *http2_adaptor; +const int32_t WINDOW_SIZE = 65536; static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void *context); static void _http_record_request(qdr_http2_connection_t *conn, qdr_http2_stream_data_t *stream_data); @@ -178,10 +179,30 @@ void qd_http2_buffer_list_append(qd_http2_buffer_list_t *buflist, const uint8_t } } + +// Per-message callback to resume receiving after Q2 is unblocked on the +// incoming link (to HTTP2 app). This routine runs on another I/O thread so it +// must be thread safe and hence we use the server activation lock +// +static void qdr_http2_q2_unblocked_handler(const qd_alloc_safe_ptr_t context) +{ + // prevent the conn from being deleted while running: + sys_mutex_lock(qd_server_get_activation_lock(http2_adaptor->core->qd->server)); + + qdr_http2_connection_t *conn = (qdr_http2_connection_t*)qd_alloc_deref_safe_ptr(&context); + if (conn && conn->pn_raw_conn) { + SET_ATOMIC_FLAG(&conn->q2_restart); + pn_raw_connection_wake(conn->pn_raw_conn); + } + + sys_mutex_unlock(qd_server_get_activation_lock(http2_adaptor->core->qd->server)); +} + /** * HTTP :path is mapped to the AMQP 'to' field. */ -qd_composed_field_t *qd_message_compose_amqp(qd_message_t *msg, +qd_composed_field_t *qd_message_compose_amqp(qdr_http2_connection_t *conn, + qd_message_t *msg, const char *to, const char *subject, const char *reply_to, @@ -263,11 +284,16 @@ qd_composed_field_t *qd_message_compose_amqp(qd_message_t *msg, } qd_compose_end_list(field); + qd_alloc_safe_ptr_t conn_sp = QD_SAFE_PTR_INIT(conn); + qd_message_set_q2_unblocked_handler(msg, qdr_http2_q2_unblocked_handler, conn_sp); + return field; } static size_t write_buffers(qdr_http2_connection_t *conn) { + if (!conn->pn_raw_conn) + return 0; qdr_http2_session_data_t *session_data = conn->session_data; if (!conn->pn_raw_conn) @@ -319,6 +345,7 @@ static size_t write_buffers(qdr_http2_connection_t *conn) return 0; } + static void free_http2_stream_data(qdr_http2_stream_data_t *stream_data, bool on_shutdown) { if (!stream_data) @@ -342,7 +369,7 @@ static void free_http2_stream_data(qdr_http2_stream_data_t *stream_data, bool on } free(stream_data->reply_to); qd_compose_free(stream_data->app_properties); - qd_compose_free(stream_data->body); + qd_buffer_list_free_buffers(&stream_data->body_buffers); qd_compose_free(stream_data->footer_properties); if (DEQ_SIZE(session_data->streams) > 0) { DEQ_REMOVE(session_data->streams, stream_data); @@ -360,12 +387,29 @@ static void free_http2_stream_data(qdr_http2_stream_data_t *stream_data, bool on qd_message_free(stream_data->message); } + // + // If the client/server closed the connection abruptly, we need to release the stream_data->curr_stream_data and + // stream_data->next_stream_data. + // This final decref of the delivery is going to free the associated message but before this message can be freed + // all stream data (body data) objects need to be freed. We do this here. + // if (stream_data->in_dlv && !stream_data->in_dlv_decrefed) { + qd_message_stream_data_release(stream_data->curr_stream_data); + stream_data->curr_stream_data = 0; + + qd_message_stream_data_release(stream_data->next_stream_data); + stream_data->next_stream_data = 0; + qdr_delivery_decref(http2_adaptor->core, stream_data->in_dlv, "HTTP2 adaptor in_dlv - free_http2_stream_data"); } if (stream_data->out_dlv && !stream_data->out_dlv_decrefed) { - qdr_delivery_decref(http2_adaptor->core, stream_data->out_dlv, "HTTP2 adaptor out_dlv - free_http2_stream_data"); + qd_message_stream_data_release(stream_data->curr_stream_data); + stream_data->curr_stream_data = 0; + + qd_message_stream_data_release(stream_data->next_stream_data); + stream_data->next_stream_data = 0; + qdr_delivery_decref(http2_adaptor->core, stream_data->out_dlv, "HTTP2 adaptor out_dlv - free_http2_stream_data"); } free_qdr_http2_stream_data_t(stream_data); @@ -401,8 +445,10 @@ void free_qdr_http2_connection(qdr_http2_connection_t* http_conn, bool on_shutdo http_conn->context.context = 0; - if (http_conn->session_data->session) + if (http_conn->session_data->session) { nghttp2_session_del(http_conn->session_data->session); + http_conn->session_data->session = 0; + } free_qdr_http2_session_data_t(http_conn->session_data); http_conn->session_data = 0; @@ -416,11 +462,11 @@ void free_qdr_http2_connection(qdr_http2_connection_t* http_conn, bool on_shutdo free_qd_http2_buffer_t(buff); buff = DEQ_HEAD(http_conn->granted_read_buffs); } - qd_log(http2_adaptor->log_source, QD_LOG_TRACE, "[C%"PRIu64"] Freeing http2 connection in free_qdr_http2_connection", http_conn->conn_id); sys_atomic_destroy(&http_conn->raw_closed_read); sys_atomic_destroy(&http_conn->raw_closed_write); + sys_atomic_destroy(&http_conn->q2_restart); free_qdr_http2_connection_t(http_conn); } @@ -439,6 +485,7 @@ static qdr_http2_stream_data_t *create_http2_stream_data(qdr_http2_session_data_ stream_data->session_data = session_data; stream_data->app_properties = qd_compose(QD_PERFORMATIVE_APPLICATION_PROPERTIES, 0); stream_data->status = QD_STREAM_OPEN; + DEQ_INIT(stream_data->body_buffers); stream_data->start = qd_timer_now(); qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] Creating new stream_data->app_properties=QD_PERFORMATIVE_APPLICATION_PROPERTIES", session_data->conn->conn_id, stream_id); qd_compose_start_map(stream_data->app_properties); @@ -450,6 +497,17 @@ static qdr_http2_stream_data_t *create_http2_stream_data(qdr_http2_session_data_ /** + * This callback function is invoked when the nghttp2 library tells the application about the error code, and error message. + */ +static int on_error_callback(nghttp2_session *session, int lib_error_code, const char *msg, size_t len, void *user_data) +{ + qdr_http2_connection_t *conn = (qdr_http2_connection_t *)user_data; + qd_log(http2_adaptor->protocol_log_source, QD_LOG_ERROR, "[C%"PRIu64"] Error generated in the on_error_callback, lib_error_code=%i, error_msg=%s", conn->conn_id, lib_error_code, msg); + return 0; +} + + +/** * Callback function invoked by nghttp2_session_recv() and nghttp2_session_mem_recv() when an invalid non-DATA frame is received */ static int on_invalid_frame_recv_callback(nghttp2_session *session, const nghttp2_frame *frame, int lib_error_code, void *user_data) @@ -475,13 +533,14 @@ static int on_data_chunk_recv_callback(nghttp2_session *session, if (!stream_data) return 0; + if(stream_data->stream_force_closed) + return 0; + stream_data->bytes_in += len; - qd_buffer_list_t buffers; - DEQ_INIT(buffers); - qd_buffer_list_append(&buffers, (uint8_t *)data, len); + // - // DISPATCH-: If an in_dlv is present it means that the qdr_link_deliver() has already been called (delivery has already been routed) + // DISPATCH-1868: If an in_dlv is present it means that the qdr_link_deliver() has already been called (delivery has already been routed) // in which case qd_message_stream_data_append can be called to append buffers to the message body // If stream_data->in_dlv = 0 but stream_data->header_and_props_composed is true, it means that the message has not been routed yet // but the message already has headers and properties @@ -490,43 +549,32 @@ static int on_data_chunk_recv_callback(nghttp2_session *session, // We want to be able to keep collecting the incoming DATA in the message object so we can ultimately route it when the credit does ultimately arrive. // if (stream_data->in_dlv || stream_data->header_and_props_composed) { - if (!stream_data->stream_force_closed) { - // DISPATCH-1868: Part of the HTTP2 message body arrives *before* we can route the delivery. So we accumulated that body - // in the stream_data->body (in the else part). But before the rest of the HTTP2 data arrives, we got credit to send the delivery - // and we have an in_dlv object now. Now, we take the buffers that were added previously to stream_data->body and call qd_message_stream_data_append - if (stream_data->body) { - if (!stream_data->body_data_added) { - qd_buffer_list_t existing_buffers; - DEQ_INIT(existing_buffers); - qd_compose_take_buffers(stream_data->body, &existing_buffers); - // @TODO(kgiusti): handle Q2 block event: - qd_message_stream_data_append(stream_data->message, &existing_buffers, 0); - stream_data->body_data_added = true; - } + qd_buffer_list_t buffers; + DEQ_INIT(buffers); + qd_buffer_list_append(&buffers, (uint8_t *)data, len); + // DISPATCH-1868: Part of the HTTP2 message body arrives *before* we can route the delivery. So we accumulated the body buffers + // in the stream_data->body_buffers. But before the rest of the HTTP2 data arrives, we got credit to send the delivery + // and we have an in_dlv object now. Now, we take the buffers that were added previously to stream_data->body_buffers and call qd_message_stream_data_append + bool q2_blocked1 = false; + if (DEQ_SIZE(stream_data->body_buffers) > 0) { + if (!stream_data->body_data_added_to_msg) { + qd_message_stream_data_append(stream_data->message, &stream_data->body_buffers, &q2_blocked1); } - else { - // Add a dummy body so that other code that checks for the presense of stream_data->body will be satisfied. - // This dummy body field will be be used and will not be sent. - stream_data->body = qd_compose(QD_PERFORMATIVE_BODY_DATA, 0); - stream_data->body_data_added = true; - } - // @TODO(kgiusti): handle Q2 block event: - qd_message_stream_data_append(stream_data->message, &buffers, 0); - qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] HTTP2 DATA on_data_chunk_recv_callback qd_compose_insert_binary_buffers into stream_data->message", conn->conn_id, stream_id); } - else { - qd_buffer_list_free_buffers(&buffers); + bool q2_blocked2 = false; + qd_message_stream_data_append(stream_data->message, &buffers, &q2_blocked2); + stream_data->body_data_added_to_msg = true; + qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] HTTP2 DATA on_data_chunk_recv_callback qd_compose_insert_binary_buffers into stream_data->message", conn->conn_id, stream_id); + conn->q2_blocked = conn->q2_blocked || q2_blocked1 || q2_blocked2; + + if (conn->q2_blocked) { + qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"] q2 is blocked on this connection", conn->conn_id); } } else { - if (stream_data->stream_force_closed) { - qd_buffer_list_free_buffers(&buffers); - } - else { - stream_data->body = qd_compose(QD_PERFORMATIVE_BODY_DATA, stream_data->body); - qd_compose_insert_binary_buffers(stream_data->body, &buffers); - qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] HTTP2 DATA on_data_chunk_recv_callback qd_compose_insert_binary_buffers into stream_data->body", conn->conn_id, stream_id); - } + // Keep inserting buffers to stream_data->body_buffers. + qd_buffer_list_append(&stream_data->body_buffers, (uint8_t *)data, len); + qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] HTTP2 DATA on_data_chunk_recv_callback qd_compose_insert_binary_buffers into stream_data->body_buffers", conn->conn_id, stream_id); } qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] HTTP2 DATA on_data_chunk_recv_callback data length %zu", conn->conn_id, stream_id, len); @@ -566,6 +614,8 @@ static int snd_data_callback(nghttp2_session *session, qdr_http2_session_data_t *session_data = conn->session_data; qdr_http2_stream_data_t *stream_data = (qdr_http2_stream_data_t *)source->ptr; + qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] snd_data_callback length=%zu", conn->conn_id, stream_data->stream_id, length); + int bytes_sent = 0; // This should not include the header length of 9. bool write_buffs = false; if (length) { @@ -576,25 +626,44 @@ static int snd_data_callback(nghttp2_session *session, memcpy(qd_http2_buffer_cursor(http2_buff), framehd, HTTP2_DATA_FRAME_HEADER_LENGTH); qd_http2_buffer_insert(http2_buff, HTTP2_DATA_FRAME_HEADER_LENGTH); pn_raw_buffer_t pn_raw_buffs[stream_data->qd_buffers_to_send]; - qd_message_stream_data_buffers(stream_data->curr_stream_data, pn_raw_buffs, 0, stream_data->qd_buffers_to_send); + int written = qd_message_stream_data_buffers(stream_data->curr_stream_data, pn_raw_buffs, stream_data->curr_stream_data_qd_buff_offset, stream_data->qd_buffers_to_send); + + assert (written == stream_data->qd_buffers_to_send); int idx = 0; + size_t bytes_to_send = length; + while (idx < stream_data->qd_buffers_to_send) { if (pn_raw_buffs[idx].size > 0) { - //int bytes_remaining = length - bytes_sent; - //if (bytes_remaining > pn_raw_buffs[idx].size) { + if (bytes_to_send < pn_raw_buffs[idx].size) { + int bytes_remaining_in_buffer = pn_raw_buffs[idx].size - stream_data->curr_stream_data_offset; + if (bytes_remaining_in_buffer < bytes_to_send) { + memcpy(qd_http2_buffer_cursor(http2_buff), pn_raw_buffs[idx].bytes + stream_data->curr_stream_data_offset, bytes_remaining_in_buffer); + qd_http2_buffer_insert(http2_buff, bytes_remaining_in_buffer); + stream_data->curr_stream_data_offset = 0; + bytes_to_send -= bytes_remaining_in_buffer; + bytes_sent += bytes_remaining_in_buffer; + } + else { + memcpy(qd_http2_buffer_cursor(http2_buff), pn_raw_buffs[idx].bytes + stream_data->curr_stream_data_offset, bytes_to_send); + qd_http2_buffer_insert(http2_buff, bytes_to_send); + qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] snd_data_callback memcpy bytes_to_send=%zu", conn->conn_id, stream_data->stream_id, bytes_to_send); + stream_data->curr_stream_data_offset += bytes_to_send; + bytes_sent += bytes_to_send; + if (stream_data->curr_stream_data_offset == BUFFER_SIZE || stream_data->curr_stream_data_offset == pn_raw_buffs[idx].size) { + stream_data->curr_stream_data_offset = 0; + stream_data->curr_stream_data_qd_buff_offset += 1; + } + } + } + else { memcpy(qd_http2_buffer_cursor(http2_buff), pn_raw_buffs[idx].bytes, pn_raw_buffs[idx].size); qd_http2_buffer_insert(http2_buff, pn_raw_buffs[idx].size); - bytes_sent += pn_raw_buffs[idx].size; qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] snd_data_callback memcpy pn_raw_buffs[%i].size=%u", conn->conn_id, stream_data->stream_id, idx, pn_raw_buffs[idx].size); -// } -// else { -// memcpy(qd_http2_buffer_cursor(http2_buff), pn_raw_buffs[idx].bytes, bytes_remaining); -// qd_http2_buffer_insert(http2_buff, bytes_remaining); -// bytes_sent += bytes_remaining; -// qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] snd_data_callback memcpy bytes_remaining=%i", conn->conn_id, stream_data->stream_id, bytes_remaining); -// } - stream_data->curr_stream_data_qd_buff_offset += 1; + stream_data->curr_stream_data_qd_buff_offset += 1; + bytes_to_send -= pn_raw_buffs[idx].size; + bytes_sent += pn_raw_buffs[idx].size; + } } idx += 1; } @@ -617,7 +686,12 @@ static int snd_data_callback(nghttp2_session *session, else { qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] snd_data_callback, full_payload_handled, out_msg_has_footer", conn->conn_id, stream_data->stream_id); } + stream_data->curr_stream_data_offset = 0; stream_data->curr_stream_data_qd_buff_offset = 0; + stream_data->payload_handled = 0; + } + else { + stream_data->payload_handled += bytes_sent; } qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] HTTP2 snd_data_callback finished, length=%zu, bytes_sent=%i, stream_data=%p", conn->conn_id, stream_data->stream_id, length, bytes_sent, (void *)stream_data); @@ -626,8 +700,9 @@ static int snd_data_callback(nghttp2_session *session, assert(bytes_sent == length); } - if (write_buffs) + if (write_buffs) { write_buffers(conn); + } return 0; @@ -741,6 +816,8 @@ static int on_header_callback(nghttp2_session *session, qd_compose_insert_string_n(stream_data->footer_properties, (const char *)name, namelen); qd_compose_insert_string_n(stream_data->footer_properties, (const char *)value, valuelen); + + qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] HTTP2 FOOTER Incoming [%s=%s]", conn->conn_id, stream_data->stream_id, (char *)name, (char *)value); } else { if (strcmp(METHOD, (const char *)name) == 0) { @@ -751,8 +828,10 @@ static int on_header_callback(nghttp2_session *session, } qd_compose_insert_string_n(stream_data->app_properties, (const char *)name, namelen); qd_compose_insert_string_n(stream_data->app_properties, (const char *)value, valuelen); + + qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] HTTP2 HEADER Incoming [%s=%s]", conn->conn_id, stream_data->stream_id, (char *)name, (char *)value); } - qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] HTTP2 HEADER Incoming [%s=%s]", conn->conn_id, stream_data->stream_id, (char *)name, (char *)value); + } break; default: @@ -767,7 +846,8 @@ static bool compose_and_deliver(qdr_http2_connection_t *conn, qdr_http2_stream_d if (!stream_data->header_and_props_composed) { qd_composed_field_t *header_and_props = 0; if (conn->ingress) { - header_and_props = qd_message_compose_amqp(stream_data->message, + header_and_props = qd_message_compose_amqp(conn, + stream_data->message, conn->config->address, // const char *to stream_data->method, // const char *subject stream_data->reply_to, // const char *reply_to @@ -777,7 +857,8 @@ static bool compose_and_deliver(qdr_http2_connection_t *conn, qdr_http2_stream_d conn->config->site); } else { - header_and_props = qd_message_compose_amqp(stream_data->message, + header_and_props = qd_message_compose_amqp(conn, + stream_data->message, stream_data->reply_to, // const char *to stream_data->request_status, // const char *subject 0, // const char *reply_to @@ -789,41 +870,78 @@ static bool compose_and_deliver(qdr_http2_connection_t *conn, qdr_http2_stream_d if (receive_complete) { qd_log(http2_adaptor->log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"][L%"PRIu64"] receive_complete = true in compose_and_deliver", conn->conn_id, stream_data->stream_id, stream_data->in_link->identity); - if (!stream_data->body) { - stream_data->body = qd_compose(QD_PERFORMATIVE_BODY_DATA, 0); - qd_compose_insert_binary(stream_data->body, 0, 0); - qd_log(http2_adaptor->log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] Inserting empty body data in compose_and_deliver", conn->conn_id, stream_data->stream_id); - } - + bool q2_blocked; if (stream_data->footer_properties) { - qd_message_compose_5(stream_data->message, header_and_props, stream_data->app_properties, stream_data->body, stream_data->footer_properties, receive_complete); + qd_message_compose_3(stream_data->message, header_and_props, stream_data->app_properties, receive_complete); + qd_message_stream_data_append(stream_data->message, &stream_data->body_buffers, &q2_blocked); + stream_data->body_data_added_to_msg = true; + + qd_buffer_list_t existing_buffers; + DEQ_INIT(existing_buffers); + qd_compose_take_buffers(stream_data->footer_properties, &existing_buffers); + qd_message_stream_data_footer_append(stream_data->message, &existing_buffers); } else { - qd_message_compose_4(stream_data->message, header_and_props, stream_data->app_properties, stream_data->body, receive_complete); + qd_message_compose_3(stream_data->message, header_and_props, stream_data->app_properties, receive_complete); + qd_message_stream_data_append(stream_data->message, &stream_data->body_buffers, &q2_blocked); + stream_data->body_data_added_to_msg = true; } + + conn->q2_blocked = conn->q2_blocked || q2_blocked; + if (conn->q2_blocked) { + qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"] q2 is blocked on this connection", conn->conn_id); + } } else { - if (stream_data->body) { - qd_log(http2_adaptor->log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"][L%"PRIu64"] receive_complete = false and has stream_data->body in compose_and_deliver", conn->conn_id, stream_data->stream_id, stream_data->in_link->identity); + if (DEQ_SIZE(stream_data->body_buffers) > 0) { + qd_log(http2_adaptor->log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"][L%"PRIu64"] receive_complete = false and has stream_data->body_buffers in compose_and_deliver", conn->conn_id, stream_data->stream_id, stream_data->in_link->identity); + bool q2_blocked; if (stream_data->footer_properties) { - qd_message_compose_5(stream_data->message, header_and_props, stream_data->app_properties, stream_data->body, stream_data->footer_properties, receive_complete); + if (!stream_data->entire_footer_arrived) { + qd_compose_free(header_and_props); + return false; + } + + qd_message_compose_3(stream_data->message, header_and_props, stream_data->app_properties, receive_complete); + qd_message_stream_data_append(stream_data->message, &stream_data->body_buffers, &q2_blocked); + qd_buffer_list_t existing_buffers; + DEQ_INIT(existing_buffers); + qd_compose_take_buffers(stream_data->footer_properties, &existing_buffers); + qd_message_stream_data_footer_append(stream_data->message, &existing_buffers); } else { - qd_message_compose_4(stream_data->message, header_and_props, stream_data->app_properties, stream_data->body, receive_complete); + qd_message_compose_3(stream_data->message, header_and_props, stream_data->app_properties, receive_complete); + qd_message_stream_data_append(stream_data->message, &stream_data->body_buffers, &q2_blocked); } - stream_data->body_data_added = true; + stream_data->body_data_added_to_msg = true; + conn->q2_blocked = conn->q2_blocked || q2_blocked; + if (conn->q2_blocked) { + qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"] q2 is blocked on this connection", conn->conn_id); + } } else { - if (stream_data->footer_properties) { + + if (!stream_data->entire_footer_arrived) { + qd_compose_free(header_and_props); + return false; + } + // // The footer has already arrived but there was no body. Insert an empty body // - stream_data->body = qd_compose(QD_PERFORMATIVE_BODY_DATA, 0); - qd_message_compose_5(stream_data->message, header_and_props, stream_data->app_properties, stream_data->body, stream_data->footer_properties, receive_complete); + qd_message_compose_3(stream_data->message, header_and_props, stream_data->app_properties, receive_complete); + qd_message_stream_data_append(stream_data->message, &stream_data->body_buffers, 0); + + qd_buffer_list_t existing_buffers; + DEQ_INIT(existing_buffers); + qd_compose_take_buffers(stream_data->footer_properties, &existing_buffers); + qd_message_stream_data_footer_append(stream_data->message, &existing_buffers); + stream_data->body_data_added_to_msg = true; } else { qd_message_compose_3(stream_data->message, header_and_props, stream_data->app_properties, receive_complete); + stream_data->body_data_added_to_msg = false; } } } @@ -851,7 +969,7 @@ static bool route_delivery(qdr_http2_stream_data_t *stream_data, bool receive_co { qdr_http2_connection_t *conn = stream_data->session_data->conn; if (stream_data->in_dlv) { - qd_log(http2_adaptor->log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] in_dlv already present, not routing delivery", conn->conn_id, stream_data->stream_id); + qd_log(http2_adaptor->log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] in_dlv already present, delivery already routed", conn->conn_id, stream_data->stream_id); return false; } @@ -879,7 +997,7 @@ static void create_settings_frame(qdr_http2_connection_t *conn) { qdr_http2_session_data_t *session_data = conn->session_data; nghttp2_settings_entry iv[3] = {{NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 100}, - {NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, 65536}, + {NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, WINDOW_SIZE}, {NGHTTP2_SETTINGS_ENABLE_PUSH, 0}}; // You must call nghttp2_session_send after calling nghttp2_submit_settings @@ -954,8 +1072,8 @@ static int on_frame_recv_callback(nghttp2_session *session, // We will also close the pn_raw_connection (we will not close the qdr_connection_t and the qdr_http2_connection_t, those will still remain). This will close the TCP connection to the server // and will enable creation of a new connection to the server since we are not allowed to create any more streams on the connection that received the GOAWAY frame. // - qd_log(http2_adaptor->protocol_log_source, QD_LOG_ERROR, "[C%"PRIu64"][S%"PRId32"] GOAWAY frame received", conn->conn_id, stream_id); int32_t last_stream_id = frame->goaway.last_stream_id; + qd_log(http2_adaptor->protocol_log_source, QD_LOG_ERROR, "[C%"PRIu64"][S%"PRId32"] GOAWAY frame received, last_stream_id=[%"PRId32"]", conn->conn_id, stream_id, last_stream_id); // Free all streams that are greater that the last_stream_id because the server is not going to process those streams. free_unprocessed_streams(conn, last_stream_id); conn->goaway_received = true; @@ -995,15 +1113,6 @@ static int on_frame_recv_callback(nghttp2_session *session, } if (stream_data->in_dlv && !stream_data->stream_force_closed) { - if (!stream_data->body) { - stream_data->body = qd_compose(QD_PERFORMATIVE_BODY_DATA, 0); - qd_compose_insert_binary(stream_data->body, 0, 0); - // @TODO(kgiusti): handle Q2 block event: - qd_message_extend(stream_data->message, stream_data->body, 0); - } - } - - if (stream_data->in_dlv && !stream_data->stream_force_closed) { qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] NGHTTP2_DATA frame received, qdr_delivery_continue "DLV_FMT, conn->conn_id, stream_id, DLV_ARGS(stream_data->in_dlv)); qdr_delivery_continue(http2_adaptor->core, stream_data->in_dlv, false); } @@ -1034,7 +1143,6 @@ static int on_frame_recv_callback(nghttp2_session *session, if (stream_data->use_footer_properties) { qd_compose_end_map(stream_data->footer_properties); stream_data->entire_footer_arrived = true; - // @TODO(kgiusti): handle Q2 block event: qd_message_extend(stream_data->message, stream_data->footer_properties, 0); qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] Closing footer map, extending message with footer", conn->conn_id, stream_id); } @@ -1095,6 +1203,23 @@ static int on_frame_recv_callback(nghttp2_session *session, return 0; } +static void set_buffers_to_send(qdr_http2_stream_data_t *stream_data, size_t bytes_to_send) +{ + size_t diff = qd_message_stream_data_buffer_count(stream_data->curr_stream_data) - stream_data->curr_stream_data_qd_buff_offset; + pn_raw_buffer_t pn_raw_buffs[diff]; + int written = qd_message_stream_data_buffers(stream_data->curr_stream_data, pn_raw_buffs, stream_data->curr_stream_data_qd_buff_offset, diff); + assert(diff == written); + int idx = 0; + int rolling_count = 0; + rolling_count -= stream_data->curr_stream_data_offset; + while (idx < written) { + rolling_count += pn_raw_buffs[idx].size; + idx+=1; + if (rolling_count >= bytes_to_send) + break; + } + stream_data->qd_buffers_to_send = idx; +} ssize_t read_data_callback(nghttp2_session *session, int32_t stream_id, @@ -1109,10 +1234,9 @@ ssize_t read_data_callback(nghttp2_session *session, qd_message_t *message = qdr_delivery_message(stream_data->out_dlv); qd_message_depth_status_t status = qd_message_check_depth(message, QD_DEPTH_BODY); - // This flag tells nghttp2 that the data is not being copied into its buffer (uint8_t *buf). + // This flag tells nghttp2 that the data is not being copied into the buffer supplied by nghttp2 (uint8_t *buf). *data_flags |= NGHTTP2_DATA_FLAG_NO_COPY; - switch (status) { case QD_MESSAGE_DEPTH_OK: { // @@ -1134,7 +1258,7 @@ ssize_t read_data_callback(nghttp2_session *session, qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback No body data, get qd_message_next_stream_data", conn->conn_id, stream_data->stream_id); } - if (stream_data->next_stream_data == 0 && stream_data->next_stream_data_result == QD_MESSAGE_STREAM_DATA_NO_MORE) { + if (stream_data->next_stream_data == 0 && (stream_data->next_stream_data_result == QD_MESSAGE_STREAM_DATA_NO_MORE || stream_data->next_stream_data_result == QD_MESSAGE_STREAM_DATA_INVALID)) { stream_data->curr_stream_data_result = stream_data->next_stream_data_result; } @@ -1163,6 +1287,7 @@ ssize_t read_data_callback(nghttp2_session *session, // The payload length is zero on this body data. Look ahead one body data to see if it is QD_MESSAGE_STREAM_DATA_NO_MORE stream_data->next_stream_data_result = qd_message_next_stream_data(message, &stream_data->next_stream_data); + if (stream_data->next_stream_data_result == QD_MESSAGE_STREAM_DATA_NO_MORE) { if (!stream_data->out_msg_has_footer) { qd_message_stream_data_release(stream_data->curr_stream_data); @@ -1196,53 +1321,52 @@ ssize_t read_data_callback(nghttp2_session *session, return 0; } - stream_data->stream_data_buff_count = qd_message_stream_data_buffer_count(stream_data->curr_stream_data); - qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback, stream_data->stream_data_buff_count=%i, payload_length=%zu", conn->conn_id, stream_data->stream_id, stream_data->stream_data_buff_count, payload_length); - size_t bytes_to_send = 0; if (payload_length) { - size_t remaining_payload_length = payload_length - (stream_data->curr_stream_data_qd_buff_offset * BUFFER_SIZE); + int remaining_payload_length = payload_length - stream_data->payload_handled; + qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback remaining_payload_length=%i, length=%zu", conn->conn_id, stream_data->stream_id, remaining_payload_length, length); if (remaining_payload_length <= QD_HTTP2_BUFFER_SIZE) { - bytes_to_send = remaining_payload_length; - stream_data->qd_buffers_to_send = stream_data->stream_data_buff_count; - stream_data->full_payload_handled = true; - qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback remaining_payload_length (%zu) <= QD_HTTP2_BUFFER_SIZE(16384), bytes_to_send=%zu, stream_data->qd_buffers_to_send=%zu", conn->conn_id, stream_data->stream_id, remaining_payload_length, bytes_to_send, stream_data->qd_buffers_to_send); - - // Look ahead one body data - stream_data->next_stream_data_result = qd_message_next_stream_data(message, &stream_data->next_stream_data); - if (stream_data->next_stream_data_result == QD_MESSAGE_STREAM_DATA_NO_MORE) { - *data_flags |= NGHTTP2_DATA_FLAG_EOF; - stream_data->out_msg_data_flag_eof = true; - stream_data->out_msg_body_sent = true; - if (stream_data->next_stream_data) { - qd_message_stream_data_release(stream_data->next_stream_data); - stream_data->next_stream_data = 0; + if (length < remaining_payload_length) { + bytes_to_send = length; + set_buffers_to_send(stream_data, bytes_to_send); + stream_data->full_payload_handled = false; + } + else { + bytes_to_send = remaining_payload_length; + set_buffers_to_send(stream_data, bytes_to_send); + stream_data->full_payload_handled = true; + qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback remaining_payload_length (%zu) <= QD_HTTP2_BUFFER_SIZE(16384), bytes_to_send=%zu, stream_data->qd_buffers_to_send=%zu", conn->conn_id, stream_data->stream_id, remaining_payload_length, bytes_to_send, stream_data->qd_buffers_to_send); + + // Look ahead one body data + stream_data->next_stream_data_result = qd_message_next_stream_data(message, &stream_data->next_stream_data); + if (stream_data->next_stream_data_result == QD_MESSAGE_STREAM_DATA_NO_MORE) { + *data_flags |= NGHTTP2_DATA_FLAG_EOF; + stream_data->out_msg_data_flag_eof = true; + stream_data->out_msg_body_sent = true; + stream_data->out_dlv_local_disposition = PN_ACCEPTED; + qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback, looking ahead one body data QD_MESSAGE_STREAM_DATA_NO_MORE", conn->conn_id, stream_data->stream_id); } - stream_data->out_dlv_local_disposition = PN_ACCEPTED; - qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback, looking ahead one body data QD_MESSAGE_STREAM_DATA_NO_MORE", conn->conn_id, stream_data->stream_id); - } - else if (stream_data->next_stream_data_result == QD_MESSAGE_STREAM_DATA_INCOMPLETE) { - qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback, looking ahead one body data QD_MESSAGE_STREAM_DATA_INCOMPLETE", conn->conn_id, stream_data->stream_id); - - } - else if (stream_data->next_stream_data_result == QD_MESSAGE_STREAM_DATA_BODY_OK) { - qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback, looking ahead one body data QD_MESSAGE_STREAM_DATA_OK", conn->conn_id, stream_data->stream_id); - - } - else if (stream_data->next_stream_data_result == QD_MESSAGE_STREAM_DATA_FOOTER_OK) { - stream_data->out_msg_body_sent = true; - qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback, looking ahead one body data, QD_MESSAGE_STREAM_DATA_FOOTER_OK", conn->conn_id, stream_data->stream_id); - } + else if (stream_data->next_stream_data_result == QD_MESSAGE_STREAM_DATA_FOOTER_OK) { + stream_data->out_msg_body_sent = true; + qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback, looking ahead one body data, QD_MESSAGE_STREAM_DATA_FOOTER_OK", conn->conn_id, stream_data->stream_id); + } + } } else { - // This means that there is more that 16k worth of payload in one body data. - // We want to send only 16k data per read_data_callback - bytes_to_send = QD_HTTP2_BUFFER_SIZE; - stream_data->full_payload_handled = false; - stream_data->qd_buffers_to_send = NUM_QD_BUFFERS_IN_ONE_HTTP2_BUFFER; - qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback remaining_payload_length <= QD_HTTP2_BUFFER_SIZE ELSE bytes_to_send=%zu, stream_data->qd_buffers_to_send=%zu", conn->conn_id, stream_data->stream_id, bytes_to_send, stream_data->qd_buffers_to_send); + if (length < remaining_payload_length) { + bytes_to_send = length; + set_buffers_to_send(stream_data, bytes_to_send); + } + else { + // This means that there is more that 16k worth of payload in one body data. + // We want to send only 16k data per read_data_callback + bytes_to_send = QD_HTTP2_BUFFER_SIZE; + set_buffers_to_send(stream_data, bytes_to_send); + qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback remaining_payload_length <= QD_HTTP2_BUFFER_SIZE ELSE bytes_to_send=%zu, stream_data->qd_buffers_to_send=%zu", conn->conn_id, stream_data->stream_id, bytes_to_send, stream_data->qd_buffers_to_send); + } + stream_data->full_payload_handled = false; } } @@ -1252,12 +1376,19 @@ ssize_t read_data_callback(nghttp2_session *session, } case QD_MESSAGE_STREAM_DATA_FOOTER_OK: - qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback QD_MESSAGE_STREAM_DATA_FOOTER_OK", conn->conn_id, stream_data->stream_id); + qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback QD_MESSAGE_STREAM_DATA_FOOTER_OK", conn->conn_id, stream_data->stream_id); stream_data->out_msg_has_footer = true; stream_data->next_stream_data_result = qd_message_next_stream_data(message, &stream_data->next_stream_data); if (stream_data->next_stream_data) { qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback QD_MESSAGE_STREAM_DATA_FOOTER_OK, we have a next_stream_data", conn->conn_id, stream_data->stream_id); } + if (stream_data->next_stream_data_result == QD_MESSAGE_STREAM_DATA_INVALID) { + stream_data->out_msg_has_footer = false; + if (stream_data->next_stream_data) { + qd_message_stream_data_release(stream_data->next_stream_data); + stream_data->next_stream_data = 0; + } + } break; case QD_MESSAGE_STREAM_DATA_INCOMPLETE: @@ -1271,7 +1402,6 @@ ssize_t read_data_callback(nghttp2_session *session, case QD_MESSAGE_STREAM_DATA_NO_MORE: { // // We have already handled the last body-data segment for this delivery. - // Complete the "sending" of this delivery and replenish credit. // size_t pn_buffs_write_capacity = pn_raw_connection_write_buffers_capacity(conn->pn_raw_conn); if (pn_buffs_write_capacity == 0) { @@ -1310,9 +1440,10 @@ ssize_t read_data_callback(nghttp2_session *session, // *data_flags |= NGHTTP2_DATA_FLAG_EOF; stream_data->out_msg_data_flag_eof = true; - if (stream_data->curr_stream_data) + if (stream_data->curr_stream_data) { qd_message_stream_data_release(stream_data->curr_stream_data); - stream_data->curr_stream_data = 0; + stream_data->curr_stream_data = 0; + } stream_data->out_dlv_local_disposition = PN_REJECTED; qd_log(http2_adaptor->protocol_log_source, QD_LOG_ERROR, "[C%"PRIu64"][S%"PRId32"] read_data_callback QD_MESSAGE_STREAM_DATA_INVALID", conn->conn_id, stream_data->stream_id); break; @@ -1349,7 +1480,7 @@ qdr_http2_connection_t *qdr_http_connection_ingress(qd_http_listener_t* listener ingress_http_conn->pn_raw_conn = pn_raw_connection(); sys_atomic_init(&ingress_http_conn->raw_closed_read, 0); sys_atomic_init(&ingress_http_conn->raw_closed_write, 0); - + sys_atomic_init(&ingress_http_conn->q2_restart, 0); ingress_http_conn->session_data = new_qdr_http2_session_data_t(); ZERO(ingress_http_conn->session_data); DEQ_INIT(ingress_http_conn->session_data->buffs); @@ -1659,6 +1790,10 @@ uint64_t handle_outgoing_http(qdr_http2_stream_data_t *stream_data) { //stream_data->processing = true; qdr_http2_session_data_t *session_data = stream_data->session_data; + + if (!stream_data->session_data) + return 0; + qdr_http2_connection_t *conn = session_data->conn; if (IS_ATOMIC_FLAG_SET(&conn->raw_closed_write)) @@ -1666,6 +1801,7 @@ uint64_t handle_outgoing_http(qdr_http2_stream_data_t *stream_data) qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"] Starting to handle_outgoing_http", conn->conn_id); if (stream_data->out_dlv) { + qd_message_t *message = qdr_delivery_message(stream_data->out_dlv); if (stream_data->out_msg_send_complete) { @@ -1674,6 +1810,7 @@ uint64_t handle_outgoing_http(qdr_http2_stream_data_t *stream_data) } if (!stream_data->out_msg_header_sent) { + qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"] Header not sent yet", conn->conn_id); qd_iterator_t *group_id_itr = qd_message_field_iterator(message, QD_FIELD_GROUP_ID); @@ -1721,8 +1858,10 @@ uint64_t handle_outgoing_http(qdr_http2_stream_data_t *stream_data) stream_data->curr_stream_data_result = qd_message_next_stream_data(message, &stream_data->curr_stream_data); if (stream_data->curr_stream_data_result == QD_MESSAGE_STREAM_DATA_BODY_OK) { size_t payload_length = qd_message_stream_data_payload_length(stream_data->curr_stream_data); + if (payload_length == 0) { - stream_data->next_stream_data_result = qd_message_next_stream_data(message, &stream_data->next_stream_data); + stream_data->next_stream_data_result = qd_message_next_stream_data(message, &stream_data->next_stream_data); + if (stream_data->next_stream_data_result == QD_MESSAGE_STREAM_DATA_NO_MORE) { if (stream_data->next_stream_data) { qd_message_stream_data_release(stream_data->next_stream_data); @@ -1730,6 +1869,7 @@ uint64_t handle_outgoing_http(qdr_http2_stream_data_t *stream_data) } qd_message_stream_data_release(stream_data->curr_stream_data); + stream_data->curr_stream_data = 0; flags = NGHTTP2_FLAG_END_STREAM; stream_data->out_msg_has_body = false; @@ -1767,6 +1907,7 @@ uint64_t handle_outgoing_http(qdr_http2_stream_data_t *stream_data) free(hdrs[idx].name); free(hdrs[idx].value); } + } else { qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] Headers already submitted, Proceeding with the body", conn->conn_id, stream_data->stream_id); @@ -1795,8 +1936,10 @@ uint64_t handle_outgoing_http(qdr_http2_stream_data_t *stream_data) qd_log(http2_adaptor->protocol_log_source, QD_LOG_ERROR, "[C%"PRIu64"][S%"PRId32"] Error submitting data rv=%i", conn->conn_id, stream_data->stream_id, rv); } else { - nghttp2_session_send(session_data->session); - qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] nghttp2_session_send - done", conn->conn_id, stream_data->stream_id); + if (session_data->session) { + nghttp2_session_send(session_data->session); + qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] nghttp2_session_send - done", conn->conn_id, stream_data->stream_id); + } } } } @@ -2067,7 +2210,7 @@ static int handle_incoming_http(qdr_http2_connection_t *conn) else { grant_read_buffers(conn); } - nghttp2_session_send(conn-> session_data->session); + nghttp2_session_send(conn->session_data->session); return count; } @@ -2322,6 +2465,8 @@ qdr_http2_connection_t *qdr_http_connection_egress(qd_http_connector_t *connecto egress_http_conn->session_data->conn = egress_http_conn; sys_atomic_init(&egress_http_conn->raw_closed_read, 0); sys_atomic_init(&egress_http_conn->raw_closed_write, 0); + sys_atomic_init(&egress_http_conn->q2_restart, 0); + sys_mutex_lock(http2_adaptor->lock); DEQ_INSERT_TAIL(http2_adaptor->connections, egress_http_conn); sys_mutex_unlock(http2_adaptor->lock); @@ -2395,6 +2540,9 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void break; } case PN_RAW_CONNECTION_CLOSED_READ: { + if (conn->q2_blocked) { + conn->q2_blocked = false; + } SET_ATOMIC_FLAG(&conn->raw_closed_read); if (conn->pn_raw_conn) pn_raw_connection_close(conn->pn_raw_conn); @@ -2419,10 +2567,6 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void } } conn->connection_established = false; - if (conn->goaway_received) { - nghttp2_session_del(conn->session_data->session); - conn->session_data->session = 0; - } handle_disconnected(conn); break; } @@ -2437,10 +2581,21 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void } case PN_RAW_CONNECTION_WAKE: { qd_log(log, QD_LOG_TRACE, "[C%"PRIu64"] PN_RAW_CONNECTION_WAKE Wake-up", conn->conn_id); + if (CLEAR_ATOMIC_FLAG(&conn->q2_restart)) { + conn->q2_blocked = false; + qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"] q2 is unblocked on this connection", conn->conn_id); + handle_incoming_http(conn); + } + while (qdr_connection_process(conn->qdr_conn)) {} break; } case PN_RAW_CONNECTION_READ: { + // We don't want to read when we are q2 blocked. + if (conn->q2_blocked) { + return; + } + int read = handle_incoming_http(conn); qd_log(log, QD_LOG_TRACE, "[C%"PRIu64"] PN_RAW_CONNECTION_READ Read %i bytes", conn->conn_id, read); break; @@ -2662,6 +2817,7 @@ static void qdr_http2_adaptor_init(qdr_core_t *core, void **adaptor_context) nghttp2_session_callbacks_set_send_data_callback(callbacks, snd_data_callback); nghttp2_session_callbacks_set_send_callback(callbacks, send_callback); nghttp2_session_callbacks_set_on_invalid_frame_recv_callback(callbacks, on_invalid_frame_recv_callback); + nghttp2_session_callbacks_set_error_callback2(callbacks, on_error_callback); adaptor->callbacks = callbacks; http2_adaptor = adaptor; diff --git a/src/adaptors/http2/http2_adaptor.h b/src/adaptors/http2/http2_adaptor.h index 7aafc2f..9373066 100644 --- a/src/adaptors/http2/http2_adaptor.h +++ b/src/adaptors/http2/http2_adaptor.h @@ -83,7 +83,7 @@ struct qdr_http2_stream_data_t { qd_message_t *message; qd_composed_field_t *app_properties; qd_composed_field_t *footer_properties; - qd_composed_field_t *body; + qd_buffer_list_t body_buffers; qd_message_stream_data_t *curr_stream_data; qd_message_stream_data_t *next_stream_data; qd_message_stream_data_t *footer_stream_data; @@ -92,7 +92,8 @@ struct qdr_http2_stream_data_t { qd_message_stream_data_result_t curr_stream_data_result; qd_message_stream_data_result_t next_stream_data_result; int curr_stream_data_qd_buff_offset; - int stream_data_buff_count; + int curr_stream_data_offset; // The offset within the qd_buffer so we can jump there. + int payload_handled; int in_link_credit; // provided by router int32_t stream_id; size_t qd_buffers_to_send; @@ -111,10 +112,9 @@ struct qdr_http2_stream_data_t { bool disp_applied; // Has the disp been applied to the out_dlv. The stream is ready to be freed now. bool header_and_props_composed; // true if the header and properties of the inbound message have already been composed so we don't have to do it again. bool stream_force_closed; - bool in_dlv_decrefed; bool out_dlv_decrefed; - bool body_data_added; + bool body_data_added_to_msg; int bytes_in; int bytes_out; qd_timestamp_t start; @@ -152,7 +152,8 @@ struct qdr_http2_connection_t { bool goaway_received; sys_atomic_t raw_closed_read; sys_atomic_t raw_closed_write; - + bool q2_blocked; // send a connection level WINDOW_UPDATE frame to tell the client to stop sending data. + sys_atomic_t q2_restart; // signal to resume receive DEQ_LINKS(qdr_http2_connection_t); }; diff --git a/src/message.c b/src/message.c index 2d8d579..e204730 100644 --- a/src/message.c +++ b/src/message.c @@ -42,6 +42,8 @@ #include <string.h> #include <time.h> + #define CHECK_Q2(blist) assert(DEQ_SIZE(blist) <= QD_QLIMIT_Q2_LOWER) + #define LOCK sys_mutex_lock #define UNLOCK sys_mutex_unlock @@ -2390,8 +2392,11 @@ void qd_message_compose_4(qd_message_t *msg, qd_composed_field_t *field1, qd_com qd_message_content_t *content = MSG_CONTENT(msg); SET_ATOMIC_BOOL(&content->receive_complete, receive_complete); qd_buffer_list_t *field1_buffers = qd_compose_buffers(field1); + CHECK_Q2(*field1_buffers); qd_buffer_list_t *field2_buffers = qd_compose_buffers(field2); + CHECK_Q2(*field2_buffers); qd_buffer_list_t *field3_buffers = qd_compose_buffers(field3); + CHECK_Q2(*field3_buffers); content->buffers = *field1_buffers; DEQ_INIT(*field1_buffers); @@ -2404,10 +2409,13 @@ void qd_message_compose_5(qd_message_t *msg, qd_composed_field_t *field1, qd_com qd_message_content_t *content = MSG_CONTENT(msg); SET_ATOMIC_BOOL(&content->receive_complete, receive_complete); qd_buffer_list_t *field1_buffers = qd_compose_buffers(field1); + CHECK_Q2(*field1_buffers); qd_buffer_list_t *field2_buffers = qd_compose_buffers(field2); + CHECK_Q2(*field2_buffers); qd_buffer_list_t *field3_buffers = qd_compose_buffers(field3); + CHECK_Q2(*field3_buffers); qd_buffer_list_t *field4_buffers = qd_compose_buffers(field4); - + CHECK_Q2(*field4_buffers); content->buffers = *field1_buffers; DEQ_INIT(*field1_buffers); DEQ_APPEND(content->buffers, (*field2_buffers)); @@ -2998,18 +3006,33 @@ bool qd_message_oversize(const qd_message_t *msg) } +int qd_message_stream_data_footer_append(qd_message_t *message, qd_buffer_list_t *footer_props) +{ + qd_composed_field_t *field = 0; + int rc = 0; + + field = qd_compose(QD_PERFORMATIVE_FOOTER, field); + + // Stick the buffers into the footer compose field. + qd_compose_insert_binary_buffers(field, footer_props); + + rc = qd_message_extend(message, field, 0); + + qd_compose_free(field); + return rc; + +} + int qd_message_stream_data_append(qd_message_t *message, qd_buffer_list_t *data, bool *q2_blocked) { unsigned int length = DEQ_SIZE(*data); + qd_composed_field_t *field = 0; int rc = 0; if (q2_blocked) *q2_blocked = false; - if (length == 0) - return rc; - // DISPATCH-1803: ensure no body data section can exceed the // QD_QLIMIT_Q2_LOWER. This allows the egress router to wait for an entire // body data section to arrive and be validated before sending it out to diff --git a/tests/http2_server.py b/tests/http2_server.py index c22c07b..0acb91d 100644 --- a/tests/http2_server.py +++ b/tests/http2_server.py @@ -108,5 +108,13 @@ async def get_jpg_images(): img_file = image_file("apache.jpg") return await send_file(img_file, mimetype='image/jpg') + +@app.route('/upload', methods=['POST']) +async def process_upload_data(): + for name, file in (await request.files).items(): + print(f'Processing {name}: {len(file.read())}') + return "Success!" + + #app.run(port=5000, certfile='cert.pem', keyfile='key.pem') app.run(port=os.getenv('SERVER_LISTEN_PORT')) diff --git a/tests/http2_slow_q2_server.py b/tests/http2_slow_q2_server.py new file mode 100644 index 0000000..6aa230d --- /dev/null +++ b/tests/http2_slow_q2_server.py @@ -0,0 +1,107 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import socket +import signal +import sys +import os +import h2.connection +import h2.events +import h2.config +import h2.errors + +BYTES = 16384 + + +def receive_signal(signalNumber, frame): + print('Received:', signalNumber) + sys.exit(0) + + +def send_response(event, conn): + """ + conn.close_connection() sends a goaway frame to the client + and closes the connection. + """ + stream_id = event.stream_id + conn.send_headers(stream_id=stream_id, + headers=[(u':status', u'200'), (u'server', u'h2_slow_q2_server/0.1.0')]) + conn.send_data(stream_id=stream_id, + data=b'Success!', + end_stream=True) + + +def handle_events(conn, events): + for event in events: + if isinstance(event, h2.events.DataReceived): + # When the server receives a DATA frame from the router, we send back a WINDOW_UPDATE frame + # with a window size increment of only 1k (1024 bytes) + # This pushes the router into q2 since it is able to only send two qd_buffers at a time. + conn.increment_flow_control_window(1024, None) + conn.increment_flow_control_window(1024, event.stream_id) + elif isinstance(event, h2.events.StreamEnded): + send_response(event, conn) + + +def handle(sock): + config = h2.config.H2Configuration(client_side=False) + + # The default initial window per HTTP2 spec is 64K. + # That means that the router is allowed to send only 64k before it needs more WINDOW_UPDATE frames + # providing more credit for the router to send more data. + conn = h2.connection.H2Connection(config=config) + conn.initiate_connection() + sock.sendall(conn.data_to_send()) + + while True: + data = None + try: + data = sock.recv(BYTES) + except: + pass + if not data: + break + try: + events = conn.receive_data(data) + except Exception as e: + print(e) + break + handle_events(conn, events) + data_to_send = conn.data_to_send() + if data_to_send: + sock.sendall(data_to_send) + + +signal.signal(signal.SIGHUP, receive_signal) +signal.signal(signal.SIGINT, receive_signal) +signal.signal(signal.SIGQUIT, receive_signal) +signal.signal(signal.SIGILL, receive_signal) +signal.signal(signal.SIGTERM, receive_signal) + +sock = socket.socket() +sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) +sock.bind(('0.0.0.0', int(os.getenv('SERVER_LISTEN_PORT')))) +sock.listen(5) + +while True: + # The accept method blocks until someone attempts to connect to our TCP + # port: when they do, it returns a tuple: the first element is a new + # socket object, the second element is a tuple of the address the new + # connection is from + handle(sock.accept()[0]) diff --git a/tests/images/test.jpg b/tests/images/test.jpg new file mode 100644 index 0000000..9ef14a3 Binary files /dev/null and b/tests/images/test.jpg differ diff --git a/tests/system_tests_http2.py b/tests/system_tests_http2.py index cca7a8e..7d09f6f 100644 --- a/tests/system_tests_http2.py +++ b/tests/system_tests_http2.py @@ -155,6 +155,15 @@ class CommonHttp2Tests: self.assertIn('Success! Your first name is John, last name is Doe', out) @unittest.skipIf(skip_test(), "Python 3.7 or greater, Quart 0.13.0 or greater and curl needed to run http2 tests") + def test_post_upload_large_image_jpg(self): + # curl -X POST -H "Content-Type: multipart/form-data" -F "data=@/home/gmurthy/opensource/test.jpg" + # http://127.0.0.1:9000/upload --http2-prior-knowledge + address = self.router_qdra.http_addresses[0] + "/upload" + out = self.run_curl(address, args=['-X', 'POST', '-H', 'Content-Type: multipart/form-data', + '-F', 'data=@' + image_file('test.jpg')]) + self.assertIn('Success', out) + + @unittest.skipIf(skip_test(), "Python 3.7 or greater, Quart 0.13.0 or greater and curl needed to run http2 tests") def test_delete_request(self): # curl -X DELETE "http://127.0.0.1:9000/myinfo/delete/22122" -H "accept: application/json" --http2-prior-knowledge address = self.router_qdra.http_addresses[0] + "/myinfo/delete/22122" @@ -177,14 +186,14 @@ class CommonHttp2Tests: @unittest.skipIf(skip_test(), "Python 3.7 or greater, Quart 0.13.0 or greater and curl needed to run http2 tests") def test_404(self): - # Run curl 127.0.0.1:port/unavilable --http2-prior-knowledge - address = self.router_qdra.http_addresses[0] + "/unavilable" - out = self.run_curl(address) + # Run curl 127.0.0.1:port/unavailable --http2-prior-knowledge + address = self.router_qdra.http_addresses[0] + "/unavailable" + out = self.run_curl(address=address) self.assertIn('404 Not Found', out) @unittest.skipIf(skip_test(), "Python 3.7 or greater, Quart 0.13.0 or greater and curl needed to run http2 tests") def test_500(self): - # Run curl 127.0.0.1:port/unavilable --http2-prior-knowledge + # Run curl 127.0.0.1:port/test/500 --http2-prior-knowledge address = self.router_qdra.http_addresses[0] + "/test/500" out = self.run_curl(address) self.assertIn('500 Internal Server Error', out) @@ -742,3 +751,126 @@ class Http2TestGoAway(Http2TestBase): address = self.router_qdra.http_addresses[0] + "/goaway_test_1" out = self.run_curl(address, args=["-i"]) self.assertIn("HTTP/2 503", out) + + +class Http2Q2OneRouterTest(Http2TestBase): + @classmethod + def setUpClass(cls): + super(Http2Q2OneRouterTest, cls).setUpClass() + if skip_h2_test(): + return + cls.http2_server_name = "http2_slow_q2_server" + os.environ['SERVER_LISTEN_PORT'] = str(cls.tester.get_port()) + cls.http2_server = cls.tester.http2server(name=cls.http2_server_name, + listen_port=int(os.getenv('SERVER_LISTEN_PORT')), + py_string='python3', + server_file="http2_slow_q2_server.py") + name = "http2-test-router" + cls.connector_name = 'connectorToServer' + cls.connector_props = { + 'port': os.getenv('SERVER_LISTEN_PORT'), + 'address': 'examples', + 'host': '127.0.0.1', + 'protocolVersion': 'HTTP2', + 'name': cls.connector_name + } + config = Qdrouterd.Config([ + ('router', {'mode': 'standalone', 'id': 'QDR'}), + ('listener', {'port': cls.tester.get_port(), 'role': 'normal', 'host': '0.0.0.0'}), + + ('httpListener', {'port': cls.tester.get_port(), 'address': 'examples', + 'host': '127.0.0.1', 'protocolVersion': 'HTTP2'}), + ('httpConnector', cls.connector_props) + ]) + cls.router_qdra = cls.tester.qdrouterd(name, config, wait=True) + + @unittest.skipIf(skip_h2_test(), + "Python 3.7 or greater, hyper-h2 and curl needed to run hyperhttp2 tests") + def test_q2_block_unblock(self): + # curl -X POST -H "Content-Type: multipart/form-data" -F "data=@/home/gmurthy/opensource/test.jpg" + # http://127.0.0.1:9000/upload --http2-prior-knowledge + address = self.router_qdra.http_addresses[0] + "/upload" + out = self.run_curl(address, args=['-X', 'POST', '-H', 'Content-Type: multipart/form-data', + '-F', 'data=@' + image_file('test.jpg')]) + self.assertIn('Success', out) + num_blocked = 0 + num_unblocked = 0 + blocked = "q2 is blocked" + unblocked = "q2 is unblocked" + with open(self.router_qdra.logfile_path, 'r') as router_log: + log_lines = router_log.read().split("\n") + for log_line in log_lines: + if unblocked in log_line: + num_unblocked += 1 + elif blocked in log_line: + num_blocked += 1 + + self.assertGreater(num_blocked, 0) + self.assertGreater(num_unblocked, 0) + + +class Http2Q2TwoRouterTest(Http2TestBase): + @classmethod + def setUpClass(cls): + super(Http2Q2TwoRouterTest, cls).setUpClass() + if skip_h2_test(): + return + cls.http2_server_name = "http2_server" + os.environ['SERVER_LISTEN_PORT'] = str(cls.tester.get_port()) + cls.http2_server = cls.tester.http2server(name=cls.http2_server_name, + listen_port=int(os.getenv('SERVER_LISTEN_PORT')), + py_string='python3', + server_file="http2_slow_q2_server.py") + qdr_a = "QDR.A" + inter_router_port = cls.tester.get_port() + config_qdra = Qdrouterd.Config([ + ('router', {'mode': 'interior', 'id': 'QDR.A'}), + ('listener', {'port': cls.tester.get_port(), 'role': 'normal', 'host': '0.0.0.0'}), + ('httpListener', {'port': cls.tester.get_port(), 'address': 'examples', + 'host': '127.0.0.1', 'protocolVersion': 'HTTP2'}), + ('connector', {'name': 'connectorToB', 'role': 'inter-router', + 'port': inter_router_port, + 'verifyHostname': 'no'}) + ]) + + qdr_b = "QDR.B" + cls.connector_name = 'serverConnector' + cls.http_connector_props = { + 'port': os.getenv('SERVER_LISTEN_PORT'), + 'address': 'examples', + 'host': '127.0.0.1', + 'protocolVersion': 'HTTP2', + 'name': cls.connector_name + } + config_qdrb = Qdrouterd.Config([ + ('router', {'mode': 'interior', 'id': 'QDR.B'}), + ('httpConnector', cls.http_connector_props), + ('listener', {'role': 'inter-router', 'maxSessionFrames': '10', 'port': inter_router_port}) + ]) + cls.router_qdrb = cls.tester.qdrouterd(qdr_b, config_qdrb, wait=True) + cls.router_qdra = cls.tester.qdrouterd(qdr_a, config_qdra, wait=True) + cls.router_qdra.wait_router_connected('QDR.B') + + @unittest.skipIf(skip_h2_test(), + "Python 3.7 or greater, hyper-h2 and curl needed to run hyperhttp2 tests") + def test_q2_block_unblock(self): + # curl -X POST -H "Content-Type: multipart/form-data" -F "data=@/home/gmurthy/opensource/test.jpg" + # http://127.0.0.1:9000/upload --http2-prior-knowledge + address = self.router_qdra.http_addresses[0] + "/upload" + out = self.run_curl(address, args=['-X', 'POST', '-H', 'Content-Type: multipart/form-data', + '-F', 'data=@' + image_file('test.jpg')]) + self.assertIn('Success', out) + num_blocked = 0 + num_unblocked = 0 + blocked = "q2 is blocked" + unblocked = "q2 is unblocked" + with open(self.router_qdra.logfile_path, 'r') as router_log: + log_lines = router_log.read().split("\n") + for log_line in log_lines: + if unblocked in log_line: + num_unblocked += 1 + elif blocked in log_line: + num_blocked += 1 + + self.assertGreater(num_blocked, 0) + self.assertGreater(num_unblocked, 0) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org