This is an automated email from the ASF dual-hosted git repository.

gmurthy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/main by this push:
     new 85efbd7  DISPATCH-2218: Removed use of variable length arrays. Used 
the stream data iterator API. This closes #1328
85efbd7 is described below

commit 85efbd7ca4343a6f7348d7c10a160b78c64070fc
Author: Ganesh Murthy <gmur...@apache.org>
AuthorDate: Mon Jul 26 11:49:07 2021 -0400

    DISPATCH-2218: Removed use of variable length arrays. Used the stream data 
iterator API. This closes #1328
---
 src/adaptors/http2/http2_adaptor.c | 121 +++++++++++++------------------------
 src/adaptors/http2/http2_adaptor.h |   6 +-
 2 files changed, 44 insertions(+), 83 deletions(-)

diff --git a/src/adaptors/http2/http2_adaptor.c 
b/src/adaptors/http2/http2_adaptor.c
index d7f2024..648f510 100644
--- a/src/adaptors/http2/http2_adaptor.c
+++ b/src/adaptors/http2/http2_adaptor.c
@@ -66,6 +66,7 @@ typedef struct qdr_http2_adaptor_t {
 
 static qdr_http2_adaptor_t *http2_adaptor;
 const int32_t WINDOW_SIZE = 65536;
+const int32_t MAX_FRAME_SIZE = 16384;
 
 static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, 
void *context);
 static void _http_record_request(qdr_http2_connection_t *conn, 
qdr_http2_stream_data_t *stream_data);
@@ -395,7 +396,9 @@ static void free_http2_stream_data(qdr_http2_stream_data_t 
*stream_data, bool on
     //
     if (stream_data->in_dlv && !stream_data->in_dlv_decrefed) {
         qd_message_stream_data_release(stream_data->curr_stream_data);
+        qd_iterator_free(stream_data->curr_stream_data_iter);
         stream_data->curr_stream_data = 0;
+        stream_data->curr_stream_data_iter = 0;
 
         qd_message_stream_data_release(stream_data->next_stream_data);
         stream_data->next_stream_data = 0;
@@ -405,7 +408,9 @@ static void free_http2_stream_data(qdr_http2_stream_data_t 
*stream_data, bool on
 
     if (stream_data->out_dlv && !stream_data->out_dlv_decrefed) {
         qd_message_stream_data_release(stream_data->curr_stream_data);
+        qd_iterator_free(stream_data->curr_stream_data_iter);
         stream_data->curr_stream_data = 0;
+        stream_data->curr_stream_data_iter = 0;
 
         qd_message_stream_data_release(stream_data->next_stream_data);
         stream_data->next_stream_data = 0;
@@ -625,48 +630,16 @@ static int snd_data_callback(nghttp2_session *session,
         // Insert the framehd of length 9 bytes into the buffer
         memcpy(qd_http2_buffer_cursor(http2_buff), framehd, 
HTTP2_DATA_FRAME_HEADER_LENGTH);
         qd_http2_buffer_insert(http2_buff, HTTP2_DATA_FRAME_HEADER_LENGTH);
-        pn_raw_buffer_t pn_raw_buffs[stream_data->qd_buffers_to_send];
-        int written = 
qd_message_stream_data_buffers(stream_data->curr_stream_data, pn_raw_buffs, 
stream_data->curr_stream_data_qd_buff_offset, stream_data->qd_buffers_to_send);
-        (void)written;
-        assert (written == stream_data->qd_buffers_to_send);
-
-        int idx = 0;
-        size_t bytes_to_send = length;
-
-        while (idx < stream_data->qd_buffers_to_send) {
-            if (pn_raw_buffs[idx].size > 0) {
-               if (bytes_to_send < pn_raw_buffs[idx].size) {
-                       int bytes_remaining_in_buffer = pn_raw_buffs[idx].size 
- stream_data->curr_stream_data_offset;
-                       if (bytes_remaining_in_buffer < bytes_to_send) {
-                               memcpy(qd_http2_buffer_cursor(http2_buff), 
pn_raw_buffs[idx].bytes + stream_data->curr_stream_data_offset, 
bytes_remaining_in_buffer);
-                               qd_http2_buffer_insert(http2_buff, 
bytes_remaining_in_buffer);
-                               stream_data->curr_stream_data_offset = 0;
-                               bytes_to_send -= bytes_remaining_in_buffer;
-                               bytes_sent += bytes_remaining_in_buffer;
-                       }
-                       else {
-                                               
memcpy(qd_http2_buffer_cursor(http2_buff), pn_raw_buffs[idx].bytes + 
stream_data->curr_stream_data_offset, bytes_to_send);
-                                               
qd_http2_buffer_insert(http2_buff, bytes_to_send);
-                                               
qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, 
"[C%"PRIu64"][S%"PRId32"] snd_data_callback memcpy bytes_to_send=%zu", 
conn->conn_id, stream_data->stream_id, bytes_to_send);
-                                               
stream_data->curr_stream_data_offset += bytes_to_send;
-                                               bytes_sent += bytes_to_send;
-                                               if 
(stream_data->curr_stream_data_offset == BUFFER_SIZE || 
stream_data->curr_stream_data_offset == pn_raw_buffs[idx].size) {
-                                                       
stream_data->curr_stream_data_offset = 0;
-                                                       
stream_data->curr_stream_data_qd_buff_offset += 1;
-                                               }
-                       }
-               }
-               else {
-                    memcpy(qd_http2_buffer_cursor(http2_buff), 
pn_raw_buffs[idx].bytes, pn_raw_buffs[idx].size);
-                    qd_http2_buffer_insert(http2_buff, pn_raw_buffs[idx].size);
-                    qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, 
"[C%"PRIu64"][S%"PRId32"] snd_data_callback memcpy pn_raw_buffs[%i].size=%u", 
conn->conn_id, stream_data->stream_id, idx, pn_raw_buffs[idx].size);
-                    stream_data->curr_stream_data_qd_buff_offset += 1;
-                    bytes_to_send -= pn_raw_buffs[idx].size;
-                    bytes_sent += pn_raw_buffs[idx].size;
-               }
-            }
-            idx += 1;
-        }
+
+        uint32_t octets_remaining = 
qd_iterator_remaining(stream_data->curr_stream_data_iter);
+
+        size_t len = MIN(octets_remaining, length);
+        int copied = qd_iterator_ncopy(stream_data->curr_stream_data_iter, 
qd_http2_buffer_cursor(http2_buff), len);
+        assert(copied == len);
+        qd_http2_buffer_insert(http2_buff, len);
+        octets_remaining -= copied;
+        bytes_sent += copied;
+        qd_iterator_trim_view(stream_data->curr_stream_data_iter, 
octets_remaining);
     }
     else if (length == 0 && stream_data->out_msg_data_flag_eof) {
         write_buffs = true;
@@ -680,14 +653,14 @@ static int snd_data_callback(nghttp2_session *session,
     if (stream_data->full_payload_handled) {
         if (!stream_data->out_msg_has_footer && stream_data->curr_stream_data) 
{
             qd_message_stream_data_release(stream_data->curr_stream_data);
+            qd_iterator_free(stream_data->curr_stream_data_iter);
             stream_data->curr_stream_data = 0;
+            stream_data->curr_stream_data_iter = 0;
             qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, 
"[C%"PRIu64"][S%"PRId32"] snd_data_callback, full_payload_handled, no footer, 
qd_message_stream_data_release", conn->conn_id, stream_data->stream_id);
         }
         else {
             qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, 
"[C%"PRIu64"][S%"PRId32"] snd_data_callback, full_payload_handled, 
out_msg_has_footer", conn->conn_id, stream_data->stream_id);
         }
-        stream_data->curr_stream_data_offset = 0;
-        stream_data->curr_stream_data_qd_buff_offset = 0;
         stream_data->payload_handled = 0;
     }
     else {
@@ -996,8 +969,9 @@ static bool route_delivery(qdr_http2_stream_data_t 
*stream_data, bool receive_co
 static void create_settings_frame(qdr_http2_connection_t *conn)
 {
     qdr_http2_session_data_t *session_data = conn->session_data;
-    nghttp2_settings_entry iv[3] = {{NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 
100},
+    nghttp2_settings_entry iv[4] = {{NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 
100},
                                     {NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, 
WINDOW_SIZE},
+                                    {NGHTTP2_SETTINGS_MAX_FRAME_SIZE, 
MAX_FRAME_SIZE},
                                     {NGHTTP2_SETTINGS_ENABLE_PUSH, 0}};
 
     // You must call nghttp2_session_send after calling nghttp2_submit_settings
@@ -1203,24 +1177,6 @@ static int on_frame_recv_callback(nghttp2_session 
*session,
     return 0;
 }
 
-static void set_buffers_to_send(qdr_http2_stream_data_t *stream_data, size_t 
bytes_to_send)
-{
-       size_t diff = 
qd_message_stream_data_buffer_count(stream_data->curr_stream_data) - 
stream_data->curr_stream_data_qd_buff_offset;
-    pn_raw_buffer_t pn_raw_buffs[diff];
-    int written = 
qd_message_stream_data_buffers(stream_data->curr_stream_data, pn_raw_buffs, 
stream_data->curr_stream_data_qd_buff_offset, diff);
-    assert(diff == written);
-    int idx = 0;
-    int rolling_count = 0;
-    rolling_count -= stream_data->curr_stream_data_offset;
-    while (idx < written) {
-       rolling_count += pn_raw_buffs[idx].size;
-       idx+=1;
-       if (rolling_count >= bytes_to_send)
-               break;
-    }
-    stream_data->qd_buffers_to_send = idx;
-}
-
 ssize_t read_data_callback(nghttp2_session *session,
                       int32_t stream_id,
                       uint8_t *buf,
@@ -1248,6 +1204,8 @@ ssize_t read_data_callback(nghttp2_session *session,
 
         if (stream_data->next_stream_data) {
             stream_data->curr_stream_data = stream_data->next_stream_data;
+            qd_iterator_free(stream_data->curr_stream_data_iter);
+            stream_data->curr_stream_data_iter = 
qd_message_stream_data_iterator(stream_data->curr_stream_data);
             stream_data->curr_stream_data_result = 
stream_data->next_stream_data_result;
             stream_data->next_stream_data = 0;
             qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, 
"[C%"PRIu64"][S%"PRId32"] read_data_callback Use next_stream_data", 
conn->conn_id, stream_data->stream_id);
@@ -1255,6 +1213,10 @@ ssize_t read_data_callback(nghttp2_session *session,
 
         if (!stream_data->curr_stream_data) {
             stream_data->curr_stream_data_result = 
qd_message_next_stream_data(message, &stream_data->curr_stream_data);
+            if (stream_data->curr_stream_data) {
+                qd_iterator_free(stream_data->curr_stream_data_iter);
+                stream_data->curr_stream_data_iter = 
qd_message_stream_data_iterator(stream_data->curr_stream_data);
+            }
             qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, 
"[C%"PRIu64"][S%"PRId32"] read_data_callback No body data, get 
qd_message_next_stream_data", conn->conn_id, stream_data->stream_id);
         }
 
@@ -1291,6 +1253,8 @@ ssize_t read_data_callback(nghttp2_session *session,
                 if (stream_data->next_stream_data_result == 
QD_MESSAGE_STREAM_DATA_NO_MORE) {
                     if (!stream_data->out_msg_has_footer) {
                         
qd_message_stream_data_release(stream_data->curr_stream_data);
+                        qd_iterator_free(stream_data->curr_stream_data_iter);
+                        stream_data->curr_stream_data_iter = 0;
                         stream_data->curr_stream_data = 0;
                     }
 
@@ -1311,6 +1275,8 @@ ssize_t read_data_callback(nghttp2_session *session,
                 }
                 else {
                     
qd_message_stream_data_release(stream_data->curr_stream_data);
+                    qd_iterator_free(stream_data->curr_stream_data_iter);
+                    stream_data->curr_stream_data_iter = 0;
                     stream_data->curr_stream_data = 0;
                 }
 
@@ -1330,14 +1296,12 @@ ssize_t read_data_callback(nghttp2_session *session,
                 if (remaining_payload_length <= QD_HTTP2_BUFFER_SIZE) {
                        if (length < remaining_payload_length) {
                                bytes_to_send = length;
-                               set_buffers_to_send(stream_data, bytes_to_send);
                                stream_data->full_payload_handled = false;
                        }
                        else {
                                bytes_to_send = remaining_payload_length;
-                        set_buffers_to_send(stream_data, bytes_to_send);
                                stream_data->full_payload_handled = true;
-                               qd_log(http2_adaptor->protocol_log_source, 
QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback 
remaining_payload_length (%i) <= QD_HTTP2_BUFFER_SIZE(16384), 
bytes_to_send=%zu, stream_data->qd_buffers_to_send=%zu", conn->conn_id, 
stream_data->stream_id, remaining_payload_length, bytes_to_send, 
stream_data->qd_buffers_to_send);
+                               qd_log(http2_adaptor->protocol_log_source, 
QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback 
remaining_payload_length (%i) <= QD_HTTP2_BUFFER_SIZE(16384), 
bytes_to_send=%zu", conn->conn_id, stream_data->stream_id, 
remaining_payload_length, bytes_to_send);
 
                         // Look ahead one body data
                         stream_data->next_stream_data_result = 
qd_message_next_stream_data(message, &stream_data->next_stream_data);
@@ -1355,18 +1319,11 @@ ssize_t read_data_callback(nghttp2_session *session,
                        }
                 }
                 else {
-                       if (length < remaining_payload_length) {
-                               bytes_to_send = length;
-                               set_buffers_to_send(stream_data, bytes_to_send);
-                    }
-                       else {
-                                               // This means that there is 
more that 16k worth of payload in one body data.
-                                               // We want to send only 16k 
data per read_data_callback
-                                               bytes_to_send = 
QD_HTTP2_BUFFER_SIZE;
-                                               
set_buffers_to_send(stream_data, bytes_to_send);
-                                               
qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, 
"[C%"PRIu64"][S%"PRId32"] read_data_callback remaining_payload_length <= 
QD_HTTP2_BUFFER_SIZE ELSE bytes_to_send=%zu, 
stream_data->qd_buffers_to_send=%zu", conn->conn_id, stream_data->stream_id, 
bytes_to_send, stream_data->qd_buffers_to_send);
-                       }
-                       stream_data->full_payload_handled = false;
+                    // This means that there is more that 16k worth of payload 
in one body data.
+                    // We want to send only 16k data per read_data_callback
+                    bytes_to_send = QD_HTTP2_BUFFER_SIZE;
+                    qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, 
"[C%"PRIu64"][S%"PRId32"] read_data_callback remaining_payload_length <= 
QD_HTTP2_BUFFER_SIZE ELSE bytes_to_send=%zu", conn->conn_id, 
stream_data->stream_id, bytes_to_send);
+                    stream_data->full_payload_handled = false;
                 }
             }
 
@@ -1410,7 +1367,6 @@ ssize_t read_data_callback(nghttp2_session *session,
                 return NGHTTP2_ERR_DEFERRED;
             }
             else {
-                stream_data->qd_buffers_to_send = 0;
                 *data_flags |= NGHTTP2_DATA_FLAG_EOF;
                 stream_data->out_msg_data_flag_eof = true;
                 if (stream_data->out_msg_has_footer) {
@@ -1442,6 +1398,8 @@ ssize_t read_data_callback(nghttp2_session *session,
             stream_data->out_msg_data_flag_eof = true;
             if (stream_data->curr_stream_data) {
                 qd_message_stream_data_release(stream_data->curr_stream_data);
+                qd_iterator_free(stream_data->curr_stream_data_iter);
+                stream_data->curr_stream_data_iter = 0;
                stream_data->curr_stream_data = 0;
             }
             stream_data->out_dlv_local_disposition = PN_REJECTED;
@@ -1876,6 +1834,9 @@ uint64_t handle_outgoing_http(qdr_http2_stream_data_t 
*stream_data)
                         qd_log(http2_adaptor->protocol_log_source, 
QD_LOG_TRACE, "[C%"PRIu64"] Message has no body, sending 
NGHTTP2_FLAG_END_STREAM with nghttp2_submit_headers", conn->conn_id);
                     }
                 }
+                else {
+                    stream_data->curr_stream_data_iter = 
qd_message_stream_data_iterator(stream_data->curr_stream_data);
+                }
             }
 
             stream_data->stream_id = 
nghttp2_submit_headers(session_data->session,
@@ -2010,6 +1971,8 @@ uint64_t handle_outgoing_http(qdr_http2_stream_data_t 
*stream_data)
                 qd_parse_free(footer_properties_fld);
                 if (stream_data->curr_stream_data) {
                     
qd_message_stream_data_release(stream_data->curr_stream_data);
+                    qd_iterator_free(stream_data->curr_stream_data_iter);
+                    stream_data->curr_stream_data_iter = 0;
                     stream_data->curr_stream_data = 0;
                 }
                 if (stream_data->next_stream_data) {
diff --git a/src/adaptors/http2/http2_adaptor.h 
b/src/adaptors/http2/http2_adaptor.h
index 9373066..78894af 100644
--- a/src/adaptors/http2/http2_adaptor.h
+++ b/src/adaptors/http2/http2_adaptor.h
@@ -85,18 +85,16 @@ struct qdr_http2_stream_data_t {
     qd_composed_field_t      *footer_properties;
     qd_buffer_list_t          body_buffers;
     qd_message_stream_data_t *curr_stream_data;
+    qd_iterator_t            *curr_stream_data_iter; // points to the data 
contained in the stream_data/raw_buffers
     qd_message_stream_data_t *next_stream_data;
     qd_message_stream_data_t *footer_stream_data;
     DEQ_LINKS(qdr_http2_stream_data_t);
 
     qd_message_stream_data_result_t  curr_stream_data_result;
     qd_message_stream_data_result_t  next_stream_data_result;
-    int                            curr_stream_data_qd_buff_offset;
-    int                            curr_stream_data_offset; // The offset 
within the qd_buffer so we can jump there.
-    int                                                   payload_handled;
+    int                            payload_handled;
     int                            in_link_credit;   // provided by router
     int32_t                        stream_id;
-    size_t                         qd_buffers_to_send;
     qd_http2_stream_status_t       status;
     bool                     entire_footer_arrived;
     bool                     entire_header_arrived;

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to