This is an automated email from the ASF dual-hosted git repository. gmurthy pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/main by this push: new 85efbd7 DISPATCH-2218: Removed use of variable length arrays. Used the stream data iterator API. This closes #1328 85efbd7 is described below commit 85efbd7ca4343a6f7348d7c10a160b78c64070fc Author: Ganesh Murthy <gmur...@apache.org> AuthorDate: Mon Jul 26 11:49:07 2021 -0400 DISPATCH-2218: Removed use of variable length arrays. Used the stream data iterator API. This closes #1328 --- src/adaptors/http2/http2_adaptor.c | 121 +++++++++++++------------------------ src/adaptors/http2/http2_adaptor.h | 6 +- 2 files changed, 44 insertions(+), 83 deletions(-) diff --git a/src/adaptors/http2/http2_adaptor.c b/src/adaptors/http2/http2_adaptor.c index d7f2024..648f510 100644 --- a/src/adaptors/http2/http2_adaptor.c +++ b/src/adaptors/http2/http2_adaptor.c @@ -66,6 +66,7 @@ typedef struct qdr_http2_adaptor_t { static qdr_http2_adaptor_t *http2_adaptor; const int32_t WINDOW_SIZE = 65536; +const int32_t MAX_FRAME_SIZE = 16384; static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void *context); static void _http_record_request(qdr_http2_connection_t *conn, qdr_http2_stream_data_t *stream_data); @@ -395,7 +396,9 @@ static void free_http2_stream_data(qdr_http2_stream_data_t *stream_data, bool on // if (stream_data->in_dlv && !stream_data->in_dlv_decrefed) { qd_message_stream_data_release(stream_data->curr_stream_data); + qd_iterator_free(stream_data->curr_stream_data_iter); stream_data->curr_stream_data = 0; + stream_data->curr_stream_data_iter = 0; qd_message_stream_data_release(stream_data->next_stream_data); stream_data->next_stream_data = 0; @@ -405,7 +408,9 @@ static void free_http2_stream_data(qdr_http2_stream_data_t *stream_data, bool on if (stream_data->out_dlv && !stream_data->out_dlv_decrefed) { qd_message_stream_data_release(stream_data->curr_stream_data); + qd_iterator_free(stream_data->curr_stream_data_iter); stream_data->curr_stream_data = 0; + stream_data->curr_stream_data_iter = 0; qd_message_stream_data_release(stream_data->next_stream_data); stream_data->next_stream_data = 0; @@ -625,48 +630,16 @@ static int snd_data_callback(nghttp2_session *session, // Insert the framehd of length 9 bytes into the buffer memcpy(qd_http2_buffer_cursor(http2_buff), framehd, HTTP2_DATA_FRAME_HEADER_LENGTH); qd_http2_buffer_insert(http2_buff, HTTP2_DATA_FRAME_HEADER_LENGTH); - pn_raw_buffer_t pn_raw_buffs[stream_data->qd_buffers_to_send]; - int written = qd_message_stream_data_buffers(stream_data->curr_stream_data, pn_raw_buffs, stream_data->curr_stream_data_qd_buff_offset, stream_data->qd_buffers_to_send); - (void)written; - assert (written == stream_data->qd_buffers_to_send); - - int idx = 0; - size_t bytes_to_send = length; - - while (idx < stream_data->qd_buffers_to_send) { - if (pn_raw_buffs[idx].size > 0) { - if (bytes_to_send < pn_raw_buffs[idx].size) { - int bytes_remaining_in_buffer = pn_raw_buffs[idx].size - stream_data->curr_stream_data_offset; - if (bytes_remaining_in_buffer < bytes_to_send) { - memcpy(qd_http2_buffer_cursor(http2_buff), pn_raw_buffs[idx].bytes + stream_data->curr_stream_data_offset, bytes_remaining_in_buffer); - qd_http2_buffer_insert(http2_buff, bytes_remaining_in_buffer); - stream_data->curr_stream_data_offset = 0; - bytes_to_send -= bytes_remaining_in_buffer; - bytes_sent += bytes_remaining_in_buffer; - } - else { - memcpy(qd_http2_buffer_cursor(http2_buff), pn_raw_buffs[idx].bytes + stream_data->curr_stream_data_offset, bytes_to_send); - qd_http2_buffer_insert(http2_buff, bytes_to_send); - qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] snd_data_callback memcpy bytes_to_send=%zu", conn->conn_id, stream_data->stream_id, bytes_to_send); - stream_data->curr_stream_data_offset += bytes_to_send; - bytes_sent += bytes_to_send; - if (stream_data->curr_stream_data_offset == BUFFER_SIZE || stream_data->curr_stream_data_offset == pn_raw_buffs[idx].size) { - stream_data->curr_stream_data_offset = 0; - stream_data->curr_stream_data_qd_buff_offset += 1; - } - } - } - else { - memcpy(qd_http2_buffer_cursor(http2_buff), pn_raw_buffs[idx].bytes, pn_raw_buffs[idx].size); - qd_http2_buffer_insert(http2_buff, pn_raw_buffs[idx].size); - qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] snd_data_callback memcpy pn_raw_buffs[%i].size=%u", conn->conn_id, stream_data->stream_id, idx, pn_raw_buffs[idx].size); - stream_data->curr_stream_data_qd_buff_offset += 1; - bytes_to_send -= pn_raw_buffs[idx].size; - bytes_sent += pn_raw_buffs[idx].size; - } - } - idx += 1; - } + + uint32_t octets_remaining = qd_iterator_remaining(stream_data->curr_stream_data_iter); + + size_t len = MIN(octets_remaining, length); + int copied = qd_iterator_ncopy(stream_data->curr_stream_data_iter, qd_http2_buffer_cursor(http2_buff), len); + assert(copied == len); + qd_http2_buffer_insert(http2_buff, len); + octets_remaining -= copied; + bytes_sent += copied; + qd_iterator_trim_view(stream_data->curr_stream_data_iter, octets_remaining); } else if (length == 0 && stream_data->out_msg_data_flag_eof) { write_buffs = true; @@ -680,14 +653,14 @@ static int snd_data_callback(nghttp2_session *session, if (stream_data->full_payload_handled) { if (!stream_data->out_msg_has_footer && stream_data->curr_stream_data) { qd_message_stream_data_release(stream_data->curr_stream_data); + qd_iterator_free(stream_data->curr_stream_data_iter); stream_data->curr_stream_data = 0; + stream_data->curr_stream_data_iter = 0; qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] snd_data_callback, full_payload_handled, no footer, qd_message_stream_data_release", conn->conn_id, stream_data->stream_id); } else { qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] snd_data_callback, full_payload_handled, out_msg_has_footer", conn->conn_id, stream_data->stream_id); } - stream_data->curr_stream_data_offset = 0; - stream_data->curr_stream_data_qd_buff_offset = 0; stream_data->payload_handled = 0; } else { @@ -996,8 +969,9 @@ static bool route_delivery(qdr_http2_stream_data_t *stream_data, bool receive_co static void create_settings_frame(qdr_http2_connection_t *conn) { qdr_http2_session_data_t *session_data = conn->session_data; - nghttp2_settings_entry iv[3] = {{NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 100}, + nghttp2_settings_entry iv[4] = {{NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 100}, {NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, WINDOW_SIZE}, + {NGHTTP2_SETTINGS_MAX_FRAME_SIZE, MAX_FRAME_SIZE}, {NGHTTP2_SETTINGS_ENABLE_PUSH, 0}}; // You must call nghttp2_session_send after calling nghttp2_submit_settings @@ -1203,24 +1177,6 @@ static int on_frame_recv_callback(nghttp2_session *session, return 0; } -static void set_buffers_to_send(qdr_http2_stream_data_t *stream_data, size_t bytes_to_send) -{ - size_t diff = qd_message_stream_data_buffer_count(stream_data->curr_stream_data) - stream_data->curr_stream_data_qd_buff_offset; - pn_raw_buffer_t pn_raw_buffs[diff]; - int written = qd_message_stream_data_buffers(stream_data->curr_stream_data, pn_raw_buffs, stream_data->curr_stream_data_qd_buff_offset, diff); - assert(diff == written); - int idx = 0; - int rolling_count = 0; - rolling_count -= stream_data->curr_stream_data_offset; - while (idx < written) { - rolling_count += pn_raw_buffs[idx].size; - idx+=1; - if (rolling_count >= bytes_to_send) - break; - } - stream_data->qd_buffers_to_send = idx; -} - ssize_t read_data_callback(nghttp2_session *session, int32_t stream_id, uint8_t *buf, @@ -1248,6 +1204,8 @@ ssize_t read_data_callback(nghttp2_session *session, if (stream_data->next_stream_data) { stream_data->curr_stream_data = stream_data->next_stream_data; + qd_iterator_free(stream_data->curr_stream_data_iter); + stream_data->curr_stream_data_iter = qd_message_stream_data_iterator(stream_data->curr_stream_data); stream_data->curr_stream_data_result = stream_data->next_stream_data_result; stream_data->next_stream_data = 0; qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback Use next_stream_data", conn->conn_id, stream_data->stream_id); @@ -1255,6 +1213,10 @@ ssize_t read_data_callback(nghttp2_session *session, if (!stream_data->curr_stream_data) { stream_data->curr_stream_data_result = qd_message_next_stream_data(message, &stream_data->curr_stream_data); + if (stream_data->curr_stream_data) { + qd_iterator_free(stream_data->curr_stream_data_iter); + stream_data->curr_stream_data_iter = qd_message_stream_data_iterator(stream_data->curr_stream_data); + } qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback No body data, get qd_message_next_stream_data", conn->conn_id, stream_data->stream_id); } @@ -1291,6 +1253,8 @@ ssize_t read_data_callback(nghttp2_session *session, if (stream_data->next_stream_data_result == QD_MESSAGE_STREAM_DATA_NO_MORE) { if (!stream_data->out_msg_has_footer) { qd_message_stream_data_release(stream_data->curr_stream_data); + qd_iterator_free(stream_data->curr_stream_data_iter); + stream_data->curr_stream_data_iter = 0; stream_data->curr_stream_data = 0; } @@ -1311,6 +1275,8 @@ ssize_t read_data_callback(nghttp2_session *session, } else { qd_message_stream_data_release(stream_data->curr_stream_data); + qd_iterator_free(stream_data->curr_stream_data_iter); + stream_data->curr_stream_data_iter = 0; stream_data->curr_stream_data = 0; } @@ -1330,14 +1296,12 @@ ssize_t read_data_callback(nghttp2_session *session, if (remaining_payload_length <= QD_HTTP2_BUFFER_SIZE) { if (length < remaining_payload_length) { bytes_to_send = length; - set_buffers_to_send(stream_data, bytes_to_send); stream_data->full_payload_handled = false; } else { bytes_to_send = remaining_payload_length; - set_buffers_to_send(stream_data, bytes_to_send); stream_data->full_payload_handled = true; - qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback remaining_payload_length (%i) <= QD_HTTP2_BUFFER_SIZE(16384), bytes_to_send=%zu, stream_data->qd_buffers_to_send=%zu", conn->conn_id, stream_data->stream_id, remaining_payload_length, bytes_to_send, stream_data->qd_buffers_to_send); + qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback remaining_payload_length (%i) <= QD_HTTP2_BUFFER_SIZE(16384), bytes_to_send=%zu", conn->conn_id, stream_data->stream_id, remaining_payload_length, bytes_to_send); // Look ahead one body data stream_data->next_stream_data_result = qd_message_next_stream_data(message, &stream_data->next_stream_data); @@ -1355,18 +1319,11 @@ ssize_t read_data_callback(nghttp2_session *session, } } else { - if (length < remaining_payload_length) { - bytes_to_send = length; - set_buffers_to_send(stream_data, bytes_to_send); - } - else { - // This means that there is more that 16k worth of payload in one body data. - // We want to send only 16k data per read_data_callback - bytes_to_send = QD_HTTP2_BUFFER_SIZE; - set_buffers_to_send(stream_data, bytes_to_send); - qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback remaining_payload_length <= QD_HTTP2_BUFFER_SIZE ELSE bytes_to_send=%zu, stream_data->qd_buffers_to_send=%zu", conn->conn_id, stream_data->stream_id, bytes_to_send, stream_data->qd_buffers_to_send); - } - stream_data->full_payload_handled = false; + // This means that there is more that 16k worth of payload in one body data. + // We want to send only 16k data per read_data_callback + bytes_to_send = QD_HTTP2_BUFFER_SIZE; + qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback remaining_payload_length <= QD_HTTP2_BUFFER_SIZE ELSE bytes_to_send=%zu", conn->conn_id, stream_data->stream_id, bytes_to_send); + stream_data->full_payload_handled = false; } } @@ -1410,7 +1367,6 @@ ssize_t read_data_callback(nghttp2_session *session, return NGHTTP2_ERR_DEFERRED; } else { - stream_data->qd_buffers_to_send = 0; *data_flags |= NGHTTP2_DATA_FLAG_EOF; stream_data->out_msg_data_flag_eof = true; if (stream_data->out_msg_has_footer) { @@ -1442,6 +1398,8 @@ ssize_t read_data_callback(nghttp2_session *session, stream_data->out_msg_data_flag_eof = true; if (stream_data->curr_stream_data) { qd_message_stream_data_release(stream_data->curr_stream_data); + qd_iterator_free(stream_data->curr_stream_data_iter); + stream_data->curr_stream_data_iter = 0; stream_data->curr_stream_data = 0; } stream_data->out_dlv_local_disposition = PN_REJECTED; @@ -1876,6 +1834,9 @@ uint64_t handle_outgoing_http(qdr_http2_stream_data_t *stream_data) qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"] Message has no body, sending NGHTTP2_FLAG_END_STREAM with nghttp2_submit_headers", conn->conn_id); } } + else { + stream_data->curr_stream_data_iter = qd_message_stream_data_iterator(stream_data->curr_stream_data); + } } stream_data->stream_id = nghttp2_submit_headers(session_data->session, @@ -2010,6 +1971,8 @@ uint64_t handle_outgoing_http(qdr_http2_stream_data_t *stream_data) qd_parse_free(footer_properties_fld); if (stream_data->curr_stream_data) { qd_message_stream_data_release(stream_data->curr_stream_data); + qd_iterator_free(stream_data->curr_stream_data_iter); + stream_data->curr_stream_data_iter = 0; stream_data->curr_stream_data = 0; } if (stream_data->next_stream_data) { diff --git a/src/adaptors/http2/http2_adaptor.h b/src/adaptors/http2/http2_adaptor.h index 9373066..78894af 100644 --- a/src/adaptors/http2/http2_adaptor.h +++ b/src/adaptors/http2/http2_adaptor.h @@ -85,18 +85,16 @@ struct qdr_http2_stream_data_t { qd_composed_field_t *footer_properties; qd_buffer_list_t body_buffers; qd_message_stream_data_t *curr_stream_data; + qd_iterator_t *curr_stream_data_iter; // points to the data contained in the stream_data/raw_buffers qd_message_stream_data_t *next_stream_data; qd_message_stream_data_t *footer_stream_data; DEQ_LINKS(qdr_http2_stream_data_t); qd_message_stream_data_result_t curr_stream_data_result; qd_message_stream_data_result_t next_stream_data_result; - int curr_stream_data_qd_buff_offset; - int curr_stream_data_offset; // The offset within the qd_buffer so we can jump there. - int payload_handled; + int payload_handled; int in_link_credit; // provided by router int32_t stream_id; - size_t qd_buffers_to_send; qd_http2_stream_status_t status; bool entire_footer_arrived; bool entire_header_arrived; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org