[qpid-dispatch] branch master updated: DISPATCH-1947: Add self test to verify that Q2 limit is in effect

2021-02-23 Thread chug
This is an automated email from the ASF dual-hosted git repository.

chug pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/master by this push:
 new ffc4c9c  DISPATCH-1947: Add self test to verify that Q2 limit is in 
effect
ffc4c9c is described below

commit ffc4c9cc6d1871cf517754b5f060988229b6cec9
Author: Chuck Rolke 
AuthorDate: Tue Feb 23 17:09:09 2021 -0500

DISPATCH-1947: Add self test to verify that Q2 limit is in effect
---
 tests/system_tests_tcp_adaptor.py | 16 
 1 file changed, 16 insertions(+)

diff --git a/tests/system_tests_tcp_adaptor.py 
b/tests/system_tests_tcp_adaptor.py
index 09a50ec..bbe02dc 100644
--- a/tests/system_tests_tcp_adaptor.py
+++ b/tests/system_tests_tcp_adaptor.py
@@ -22,6 +22,7 @@ from __future__ import division
 from __future__ import absolute_import
 from __future__ import print_function
 
+import io
 import os
 import sys
 import time
@@ -678,6 +679,21 @@ class TcpAdaptor(TestCase):
 print(result)
 sys.stdout.flush()
 assert result is None, "TCP_TEST Stop %s FAIL: %s" % (name, result)
+
+# search the router log file to verify Q2 was hit
+time.sleep(0.005)  # wait for log file to be written
+block_ct = 0
+unblock_ct = 0
+with io.open(self.INTA.logfile_path) as f:
+for line in f:
+if 'client link blocked on Q2 limit' in line:
+block_ct += 1
+if 'client link unblocked from Q2 limit' in line:
+unblock_ct += 1
+self.assertTrue(block_ct > 0)
+self.assertEqual(block_ct, unblock_ct)
+
+
 self.logger.log("TCP_TEST Stop %s SUCCESS" % name)
 
 @SkipIfNeeded(DISABLE_SELECTOR_TESTS, DISABLE_SELECTOR_REASON)


-
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org



[qpid-dispatch] branch master updated: DISPATCH-1947: TCP Adaptor flow control

2021-02-23 Thread chug
This is an automated email from the ASF dual-hosted git repository.

chug pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/master by this push:
 new c6b2055  DISPATCH-1947: TCP Adaptor flow control
c6b2055 is described below

commit c6b205574ed5925cde40d8f126902d28ff5a32e2
Author: Chuck Rolke 
AuthorDate: Tue Feb 23 16:48:05 2021 -0500

DISPATCH-1947: TCP Adaptor flow control

This closes #1056
---
 src/adaptors/tcp_adaptor.c | 88 --
 1 file changed, 78 insertions(+), 10 deletions(-)

diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c
index 69bed1a..0b83123 100644
--- a/src/adaptors/tcp_adaptor.c
+++ b/src/adaptors/tcp_adaptor.c
@@ -17,6 +17,7 @@
  * under the License.
  */
 
+#include "tcp_adaptor.h"
 #include 
 #include 
 #include 
@@ -26,7 +27,6 @@
 #include "qpid/dispatch/ctools.h"
 #include "qpid/dispatch/protocol_adaptor.h"
 #include "delivery.h"
-#include "tcp_adaptor.h"
 #include 
 #include 
 
@@ -78,6 +78,9 @@ struct qdr_tcp_connection_t {
 int outgoing_buff_count;  // number of buffers with 
data
 int outgoing_buff_idx;// first buffer with data
 
+sys_atomic_tq2_restart;  // signal to resume receive
+boolq2_blocked;  // stop reading from raw conn
+
 DEQ_LINKS(qdr_tcp_connection_t);
 };
 
@@ -148,7 +151,37 @@ static void grant_read_buffers(qdr_tcp_connection_t *conn)
 }
 }
 
-static int handle_incoming(qdr_tcp_connection_t *conn)
+
+// Per-message callback to resume receiving after Q2 is unblocked on the
+// incoming link.
+// This routine must be thread safe: the thread on which it is running
+// is not an IO thread that owns the underlying pn_raw_conn.
+//
+void qdr_tcp_q2_unblocked_handler(const qd_alloc_safe_ptr_t context)
+{
+qdr_tcp_connection_t *tc = 
(qdr_tcp_connection_t*)qd_alloc_deref_safe_ptr(&context);
+if (tc == 0) {
+// bad news.
+assert(false);
+return;
+}
+
+// prevent the tc from being deleted while running:
+sys_mutex_lock(tc->activation_lock);
+
+if (tc && tc->pn_raw_conn) {
+sys_atomic_set(&tc->q2_restart, 1);
+pn_raw_connection_wake(tc->pn_raw_conn);
+}
+
+sys_mutex_unlock(tc->activation_lock);
+}
+
+
+// Fetch incoming raw incoming buffers from proton and pass them to
+// existing delivery or create a new delivery.
+// If close is pending then do not give more buffers to proton.
+static int handle_incoming_impl(qdr_tcp_connection_t *conn, bool close_pending)
 {
 //
 // Don't initiate an ingress stream message if we don't yet have a 
reply-to address and credit.
@@ -163,6 +196,16 @@ static int handle_incoming(qdr_tcp_connection_t *conn)
 return 0;
 }
 
+//
+// Don't read from proton if in Q2 holdoff
+//
+if (conn->q2_blocked) {
+qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] 
handle_incoming q2_blocked", conn->conn_id);
+return 0;
+}
+
+// Read all buffers available from proton.
+// Collect buffers for ingress; free empty buffers.
 qd_buffer_list_t buffers;
 DEQ_INIT(buffers);
 pn_raw_buffer_t raw_buffers[READ_BUFFERS];
@@ -182,14 +225,20 @@ static int handle_incoming(qdr_tcp_connection_t *conn)
 }
 }
 }
-
 qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] Took %zu read 
buffers", conn->conn_id, DEQ_SIZE(buffers));
 qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] Freed %i read 
buffers", conn->conn_id, free_count);
-grant_read_buffers(conn);
+
+// Only grant more buffers to proton for reading if close is not pending
+if (!close_pending) {
+grant_read_buffers(conn);
+}
 
 if (conn->instream) {
-// @TODO(kgiusti): handle Q2 block event:
-qd_message_stream_data_append(qdr_delivery_message(conn->instream), 
&buffers, 0);
+qd_message_stream_data_append(qdr_delivery_message(conn->instream), 
&buffers, &conn->q2_blocked);
+if (conn->q2_blocked) {
+// note: unit tests grep for this log!
+qd_log(tcp_adaptor->log_source, QD_LOG_TRACE, "[C%"PRIu64"] client 
link blocked on Q2 limit", conn->conn_id);
+}
 qdr_delivery_continue(tcp_adaptor->core, conn->instream, false);
 qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, 
"[C%"PRIu64"][L%"PRIu64"] Continuing message with %i bytes", conn->conn_id, 
conn->incoming_id, count);
 } else {
@@ -230,6 +279,10 @@ static int handle_incoming(qdr_tcp_connection_t *conn)
 qd_message_compose_2(msg, props, false);
 qd_compose_free(props);
 
+// set up message q2 unblocked callback handler
+qd_alloc_safe_ptr_t conn_sp = QD_SAFE_PTR_INIT(conn);
+qd_message_set_q2_unblocked_handler(msg, qdr_tcp_q2_unblocked_handler, 
c