This is an automated email from the ASF dual-hosted git repository.

gmurthy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/main by this push:
     new 1b96ceb  DISPATCH-1940, DISPATCH-2198: Added code to handle http2 and 
q2 flow control window updates. This closes #1301
1b96ceb is described below

commit 1b96ceb8818ae77949275f1d9616e07ca51ce5a0
Author: Ganesh Murthy <gmur...@apache.org>
AuthorDate: Mon Jun 7 15:49:37 2021 -0400

    DISPATCH-1940, DISPATCH-2198: Added code to handle http2 and q2 flow 
control window updates. This closes #1301
---
 include/qpid/dispatch/message.h    |  12 +
 src/adaptors/http2/http2_adaptor.c | 438 +++++++++++++++++++++++++------------
 src/adaptors/http2/http2_adaptor.h |  11 +-
 src/message.c                      |  31 ++-
 tests/http2_server.py              |   8 +
 tests/http2_slow_q2_server.py      | 107 +++++++++
 tests/images/test.jpg              | Bin 0 -> 9634665 bytes
 tests/system_tests_http2.py        | 140 +++++++++++-
 8 files changed, 593 insertions(+), 154 deletions(-)

diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h
index 6072e6d..4297b9c 100644
--- a/include/qpid/dispatch/message.h
+++ b/include/qpid/dispatch/message.h
@@ -416,6 +416,18 @@ qd_message_stream_data_result_t 
qd_message_next_stream_data(qd_message_t *msg, q
 
 
 /**
+ * qd_message_stream_data_footer_append
+ *
+ * Constructs a footer field by calling the qd_compose(QD_PERFORMATIVE_FOOTER, 
field);
+ * It then inserts the passed in buffer list to the composed field and 
proceeds to disable q2 before finally adding the footer
+ * field to the message.
+ *
+ * Use this function if you have the complete footer data available in the 
passed in buffer list
+ */
+int qd_message_stream_data_footer_append(qd_message_t *message, 
qd_buffer_list_t *footer_props);
+
+
+/**
  * qd_message_stream_data_append
  *
  * Append the buffers in data as a sequence of one or more BODY_DATA sections
diff --git a/src/adaptors/http2/http2_adaptor.c 
b/src/adaptors/http2/http2_adaptor.c
index 1f3aaf4..b5d5f61 100644
--- a/src/adaptors/http2/http2_adaptor.c
+++ b/src/adaptors/http2/http2_adaptor.c
@@ -65,6 +65,7 @@ typedef struct qdr_http2_adaptor_t {
 
 
 static qdr_http2_adaptor_t *http2_adaptor;
+const int32_t WINDOW_SIZE = 65536;
 
 static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, 
void *context);
 static void _http_record_request(qdr_http2_connection_t *conn, 
qdr_http2_stream_data_t *stream_data);
@@ -178,10 +179,30 @@ void qd_http2_buffer_list_append(qd_http2_buffer_list_t 
*buflist, const uint8_t
     }
 }
 
+
+// Per-message callback to resume receiving after Q2 is unblocked on the
+// incoming link (to HTTP2 app).  This routine runs on another I/O thread so it
+// must be thread safe and hence we use the server activation lock
+//
+static void qdr_http2_q2_unblocked_handler(const qd_alloc_safe_ptr_t context)
+{
+    // prevent the conn from being deleted while running:
+       
sys_mutex_lock(qd_server_get_activation_lock(http2_adaptor->core->qd->server));
+
+    qdr_http2_connection_t *conn = 
(qdr_http2_connection_t*)qd_alloc_deref_safe_ptr(&context);
+    if (conn && conn->pn_raw_conn) {
+       SET_ATOMIC_FLAG(&conn->q2_restart);
+        pn_raw_connection_wake(conn->pn_raw_conn);
+    }
+
+    
sys_mutex_unlock(qd_server_get_activation_lock(http2_adaptor->core->qd->server));
+}
+
 /**
  * HTTP :path is mapped to the AMQP 'to' field.
  */
-qd_composed_field_t  *qd_message_compose_amqp(qd_message_t *msg,
+qd_composed_field_t  *qd_message_compose_amqp(qdr_http2_connection_t *conn,
+                                                                               
          qd_message_t *msg,
                                               const char *to,
                                               const char *subject,
                                               const char *reply_to,
@@ -263,11 +284,16 @@ qd_composed_field_t  
*qd_message_compose_amqp(qd_message_t *msg,
     }
     qd_compose_end_list(field);
 
+    qd_alloc_safe_ptr_t conn_sp = QD_SAFE_PTR_INIT(conn);
+    qd_message_set_q2_unblocked_handler(msg, qdr_http2_q2_unblocked_handler, 
conn_sp);
+
     return field;
 }
 
 static size_t write_buffers(qdr_http2_connection_t *conn)
 {
+       if (!conn->pn_raw_conn)
+               return 0;
     qdr_http2_session_data_t *session_data = conn->session_data;
 
     if (!conn->pn_raw_conn)
@@ -319,6 +345,7 @@ static size_t write_buffers(qdr_http2_connection_t *conn)
     return 0;
 }
 
+
 static void free_http2_stream_data(qdr_http2_stream_data_t *stream_data, bool 
on_shutdown)
 {
     if (!stream_data)
@@ -342,7 +369,7 @@ static void free_http2_stream_data(qdr_http2_stream_data_t 
*stream_data, bool on
     }
     free(stream_data->reply_to);
     qd_compose_free(stream_data->app_properties);
-    qd_compose_free(stream_data->body);
+    qd_buffer_list_free_buffers(&stream_data->body_buffers);
     qd_compose_free(stream_data->footer_properties);
     if (DEQ_SIZE(session_data->streams) > 0) {
         DEQ_REMOVE(session_data->streams, stream_data);
@@ -360,12 +387,29 @@ static void 
free_http2_stream_data(qdr_http2_stream_data_t *stream_data, bool on
         qd_message_free(stream_data->message);
     }
 
+    //
+    // If the client/server closed the connection abruptly, we need to release 
the stream_data->curr_stream_data and
+    // stream_data->next_stream_data.
+    // This final decref of the delivery is going to free the associated 
message but before this message can be freed
+    // all stream data (body data) objects need to be freed. We do this here.
+    //
     if (stream_data->in_dlv && !stream_data->in_dlv_decrefed) {
+        qd_message_stream_data_release(stream_data->curr_stream_data);
+        stream_data->curr_stream_data = 0;
+
+        qd_message_stream_data_release(stream_data->next_stream_data);
+        stream_data->next_stream_data = 0;
+
         qdr_delivery_decref(http2_adaptor->core, stream_data->in_dlv, "HTTP2 
adaptor in_dlv - free_http2_stream_data");
     }
 
     if (stream_data->out_dlv && !stream_data->out_dlv_decrefed) {
-        qdr_delivery_decref(http2_adaptor->core, stream_data->out_dlv, "HTTP2 
adaptor out_dlv - free_http2_stream_data");
+        qd_message_stream_data_release(stream_data->curr_stream_data);
+        stream_data->curr_stream_data = 0;
+
+        qd_message_stream_data_release(stream_data->next_stream_data);
+        stream_data->next_stream_data = 0;
+       qdr_delivery_decref(http2_adaptor->core, stream_data->out_dlv, "HTTP2 
adaptor out_dlv - free_http2_stream_data");
     }
 
     free_qdr_http2_stream_data_t(stream_data);
@@ -401,8 +445,10 @@ void free_qdr_http2_connection(qdr_http2_connection_t* 
http_conn, bool on_shutdo
 
     http_conn->context.context = 0;
 
-    if (http_conn->session_data->session)
+    if (http_conn->session_data->session) {
         nghttp2_session_del(http_conn->session_data->session);
+        http_conn->session_data->session = 0;
+    }
 
     free_qdr_http2_session_data_t(http_conn->session_data);
     http_conn->session_data = 0;
@@ -416,11 +462,11 @@ void free_qdr_http2_connection(qdr_http2_connection_t* 
http_conn, bool on_shutdo
         free_qd_http2_buffer_t(buff);
         buff = DEQ_HEAD(http_conn->granted_read_buffs);
     }
-
     qd_log(http2_adaptor->log_source, QD_LOG_TRACE, "[C%"PRIu64"] Freeing 
http2 connection in free_qdr_http2_connection", http_conn->conn_id);
 
     sys_atomic_destroy(&http_conn->raw_closed_read);
     sys_atomic_destroy(&http_conn->raw_closed_write);
+    sys_atomic_destroy(&http_conn->q2_restart);
 
     free_qdr_http2_connection_t(http_conn);
 }
@@ -439,6 +485,7 @@ static qdr_http2_stream_data_t 
*create_http2_stream_data(qdr_http2_session_data_
     stream_data->session_data = session_data;
     stream_data->app_properties = 
qd_compose(QD_PERFORMATIVE_APPLICATION_PROPERTIES, 0);
     stream_data->status = QD_STREAM_OPEN;
+    DEQ_INIT(stream_data->body_buffers);
     stream_data->start = qd_timer_now();
     qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, 
"[C%"PRIu64"][S%"PRId32"] Creating new 
stream_data->app_properties=QD_PERFORMATIVE_APPLICATION_PROPERTIES", 
session_data->conn->conn_id, stream_id);
     qd_compose_start_map(stream_data->app_properties);
@@ -450,6 +497,17 @@ static qdr_http2_stream_data_t 
*create_http2_stream_data(qdr_http2_session_data_
 
 
 /**
+ * This callback function  is invoked when the nghttp2 library tells the 
application about the error code, and error message.
+ */
+static int on_error_callback(nghttp2_session *session, int lib_error_code, 
const char *msg, size_t len, void *user_data)
+{
+       qdr_http2_connection_t *conn = (qdr_http2_connection_t *)user_data;
+       qd_log(http2_adaptor->protocol_log_source, QD_LOG_ERROR, "[C%"PRIu64"] 
Error generated in the on_error_callback, lib_error_code=%i, error_msg=%s", 
conn->conn_id, lib_error_code, msg);
+       return 0;
+}
+
+
+/**
  * Callback function invoked by nghttp2_session_recv() and 
nghttp2_session_mem_recv() when an invalid non-DATA frame is received
  */
 static int on_invalid_frame_recv_callback(nghttp2_session *session, const 
nghttp2_frame *frame, int lib_error_code, void *user_data)
@@ -475,13 +533,14 @@ static int on_data_chunk_recv_callback(nghttp2_session 
*session,
     if (!stream_data)
         return 0;
 
+    if(stream_data->stream_force_closed)
+       return 0;
+
     stream_data->bytes_in += len;
-    qd_buffer_list_t buffers;
-    DEQ_INIT(buffers);
-    qd_buffer_list_append(&buffers, (uint8_t *)data, len);
+
 
     //
-    // DISPATCH-: If an in_dlv is present it means that the qdr_link_deliver() 
has already been called (delivery has already been routed)
+    // DISPATCH-1868: If an in_dlv is present it means that the 
qdr_link_deliver() has already been called (delivery has already been routed)
     // in which case qd_message_stream_data_append can be called to append 
buffers to the message body
     // If stream_data->in_dlv = 0 but stream_data->header_and_props_composed 
is true, it means that the message has not been routed yet
     // but the message already has headers and properties
@@ -490,43 +549,32 @@ static int on_data_chunk_recv_callback(nghttp2_session 
*session,
     // We want to be able to keep collecting the incoming DATA in the message 
object so we can ultimately route it when the credit does ultimately arrive.
     //
     if (stream_data->in_dlv || stream_data->header_and_props_composed) {
-        if (!stream_data->stream_force_closed) {
-            // DISPATCH-1868: Part of the HTTP2 message body arrives *before* 
we can route the delivery. So we accumulated that body
-            // in the stream_data->body (in the else part). But before the 
rest of the HTTP2 data arrives, we got credit to send the delivery
-            // and we have an in_dlv object now. Now, we take the buffers that 
were added previously to stream_data->body and call 
qd_message_stream_data_append
-            if (stream_data->body) {
-                if (!stream_data->body_data_added) {
-                    qd_buffer_list_t existing_buffers;
-                    DEQ_INIT(existing_buffers);
-                    qd_compose_take_buffers(stream_data->body, 
&existing_buffers);
-                    // @TODO(kgiusti): handle Q2 block event:
-                    qd_message_stream_data_append(stream_data->message, 
&existing_buffers, 0);
-                    stream_data->body_data_added = true;
-                }
+        qd_buffer_list_t buffers;
+        DEQ_INIT(buffers);
+        qd_buffer_list_append(&buffers, (uint8_t *)data, len);
+        // DISPATCH-1868: Part of the HTTP2 message body arrives *before* we 
can route the delivery. So we accumulated the body buffers
+        // in the stream_data->body_buffers. But before the rest of the HTTP2 
data arrives, we got credit to send the delivery
+        // and we have an in_dlv object now. Now, we take the buffers that 
were added previously to stream_data->body_buffers and call 
qd_message_stream_data_append
+        bool q2_blocked1 = false;
+        if (DEQ_SIZE(stream_data->body_buffers) > 0) {
+            if (!stream_data->body_data_added_to_msg) {
+                qd_message_stream_data_append(stream_data->message, 
&stream_data->body_buffers, &q2_blocked1);
             }
-            else {
-                // Add a dummy body so that other code that checks for the 
presense of stream_data->body will be satisfied.
-                // This dummy body field will be be used and will not be sent.
-                stream_data->body = qd_compose(QD_PERFORMATIVE_BODY_DATA, 0);
-                stream_data->body_data_added = true;
-            }
-            // @TODO(kgiusti): handle Q2 block event:
-            qd_message_stream_data_append(stream_data->message, &buffers, 0);
-            qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, 
"[C%"PRIu64"][S%"PRId32"] HTTP2 DATA on_data_chunk_recv_callback 
qd_compose_insert_binary_buffers into stream_data->message", conn->conn_id, 
stream_id);
         }
-        else {
-            qd_buffer_list_free_buffers(&buffers);
+        bool q2_blocked2 = false;
+        qd_message_stream_data_append(stream_data->message, &buffers, 
&q2_blocked2);
+        stream_data->body_data_added_to_msg = true;
+        qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, 
"[C%"PRIu64"][S%"PRId32"] HTTP2 DATA on_data_chunk_recv_callback 
qd_compose_insert_binary_buffers into stream_data->message", conn->conn_id, 
stream_id);
+        conn->q2_blocked = conn->q2_blocked || q2_blocked1 || q2_blocked2;
+
+        if (conn->q2_blocked) {
+            qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, 
"[C%"PRIu64"] q2 is blocked on this connection", conn->conn_id);
         }
     }
     else {
-        if (stream_data->stream_force_closed) {
-            qd_buffer_list_free_buffers(&buffers);
-        }
-        else {
-            stream_data->body = qd_compose(QD_PERFORMATIVE_BODY_DATA, 
stream_data->body);
-            qd_compose_insert_binary_buffers(stream_data->body, &buffers);
-            qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, 
"[C%"PRIu64"][S%"PRId32"] HTTP2 DATA on_data_chunk_recv_callback 
qd_compose_insert_binary_buffers into stream_data->body", conn->conn_id, 
stream_id);
-        }
+        // Keep inserting buffers to stream_data->body_buffers.
+        qd_buffer_list_append(&stream_data->body_buffers, (uint8_t *)data, 
len);
+        qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, 
"[C%"PRIu64"][S%"PRId32"] HTTP2 DATA on_data_chunk_recv_callback 
qd_compose_insert_binary_buffers into stream_data->body_buffers", 
conn->conn_id, stream_id);
     }
 
     qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, 
"[C%"PRIu64"][S%"PRId32"] HTTP2 DATA on_data_chunk_recv_callback data length 
%zu", conn->conn_id, stream_id, len);
@@ -566,6 +614,8 @@ static int snd_data_callback(nghttp2_session *session,
     qdr_http2_session_data_t *session_data = conn->session_data;
     qdr_http2_stream_data_t *stream_data = (qdr_http2_stream_data_t 
*)source->ptr;
 
+    qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, 
"[C%"PRIu64"][S%"PRId32"] snd_data_callback length=%zu", conn->conn_id, 
stream_data->stream_id, length);
+
     int bytes_sent = 0; // This should not include the header length of 9.
     bool write_buffs = false;
     if (length) {
@@ -576,25 +626,44 @@ static int snd_data_callback(nghttp2_session *session,
         memcpy(qd_http2_buffer_cursor(http2_buff), framehd, 
HTTP2_DATA_FRAME_HEADER_LENGTH);
         qd_http2_buffer_insert(http2_buff, HTTP2_DATA_FRAME_HEADER_LENGTH);
         pn_raw_buffer_t pn_raw_buffs[stream_data->qd_buffers_to_send];
-        qd_message_stream_data_buffers(stream_data->curr_stream_data, 
pn_raw_buffs, 0, stream_data->qd_buffers_to_send);
+        int written = 
qd_message_stream_data_buffers(stream_data->curr_stream_data, pn_raw_buffs, 
stream_data->curr_stream_data_qd_buff_offset, stream_data->qd_buffers_to_send);
+
+        assert (written == stream_data->qd_buffers_to_send);
 
         int idx = 0;
+        size_t bytes_to_send = length;
+
         while (idx < stream_data->qd_buffers_to_send) {
             if (pn_raw_buffs[idx].size > 0) {
-                //int bytes_remaining = length - bytes_sent;
-                //if (bytes_remaining > pn_raw_buffs[idx].size) {
+               if (bytes_to_send < pn_raw_buffs[idx].size) {
+                       int bytes_remaining_in_buffer = pn_raw_buffs[idx].size 
- stream_data->curr_stream_data_offset;
+                       if (bytes_remaining_in_buffer < bytes_to_send) {
+                               memcpy(qd_http2_buffer_cursor(http2_buff), 
pn_raw_buffs[idx].bytes + stream_data->curr_stream_data_offset, 
bytes_remaining_in_buffer);
+                               qd_http2_buffer_insert(http2_buff, 
bytes_remaining_in_buffer);
+                               stream_data->curr_stream_data_offset = 0;
+                               bytes_to_send -= bytes_remaining_in_buffer;
+                               bytes_sent += bytes_remaining_in_buffer;
+                       }
+                       else {
+                                               
memcpy(qd_http2_buffer_cursor(http2_buff), pn_raw_buffs[idx].bytes + 
stream_data->curr_stream_data_offset, bytes_to_send);
+                                               
qd_http2_buffer_insert(http2_buff, bytes_to_send);
+                                               
qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, 
"[C%"PRIu64"][S%"PRId32"] snd_data_callback memcpy bytes_to_send=%zu", 
conn->conn_id, stream_data->stream_id, bytes_to_send);
+                                               
stream_data->curr_stream_data_offset += bytes_to_send;
+                                               bytes_sent += bytes_to_send;
+                                               if 
(stream_data->curr_stream_data_offset == BUFFER_SIZE || 
stream_data->curr_stream_data_offset == pn_raw_buffs[idx].size) {
+                                                       
stream_data->curr_stream_data_offset = 0;
+                                                       
stream_data->curr_stream_data_qd_buff_offset += 1;
+                                               }
+                       }
+               }
+               else {
                     memcpy(qd_http2_buffer_cursor(http2_buff), 
pn_raw_buffs[idx].bytes, pn_raw_buffs[idx].size);
                     qd_http2_buffer_insert(http2_buff, pn_raw_buffs[idx].size);
-                    bytes_sent += pn_raw_buffs[idx].size;
                     qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, 
"[C%"PRIu64"][S%"PRId32"] snd_data_callback memcpy pn_raw_buffs[%i].size=%u", 
conn->conn_id, stream_data->stream_id, idx, pn_raw_buffs[idx].size);
-//                }
-//                else {
-//                    memcpy(qd_http2_buffer_cursor(http2_buff), 
pn_raw_buffs[idx].bytes, bytes_remaining);
-//                    qd_http2_buffer_insert(http2_buff, bytes_remaining);
-//                    bytes_sent += bytes_remaining;
-//                    qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, 
"[C%"PRIu64"][S%"PRId32"] snd_data_callback memcpy bytes_remaining=%i", 
conn->conn_id, stream_data->stream_id, bytes_remaining);
-//                }
-                stream_data->curr_stream_data_qd_buff_offset += 1;
+                    stream_data->curr_stream_data_qd_buff_offset += 1;
+                    bytes_to_send -= pn_raw_buffs[idx].size;
+                    bytes_sent += pn_raw_buffs[idx].size;
+               }
             }
             idx += 1;
         }
@@ -617,7 +686,12 @@ static int snd_data_callback(nghttp2_session *session,
         else {
             qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, 
"[C%"PRIu64"][S%"PRId32"] snd_data_callback, full_payload_handled, 
out_msg_has_footer", conn->conn_id, stream_data->stream_id);
         }
+        stream_data->curr_stream_data_offset = 0;
         stream_data->curr_stream_data_qd_buff_offset = 0;
+        stream_data->payload_handled = 0;
+    }
+    else {
+       stream_data->payload_handled += bytes_sent;
     }
 
     qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, 
"[C%"PRIu64"][S%"PRId32"] HTTP2 snd_data_callback finished, length=%zu, 
bytes_sent=%i, stream_data=%p", conn->conn_id, stream_data->stream_id, length, 
bytes_sent, (void *)stream_data);
@@ -626,8 +700,9 @@ static int snd_data_callback(nghttp2_session *session,
         assert(bytes_sent == length);
     }
 
-    if (write_buffs)
+    if (write_buffs) {
         write_buffers(conn);
+    }
 
     return 0;
 
@@ -741,6 +816,8 @@ static int on_header_callback(nghttp2_session *session,
 
                 qd_compose_insert_string_n(stream_data->footer_properties, 
(const char *)name, namelen);
                 qd_compose_insert_string_n(stream_data->footer_properties, 
(const char *)value, valuelen);
+
+                qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, 
"[C%"PRIu64"][S%"PRId32"] HTTP2 FOOTER Incoming [%s=%s]", conn->conn_id, 
stream_data->stream_id, (char *)name, (char *)value);
             }
             else {
                 if (strcmp(METHOD, (const char *)name) == 0) {
@@ -751,8 +828,10 @@ static int on_header_callback(nghttp2_session *session,
                 }
                 qd_compose_insert_string_n(stream_data->app_properties, (const 
char *)name, namelen);
                 qd_compose_insert_string_n(stream_data->app_properties, (const 
char *)value, valuelen);
+
+                qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, 
"[C%"PRIu64"][S%"PRId32"] HTTP2 HEADER Incoming [%s=%s]", conn->conn_id, 
stream_data->stream_id, (char *)name, (char *)value);
             }
-            qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, 
"[C%"PRIu64"][S%"PRId32"] HTTP2 HEADER Incoming [%s=%s]", conn->conn_id, 
stream_data->stream_id, (char *)name, (char *)value);
+
         }
         break;
         default:
@@ -767,7 +846,8 @@ static bool compose_and_deliver(qdr_http2_connection_t 
*conn, qdr_http2_stream_d
     if (!stream_data->header_and_props_composed) {
         qd_composed_field_t  *header_and_props = 0;
         if (conn->ingress) {
-            header_and_props = qd_message_compose_amqp(stream_data->message,
+            header_and_props = qd_message_compose_amqp(conn,
+                                                       stream_data->message,
                                                        conn->config->address,  
// const char *to
                                                        stream_data->method,    
// const char *subject
                                                        stream_data->reply_to,  
// const char *reply_to
@@ -777,7 +857,8 @@ static bool compose_and_deliver(qdr_http2_connection_t 
*conn, qdr_http2_stream_d
                                                        conn->config->site);
         }
         else {
-            header_and_props = qd_message_compose_amqp(stream_data->message,
+            header_and_props = qd_message_compose_amqp(conn,
+                                                       stream_data->message,
                                                        stream_data->reply_to,  
      // const char *to
                                                        
stream_data->request_status,  // const char *subject
                                                        0,                      
      // const char *reply_to
@@ -789,41 +870,78 @@ static bool compose_and_deliver(qdr_http2_connection_t 
*conn, qdr_http2_stream_d
 
         if (receive_complete) {
             qd_log(http2_adaptor->log_source, QD_LOG_TRACE, 
"[C%"PRIu64"][S%"PRId32"][L%"PRIu64"] receive_complete = true in 
compose_and_deliver", conn->conn_id, stream_data->stream_id, 
stream_data->in_link->identity);
-            if (!stream_data->body) {
-                stream_data->body = qd_compose(QD_PERFORMATIVE_BODY_DATA, 0);
-                qd_compose_insert_binary(stream_data->body, 0, 0);
-                qd_log(http2_adaptor->log_source, QD_LOG_TRACE, 
"[C%"PRIu64"][S%"PRId32"] Inserting empty body data in compose_and_deliver", 
conn->conn_id, stream_data->stream_id);
-            }
-
+            bool q2_blocked;
             if (stream_data->footer_properties) {
-                qd_message_compose_5(stream_data->message, header_and_props, 
stream_data->app_properties, stream_data->body, stream_data->footer_properties, 
receive_complete);
+                qd_message_compose_3(stream_data->message, header_and_props, 
stream_data->app_properties, receive_complete);
+                qd_message_stream_data_append(stream_data->message, 
&stream_data->body_buffers, &q2_blocked);
+                stream_data->body_data_added_to_msg = true;
+
+                qd_buffer_list_t existing_buffers;
+                DEQ_INIT(existing_buffers);
+                qd_compose_take_buffers(stream_data->footer_properties, 
&existing_buffers);
+                qd_message_stream_data_footer_append(stream_data->message, 
&existing_buffers);
             }
             else {
-                qd_message_compose_4(stream_data->message, header_and_props, 
stream_data->app_properties, stream_data->body, receive_complete);
+                qd_message_compose_3(stream_data->message, header_and_props, 
stream_data->app_properties, receive_complete);
+                qd_message_stream_data_append(stream_data->message, 
&stream_data->body_buffers, &q2_blocked);
+                stream_data->body_data_added_to_msg = true;
             }
+
+            conn->q2_blocked = conn->q2_blocked || q2_blocked;
+               if (conn->q2_blocked) {
+                       qd_log(http2_adaptor->protocol_log_source, 
QD_LOG_TRACE, "[C%"PRIu64"] q2 is blocked on this connection", conn->conn_id);
+               }
         }
         else {
-            if (stream_data->body) {
-                qd_log(http2_adaptor->log_source, QD_LOG_TRACE, 
"[C%"PRIu64"][S%"PRId32"][L%"PRIu64"] receive_complete = false and has 
stream_data->body in compose_and_deliver", conn->conn_id, 
stream_data->stream_id, stream_data->in_link->identity);
+            if (DEQ_SIZE(stream_data->body_buffers) > 0) {
+                qd_log(http2_adaptor->log_source, QD_LOG_TRACE, 
"[C%"PRIu64"][S%"PRId32"][L%"PRIu64"] receive_complete = false and has 
stream_data->body_buffers in compose_and_deliver", conn->conn_id, 
stream_data->stream_id, stream_data->in_link->identity);
+                bool q2_blocked;
                 if (stream_data->footer_properties) {
-                    qd_message_compose_5(stream_data->message, 
header_and_props, stream_data->app_properties, stream_data->body, 
stream_data->footer_properties, receive_complete);
+                       if (!stream_data->entire_footer_arrived) {
+                               qd_compose_free(header_and_props);
+                               return false;
+                       }
+
+                    qd_message_compose_3(stream_data->message, 
header_and_props, stream_data->app_properties, receive_complete);
+                    qd_message_stream_data_append(stream_data->message, 
&stream_data->body_buffers, &q2_blocked);
+                    qd_buffer_list_t existing_buffers;
+                    DEQ_INIT(existing_buffers);
+                    qd_compose_take_buffers(stream_data->footer_properties, 
&existing_buffers);
+                    qd_message_stream_data_footer_append(stream_data->message, 
&existing_buffers);
                 }
                 else {
-                    qd_message_compose_4(stream_data->message, 
header_and_props, stream_data->app_properties, stream_data->body, 
receive_complete);
+                       qd_message_compose_3(stream_data->message, 
header_and_props, stream_data->app_properties, receive_complete);
+                       qd_message_stream_data_append(stream_data->message, 
&stream_data->body_buffers, &q2_blocked);
                 }
-                stream_data->body_data_added = true;
+                stream_data->body_data_added_to_msg = true;
+                conn->q2_blocked = conn->q2_blocked || q2_blocked;
+                       if (conn->q2_blocked) {
+                               qd_log(http2_adaptor->protocol_log_source, 
QD_LOG_TRACE, "[C%"PRIu64"] q2 is blocked on this connection", conn->conn_id);
+                       }
             }
             else {
-
                 if (stream_data->footer_properties) {
+
+                       if (!stream_data->entire_footer_arrived) {
+                               qd_compose_free(header_and_props);
+                               return false;
+                       }
+
                     //
                     // The footer has already arrived but there was no body. 
Insert an empty body
                     //
-                    stream_data->body = qd_compose(QD_PERFORMATIVE_BODY_DATA, 
0);
-                    qd_message_compose_5(stream_data->message, 
header_and_props, stream_data->app_properties, stream_data->body, 
stream_data->footer_properties, receive_complete);
+                    qd_message_compose_3(stream_data->message, 
header_and_props, stream_data->app_properties, receive_complete);
+                    qd_message_stream_data_append(stream_data->message, 
&stream_data->body_buffers, 0);
+
+                    qd_buffer_list_t existing_buffers;
+                    DEQ_INIT(existing_buffers);
+                    qd_compose_take_buffers(stream_data->footer_properties, 
&existing_buffers);
+                    qd_message_stream_data_footer_append(stream_data->message, 
&existing_buffers);
+                    stream_data->body_data_added_to_msg = true;
                 }
                 else {
                     qd_message_compose_3(stream_data->message, 
header_and_props, stream_data->app_properties, receive_complete);
+                    stream_data->body_data_added_to_msg = false;
                 }
             }
         }
@@ -851,7 +969,7 @@ static bool route_delivery(qdr_http2_stream_data_t 
*stream_data, bool receive_co
 {
     qdr_http2_connection_t *conn  = stream_data->session_data->conn;
     if (stream_data->in_dlv) {
-        qd_log(http2_adaptor->log_source, QD_LOG_TRACE, 
"[C%"PRIu64"][S%"PRId32"] in_dlv already present, not routing delivery", 
conn->conn_id, stream_data->stream_id);
+        qd_log(http2_adaptor->log_source, QD_LOG_TRACE, 
"[C%"PRIu64"][S%"PRId32"] in_dlv already present, delivery already routed", 
conn->conn_id, stream_data->stream_id);
         return false;
     }
 
@@ -879,7 +997,7 @@ static void create_settings_frame(qdr_http2_connection_t 
*conn)
 {
     qdr_http2_session_data_t *session_data = conn->session_data;
     nghttp2_settings_entry iv[3] = {{NGHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 
100},
-                                    {NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, 
65536},
+                                    {NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, 
WINDOW_SIZE},
                                     {NGHTTP2_SETTINGS_ENABLE_PUSH, 0}};
 
     // You must call nghttp2_session_send after calling nghttp2_submit_settings
@@ -954,8 +1072,8 @@ static int on_frame_recv_callback(nghttp2_session *session,
         // We will also close the pn_raw_connection (we will not close the 
qdr_connection_t and the qdr_http2_connection_t, those will still remain). This 
will close the TCP connection to the server
         // and will enable creation  of a new connection to the server since 
we are not allowed to create any more streams on the connection that received 
the GOAWAY frame.
         //
-        qd_log(http2_adaptor->protocol_log_source, QD_LOG_ERROR, 
"[C%"PRIu64"][S%"PRId32"] GOAWAY frame received", conn->conn_id, stream_id);
         int32_t last_stream_id = frame->goaway.last_stream_id;
+        qd_log(http2_adaptor->protocol_log_source, QD_LOG_ERROR, 
"[C%"PRIu64"][S%"PRId32"] GOAWAY frame received, last_stream_id=[%"PRId32"]", 
conn->conn_id, stream_id, last_stream_id);
         // Free all streams that are greater that the last_stream_id because 
the server is not going to process those streams.
         free_unprocessed_streams(conn, last_stream_id);
         conn->goaway_received = true;
@@ -995,15 +1113,6 @@ static int on_frame_recv_callback(nghttp2_session 
*session,
         }
 
         if (stream_data->in_dlv && !stream_data->stream_force_closed) {
-            if (!stream_data->body) {
-                stream_data->body = qd_compose(QD_PERFORMATIVE_BODY_DATA, 0);
-                qd_compose_insert_binary(stream_data->body, 0, 0);
-                // @TODO(kgiusti): handle Q2 block event:
-                qd_message_extend(stream_data->message, stream_data->body, 0);
-            }
-        }
-
-        if (stream_data->in_dlv && !stream_data->stream_force_closed) {
             qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, 
"[C%"PRIu64"][S%"PRId32"] NGHTTP2_DATA frame received, qdr_delivery_continue 
"DLV_FMT, conn->conn_id, stream_id, DLV_ARGS(stream_data->in_dlv));
             qdr_delivery_continue(http2_adaptor->core, stream_data->in_dlv, 
false);
         }
@@ -1034,7 +1143,6 @@ static int on_frame_recv_callback(nghttp2_session 
*session,
             if (stream_data->use_footer_properties) {
                 qd_compose_end_map(stream_data->footer_properties);
                 stream_data->entire_footer_arrived = true;
-                // @TODO(kgiusti): handle Q2 block event:
                 qd_message_extend(stream_data->message, 
stream_data->footer_properties, 0);
                 qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, 
"[C%"PRIu64"][S%"PRId32"] Closing footer map, extending message with footer", 
conn->conn_id, stream_id);
             }
@@ -1095,6 +1203,23 @@ static int on_frame_recv_callback(nghttp2_session 
*session,
     return 0;
 }
 
+static void set_buffers_to_send(qdr_http2_stream_data_t *stream_data, size_t 
bytes_to_send)
+{
+       size_t diff = 
qd_message_stream_data_buffer_count(stream_data->curr_stream_data) - 
stream_data->curr_stream_data_qd_buff_offset;
+    pn_raw_buffer_t pn_raw_buffs[diff];
+    int written = 
qd_message_stream_data_buffers(stream_data->curr_stream_data, pn_raw_buffs, 
stream_data->curr_stream_data_qd_buff_offset, diff);
+    assert(diff == written);
+    int idx = 0;
+    int rolling_count = 0;
+    rolling_count -= stream_data->curr_stream_data_offset;
+    while (idx < written) {
+       rolling_count += pn_raw_buffs[idx].size;
+       idx+=1;
+       if (rolling_count >= bytes_to_send)
+               break;
+    }
+    stream_data->qd_buffers_to_send = idx;
+}
 
 ssize_t read_data_callback(nghttp2_session *session,
                       int32_t stream_id,
@@ -1109,10 +1234,9 @@ ssize_t read_data_callback(nghttp2_session *session,
     qd_message_t *message = qdr_delivery_message(stream_data->out_dlv);
     qd_message_depth_status_t status = qd_message_check_depth(message, 
QD_DEPTH_BODY);
 
-    // This flag tells nghttp2 that the data is not being copied into its 
buffer (uint8_t *buf).
+    // This flag tells nghttp2 that the data is not being copied into the 
buffer supplied by nghttp2 (uint8_t *buf).
     *data_flags |= NGHTTP2_DATA_FLAG_NO_COPY;
 
-
     switch (status) {
     case QD_MESSAGE_DEPTH_OK: {
         //
@@ -1134,7 +1258,7 @@ ssize_t read_data_callback(nghttp2_session *session,
             qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, 
"[C%"PRIu64"][S%"PRId32"] read_data_callback No body data, get 
qd_message_next_stream_data", conn->conn_id, stream_data->stream_id);
         }
 
-        if (stream_data->next_stream_data == 0 && 
stream_data->next_stream_data_result == QD_MESSAGE_STREAM_DATA_NO_MORE) {
+        if (stream_data->next_stream_data == 0 && 
(stream_data->next_stream_data_result == QD_MESSAGE_STREAM_DATA_NO_MORE || 
stream_data->next_stream_data_result == QD_MESSAGE_STREAM_DATA_INVALID)) {
             stream_data->curr_stream_data_result = 
stream_data->next_stream_data_result;
         }
 
@@ -1163,6 +1287,7 @@ ssize_t read_data_callback(nghttp2_session *session,
 
                 // The payload length is zero on this body data. Look ahead 
one body data to see if it is QD_MESSAGE_STREAM_DATA_NO_MORE
                 stream_data->next_stream_data_result = 
qd_message_next_stream_data(message, &stream_data->next_stream_data);
+
                 if (stream_data->next_stream_data_result == 
QD_MESSAGE_STREAM_DATA_NO_MORE) {
                     if (!stream_data->out_msg_has_footer) {
                         
qd_message_stream_data_release(stream_data->curr_stream_data);
@@ -1196,53 +1321,52 @@ ssize_t read_data_callback(nghttp2_session *session,
                 return 0;
             }
 
-            stream_data->stream_data_buff_count = 
qd_message_stream_data_buffer_count(stream_data->curr_stream_data);
-            qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, 
"[C%"PRIu64"][S%"PRId32"] read_data_callback, 
stream_data->stream_data_buff_count=%i, payload_length=%zu", conn->conn_id, 
stream_data->stream_id, stream_data->stream_data_buff_count, payload_length);
-
             size_t bytes_to_send = 0;
             if (payload_length) {
-                size_t remaining_payload_length = payload_length - 
(stream_data->curr_stream_data_qd_buff_offset * BUFFER_SIZE);
+                int remaining_payload_length = payload_length - 
stream_data->payload_handled;
 
+                qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, 
"[C%"PRIu64"][S%"PRId32"] read_data_callback remaining_payload_length=%i, 
length=%zu", conn->conn_id, stream_data->stream_id, remaining_payload_length, 
length);
 
                 if (remaining_payload_length <= QD_HTTP2_BUFFER_SIZE) {
-                    bytes_to_send = remaining_payload_length;
-                    stream_data->qd_buffers_to_send = 
stream_data->stream_data_buff_count;
-                    stream_data->full_payload_handled = true;
-                    qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, 
"[C%"PRIu64"][S%"PRId32"] read_data_callback remaining_payload_length (%zu) <= 
QD_HTTP2_BUFFER_SIZE(16384), bytes_to_send=%zu, 
stream_data->qd_buffers_to_send=%zu", conn->conn_id, stream_data->stream_id, 
remaining_payload_length, bytes_to_send, stream_data->qd_buffers_to_send);
-
-                    // Look ahead one body data
-                    stream_data->next_stream_data_result = 
qd_message_next_stream_data(message, &stream_data->next_stream_data);
-                    if (stream_data->next_stream_data_result == 
QD_MESSAGE_STREAM_DATA_NO_MORE) {
-                        *data_flags |= NGHTTP2_DATA_FLAG_EOF;
-                        stream_data->out_msg_data_flag_eof = true;
-                        stream_data->out_msg_body_sent = true;
-                        if (stream_data->next_stream_data) {
-                            
qd_message_stream_data_release(stream_data->next_stream_data);
-                            stream_data->next_stream_data = 0;
+                       if (length < remaining_payload_length) {
+                               bytes_to_send = length;
+                               set_buffers_to_send(stream_data, bytes_to_send);
+                               stream_data->full_payload_handled = false;
+                       }
+                       else {
+                               bytes_to_send = remaining_payload_length;
+                        set_buffers_to_send(stream_data, bytes_to_send);
+                               stream_data->full_payload_handled = true;
+                               qd_log(http2_adaptor->protocol_log_source, 
QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback 
remaining_payload_length (%zu) <= QD_HTTP2_BUFFER_SIZE(16384), 
bytes_to_send=%zu, stream_data->qd_buffers_to_send=%zu", conn->conn_id, 
stream_data->stream_id, remaining_payload_length, bytes_to_send, 
stream_data->qd_buffers_to_send);
+
+                        // Look ahead one body data
+                        stream_data->next_stream_data_result = 
qd_message_next_stream_data(message, &stream_data->next_stream_data);
+                        if (stream_data->next_stream_data_result == 
QD_MESSAGE_STREAM_DATA_NO_MORE) {
+                            *data_flags |= NGHTTP2_DATA_FLAG_EOF;
+                            stream_data->out_msg_data_flag_eof = true;
+                            stream_data->out_msg_body_sent = true;
+                            stream_data->out_dlv_local_disposition = 
PN_ACCEPTED;
+                            qd_log(http2_adaptor->protocol_log_source, 
QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback, looking ahead one 
body data QD_MESSAGE_STREAM_DATA_NO_MORE", conn->conn_id, 
stream_data->stream_id);
                         }
-                        stream_data->out_dlv_local_disposition = PN_ACCEPTED;
-                        qd_log(http2_adaptor->protocol_log_source, 
QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback, looking ahead one 
body data QD_MESSAGE_STREAM_DATA_NO_MORE", conn->conn_id, 
stream_data->stream_id);
-                    }
-                    else if (stream_data->next_stream_data_result == 
QD_MESSAGE_STREAM_DATA_INCOMPLETE) {
-                        qd_log(http2_adaptor->protocol_log_source, 
QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback, looking ahead one 
body data QD_MESSAGE_STREAM_DATA_INCOMPLETE", conn->conn_id, 
stream_data->stream_id);
-
-                    }
-                    else if (stream_data->next_stream_data_result == 
QD_MESSAGE_STREAM_DATA_BODY_OK) {
-                        qd_log(http2_adaptor->protocol_log_source, 
QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback, looking ahead one 
body data QD_MESSAGE_STREAM_DATA_OK", conn->conn_id, stream_data->stream_id);
-
-                    }
-                    else if (stream_data->next_stream_data_result == 
QD_MESSAGE_STREAM_DATA_FOOTER_OK) {
-                        stream_data->out_msg_body_sent = true;
-                        qd_log(http2_adaptor->protocol_log_source, 
QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback, looking ahead one 
body data, QD_MESSAGE_STREAM_DATA_FOOTER_OK", conn->conn_id, 
stream_data->stream_id);
-                    }
+                        else if (stream_data->next_stream_data_result == 
QD_MESSAGE_STREAM_DATA_FOOTER_OK) {
+                            stream_data->out_msg_body_sent = true;
+                            qd_log(http2_adaptor->protocol_log_source, 
QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback, looking ahead one 
body data, QD_MESSAGE_STREAM_DATA_FOOTER_OK", conn->conn_id, 
stream_data->stream_id);
+                        }
+                       }
                 }
                 else {
-                    // This means that there is more that 16k worth of payload 
in one body data.
-                    // We want to send only 16k data per read_data_callback
-                    bytes_to_send = QD_HTTP2_BUFFER_SIZE;
-                    stream_data->full_payload_handled = false;
-                    stream_data->qd_buffers_to_send = 
NUM_QD_BUFFERS_IN_ONE_HTTP2_BUFFER;
-                    qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, 
"[C%"PRIu64"][S%"PRId32"] read_data_callback remaining_payload_length <= 
QD_HTTP2_BUFFER_SIZE ELSE bytes_to_send=%zu, 
stream_data->qd_buffers_to_send=%zu", conn->conn_id, stream_data->stream_id, 
bytes_to_send, stream_data->qd_buffers_to_send);
+                       if (length < remaining_payload_length) {
+                               bytes_to_send = length;
+                               set_buffers_to_send(stream_data, bytes_to_send);
+                    }
+                       else {
+                                               // This means that there is 
more that 16k worth of payload in one body data.
+                                               // We want to send only 16k 
data per read_data_callback
+                                               bytes_to_send = 
QD_HTTP2_BUFFER_SIZE;
+                                               
set_buffers_to_send(stream_data, bytes_to_send);
+                                               
qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, 
"[C%"PRIu64"][S%"PRId32"] read_data_callback remaining_payload_length <= 
QD_HTTP2_BUFFER_SIZE ELSE bytes_to_send=%zu, 
stream_data->qd_buffers_to_send=%zu", conn->conn_id, stream_data->stream_id, 
bytes_to_send, stream_data->qd_buffers_to_send);
+                       }
+                       stream_data->full_payload_handled = false;
                 }
             }
 
@@ -1252,12 +1376,19 @@ ssize_t read_data_callback(nghttp2_session *session,
         }
 
         case QD_MESSAGE_STREAM_DATA_FOOTER_OK:
-            qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, 
"[C%"PRIu64"][S%"PRId32"] read_data_callback QD_MESSAGE_STREAM_DATA_FOOTER_OK", 
conn->conn_id, stream_data->stream_id);
+               qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, 
"[C%"PRIu64"][S%"PRId32"] read_data_callback QD_MESSAGE_STREAM_DATA_FOOTER_OK", 
conn->conn_id, stream_data->stream_id);
             stream_data->out_msg_has_footer = true;
             stream_data->next_stream_data_result = 
qd_message_next_stream_data(message, &stream_data->next_stream_data);
             if (stream_data->next_stream_data) {
                 qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, 
"[C%"PRIu64"][S%"PRId32"] read_data_callback QD_MESSAGE_STREAM_DATA_FOOTER_OK, 
we have a next_stream_data", conn->conn_id, stream_data->stream_id);
             }
+            if (stream_data->next_stream_data_result == 
QD_MESSAGE_STREAM_DATA_INVALID) {
+               stream_data->out_msg_has_footer = false;
+                               if (stream_data->next_stream_data) {
+                                       
qd_message_stream_data_release(stream_data->next_stream_data);
+                                       stream_data->next_stream_data = 0;
+                               }
+            }
             break;
 
         case QD_MESSAGE_STREAM_DATA_INCOMPLETE:
@@ -1271,7 +1402,6 @@ ssize_t read_data_callback(nghttp2_session *session,
         case QD_MESSAGE_STREAM_DATA_NO_MORE: {
             //
             // We have already handled the last body-data segment for this 
delivery.
-            // Complete the "sending" of this delivery and replenish credit.
             //
             size_t pn_buffs_write_capacity = 
pn_raw_connection_write_buffers_capacity(conn->pn_raw_conn);
             if (pn_buffs_write_capacity == 0) {
@@ -1310,9 +1440,10 @@ ssize_t read_data_callback(nghttp2_session *session,
             //
             *data_flags |= NGHTTP2_DATA_FLAG_EOF;
             stream_data->out_msg_data_flag_eof = true;
-            if (stream_data->curr_stream_data)
+            if (stream_data->curr_stream_data) {
                 qd_message_stream_data_release(stream_data->curr_stream_data);
-            stream_data->curr_stream_data = 0;
+               stream_data->curr_stream_data = 0;
+            }
             stream_data->out_dlv_local_disposition = PN_REJECTED;
             qd_log(http2_adaptor->protocol_log_source, QD_LOG_ERROR, 
"[C%"PRIu64"][S%"PRId32"] read_data_callback QD_MESSAGE_STREAM_DATA_INVALID", 
conn->conn_id, stream_data->stream_id);
             break;
@@ -1349,7 +1480,7 @@ qdr_http2_connection_t 
*qdr_http_connection_ingress(qd_http_listener_t* listener
     ingress_http_conn->pn_raw_conn = pn_raw_connection();
     sys_atomic_init(&ingress_http_conn->raw_closed_read, 0);
     sys_atomic_init(&ingress_http_conn->raw_closed_write, 0);
-
+    sys_atomic_init(&ingress_http_conn->q2_restart, 0);
     ingress_http_conn->session_data = new_qdr_http2_session_data_t();
     ZERO(ingress_http_conn->session_data);
     DEQ_INIT(ingress_http_conn->session_data->buffs);
@@ -1659,6 +1790,10 @@ uint64_t handle_outgoing_http(qdr_http2_stream_data_t 
*stream_data)
 {
     //stream_data->processing = true;
     qdr_http2_session_data_t *session_data = stream_data->session_data;
+
+    if (!stream_data->session_data)
+       return 0;
+
     qdr_http2_connection_t *conn = session_data->conn;
 
        if (IS_ATOMIC_FLAG_SET(&conn->raw_closed_write))
@@ -1666,6 +1801,7 @@ uint64_t handle_outgoing_http(qdr_http2_stream_data_t 
*stream_data)
 
     qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"] 
Starting to handle_outgoing_http", conn->conn_id);
     if (stream_data->out_dlv) {
+
         qd_message_t *message = qdr_delivery_message(stream_data->out_dlv);
 
         if (stream_data->out_msg_send_complete) {
@@ -1674,6 +1810,7 @@ uint64_t handle_outgoing_http(qdr_http2_stream_data_t 
*stream_data)
         }
 
         if (!stream_data->out_msg_header_sent) {
+
             qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, 
"[C%"PRIu64"] Header not sent yet", conn->conn_id);
 
             qd_iterator_t *group_id_itr = qd_message_field_iterator(message, 
QD_FIELD_GROUP_ID);
@@ -1721,8 +1858,10 @@ uint64_t handle_outgoing_http(qdr_http2_stream_data_t 
*stream_data)
             stream_data->curr_stream_data_result = 
qd_message_next_stream_data(message, &stream_data->curr_stream_data);
             if (stream_data->curr_stream_data_result == 
QD_MESSAGE_STREAM_DATA_BODY_OK) {
                 size_t payload_length = 
qd_message_stream_data_payload_length(stream_data->curr_stream_data);
+
                 if (payload_length == 0) {
-                    stream_data->next_stream_data_result = 
qd_message_next_stream_data(message, &stream_data->next_stream_data);
+                    stream_data->next_stream_data_result =     
qd_message_next_stream_data(message, &stream_data->next_stream_data);
+
                     if (stream_data->next_stream_data_result == 
QD_MESSAGE_STREAM_DATA_NO_MORE) {
                         if (stream_data->next_stream_data) {
                             
qd_message_stream_data_release(stream_data->next_stream_data);
@@ -1730,6 +1869,7 @@ uint64_t handle_outgoing_http(qdr_http2_stream_data_t 
*stream_data)
                         }
 
                         
qd_message_stream_data_release(stream_data->curr_stream_data);
+
                         stream_data->curr_stream_data = 0;
                         flags = NGHTTP2_FLAG_END_STREAM;
                         stream_data->out_msg_has_body = false;
@@ -1767,6 +1907,7 @@ uint64_t handle_outgoing_http(qdr_http2_stream_data_t 
*stream_data)
                 free(hdrs[idx].name);
                 free(hdrs[idx].value);
             }
+
         }
         else {
             qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, 
"[C%"PRIu64"][S%"PRId32"] Headers already submitted, Proceeding with the body", 
conn->conn_id, stream_data->stream_id);
@@ -1795,8 +1936,10 @@ uint64_t handle_outgoing_http(qdr_http2_stream_data_t 
*stream_data)
                     qd_log(http2_adaptor->protocol_log_source, QD_LOG_ERROR, 
"[C%"PRIu64"][S%"PRId32"] Error submitting data rv=%i", conn->conn_id, 
stream_data->stream_id, rv);
                 }
                 else {
-                    nghttp2_session_send(session_data->session);
-                    qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, 
"[C%"PRIu64"][S%"PRId32"] nghttp2_session_send - done", conn->conn_id, 
stream_data->stream_id);
+                       if (session_data->session) {
+                               nghttp2_session_send(session_data->session);
+                               qd_log(http2_adaptor->protocol_log_source, 
QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] nghttp2_session_send - done", 
conn->conn_id, stream_data->stream_id);
+                       }
                 }
             }
         }
@@ -2067,7 +2210,7 @@ static int handle_incoming_http(qdr_http2_connection_t 
*conn)
     else {
         grant_read_buffers(conn);
     }
-    nghttp2_session_send(conn-> session_data->session);
+    nghttp2_session_send(conn->session_data->session);
 
     return count;
 }
@@ -2322,6 +2465,8 @@ qdr_http2_connection_t 
*qdr_http_connection_egress(qd_http_connector_t *connecto
     egress_http_conn->session_data->conn = egress_http_conn;
     sys_atomic_init(&egress_http_conn->raw_closed_read, 0);
     sys_atomic_init(&egress_http_conn->raw_closed_write, 0);
+    sys_atomic_init(&egress_http_conn->q2_restart, 0);
+
     sys_mutex_lock(http2_adaptor->lock);
     DEQ_INSERT_TAIL(http2_adaptor->connections, egress_http_conn);
     sys_mutex_unlock(http2_adaptor->lock);
@@ -2395,6 +2540,9 @@ static void handle_connection_event(pn_event_t *e, 
qd_server_t *qd_server, void
         break;
     }
     case PN_RAW_CONNECTION_CLOSED_READ: {
+        if (conn->q2_blocked) {
+            conn->q2_blocked = false;
+        }
        SET_ATOMIC_FLAG(&conn->raw_closed_read);
         if (conn->pn_raw_conn)
             pn_raw_connection_close(conn->pn_raw_conn);
@@ -2419,10 +2567,6 @@ static void handle_connection_event(pn_event_t *e, 
qd_server_t *qd_server, void
             }
         }
         conn->connection_established = false;
-        if (conn->goaway_received) {
-            nghttp2_session_del(conn->session_data->session);
-            conn->session_data->session = 0;
-        }
         handle_disconnected(conn);
         break;
     }
@@ -2437,10 +2581,21 @@ static void handle_connection_event(pn_event_t *e, 
qd_server_t *qd_server, void
     }
     case PN_RAW_CONNECTION_WAKE: {
         qd_log(log, QD_LOG_TRACE, "[C%"PRIu64"] PN_RAW_CONNECTION_WAKE 
Wake-up", conn->conn_id);
+        if (CLEAR_ATOMIC_FLAG(&conn->q2_restart)) {
+            conn->q2_blocked = false;
+            qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, 
"[C%"PRIu64"] q2 is unblocked on this connection", conn->conn_id);
+            handle_incoming_http(conn);
+        }
+
         while (qdr_connection_process(conn->qdr_conn)) {}
         break;
     }
     case PN_RAW_CONNECTION_READ: {
+       // We don't want to read when we are q2 blocked.
+       if (conn->q2_blocked) {
+               return;
+       }
+
         int read = handle_incoming_http(conn);
         qd_log(log, QD_LOG_TRACE, "[C%"PRIu64"] PN_RAW_CONNECTION_READ Read %i 
bytes", conn->conn_id, read);
         break;
@@ -2662,6 +2817,7 @@ static void qdr_http2_adaptor_init(qdr_core_t *core, void 
**adaptor_context)
     nghttp2_session_callbacks_set_send_data_callback(callbacks, 
snd_data_callback);
     nghttp2_session_callbacks_set_send_callback(callbacks, send_callback);
     nghttp2_session_callbacks_set_on_invalid_frame_recv_callback(callbacks, 
on_invalid_frame_recv_callback);
+    nghttp2_session_callbacks_set_error_callback2(callbacks, 
on_error_callback);
 
     adaptor->callbacks = callbacks;
     http2_adaptor = adaptor;
diff --git a/src/adaptors/http2/http2_adaptor.h 
b/src/adaptors/http2/http2_adaptor.h
index 7aafc2f..9373066 100644
--- a/src/adaptors/http2/http2_adaptor.h
+++ b/src/adaptors/http2/http2_adaptor.h
@@ -83,7 +83,7 @@ struct qdr_http2_stream_data_t {
     qd_message_t             *message;
     qd_composed_field_t      *app_properties;
     qd_composed_field_t      *footer_properties;
-    qd_composed_field_t      *body;
+    qd_buffer_list_t          body_buffers;
     qd_message_stream_data_t *curr_stream_data;
     qd_message_stream_data_t *next_stream_data;
     qd_message_stream_data_t *footer_stream_data;
@@ -92,7 +92,8 @@ struct qdr_http2_stream_data_t {
     qd_message_stream_data_result_t  curr_stream_data_result;
     qd_message_stream_data_result_t  next_stream_data_result;
     int                            curr_stream_data_qd_buff_offset;
-    int                            stream_data_buff_count;
+    int                            curr_stream_data_offset; // The offset 
within the qd_buffer so we can jump there.
+    int                                                   payload_handled;
     int                            in_link_credit;   // provided by router
     int32_t                        stream_id;
     size_t                         qd_buffers_to_send;
@@ -111,10 +112,9 @@ struct qdr_http2_stream_data_t {
     bool                     disp_applied;   // Has the disp been applied to 
the out_dlv. The stream is ready to be freed now.
     bool                     header_and_props_composed;  // true if the header 
and properties of the inbound message have already been composed so we don't 
have to do it again.
     bool                     stream_force_closed;
-
     bool                     in_dlv_decrefed;
     bool                     out_dlv_decrefed;
-    bool                     body_data_added;
+    bool                     body_data_added_to_msg;
     int                      bytes_in;
     int                      bytes_out;
     qd_timestamp_t           start;
@@ -152,7 +152,8 @@ struct qdr_http2_connection_t {
     bool                      goaway_received;
     sys_atomic_t                     raw_closed_read;
     sys_atomic_t                         raw_closed_write;
-
+    bool                      q2_blocked;      // send a connection level 
WINDOW_UPDATE frame to tell the client to stop sending data.
+    sys_atomic_t              q2_restart;      // signal to resume receive
     DEQ_LINKS(qdr_http2_connection_t);
  };
 
diff --git a/src/message.c b/src/message.c
index 2d8d579..e204730 100644
--- a/src/message.c
+++ b/src/message.c
@@ -42,6 +42,8 @@
 #include <string.h>
 #include <time.h>
 
+ #define CHECK_Q2(blist) assert(DEQ_SIZE(blist) <= QD_QLIMIT_Q2_LOWER)
+
 #define LOCK   sys_mutex_lock
 #define UNLOCK sys_mutex_unlock
 
@@ -2390,8 +2392,11 @@ void qd_message_compose_4(qd_message_t *msg, 
qd_composed_field_t *field1, qd_com
     qd_message_content_t *content        = MSG_CONTENT(msg);
     SET_ATOMIC_BOOL(&content->receive_complete, receive_complete);
     qd_buffer_list_t     *field1_buffers = qd_compose_buffers(field1);
+    CHECK_Q2(*field1_buffers);
     qd_buffer_list_t     *field2_buffers = qd_compose_buffers(field2);
+    CHECK_Q2(*field2_buffers);
     qd_buffer_list_t     *field3_buffers = qd_compose_buffers(field3);
+    CHECK_Q2(*field3_buffers);
 
     content->buffers = *field1_buffers;
     DEQ_INIT(*field1_buffers);
@@ -2404,10 +2409,13 @@ void qd_message_compose_5(qd_message_t *msg, 
qd_composed_field_t *field1, qd_com
     qd_message_content_t *content        = MSG_CONTENT(msg);
     SET_ATOMIC_BOOL(&content->receive_complete, receive_complete);
     qd_buffer_list_t     *field1_buffers = qd_compose_buffers(field1);
+    CHECK_Q2(*field1_buffers);
     qd_buffer_list_t     *field2_buffers = qd_compose_buffers(field2);
+    CHECK_Q2(*field2_buffers);
     qd_buffer_list_t     *field3_buffers = qd_compose_buffers(field3);
+    CHECK_Q2(*field3_buffers);
     qd_buffer_list_t     *field4_buffers = qd_compose_buffers(field4);
-
+    CHECK_Q2(*field4_buffers);
     content->buffers = *field1_buffers;
     DEQ_INIT(*field1_buffers);
     DEQ_APPEND(content->buffers, (*field2_buffers));
@@ -2998,18 +3006,33 @@ bool qd_message_oversize(const qd_message_t *msg)
 }
 
 
+int qd_message_stream_data_footer_append(qd_message_t *message, 
qd_buffer_list_t *footer_props)
+{
+    qd_composed_field_t *field = 0;
+    int rc = 0;
+
+    field = qd_compose(QD_PERFORMATIVE_FOOTER, field);
+
+    // Stick the buffers into the footer compose field.
+    qd_compose_insert_binary_buffers(field, footer_props);
+
+    rc = qd_message_extend(message, field, 0);
+
+    qd_compose_free(field);
+    return rc;
+
+}
+
 int qd_message_stream_data_append(qd_message_t *message, qd_buffer_list_t 
*data, bool *q2_blocked)
 {
     unsigned int        length = DEQ_SIZE(*data);
+
     qd_composed_field_t *field = 0;
     int rc = 0;
 
     if (q2_blocked)
         *q2_blocked = false;
 
-    if (length == 0)
-        return rc;
-
     // DISPATCH-1803: ensure no body data section can exceed the
     // QD_QLIMIT_Q2_LOWER.  This allows the egress router to wait for an entire
     // body data section to arrive and be validated before sending it out to
diff --git a/tests/http2_server.py b/tests/http2_server.py
index c22c07b..0acb91d 100644
--- a/tests/http2_server.py
+++ b/tests/http2_server.py
@@ -108,5 +108,13 @@ async def get_jpg_images():
     img_file = image_file("apache.jpg")
     return await send_file(img_file, mimetype='image/jpg')
 
+
+@app.route('/upload', methods=['POST'])
+async def process_upload_data():
+    for name, file in (await request.files).items():
+        print(f'Processing {name}: {len(file.read())}')
+    return "Success!"
+
+
 #app.run(port=5000, certfile='cert.pem', keyfile='key.pem')
 app.run(port=os.getenv('SERVER_LISTEN_PORT'))
diff --git a/tests/http2_slow_q2_server.py b/tests/http2_slow_q2_server.py
new file mode 100644
index 0000000..6aa230d
--- /dev/null
+++ b/tests/http2_slow_q2_server.py
@@ -0,0 +1,107 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import socket
+import signal
+import sys
+import os
+import h2.connection
+import h2.events
+import h2.config
+import h2.errors
+
+BYTES = 16384
+
+
+def receive_signal(signalNumber, frame):
+    print('Received:', signalNumber)
+    sys.exit(0)
+
+
+def send_response(event, conn):
+    """
+    conn.close_connection() sends a goaway frame to the client
+    and closes the connection.
+    """
+    stream_id = event.stream_id
+    conn.send_headers(stream_id=stream_id,
+                      headers=[(u':status', u'200'), (u'server', 
u'h2_slow_q2_server/0.1.0')])
+    conn.send_data(stream_id=stream_id,
+                   data=b'Success!',
+                   end_stream=True)
+
+
+def handle_events(conn, events):
+    for event in events:
+        if isinstance(event, h2.events.DataReceived):
+            # When the server receives a DATA frame from the router, we send 
back a WINDOW_UPDATE frame
+            # with a window size increment of only 1k (1024 bytes)
+            # This pushes the router into q2 since it is able to only send two 
qd_buffers at a time.
+            conn.increment_flow_control_window(1024, None)
+            conn.increment_flow_control_window(1024, event.stream_id)
+        elif isinstance(event, h2.events.StreamEnded):
+            send_response(event, conn)
+
+
+def handle(sock):
+    config = h2.config.H2Configuration(client_side=False)
+
+    # The default initial window per HTTP2 spec is 64K.
+    # That means that the router is allowed to send only 64k before it needs 
more WINDOW_UPDATE frames
+    # providing more credit for the router to send more data.
+    conn = h2.connection.H2Connection(config=config)
+    conn.initiate_connection()
+    sock.sendall(conn.data_to_send())
+
+    while True:
+        data = None
+        try:
+            data = sock.recv(BYTES)
+        except:
+            pass
+        if not data:
+            break
+        try:
+            events = conn.receive_data(data)
+        except Exception as e:
+            print(e)
+            break
+        handle_events(conn, events)
+        data_to_send = conn.data_to_send()
+        if data_to_send:
+            sock.sendall(data_to_send)
+
+
+signal.signal(signal.SIGHUP, receive_signal)
+signal.signal(signal.SIGINT, receive_signal)
+signal.signal(signal.SIGQUIT, receive_signal)
+signal.signal(signal.SIGILL, receive_signal)
+signal.signal(signal.SIGTERM, receive_signal)
+
+sock = socket.socket()
+sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+sock.bind(('0.0.0.0', int(os.getenv('SERVER_LISTEN_PORT'))))
+sock.listen(5)
+
+while True:
+    # The accept method blocks until someone attempts to connect to our TCP
+    # port: when they do, it returns a tuple: the first element is a new
+    # socket object, the second element is a tuple of the address the new
+    # connection is from
+    handle(sock.accept()[0])
diff --git a/tests/images/test.jpg b/tests/images/test.jpg
new file mode 100644
index 0000000..9ef14a3
Binary files /dev/null and b/tests/images/test.jpg differ
diff --git a/tests/system_tests_http2.py b/tests/system_tests_http2.py
index cca7a8e..7d09f6f 100644
--- a/tests/system_tests_http2.py
+++ b/tests/system_tests_http2.py
@@ -155,6 +155,15 @@ class CommonHttp2Tests:
         self.assertIn('Success! Your first name is John, last name is Doe', 
out)
 
     @unittest.skipIf(skip_test(), "Python 3.7 or greater, Quart 0.13.0 or 
greater and curl needed to run http2 tests")
+    def test_post_upload_large_image_jpg(self):
+        # curl  -X POST -H "Content-Type: multipart/form-data"  -F 
"data=@/home/gmurthy/opensource/test.jpg"
+        # http://127.0.0.1:9000/upload --http2-prior-knowledge
+        address = self.router_qdra.http_addresses[0] + "/upload"
+        out = self.run_curl(address, args=['-X', 'POST', '-H', 'Content-Type: 
multipart/form-data',
+                                           '-F', 'data=@' + 
image_file('test.jpg')])
+        self.assertIn('Success', out)
+
+    @unittest.skipIf(skip_test(), "Python 3.7 or greater, Quart 0.13.0 or 
greater and curl needed to run http2 tests")
     def test_delete_request(self):
         # curl -X DELETE "http://127.0.0.1:9000/myinfo/delete/22122"; -H  
"accept: application/json" --http2-prior-knowledge
         address = self.router_qdra.http_addresses[0] + "/myinfo/delete/22122"
@@ -177,14 +186,14 @@ class CommonHttp2Tests:
 
     @unittest.skipIf(skip_test(), "Python 3.7 or greater, Quart 0.13.0 or 
greater and curl needed to run http2 tests")
     def test_404(self):
-        # Run curl 127.0.0.1:port/unavilable --http2-prior-knowledge
-        address = self.router_qdra.http_addresses[0] + "/unavilable"
-        out = self.run_curl(address)
+        # Run curl 127.0.0.1:port/unavailable --http2-prior-knowledge
+        address = self.router_qdra.http_addresses[0] + "/unavailable"
+        out = self.run_curl(address=address)
         self.assertIn('404 Not Found', out)
 
     @unittest.skipIf(skip_test(), "Python 3.7 or greater, Quart 0.13.0 or 
greater and curl needed to run http2 tests")
     def test_500(self):
-        # Run curl 127.0.0.1:port/unavilable --http2-prior-knowledge
+        # Run curl 127.0.0.1:port/test/500 --http2-prior-knowledge
         address = self.router_qdra.http_addresses[0] + "/test/500"
         out = self.run_curl(address)
         self.assertIn('500 Internal Server Error', out)
@@ -742,3 +751,126 @@ class Http2TestGoAway(Http2TestBase):
         address = self.router_qdra.http_addresses[0] + "/goaway_test_1"
         out = self.run_curl(address, args=["-i"])
         self.assertIn("HTTP/2 503", out)
+
+
+class Http2Q2OneRouterTest(Http2TestBase):
+    @classmethod
+    def setUpClass(cls):
+        super(Http2Q2OneRouterTest, cls).setUpClass()
+        if skip_h2_test():
+            return
+        cls.http2_server_name = "http2_slow_q2_server"
+        os.environ['SERVER_LISTEN_PORT'] = str(cls.tester.get_port())
+        cls.http2_server = cls.tester.http2server(name=cls.http2_server_name,
+                                                  
listen_port=int(os.getenv('SERVER_LISTEN_PORT')),
+                                                  py_string='python3',
+                                                  
server_file="http2_slow_q2_server.py")
+        name = "http2-test-router"
+        cls.connector_name = 'connectorToServer'
+        cls.connector_props = {
+            'port': os.getenv('SERVER_LISTEN_PORT'),
+            'address': 'examples',
+            'host': '127.0.0.1',
+            'protocolVersion': 'HTTP2',
+            'name': cls.connector_name
+        }
+        config = Qdrouterd.Config([
+            ('router', {'mode': 'standalone', 'id': 'QDR'}),
+            ('listener', {'port': cls.tester.get_port(), 'role': 'normal', 
'host': '0.0.0.0'}),
+
+            ('httpListener', {'port': cls.tester.get_port(), 'address': 
'examples',
+                              'host': '127.0.0.1', 'protocolVersion': 
'HTTP2'}),
+            ('httpConnector', cls.connector_props)
+        ])
+        cls.router_qdra = cls.tester.qdrouterd(name, config, wait=True)
+
+    @unittest.skipIf(skip_h2_test(),
+                     "Python 3.7 or greater, hyper-h2 and curl needed to run 
hyperhttp2 tests")
+    def test_q2_block_unblock(self):
+        # curl  -X POST -H "Content-Type: multipart/form-data"  -F 
"data=@/home/gmurthy/opensource/test.jpg"
+        # http://127.0.0.1:9000/upload --http2-prior-knowledge
+        address = self.router_qdra.http_addresses[0] + "/upload"
+        out = self.run_curl(address, args=['-X', 'POST', '-H', 'Content-Type: 
multipart/form-data',
+                                           '-F', 'data=@' + 
image_file('test.jpg')])
+        self.assertIn('Success', out)
+        num_blocked = 0
+        num_unblocked = 0
+        blocked = "q2 is blocked"
+        unblocked = "q2 is unblocked"
+        with open(self.router_qdra.logfile_path, 'r') as router_log:
+            log_lines = router_log.read().split("\n")
+            for log_line in log_lines:
+                if unblocked in log_line:
+                    num_unblocked += 1
+                elif blocked in log_line:
+                    num_blocked += 1
+
+        self.assertGreater(num_blocked, 0)
+        self.assertGreater(num_unblocked, 0)
+
+
+class Http2Q2TwoRouterTest(Http2TestBase):
+    @classmethod
+    def setUpClass(cls):
+        super(Http2Q2TwoRouterTest, cls).setUpClass()
+        if skip_h2_test():
+            return
+        cls.http2_server_name = "http2_server"
+        os.environ['SERVER_LISTEN_PORT'] = str(cls.tester.get_port())
+        cls.http2_server = cls.tester.http2server(name=cls.http2_server_name,
+                                                  
listen_port=int(os.getenv('SERVER_LISTEN_PORT')),
+                                                  py_string='python3',
+                                                  
server_file="http2_slow_q2_server.py")
+        qdr_a = "QDR.A"
+        inter_router_port = cls.tester.get_port()
+        config_qdra = Qdrouterd.Config([
+            ('router', {'mode': 'interior', 'id': 'QDR.A'}),
+            ('listener', {'port': cls.tester.get_port(), 'role': 'normal', 
'host': '0.0.0.0'}),
+            ('httpListener', {'port': cls.tester.get_port(), 'address': 
'examples',
+                              'host': '127.0.0.1', 'protocolVersion': 
'HTTP2'}),
+            ('connector', {'name': 'connectorToB', 'role': 'inter-router',
+                           'port': inter_router_port,
+                           'verifyHostname': 'no'})
+        ])
+
+        qdr_b = "QDR.B"
+        cls.connector_name = 'serverConnector'
+        cls.http_connector_props = {
+            'port': os.getenv('SERVER_LISTEN_PORT'),
+            'address': 'examples',
+            'host': '127.0.0.1',
+            'protocolVersion': 'HTTP2',
+            'name': cls.connector_name
+        }
+        config_qdrb = Qdrouterd.Config([
+            ('router', {'mode': 'interior', 'id': 'QDR.B'}),
+            ('httpConnector', cls.http_connector_props),
+            ('listener', {'role': 'inter-router', 'maxSessionFrames': '10', 
'port': inter_router_port})
+        ])
+        cls.router_qdrb = cls.tester.qdrouterd(qdr_b, config_qdrb, wait=True)
+        cls.router_qdra = cls.tester.qdrouterd(qdr_a, config_qdra, wait=True)
+        cls.router_qdra.wait_router_connected('QDR.B')
+
+    @unittest.skipIf(skip_h2_test(),
+                     "Python 3.7 or greater, hyper-h2 and curl needed to run 
hyperhttp2 tests")
+    def test_q2_block_unblock(self):
+        # curl  -X POST -H "Content-Type: multipart/form-data"  -F 
"data=@/home/gmurthy/opensource/test.jpg"
+        # http://127.0.0.1:9000/upload --http2-prior-knowledge
+        address = self.router_qdra.http_addresses[0] + "/upload"
+        out = self.run_curl(address, args=['-X', 'POST', '-H', 'Content-Type: 
multipart/form-data',
+                                           '-F', 'data=@' + 
image_file('test.jpg')])
+        self.assertIn('Success', out)
+        num_blocked = 0
+        num_unblocked = 0
+        blocked = "q2 is blocked"
+        unblocked = "q2 is unblocked"
+        with open(self.router_qdra.logfile_path, 'r') as router_log:
+            log_lines = router_log.read().split("\n")
+            for log_line in log_lines:
+                if unblocked in log_line:
+                    num_unblocked += 1
+                elif blocked in log_line:
+                    num_blocked += 1
+
+        self.assertGreater(num_blocked, 0)
+        self.assertGreater(num_unblocked, 0)

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to