This is an automated email from the ASF dual-hosted git repository.
chug pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/master by this push:
new c6b2055 DISPATCH-1947: TCP Adaptor flow control
c6b2055 is described below
commit c6b205574ed5925cde40d8f126902d28ff5a32e2
Author: Chuck Rolke
AuthorDate: Tue Feb 23 16:48:05 2021 -0500
DISPATCH-1947: TCP Adaptor flow control
This closes #1056
---
src/adaptors/tcp_adaptor.c | 88 --
1 file changed, 78 insertions(+), 10 deletions(-)
diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c
index 69bed1a..0b83123 100644
--- a/src/adaptors/tcp_adaptor.c
+++ b/src/adaptors/tcp_adaptor.c
@@ -17,6 +17,7 @@
* under the License.
*/
+#include "tcp_adaptor.h"
#include
#include
#include
@@ -26,7 +27,6 @@
#include "qpid/dispatch/ctools.h"
#include "qpid/dispatch/protocol_adaptor.h"
#include "delivery.h"
-#include "tcp_adaptor.h"
#include
#include
@@ -78,6 +78,9 @@ struct qdr_tcp_connection_t {
int outgoing_buff_count; // number of buffers with
data
int outgoing_buff_idx;// first buffer with data
+sys_atomic_tq2_restart; // signal to resume receive
+boolq2_blocked; // stop reading from raw conn
+
DEQ_LINKS(qdr_tcp_connection_t);
};
@@ -148,7 +151,37 @@ static void grant_read_buffers(qdr_tcp_connection_t *conn)
}
}
-static int handle_incoming(qdr_tcp_connection_t *conn)
+
+// Per-message callback to resume receiving after Q2 is unblocked on the
+// incoming link.
+// This routine must be thread safe: the thread on which it is running
+// is not an IO thread that owns the underlying pn_raw_conn.
+//
+void qdr_tcp_q2_unblocked_handler(const qd_alloc_safe_ptr_t context)
+{
+qdr_tcp_connection_t *tc =
(qdr_tcp_connection_t*)qd_alloc_deref_safe_ptr(&context);
+if (tc == 0) {
+// bad news.
+assert(false);
+return;
+}
+
+// prevent the tc from being deleted while running:
+sys_mutex_lock(tc->activation_lock);
+
+if (tc && tc->pn_raw_conn) {
+sys_atomic_set(&tc->q2_restart, 1);
+pn_raw_connection_wake(tc->pn_raw_conn);
+}
+
+sys_mutex_unlock(tc->activation_lock);
+}
+
+
+// Fetch incoming raw incoming buffers from proton and pass them to
+// existing delivery or create a new delivery.
+// If close is pending then do not give more buffers to proton.
+static int handle_incoming_impl(qdr_tcp_connection_t *conn, bool close_pending)
{
//
// Don't initiate an ingress stream message if we don't yet have a
reply-to address and credit.
@@ -163,6 +196,16 @@ static int handle_incoming(qdr_tcp_connection_t *conn)
return 0;
}
+//
+// Don't read from proton if in Q2 holdoff
+//
+if (conn->q2_blocked) {
+qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"]
handle_incoming q2_blocked", conn->conn_id);
+return 0;
+}
+
+// Read all buffers available from proton.
+// Collect buffers for ingress; free empty buffers.
qd_buffer_list_t buffers;
DEQ_INIT(buffers);
pn_raw_buffer_t raw_buffers[READ_BUFFERS];
@@ -182,14 +225,20 @@ static int handle_incoming(qdr_tcp_connection_t *conn)
}
}
}
-
qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] Took %zu read
buffers", conn->conn_id, DEQ_SIZE(buffers));
qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] Freed %i read
buffers", conn->conn_id, free_count);
-grant_read_buffers(conn);
+
+// Only grant more buffers to proton for reading if close is not pending
+if (!close_pending) {
+grant_read_buffers(conn);
+}
if (conn->instream) {
-// @TODO(kgiusti): handle Q2 block event:
-qd_message_stream_data_append(qdr_delivery_message(conn->instream),
&buffers, 0);
+qd_message_stream_data_append(qdr_delivery_message(conn->instream),
&buffers, &conn->q2_blocked);
+if (conn->q2_blocked) {
+// note: unit tests grep for this log!
+qd_log(tcp_adaptor->log_source, QD_LOG_TRACE, "[C%"PRIu64"] client
link blocked on Q2 limit", conn->conn_id);
+}
qdr_delivery_continue(tcp_adaptor->core, conn->instream, false);
qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG,
"[C%"PRIu64"][L%"PRIu64"] Continuing message with %i bytes", conn->conn_id,
conn->incoming_id, count);
} else {
@@ -230,6 +279,10 @@ static int handle_incoming(qdr_tcp_connection_t *conn)
qd_message_compose_2(msg, props, false);
qd_compose_free(props);
+// set up message q2 unblocked callback handler
+qd_alloc_safe_ptr_t conn_sp = QD_SAFE_PTR_INIT(conn);
+qd_message_set_q2_unblocked_handler(msg, qdr_tcp_q2_unblocked_handler,
c