This is an automated email from the ASF dual-hosted git repository.

gmurthy pushed a commit to branch dev-protocol-adaptors-2
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/dev-protocol-adaptors-2 by 
this push:
     new 042a647  DISPATCH-1852: Added code to accumulate DATA frames in the 
message body in case credit does not arrive and the header has not been routed. 
This closes #926
042a647 is described below

commit 042a647e3e6df0a7871c9f3d194e64dd95bc07a1
Author: Ganesh Murthy <gmur...@apache.org>
AuthorDate: Fri Nov 20 10:02:41 2020 -0500

    DISPATCH-1852: Added code to accumulate DATA frames in the message body in 
case credit does not arrive and the header has not been routed. This closes #926
---
 src/adaptors/http2/http2_adaptor.c | 112 +++++++++++++++++++++----------------
 src/adaptors/http2/http2_adaptor.h |   1 +
 2 files changed, 64 insertions(+), 49 deletions(-)

diff --git a/src/adaptors/http2/http2_adaptor.c 
b/src/adaptors/http2/http2_adaptor.c
index a2ee4fe..85760de 100644
--- a/src/adaptors/http2/http2_adaptor.c
+++ b/src/adaptors/http2/http2_adaptor.c
@@ -414,14 +414,23 @@ static int on_data_chunk_recv_callback(nghttp2_session 
*session,
         return 0;
 
     stream_data->bytes_in += len;
-
     qd_buffer_list_t buffers;
     DEQ_INIT(buffers);
     qd_buffer_list_append(&buffers, (uint8_t *)data, len);
 
-    if (stream_data->in_dlv) {
-        if (!stream_data->in_dlv_released)
+    //
+    // DISPATCH-: If an in_dlv is present it means that the qdr_link_deliver() 
has already been called (delivery has already been routed)
+    // in which case qd_message_stream_data_append can be called to append 
buffers to the message body
+    // If stream_data->in_dlv = 0 but stream_data->header_and_props_composed 
is true, it means that the message has not been routed yet
+    // but the message already has headers and properties
+    // in which case the qd_message_stream_data_append() can be called to add 
body data to the message.
+    // In many cases when the response message is streamed by a server, the 
entire message body can arrive before we get credit to route it.
+    // We want to be able to keep collecting the incoming DATA in the message 
object so we can ultimately route it when the credit does ultimately arrive.
+    //
+    if (stream_data->in_dlv || stream_data->header_and_props_composed) {
+        if (!stream_data->in_dlv_released) {
             qd_message_stream_data_append(stream_data->message, &buffers);
+        }
     }
     else {
         if (!stream_data->body) {
@@ -655,29 +664,61 @@ static int on_header_callback(nghttp2_session *session,
 }
 
 
-static void compose_and_deliver(qdr_http2_stream_data_t *stream_data, 
qd_composed_field_t  *header_and_props, qdr_http2_connection_t *conn, bool 
receive_complete)
+static bool compose_and_deliver(qdr_http2_connection_t *conn, 
qdr_http2_stream_data_t *stream_data, bool receive_complete)
 {
-    if (receive_complete) {
-        if (!stream_data->body) {
-            stream_data->body = qd_compose(QD_PERFORMATIVE_BODY_DATA, 0);
-            qd_compose_insert_binary(stream_data->body, 0, 0);
-            qd_log(http2_adaptor->log_source, QD_LOG_TRACE, 
"[C%"PRIu64"][S%"PRId32"] Inserting empty body data in compose_and_deliver", 
conn->conn_id, stream_data->stream_id);
+    if (!stream_data->header_and_props_composed) {
+        qd_composed_field_t  *header_and_props = 0;
+        if (conn->ingress) {
+            header_and_props = qd_message_compose_amqp(stream_data->message,
+                                                  conn->config->address,  // 
const char *to
+                                                  0,                      // 
const char *subject
+                                                  stream_data->reply_to,  // 
const char *reply_to
+                                                  0,                      // 
const char *content_type
+                                                  0,                      // 
const char *content_encoding
+                                                  0,                      // 
int32_t  correlation_id
+                                                  conn->config->site);
         }
-    }
-    if (stream_data->body) {
-        qd_message_compose_4(stream_data->message, header_and_props, 
stream_data->app_properties, stream_data->body, receive_complete);
-    }
-    else {
-        qd_message_compose_3(stream_data->message, header_and_props, 
stream_data->app_properties, receive_complete);
+        else {
+            header_and_props = qd_message_compose_amqp(stream_data->message,
+                                                  stream_data->reply_to,  // 
const char *to
+                                                  0,                      // 
const char *subject
+                                                  0,                      // 
const char *reply_to
+                                                  0,                      // 
const char *content_type
+                                                  0,                      // 
const char *content_encoding
+                                                  0,                      // 
int32_t  correlation_id
+                                                  conn->config->site);
+        }
+
+        if (receive_complete) {
+            if (!stream_data->body) {
+                stream_data->body = qd_compose(QD_PERFORMATIVE_BODY_DATA, 0);
+                qd_compose_insert_binary(stream_data->body, 0, 0);
+                qd_log(http2_adaptor->log_source, QD_LOG_TRACE, 
"[C%"PRIu64"][S%"PRId32"] Inserting empty body data in compose_and_deliver", 
conn->conn_id, stream_data->stream_id);
+            }
+        }
+        if (stream_data->body) {
+            qd_message_compose_4(stream_data->message, header_and_props, 
stream_data->app_properties, stream_data->body, receive_complete);
+        }
+        else {
+            qd_message_compose_3(stream_data->message, header_and_props, 
stream_data->app_properties, receive_complete);
+        }
+
+        // The header and properties have been added. Now we can start adding 
BODY DATA to this message.
+        stream_data->header_and_props_composed = true;
+        qd_compose_free(header_and_props);
     }
     qd_log(http2_adaptor->log_source, QD_LOG_TRACE, 
"[C%"PRIu64"][S%"PRId32"][L%"PRIu64"] Initiating qdr_link_deliver in 
compose_and_deliver", conn->conn_id, stream_data->stream_id, 
stream_data->in_link->identity);
 
     if (!stream_data->in_dlv && stream_data->in_link_credit > 0) {
         stream_data->in_dlv = qdr_link_deliver(stream_data->in_link, 
stream_data->message, 0, false, 0, 0, 0, 0);
-        qd_log(http2_adaptor->log_source, QD_LOG_TRACE, 
"[C%"PRIu64"][S%"PRId32"][L%"PRIu64"] Routed delivery in compose_and_deliver 
dlv:%lx", stream_data->session_data->conn->conn_id, stream_data->stream_id, 
stream_data->in_link->identity, (long) stream_data->in_dlv);
+        qd_log(http2_adaptor->log_source, QD_LOG_TRACE, 
"[C%"PRIu64"][S%"PRId32"][L%"PRIu64"] Routed delivery in compose_and_deliver 
dlv:%lx", conn->conn_id, stream_data->stream_id, 
stream_data->in_link->identity, (long) stream_data->in_dlv);
         qdr_delivery_set_context(stream_data->in_dlv, stream_data);
         qdr_delivery_decref(http2_adaptor->core, stream_data->in_dlv, 
"http2_adaptor - compose_and_deliver - release protection of return from 
deliver");
         stream_data->in_link_credit -= 1;
+        return true;
+    }
+    else {
+        return false;
     }
 }
 
@@ -689,44 +730,17 @@ static bool route_delivery(qdr_http2_stream_data_t 
*stream_data, bool receive_co
         return false;
     }
 
-    qd_composed_field_t  *header_and_props = 0;
-
-    if (stream_data->in_link_credit == 0) {
-        qd_log(http2_adaptor->log_source, QD_LOG_TRACE, 
"[C%"PRIu64"][S%"PRId32"] No credit on in_link, not routing delivery", 
conn->conn_id, stream_data->stream_id);
-        return false;
-    }
-
     bool delivery_routed = false;
 
     if (conn->ingress) {
         if (stream_data->reply_to && stream_data->entire_header_arrived && 
!stream_data->in_dlv) {
-            header_and_props = qd_message_compose_amqp(stream_data->message,
-                                                  conn->config->address,  // 
const char *to
-                                                  0,                      // 
const char *subject
-                                                  stream_data->reply_to,  // 
const char *reply_to
-                                                  0,                      // 
const char *content_type
-                                                  0,                      // 
const char *content_encoding
-                                                  0,                      // 
int32_t  correlation_id
-                                                  conn->config->site);
-            compose_and_deliver(stream_data, header_and_props, conn, 
receive_complete);
-            qd_compose_free(header_and_props);
-            delivery_routed = true;
+            delivery_routed = compose_and_deliver(conn, stream_data, 
receive_complete);
         }
     }
     else {
-        if (stream_data->entire_header_arrived) {
-            header_and_props = qd_message_compose_amqp(stream_data->message,
-                                                  stream_data->reply_to,  // 
const char *to
-                                                  0,                      // 
const char *subject
-                                                  0,                      // 
const char *reply_to
-                                                  0,                      // 
const char *content_type
-                                                  0,                      // 
const char *content_encoding
-                                                  0,                      // 
int32_t  correlation_id
-                                                  conn->config->site);
+        if (stream_data->entire_header_arrived && !stream_data->in_dlv) {
             qd_log(http2_adaptor->log_source, QD_LOG_TRACE, 
"[C%"PRIu64"][S%"PRId32"] Calling compose_and_deliver, routing delivery", 
conn->conn_id, stream_data->stream_id);
-            compose_and_deliver(stream_data, header_and_props, conn, 
receive_complete);
-            qd_compose_free(header_and_props);
-            delivery_routed = true;
+            delivery_routed = compose_and_deliver(conn, stream_data, 
receive_complete);
         }
     }
 
@@ -841,9 +855,9 @@ static int on_frame_recv_callback(nghttp2_session *session,
         qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, 
"[C%"PRIu64"][S%"PRId32"] NGHTTP2_DATA frame received", conn->conn_id, 
stream_id);
 
         if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM) {
-            qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, 
"[C%"PRIu64"][S%"PRId32"] NGHTTP2_DATA NGHTTP2_FLAG_END_STREAM flag received, 
receive_complete = true", conn->conn_id, stream_id);
             if (!stream_data->in_dlv_released) {
                 qd_message_set_receive_complete(stream_data->message);
+                qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, 
"[C%"PRIu64"][S%"PRId32"] NGHTTP2_DATA NGHTTP2_FLAG_END_STREAM flag received, 
setting receive_complete = true", conn->conn_id, stream_id);
             }
             advance_stream_status(stream_data);
 
@@ -1273,7 +1287,7 @@ static void qdr_http_flow(void *context, qdr_link_t 
*link, int credit)
         stream_data->in_link_credit += credit;
         if (!stream_data->in_dlv) {
             if (route_delivery(stream_data, 
qd_message_receive_complete(stream_data->message))) {
-                qd_log(http2_adaptor->log_source, QD_LOG_TRACE, "[C%"PRIu64"] 
qdr_http_flow, delivery routed successfully", 
stream_data->session_data->conn->conn_id);
+                qd_log(http2_adaptor->log_source, QD_LOG_TRACE, 
"[C%"PRIu64"][S%"PRId32"] qdr_http_flow, delivery routed successfully", 
stream_data->session_data->conn->conn_id, stream_data->stream_id);
             }
         }
     }
@@ -1448,7 +1462,7 @@ static void qdr_http_activate(void *notused, 
qdr_connection_t *c)
     qdr_http2_connection_t* conn = (qdr_http2_connection_t*) 
qdr_connection_get_context(c);
     if (conn) {
         if (conn->pn_raw_conn) {
-            qd_log(http2_adaptor->log_source, QD_LOG_INFO, "[C%"PRIu64"] 
Activation triggered, calling pn_raw_connection_wake()", conn->conn_id);
+            qd_log(http2_adaptor->log_source, QD_LOG_TRACE, "[C%"PRIu64"] 
Activation triggered, calling pn_raw_connection_wake()", conn->conn_id);
             pn_raw_connection_wake(conn->pn_raw_conn);
         }
         else if (conn->activate_timer) {
diff --git a/src/adaptors/http2/http2_adaptor.h 
b/src/adaptors/http2/http2_adaptor.h
index b900bad..c5adfc6 100644
--- a/src/adaptors/http2/http2_adaptor.h
+++ b/src/adaptors/http2/http2_adaptor.h
@@ -105,6 +105,7 @@ struct qdr_http2_stream_data_t {
     bool                     disp_updated;   // Has the disposition already 
been set on the out_dlv
     bool                     disp_applied;   // Has the disp been applied to 
the out_dlv. The stream is ready to be freed now.
     bool                     in_dlv_released;
+    bool                     header_and_props_composed;  // true if the header 
and properties of the inbound message have already been composed so we don't 
have to do it again.
 
     //for stats:
     char                    *method;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to