This is an automated email from the ASF dual-hosted git repository. tross pushed a commit to branch dev-protocol-adaptors-2 in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/dev-protocol-adaptors-2 by this push: new 2ca0f75 DISPATCH-1876 - Added proper ref-count protection for the initial-delivery. Fixed leak of buffers when connections close very early. Enabled the connect-disconnect test. 2ca0f75 is described below commit 2ca0f75b19263ee7cb335cfce01a930c01dfb9ba Author: Ted Ross <tr...@apache.org> AuthorDate: Wed Dec 9 10:01:09 2020 -0500 DISPATCH-1876 - Added proper ref-count protection for the initial-delivery. Fixed leak of buffers when connections close very early. Enabled the connect-disconnect test. --- src/adaptors/tcp_adaptor.c | 26 +++++++++++++++++++++----- src/router_core/connections.c | 4 +++- tests/system_tests_tcp_adaptor.py | 1 - 3 files changed, 24 insertions(+), 7 deletions(-) diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c index 2d7974d..7940705 100644 --- a/src/adaptors/tcp_adaptor.c +++ b/src/adaptors/tcp_adaptor.c @@ -123,6 +123,7 @@ static void grant_read_buffers(qdr_tcp_connection_t *conn) // Give proactor more read buffers for the socket if (!pn_raw_connection_is_read_closed(conn->pn_raw_conn)) { size_t desired = pn_raw_connection_read_buffers_capacity(conn->pn_raw_conn); + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] Granted %i read buffers", conn->conn_id, desired); while (desired) { size_t i; for (i = 0; i < desired && i < READ_BUFFERS; ++i) { @@ -144,7 +145,7 @@ static int handle_incoming(qdr_tcp_connection_t *conn) // // Don't initiate an ingress stream message if we don't yet have a reply-to address and credit. // - if (!conn->instream && ((conn->ingress && !conn->reply_to) || !conn->flow_enabled)) { + if (!pn_raw_connection_is_read_closed(conn->pn_raw_conn) && !conn->instream && ((conn->ingress && !conn->reply_to) || !conn->flow_enabled)) { if (conn->ingress && !conn->reply_to) { qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] Waiting for reply-to address to initiate message", conn->conn_id, conn->outgoing_id); } @@ -159,15 +160,23 @@ static int handle_incoming(qdr_tcp_connection_t *conn) pn_raw_buffer_t raw_buffers[READ_BUFFERS]; size_t n; int count = 0; + int free_count = 0; while ( (n = pn_raw_connection_take_read_buffers(conn->pn_raw_conn, raw_buffers, READ_BUFFERS)) ) { for (size_t i = 0; i < n && raw_buffers[i].bytes; ++i) { qd_buffer_t *buf = (qd_buffer_t*) raw_buffers[i].context; qd_buffer_insert(buf, raw_buffers[i].size); count += raw_buffers[i].size; - DEQ_INSERT_TAIL(buffers, buf); + if (raw_buffers[i].size > 0) { + DEQ_INSERT_TAIL(buffers, buf); + } else { + qd_buffer_free(buf); + free_count++; + } } } + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] Took %i read buffers", conn->conn_id, DEQ_SIZE(buffers)); + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] Freed %i read buffers", conn->conn_id, free_count); grant_read_buffers(conn); if (conn->instream) { @@ -242,11 +251,11 @@ static void handle_disconnected(qdr_tcp_connection_t* conn) qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] handle_disconnected - close instream", conn->conn_id, conn->incoming_id); qd_message_set_receive_complete(qdr_delivery_message(conn->instream)); qdr_delivery_continue(tcp_adaptor->core, conn->instream, true); - qdr_delivery_decref(tcp_adaptor->core, conn->instream, "tcp-adaptor.handle_disconnected"); + qdr_delivery_decref(tcp_adaptor->core, conn->instream, "tcp-adaptor.handle_disconnected - instream"); } if (conn->outstream) { qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] handle_disconnected close outstream", conn->conn_id, conn->outgoing_id); - qdr_delivery_decref(tcp_adaptor->core, conn->outstream, "tcp-adaptor.handle_disconnected"); + qdr_delivery_decref(tcp_adaptor->core, conn->outstream, "tcp-adaptor.handle_disconnected - outstream"); } if (conn->incoming) { qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] handle_disconnected - detach incoming", conn->conn_id, conn->incoming_id); @@ -262,6 +271,8 @@ static void handle_disconnected(qdr_tcp_connection_t* conn) } if (conn->initial_delivery) { qdr_delivery_remote_state_updated(tcp_adaptor->core, conn->initial_delivery, PN_RELEASED, true, 0, 0, false); + qdr_delivery_decref(tcp_adaptor->core, conn->initial_delivery, "tcp-adaptor.handle_disconnected - initial_delivery"); + conn->initial_delivery = 0; } //need to free on core thread to avoid deleting while in use by management agent @@ -512,7 +523,6 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void qd_log(log, QD_LOG_INFO, "[C%"PRIu64"] PN_RAW_CONNECTION_CONNECTED Egress connected to %s", conn->conn_id, conn->remote_address); if (!!conn->initial_delivery) { qdr_tcp_open_server_side_connection(conn); - conn->initial_delivery = 0; } while (qdr_connection_process(conn->qdr_conn)) {} handle_outgoing(conn); @@ -660,6 +670,10 @@ static void qdr_tcp_open_server_side_connection(qdr_tcp_connection_t* tc) !(tc->egress_dispatcher), tc->initial_delivery, &(tc->outgoing_id)); + if (!!tc->initial_delivery) { + qdr_delivery_decref(tcp_adaptor->core, tc->initial_delivery, "tcp-adaptor - passing initial_delivery into new link"); + tc->initial_delivery = 0; + } qdr_link_set_context(tc->outgoing, tc); } @@ -672,6 +686,7 @@ static qdr_tcp_connection_t *qdr_tcp_connection_egress(qd_bridge_config_t *confi if (initial_delivery) { tc->egress_dispatcher = false; tc->initial_delivery = initial_delivery; + qdr_delivery_incref(initial_delivery, "qdr_tcp_connection_egress - held initial delivery"); } else { tc->activate_timer = qd_timer(tcp_adaptor->core->qd, on_activate, tc); tc->egress_dispatcher = true; @@ -1051,6 +1066,7 @@ static uint64_t qdr_tcp_deliver(void *context, qdr_link_t *link, qdr_delivery_t qdr_tcp_connection_t* tc = (qdr_tcp_connection_t*) link_context; qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] qdr_tcp_deliver Delivery event dlv:%lx", tc->conn_id, tc->outgoing_id, delivery); if (tc->egress_dispatcher) { + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] tcp_adaptor initiating egress connection %p", tc->conn_id, tc->outgoing_id, delivery); qdr_tcp_connection_egress(&(tc->config), tc->server, delivery); return QD_DELIVERY_MOVED_TO_NEW_LINK; } else if (!tc->outstream) { diff --git a/src/router_core/connections.c b/src/router_core/connections.c index 5021b92..8bbc88d 100644 --- a/src/router_core/connections.c +++ b/src/router_core/connections.c @@ -630,6 +630,8 @@ qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn, action->args.connection.source = source; action->args.connection.target = target; action->args.connection.initial_delivery = initial_delivery; + if (!!initial_delivery) + qdr_delivery_incref(initial_delivery, "qdr_link_first_attach - protect delivery in action list"); qdr_action_enqueue(conn->core, action); return link; @@ -1607,7 +1609,7 @@ static void qdr_attach_link_downlink_CT(qdr_core_t *core, qdr_connection_t *conn static void qdr_link_process_initial_delivery_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_t *dlv) { qdr_link_t *old_link = safe_deref_qdr_link_t(dlv->link_sp); - int ref_delta = 0; + int ref_delta = -1; // Account for the action-list protection // // Remove the delivery from its current link if needed diff --git a/tests/system_tests_tcp_adaptor.py b/tests/system_tests_tcp_adaptor.py index 4c4131f..e73ec9a 100644 --- a/tests/system_tests_tcp_adaptor.py +++ b/tests/system_tests_tcp_adaptor.py @@ -674,7 +674,6 @@ class TcpAdaptor(TestCase): @SkipIfNeeded(DISABLE_SELECTOR_TESTS, DISABLE_SELECTOR_REASON) def test_20_tcp_connect_disconnect(self): - self.skipTest("DISPATCH-1876 reproducer: disabled until DISPATCH-1876 is fixed") name = "test_20_tcp_connect_disconnect" self.logger.log("TCP_TEST Start %s" % name) pairs = [self.EchoPair(self.INTA, self.INTA, sizes=[0])] --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org