This is an automated email from the ASF dual-hosted git repository.

chug pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/master by this push:
     new c6b2055  DISPATCH-1947: TCP Adaptor flow control
c6b2055 is described below

commit c6b205574ed5925cde40d8f126902d28ff5a32e2
Author: Chuck Rolke <c...@apache.org>
AuthorDate: Tue Feb 23 16:48:05 2021 -0500

    DISPATCH-1947: TCP Adaptor flow control
    
    This closes #1056
---
 src/adaptors/tcp_adaptor.c | 88 ++++++++++++++++++++++++++++++++++++++++------
 1 file changed, 78 insertions(+), 10 deletions(-)

diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c
index 69bed1a..0b83123 100644
--- a/src/adaptors/tcp_adaptor.c
+++ b/src/adaptors/tcp_adaptor.c
@@ -17,6 +17,7 @@
  * under the License.
  */
 
+#include "tcp_adaptor.h"
 #include <proton/condition.h>
 #include <proton/listener.h>
 #include <proton/netaddr.h>
@@ -26,7 +27,6 @@
 #include "qpid/dispatch/ctools.h"
 #include "qpid/dispatch/protocol_adaptor.h"
 #include "delivery.h"
-#include "tcp_adaptor.h"
 #include <stdio.h>
 #include <inttypes.h>
 
@@ -78,6 +78,9 @@ struct qdr_tcp_connection_t {
     int                     outgoing_buff_count;  // number of buffers with 
data
     int                     outgoing_buff_idx;    // first buffer with data
 
+    sys_atomic_t            q2_restart;      // signal to resume receive
+    bool                    q2_blocked;      // stop reading from raw conn
+
     DEQ_LINKS(qdr_tcp_connection_t);
 };
 
@@ -148,7 +151,37 @@ static void grant_read_buffers(qdr_tcp_connection_t *conn)
     }
 }
 
-static int handle_incoming(qdr_tcp_connection_t *conn)
+
+// Per-message callback to resume receiving after Q2 is unblocked on the
+// incoming link.
+// This routine must be thread safe: the thread on which it is running
+// is not an IO thread that owns the underlying pn_raw_conn.
+//
+void qdr_tcp_q2_unblocked_handler(const qd_alloc_safe_ptr_t context)
+{
+    qdr_tcp_connection_t *tc = 
(qdr_tcp_connection_t*)qd_alloc_deref_safe_ptr(&context);
+    if (tc == 0) {
+        // bad news.
+        assert(false);
+        return;
+    }
+
+    // prevent the tc from being deleted while running:
+    sys_mutex_lock(tc->activation_lock);
+
+    if (tc && tc->pn_raw_conn) {
+        sys_atomic_set(&tc->q2_restart, 1);
+        pn_raw_connection_wake(tc->pn_raw_conn);
+    }
+
+    sys_mutex_unlock(tc->activation_lock);
+}
+
+
+// Fetch incoming raw incoming buffers from proton and pass them to
+// existing delivery or create a new delivery.
+// If close is pending then do not give more buffers to proton.
+static int handle_incoming_impl(qdr_tcp_connection_t *conn, bool close_pending)
 {
     //
     // Don't initiate an ingress stream message if we don't yet have a 
reply-to address and credit.
@@ -163,6 +196,16 @@ static int handle_incoming(qdr_tcp_connection_t *conn)
         return 0;
     }
 
+    //
+    // Don't read from proton if in Q2 holdoff
+    //
+    if (conn->q2_blocked) {
+        qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] 
handle_incoming q2_blocked", conn->conn_id);
+        return 0;
+    }
+
+    // Read all buffers available from proton.
+    // Collect buffers for ingress; free empty buffers.
     qd_buffer_list_t buffers;
     DEQ_INIT(buffers);
     pn_raw_buffer_t raw_buffers[READ_BUFFERS];
@@ -182,14 +225,20 @@ static int handle_incoming(qdr_tcp_connection_t *conn)
             }
         }
     }
-
     qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] Took %zu read 
buffers", conn->conn_id, DEQ_SIZE(buffers));
     qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] Freed %i read 
buffers", conn->conn_id, free_count);
-    grant_read_buffers(conn);
+
+    // Only grant more buffers to proton for reading if close is not pending
+    if (!close_pending) {
+        grant_read_buffers(conn);
+    }
 
     if (conn->instream) {
-        // @TODO(kgiusti): handle Q2 block event:
-        qd_message_stream_data_append(qdr_delivery_message(conn->instream), 
&buffers, 0);
+        qd_message_stream_data_append(qdr_delivery_message(conn->instream), 
&buffers, &conn->q2_blocked);
+        if (conn->q2_blocked) {
+            // note: unit tests grep for this log!
+            qd_log(tcp_adaptor->log_source, QD_LOG_TRACE, "[C%"PRIu64"] client 
link blocked on Q2 limit", conn->conn_id);
+        }
         qdr_delivery_continue(tcp_adaptor->core, conn->instream, false);
         qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, 
"[C%"PRIu64"][L%"PRIu64"] Continuing message with %i bytes", conn->conn_id, 
conn->incoming_id, count);
     } else {
@@ -230,6 +279,10 @@ static int handle_incoming(qdr_tcp_connection_t *conn)
         qd_message_compose_2(msg, props, false);
         qd_compose_free(props);
 
+        // set up message q2 unblocked callback handler
+        qd_alloc_safe_ptr_t conn_sp = QD_SAFE_PTR_INIT(conn);
+        qd_message_set_q2_unblocked_handler(msg, qdr_tcp_q2_unblocked_handler, 
conn_sp);
+
         conn->instream = qdr_link_deliver(conn->incoming, msg, 0, false, 0, 0, 
0, 0);
         qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, 
"[C%"PRIu64"][L%"PRIu64"] Initiating message with %i bytes", conn->conn_id, 
conn->incoming_id, count);
     }
