This is an automated email from the ASF dual-hosted git repository. tross pushed a commit to branch dev-protocol-adaptors-2 in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/dev-protocol-adaptors-2 by this push: new 9c58c5e DISPATCH-1826 - Fixes for: - Accumulated temporary address records - Race condition where ingress message is sent before reply-to is established - Deliveries, messages, and buffers are leaked when connections close 9c58c5e is described below commit 9c58c5ed4c64dc8cfba4bbf6c83ae9a8255120a0 Author: Ted Ross <tr...@apache.org> AuthorDate: Wed Nov 4 12:39:01 2020 -0500 DISPATCH-1826 - Fixes for: - Accumulated temporary address records - Race condition where ingress message is sent before reply-to is established - Deliveries, messages, and buffers are leaked when connections close --- src/adaptors/tcp_adaptor.c | 37 ++++++++++++++++++++++++------------- 1 file changed, 24 insertions(+), 13 deletions(-) diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c index 9a2ed5d..d006d89 100644 --- a/src/adaptors/tcp_adaptor.c +++ b/src/adaptors/tcp_adaptor.c @@ -53,7 +53,7 @@ struct qdr_tcp_connection_t { bool egress_dispatcher; bool connector_closed;//only used if egress_dispatcher=true qd_timer_t *activate_timer; - qd_bridge_config_t config; + qd_bridge_config_t config; qd_server_t *server; char *remote_address; char *global_id; @@ -130,6 +130,12 @@ static void grant_read_buffers(qdr_tcp_connection_t *conn) static int handle_incoming(qdr_tcp_connection_t *conn) { + // + // Don't initiate an ingress stream message if we don't yet have a reply-to address. + // + if (!conn->instream && conn->ingress && !conn->reply_to) + return 0; + qd_buffer_list_t buffers; DEQ_INIT(buffers); pn_raw_buffer_t raw_buffers[READ_BUFFERS]; @@ -163,10 +169,12 @@ static int handle_incoming(qdr_tcp_connection_t *conn) qd_compose_insert_string(props, conn->config.address); // to qd_compose_insert_string(props, conn->global_id); // subject qd_compose_insert_string(props, conn->reply_to); // reply-to + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] Initiating ingress to: %s reply: %s", conn->conn_id, conn->incoming_id, conn->config.address, conn->reply_to); } else { qd_compose_insert_string(props, conn->reply_to); // to qd_compose_insert_string(props, conn->global_id); // subject qd_compose_insert_null(props); // reply-to + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] Initiating egress to: %s", conn->conn_id, conn->incoming_id, conn->reply_to); } //qd_compose_insert_null(props); // correlation-id //qd_compose_insert_null(props); // content-type @@ -195,15 +203,9 @@ static int handle_incoming(qdr_tcp_connection_t *conn) static void free_qdr_tcp_connection(qdr_tcp_connection_t* tc) { qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] Freeing tcp_connection %p", tc->conn_id, (void*) tc); - if (tc->reply_to) { - free(tc->reply_to); - } - if(tc->remote_address) { - free(tc->remote_address); - } - if(tc->global_id) { - free(tc->global_id); - } + free(tc->reply_to); + free(tc->remote_address); + free(tc->global_id); if (tc->activate_timer) { qd_timer_free(tc->activate_timer); } @@ -220,6 +222,16 @@ static void handle_disconnected(qdr_tcp_connection_t* conn) if (conn->instream) { qd_message_set_receive_complete(qdr_delivery_message(conn->instream)); qdr_delivery_continue(tcp_adaptor->core, conn->instream, true); + qdr_delivery_decref(tcp_adaptor->core, conn->instream, "tcp-adaptor.handle_disconnected"); + } + if (conn->outstream) { + qdr_delivery_decref(tcp_adaptor->core, conn->outstream, "tcp-adaptor.handle_disconnected"); + } + if (conn->incoming) { + qdr_link_detach(conn->incoming, QD_LOST, 0); + } + if (conn->outgoing) { + qdr_link_detach(conn->outgoing, QD_LOST, 0); } qdr_connection_closed(conn->conn); qdr_connection_set_context(conn->conn, 0); @@ -850,9 +862,7 @@ static void qdr_tcp_first_attach(void *context, qdr_connection_t *conn, qdr_link static void qdr_tcp_connection_copy_reply_to(qdr_tcp_connection_t* tc, qd_iterator_t* reply_to) { - int length = qd_iterator_length(reply_to); - tc->reply_to = malloc(length + 1); - qd_iterator_strncpy(reply_to, tc->reply_to, length + 1); + tc->reply_to = (char*) qd_iterator_copy(reply_to); } static void qdr_tcp_connection_copy_global_id(qdr_tcp_connection_t* tc, qd_iterator_t* subject) @@ -928,6 +938,7 @@ static uint64_t qdr_tcp_deliver(void *context, qdr_link_t *link, qdr_delivery_t qdr_tcp_connection_egress(&(tc->config), tc->server, delivery); } else if (!tc->outstream) { tc->outstream = delivery; + qdr_delivery_incref(delivery, "tcp_adaptor - new outstream"); if (!tc->ingress) { //on egress, can only set up link for the reverse //direction once we receive the first part of the --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org