This is an automated email from the ASF dual-hosted git repository.

kgiusti pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/main by this push:
     new 733c1df  DISPATCH-2192: disable tcp window on terminal outcome or 
settlement
733c1df is described below

commit 733c1dfc80edd0c3505d3dc62c9f6562381239f7
Author: Kenneth Giusti <kgiu...@apache.org>
AuthorDate: Thu Jul 8 16:08:26 2021 -0400

    DISPATCH-2192: disable tcp window on terminal outcome or settlement
    
    This closes #1288
---
 src/adaptors/tcp_adaptor.c | 72 ++++++++++++++++++++++++++--------------------
 1 file changed, 41 insertions(+), 31 deletions(-)

diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c
index a5bd0d4..1de86aa 100644
--- a/src/adaptors/tcp_adaptor.c
+++ b/src/adaptors/tcp_adaptor.c
@@ -83,6 +83,7 @@ struct qdr_tcp_connection_t {
     sys_atomic_t          raw_closed_write;  // proton event seen or 
write_close called
     bool                  raw_read_shutdown; // stream closed
     bool                  read_eos_seen;
+    bool                  window_disabled;   // true: ignore unacked byte 
window
     qdr_delivery_t       *initial_delivery;
     qd_timer_t           *activate_timer;
     qd_tcp_bridge_t      *bridge;         // config and stats
@@ -140,6 +141,15 @@ static void free_qdr_tcp_connection(qdr_tcp_connection_t* 
conn);
 static void free_bridge_config(qd_tcp_bridge_t *config);
 static void qdr_tcp_open_server_side_connection(qdr_tcp_connection_t* tc);
 
+
+// is the incoming byte window full
+//
+inline static bool read_window_full(const qdr_tcp_connection_t* conn)
+{
+    return !conn->window_disabled && conn->bytes_unacked >= TCP_MAX_CAPACITY;
+}
+
+
 static void allocate_tcp_buffer(pn_raw_buffer_t *buffer)
 {
     buffer->bytes = malloc(TCP_BUFFER_SIZE);
@@ -239,7 +249,7 @@ void qdr_tcp_q2_unblocked_handler(const qd_alloc_safe_ptr_t 
context)
 static int handle_incoming_raw_read(qdr_tcp_connection_t *conn, 
qd_buffer_list_t *buffers)
 {
     pn_raw_buffer_t raw_buffer;
-    if ( conn->bytes_unacked >= TCP_MAX_CAPACITY || 
!pn_raw_connection_take_read_buffers(conn->pn_raw_conn, &raw_buffer, 1)) {
+    if (read_window_full(conn) || 
!pn_raw_connection_take_read_buffers(conn->pn_raw_conn, &raw_buffer, 1)) {
         return 0;
     }
     int result = raw_buffer.size;
@@ -263,7 +273,7 @@ static int handle_incoming_raw_read(qdr_tcp_connection_t 
*conn, qd_buffer_list_t
         conn->bridge->bytes_in += result;
         UNLOCK(conn->bridge->stats_lock);
         conn->bytes_unacked += result;
-        if (conn->bytes_unacked >= TCP_MAX_CAPACITY) {
+        if (read_window_full(conn)) {
             qd_log(tcp_adaptor->log_source, QD_LOG_TRACE,
                    "[C%"PRIu64"] TCP RX window CLOSED: bytes in=%"PRIu64" 
unacked=%"PRIu64,
                    conn->conn_id, conn->bytes_in, conn->bytes_unacked);
@@ -1613,40 +1623,40 @@ static void qdr_tcp_delivery_update(void *context, 
qdr_delivery_t *dlv, uint64_t
             pn_raw_connection_close(tc->pn_raw_conn);
         }
 
-        if (disp == PN_RECEIVED) {
-            //
-            // the consumer of this TCP flow has updated its tx_sequence:
-            //
-            bool window_opened = false;
-            uint64_t ignore;
-            qd_delivery_state_t *dstate = 
qdr_delivery_take_local_delivery_state(dlv, &ignore);
+        // handle read window updates
 
-            if (!dstate) {
-                qd_log(tcp_adaptor->log_source, QD_LOG_ERROR,
-                       "[C%"PRIu64"] BAD PN_RECEIVED - missing 
delivery-state!!", tc->conn_id);
-            } else {
-                // note: the PN_RECEIVED is generated by the remote TCP
-                // adaptor, for simplicity we ignore the section_number since
-                // all we really need is a byte offset:
+        const bool window_was_full = read_window_full(tc);
+        tc->window_disabled = settled || tc->window_disabled;
+
+        if (!tc->window_disabled) {
+
+            if (disp == PN_RECEIVED) {
+                //
+                // the consumer of this TCP flow has updated its tx_sequence:
                 //
-                const bool was_closed = tc->bytes_unacked >= TCP_MAX_CAPACITY;
-                tc->bytes_unacked = tc->bytes_in - dstate->section_offset;
-                window_opened = tc->bytes_unacked < TCP_MAX_CAPACITY;
-                if (was_closed && window_opened) {
-                    qd_log(tcp_adaptor->log_source, QD_LOG_TRACE,
-                           "[C%"PRIu64"] TCP RX window OPEN: bytes in=%"PRIu64
-                           " unacked=%"PRIu64" remote bytes out=%"PRIu64,
-                           tc->conn_id, tc->bytes_in, tc->bytes_unacked,
-                           dstate->section_offset);
+                uint64_t ignore;
+                qd_delivery_state_t *dstate = 
qdr_delivery_take_local_delivery_state(dlv, &ignore);
+
+                if (!dstate) {
+                    qd_log(tcp_adaptor->log_source, QD_LOG_ERROR,
+                           "[C%"PRIu64"] BAD PN_RECEIVED - missing 
delivery-state!!", tc->conn_id);
+                } else {
+                    // note: the PN_RECEIVED is generated by the remote TCP
+                    // adaptor, for simplicity we ignore the section_number 
since
+                    // all we really need is a byte offset:
+                    //
+                    tc->bytes_unacked = tc->bytes_in - dstate->section_offset;
+                    qd_delivery_state_free(dstate);
                 }
+            } else if (disp) {
+                // terminal outcome: drain any pending receive data
+                tc->window_disabled = true;
             }
+        }
 
-            qd_delivery_state_free(dstate);
-
-            if (window_opened) {
-                // now that the window has opened fetch any outstanding read 
data
-                handle_incoming(tc, "TCP RX window refresh");
-            }
+        if (window_was_full && !read_window_full(tc)) {
+            // now that the window has opened fetch any outstanding read data
+            handle_incoming(tc, "TCP RX window refresh");
         }
     } else {
         qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, 
"qdr_tcp_delivery_update: no link context");

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to