@@ -237,10 +290,16 @@ static int handle_incoming(qdr_tcp_connection_t *conn)
 }
 
 
+static int handle_incoming(qdr_tcp_connection_t *conn)
+{
+    // Normal incoming runs with no close pending
+    return handle_incoming_impl(conn, false);
+}
+
 static void flush_outgoing_buffs(qdr_tcp_connection_t *conn)
 {
     // Flush buffers staged for writing to raw conn
-    // and free possible references to stream data objects.
+    // and release any references to stream data objects.
     if (conn->outgoing_buff_count > 0) {
         for (size_t i = conn->outgoing_buff_idx;
             i < conn->outgoing_buff_idx + conn->outgoing_buff_count;
@@ -263,10 +322,10 @@ static void flush_outgoing_buffs(qdr_tcp_connection_t 
*conn)
 
 static void free_qdr_tcp_connection(qdr_tcp_connection_t* tc)
 {
-    qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] Freeing 
tcp_connection %p", tc->conn_id, (void*) tc);
     free(tc->reply_to);
     free(tc->remote_address);
     free(tc->global_id);
+    sys_atomic_destroy(&tc->q2_restart);
     if (tc->activate_timer) {
         qd_timer_free(tc->activate_timer);
     }
@@ -278,7 +337,6 @@ static void free_qdr_tcp_connection(qdr_tcp_connection_t* 
tc)
 
 static void handle_disconnected(qdr_tcp_connection_t* conn)
 {
-    qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] 
handle_disconnected", conn->conn_id);
     if (conn->instream) {
         qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, 
"[C%"PRIu64"][L%"PRIu64"] handle_disconnected - close instream", conn->conn_id, 
conn->incoming_id);
         qd_message_set_receive_complete(qdr_delivery_message(conn->instream));
@@ -552,8 +610,8 @@ static void handle_connection_event(pn_event_t *e, 
qd_server_t *qd_server, void
     switch (pn_event_type(e)) {
     case PN_RAW_CONNECTION_CONNECTED: {
         if (conn->ingress) {
-            qdr_tcp_connection_ingress_accept(conn);
             qd_log(log, QD_LOG_INFO, "[C%"PRIu64"] PN_RAW_CONNECTION_CONNECTED 
Ingress accepted to %s from %s (global_id=%s)", conn->conn_id, 
conn->config.host_port, conn->remote_address, conn->global_id);
+            qdr_tcp_connection_ingress_accept(conn);
             break;
         } else {
             conn->remote_address = get_address_string(conn->pn_raw_conn);
@@ -569,6 +627,8 @@ static void handle_connection_event(pn_event_t *e, 
qd_server_t *qd_server, void
     }
     case PN_RAW_CONNECTION_CLOSED_READ: {
         qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] 
PN_RAW_CONNECTION_CLOSED_READ", conn->conn_id);
+        conn->q2_blocked = false;
+        handle_incoming_impl(conn, true);
         conn->raw_closed_read = true;
         pn_raw_connection_close(conn->pn_raw_conn);
         break;
@@ -601,6 +661,12 @@ static void handle_connection_event(pn_event_t *e, 
qd_server_t *qd_server, void
     }
     case PN_RAW_CONNECTION_WAKE: {
         qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] PN_RAW_CONNECTION_WAKE", 
conn->conn_id);
+        if (sys_atomic_set(&conn->q2_restart, 0)) {
+            // note: unit tests grep for this log!
+            qd_log(log, QD_LOG_TRACE, "[C%"PRIu64"] client link unblocked from 
Q2 limit", conn->conn_id);
+            conn->q2_blocked = false;
+            handle_incoming(conn);
+        }
         while (qdr_connection_process(conn->qdr_conn)) {}
         break;
     }
@@ -646,6 +712,7 @@ static qdr_tcp_connection_t 
*qdr_tcp_connection_ingress(qd_tcp_listener_t* liste
     tc->context.handler = &handle_connection_event;
     tc->config = listener->config;
     tc->server = listener->server;
+    sys_atomic_init(&tc->q2_restart, 0);
     tc->pn_raw_conn = pn_raw_connection();
     pn_raw_connection_set_context(tc->pn_raw_conn, tc);
     //the following call will cause a PN_RAW_CONNECTION_CONNECTED
@@ -739,6 +806,7 @@ static qdr_tcp_connection_t 
*qdr_tcp_connection_egress(qd_bridge_config_t *confi
     tc->context.handler = &handle_connection_event;
     tc->config = *config;
     tc->server = server;
+    sys_atomic_init(&tc->q2_restart, 0);
     tc->conn_id = qd_server_allocate_connection_id(tc->server);
 
     //


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to