This is an automated email from the ASF dual-hosted git repository.

tross pushed a commit to branch dev-protocol-adaptors-2
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/dev-protocol-adaptors-2 by 
this push:
     new 9c58c5e  DISPATCH-1826 - Fixes for:     - Accumulated temporary 
address records     - Race condition where ingress message is sent before 
reply-to is established     - Deliveries, messages, and buffers are leaked when 
connections close
9c58c5e is described below

commit 9c58c5ed4c64dc8cfba4bbf6c83ae9a8255120a0
Author: Ted Ross <tr...@apache.org>
AuthorDate: Wed Nov 4 12:39:01 2020 -0500

    DISPATCH-1826 - Fixes for:
        - Accumulated temporary address records
        - Race condition where ingress message is sent before reply-to is 
established
        - Deliveries, messages, and buffers are leaked when connections close
---
 src/adaptors/tcp_adaptor.c | 37 ++++++++++++++++++++++++-------------
 1 file changed, 24 insertions(+), 13 deletions(-)

diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c
index 9a2ed5d..d006d89 100644
--- a/src/adaptors/tcp_adaptor.c
+++ b/src/adaptors/tcp_adaptor.c
@@ -53,7 +53,7 @@ struct qdr_tcp_connection_t {
     bool                  egress_dispatcher;
     bool                  connector_closed;//only used if 
egress_dispatcher=true
     qd_timer_t           *activate_timer;
-    qd_bridge_config_t   config;
+    qd_bridge_config_t    config;
     qd_server_t          *server;
     char                 *remote_address;
     char                 *global_id;
@@ -130,6 +130,12 @@ static void grant_read_buffers(qdr_tcp_connection_t *conn)
 
 static int handle_incoming(qdr_tcp_connection_t *conn)
 {
+    //
+    // Don't initiate an ingress stream message if we don't yet have a 
reply-to address.
+    //
+    if (!conn->instream && conn->ingress && !conn->reply_to)
+        return 0;
+
     qd_buffer_list_t buffers;
     DEQ_INIT(buffers);
     pn_raw_buffer_t raw_buffers[READ_BUFFERS];
@@ -163,10 +169,12 @@ static int handle_incoming(qdr_tcp_connection_t *conn)
             qd_compose_insert_string(props, conn->config.address); // to
             qd_compose_insert_string(props, conn->global_id);   // subject
             qd_compose_insert_string(props, conn->reply_to);    // reply-to
+            qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, 
"[C%"PRIu64"][L%"PRIu64"] Initiating ingress to: %s reply: %s", conn->conn_id, 
conn->incoming_id, conn->config.address, conn->reply_to);
         } else {
             qd_compose_insert_string(props, conn->reply_to); // to
             qd_compose_insert_string(props, conn->global_id);   // subject
             qd_compose_insert_null(props);    // reply-to
+            qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, 
"[C%"PRIu64"][L%"PRIu64"] Initiating egress to: %s", conn->conn_id, 
conn->incoming_id, conn->reply_to);
         }
         //qd_compose_insert_null(props);                      // correlation-id
         //qd_compose_insert_null(props);                      // content-type
@@ -195,15 +203,9 @@ static int handle_incoming(qdr_tcp_connection_t *conn)
 static void free_qdr_tcp_connection(qdr_tcp_connection_t* tc)
 {
     qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] Freeing 
tcp_connection %p", tc->conn_id, (void*) tc);
-    if (tc->reply_to) {
-        free(tc->reply_to);
-    }
-    if(tc->remote_address) {
-        free(tc->remote_address);
-    }
-    if(tc->global_id) {
-        free(tc->global_id);
-    }
+    free(tc->reply_to);
+    free(tc->remote_address);
+    free(tc->global_id);
     if (tc->activate_timer) {
         qd_timer_free(tc->activate_timer);
     }
@@ -220,6 +222,16 @@ static void handle_disconnected(qdr_tcp_connection_t* conn)
     if (conn->instream) {
         qd_message_set_receive_complete(qdr_delivery_message(conn->instream));
         qdr_delivery_continue(tcp_adaptor->core, conn->instream, true);
+        qdr_delivery_decref(tcp_adaptor->core, conn->instream, 
"tcp-adaptor.handle_disconnected");
+    }
+    if (conn->outstream) {
+        qdr_delivery_decref(tcp_adaptor->core, conn->outstream, 
"tcp-adaptor.handle_disconnected");
+    }
+    if (conn->incoming) {
+        qdr_link_detach(conn->incoming, QD_LOST, 0);
+    }
+    if (conn->outgoing) {
+        qdr_link_detach(conn->outgoing, QD_LOST, 0);
     }
     qdr_connection_closed(conn->conn);
     qdr_connection_set_context(conn->conn, 0);
@@ -850,9 +862,7 @@ static void qdr_tcp_first_attach(void *context, 
qdr_connection_t *conn, qdr_link
 
 static void qdr_tcp_connection_copy_reply_to(qdr_tcp_connection_t* tc, 
qd_iterator_t* reply_to)
 {
-    int length = qd_iterator_length(reply_to);
-    tc->reply_to = malloc(length + 1);
-    qd_iterator_strncpy(reply_to, tc->reply_to, length + 1);
+    tc->reply_to = (char*)  qd_iterator_copy(reply_to);
 }
 
 static void qdr_tcp_connection_copy_global_id(qdr_tcp_connection_t* tc, 
qd_iterator_t* subject)
@@ -928,6 +938,7 @@ static uint64_t qdr_tcp_deliver(void *context, qdr_link_t 
*link, qdr_delivery_t
                 qdr_tcp_connection_egress(&(tc->config), tc->server, delivery);
             } else if (!tc->outstream) {
                 tc->outstream = delivery;
+                qdr_delivery_incref(delivery, "tcp_adaptor - new outstream");
                 if (!tc->ingress) {
                     //on egress, can only set up link for the reverse
                     //direction once we receive the first part of the


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to