This is an automated email from the ASF dual-hosted git repository. kgiusti pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/main by this push: new 733c1df DISPATCH-2192: disable tcp window on terminal outcome or settlement 733c1df is described below commit 733c1dfc80edd0c3505d3dc62c9f6562381239f7 Author: Kenneth Giusti <kgiu...@apache.org> AuthorDate: Thu Jul 8 16:08:26 2021 -0400 DISPATCH-2192: disable tcp window on terminal outcome or settlement This closes #1288 --- src/adaptors/tcp_adaptor.c | 72 ++++++++++++++++++++++++++-------------------- 1 file changed, 41 insertions(+), 31 deletions(-) diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c index a5bd0d4..1de86aa 100644 --- a/src/adaptors/tcp_adaptor.c +++ b/src/adaptors/tcp_adaptor.c @@ -83,6 +83,7 @@ struct qdr_tcp_connection_t { sys_atomic_t raw_closed_write; // proton event seen or write_close called bool raw_read_shutdown; // stream closed bool read_eos_seen; + bool window_disabled; // true: ignore unacked byte window qdr_delivery_t *initial_delivery; qd_timer_t *activate_timer; qd_tcp_bridge_t *bridge; // config and stats @@ -140,6 +141,15 @@ static void free_qdr_tcp_connection(qdr_tcp_connection_t* conn); static void free_bridge_config(qd_tcp_bridge_t *config); static void qdr_tcp_open_server_side_connection(qdr_tcp_connection_t* tc); + +// is the incoming byte window full +// +inline static bool read_window_full(const qdr_tcp_connection_t* conn) +{ + return !conn->window_disabled && conn->bytes_unacked >= TCP_MAX_CAPACITY; +} + + static void allocate_tcp_buffer(pn_raw_buffer_t *buffer) { buffer->bytes = malloc(TCP_BUFFER_SIZE); @@ -239,7 +249,7 @@ void qdr_tcp_q2_unblocked_handler(const qd_alloc_safe_ptr_t context) static int handle_incoming_raw_read(qdr_tcp_connection_t *conn, qd_buffer_list_t *buffers) { pn_raw_buffer_t raw_buffer; - if ( conn->bytes_unacked >= TCP_MAX_CAPACITY || !pn_raw_connection_take_read_buffers(conn->pn_raw_conn, &raw_buffer, 1)) { + if (read_window_full(conn) || !pn_raw_connection_take_read_buffers(conn->pn_raw_conn, &raw_buffer, 1)) { return 0; } int result = raw_buffer.size; @@ -263,7 +273,7 @@ static int handle_incoming_raw_read(qdr_tcp_connection_t *conn, qd_buffer_list_t conn->bridge->bytes_in += result; UNLOCK(conn->bridge->stats_lock); conn->bytes_unacked += result; - if (conn->bytes_unacked >= TCP_MAX_CAPACITY) { + if (read_window_full(conn)) { qd_log(tcp_adaptor->log_source, QD_LOG_TRACE, "[C%"PRIu64"] TCP RX window CLOSED: bytes in=%"PRIu64" unacked=%"PRIu64, conn->conn_id, conn->bytes_in, conn->bytes_unacked); @@ -1613,40 +1623,40 @@ static void qdr_tcp_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t pn_raw_connection_close(tc->pn_raw_conn); } - if (disp == PN_RECEIVED) { - // - // the consumer of this TCP flow has updated its tx_sequence: - // - bool window_opened = false; - uint64_t ignore; - qd_delivery_state_t *dstate = qdr_delivery_take_local_delivery_state(dlv, &ignore); + // handle read window updates - if (!dstate) { - qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, - "[C%"PRIu64"] BAD PN_RECEIVED - missing delivery-state!!", tc->conn_id); - } else { - // note: the PN_RECEIVED is generated by the remote TCP - // adaptor, for simplicity we ignore the section_number since - // all we really need is a byte offset: + const bool window_was_full = read_window_full(tc); + tc->window_disabled = settled || tc->window_disabled; + + if (!tc->window_disabled) { + + if (disp == PN_RECEIVED) { + // + // the consumer of this TCP flow has updated its tx_sequence: // - const bool was_closed = tc->bytes_unacked >= TCP_MAX_CAPACITY; - tc->bytes_unacked = tc->bytes_in - dstate->section_offset; - window_opened = tc->bytes_unacked < TCP_MAX_CAPACITY; - if (was_closed && window_opened) { - qd_log(tcp_adaptor->log_source, QD_LOG_TRACE, - "[C%"PRIu64"] TCP RX window OPEN: bytes in=%"PRIu64 - " unacked=%"PRIu64" remote bytes out=%"PRIu64, - tc->conn_id, tc->bytes_in, tc->bytes_unacked, - dstate->section_offset); + uint64_t ignore; + qd_delivery_state_t *dstate = qdr_delivery_take_local_delivery_state(dlv, &ignore); + + if (!dstate) { + qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, + "[C%"PRIu64"] BAD PN_RECEIVED - missing delivery-state!!", tc->conn_id); + } else { + // note: the PN_RECEIVED is generated by the remote TCP + // adaptor, for simplicity we ignore the section_number since + // all we really need is a byte offset: + // + tc->bytes_unacked = tc->bytes_in - dstate->section_offset; + qd_delivery_state_free(dstate); } + } else if (disp) { + // terminal outcome: drain any pending receive data + tc->window_disabled = true; } + } - qd_delivery_state_free(dstate); - - if (window_opened) { - // now that the window has opened fetch any outstanding read data - handle_incoming(tc, "TCP RX window refresh"); - } + if (window_was_full && !read_window_full(tc)) { + // now that the window has opened fetch any outstanding read data + handle_incoming(tc, "TCP RX window refresh"); } } else { qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "qdr_tcp_delivery_update: no link context"); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org