This is an automated email from the ASF dual-hosted git repository.

chug pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/main by this push:
     new d288c21  DISPATCH-1878: Handle half-closed TCP connections without 
losing data
d288c21 is described below

commit d288c21fe5ecc7a2d78eaf1f38e973fce2356e45
Author: Chuck Rolke <c...@apache.org>
AuthorDate: Fri Apr 23 09:27:15 2021 -0400

    DISPATCH-1878: Handle half-closed TCP connections without losing data
    
    Restructure the code several ways to handle closed reads and closed
    writes without necessarily closing the whole connection and losing data
    in flight.
    
    This is a squashed and rebased commit of work during development and
    tracked under PR#1129.
    
    This closes #1129
---
 src/adaptors/tcp_adaptor.c | 539 ++++++++++++++++++++++++++++++---------------
 1 file changed, 365 insertions(+), 174 deletions(-)

diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c
index eb10f8a..9fcf267 100644
--- a/src/adaptors/tcp_adaptor.c
+++ b/src/adaptors/tcp_adaptor.c
@@ -57,11 +57,14 @@ struct qdr_tcp_connection_t {
     qdr_delivery_t       *outstream;
     bool                  ingress;
     bool                  flow_enabled;
+    bool                  incoming_started;
     bool                  egress_dispatcher;
     bool                  connector_closed;//only used if 
egress_dispatcher=true
     bool                  in_list;         // This connection is in the 
adaptor's connections list
-    bool                  raw_closed_read;
-    bool                  raw_closed_write;
+    bool                  raw_closed_read;   // proton event seen
+    bool                  raw_closed_write;  // proton event seen or 
write_close called
+    bool                  raw_read_shutdown; // stream closed
+    bool                  read_eos_seen;
     qdr_delivery_t       *initial_delivery;
     qd_timer_t           *activate_timer;
     qd_bridge_config_t    config;
@@ -116,6 +119,26 @@ static inline uint64_t qdr_tcp_conn_linkid(const 
qdr_tcp_connection_t *conn)
     return conn->instream ? conn->incoming_id : conn->outgoing_id;
 }
 
+static inline const char * qdr_link_direction_name(const qdr_link_t *link)
+{
+    assert(link);
+    return qdr_link_direction(link) == QD_OUTGOING ? "outgoing" : "incoming";
+}
+
+static inline const char * qdr_tcp_connection_role_name(const 
qdr_tcp_connection_t *tc)
+{
+    assert(tc);
+    return tc->ingress ? "listener" : "connector";
+}
+
+static const char * qdr_tcp_quadrant_id(const qdr_tcp_connection_t *tc, const 
qdr_link_t *link)
+{
+    if (tc->ingress)
+        return link->link_direction == QD_INCOMING ? "(listener incoming)" : 
"(listener outgoing)";
+    else
+        return link->link_direction == QD_INCOMING ? "(connector incoming)" : 
"(connector outgoing)";
+}
+
 static void on_activate(void *context)
 {
     qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) context;
@@ -136,22 +159,22 @@ static void grant_read_buffers(qdr_tcp_connection_t *conn)
 
     pn_raw_buffer_t raw_buffers[READ_BUFFERS];
     // Give proactor more read buffers for the socket
-    if (!pn_raw_connection_is_read_closed(conn->pn_raw_conn)) {
-        size_t desired = 
pn_raw_connection_read_buffers_capacity(conn->pn_raw_conn);
-        qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] Granted 
%zu read buffers", conn->conn_id, desired);
-        while (desired) {
-            size_t i;
-            for (i = 0; i < desired && i < READ_BUFFERS; ++i) {
-                qd_buffer_t *buf = qd_buffer();
-                raw_buffers[i].bytes = (char*) qd_buffer_base(buf);
-                raw_buffers[i].capacity = qd_buffer_capacity(buf);
-                raw_buffers[i].size = 0;
-                raw_buffers[i].offset = 0;
-                raw_buffers[i].context = (uintptr_t) buf;
-            }
-            desired -= i;
-            pn_raw_connection_give_read_buffers(conn->pn_raw_conn, 
raw_buffers, i);
+    size_t desired = 
pn_raw_connection_read_buffers_capacity(conn->pn_raw_conn);
+    qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG,
+        "[C%"PRIu64"][L%"PRIu64"] Granting %zu to 
pn_raw_connection_give_read_buffers()",
+        conn->conn_id, conn->incoming_id, desired);
+    while (desired) {
+        size_t i;
+        for (i = 0; i < desired && i < READ_BUFFERS; ++i) {
+            qd_buffer_t *buf = qd_buffer();
+            raw_buffers[i].bytes = (char*) qd_buffer_base(buf);
+            raw_buffers[i].capacity = qd_buffer_capacity(buf);
+            raw_buffers[i].size = 0;
+            raw_buffers[i].offset = 0;
+            raw_buffers[i].context = (uintptr_t) buf;
         }
+        desired -= i;
+        pn_raw_connection_give_read_buffers(conn->pn_raw_conn, raw_buffers, i);
     }
 }
 
@@ -175,6 +198,9 @@ void qdr_tcp_q2_unblocked_handler(const qd_alloc_safe_ptr_t 
context)
 
     if (tc->pn_raw_conn) {
         sys_atomic_set(&tc->q2_restart, 1);
+        qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG,
+               "[C%"PRIu64"] q2 unblocked: call pn_raw_connection_wake()",
+               tc->conn_id);
         pn_raw_connection_wake(tc->pn_raw_conn);
     }
 
@@ -182,36 +208,14 @@ void qdr_tcp_q2_unblocked_handler(const 
qd_alloc_safe_ptr_t context)
 }
 
 
-// Fetch incoming raw incoming buffers from proton and pass them to
-// existing delivery or create a new delivery.
-// If close is pending then do not give more buffers to proton.
-static int handle_incoming_impl(qdr_tcp_connection_t *conn, bool close_pending)
+// Extract buffers and their bytes from raw connection.
+// * Proton decides how many buffers are to be taken.
+// * Buffers with no data are freed.
+// * Buffers with data are appended to caller's buffers list.
+// * Add received byte count to connection stats
+// * Return the count of bytes in the buffers list
+static int handle_incoming_raw_read(qdr_tcp_connection_t *conn, 
qd_buffer_list_t *buffers)
 {
-    //
-    // Don't initiate an ingress stream message if we don't yet have a 
reply-to address and credit.
-    //
-    if (!pn_raw_connection_is_read_closed(conn->pn_raw_conn) && 
!conn->instream && ((conn->ingress && !conn->reply_to) || !conn->flow_enabled)) 
{
-        if (conn->ingress && !conn->reply_to) {
-            qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, 
"[C%"PRIu64"][L%"PRIu64"] Waiting for reply-to address to initiate message", 
conn->conn_id, conn->outgoing_id);
-        }
-        if (!conn->flow_enabled) {
-            qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, 
"[C%"PRIu64"][L%"PRIu64"] Waiting for credit to initiate message", 
conn->conn_id, conn->outgoing_id);
-        }
-        return 0;
-    }
-
-    //
-    // Don't read from proton if in Q2 holdoff
-    //
-    if (conn->q2_blocked) {
-        qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] 
handle_incoming q2_blocked", conn->conn_id);
-        return 0;
-    }
-
-    // Read all buffers available from proton.
-    // Collect buffers for ingress; free empty buffers.
-    qd_buffer_list_t buffers;
-    DEQ_INIT(buffers);
     pn_raw_buffer_t raw_buffers[READ_BUFFERS];
     size_t n;
     int count = 0;
@@ -221,31 +225,75 @@ static int handle_incoming_impl(qdr_tcp_connection_t 
*conn, bool close_pending)
             qd_buffer_t *buf = (qd_buffer_t*) raw_buffers[i].context;
             qd_buffer_insert(buf, raw_buffers[i].size);
             count += raw_buffers[i].size;
+
+            assert(raw_buffers[i].size == qd_buffer_size(buf));
             if (raw_buffers[i].size > 0) {
-                DEQ_INSERT_TAIL(buffers, buf);
+                DEQ_INSERT_TAIL(*buffers, buf);
             } else {
                 qd_buffer_free(buf);
                 free_count++;
             }
         }
     }
-    qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] Took %zu read 
buffers", conn->conn_id, DEQ_SIZE(buffers));
-    qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] Freed %i read 
buffers", conn->conn_id, free_count);
 
-    // Only grant more buffers to proton for reading if close is not pending
-    if (!close_pending) {
-        grant_read_buffers(conn);
+    if (count > 0) {
+        // account for any incoming bytes just read
+        conn->last_in_time = tcp_adaptor->core->uptime_ticks;
+        conn->bytes_in += count;
     }
 
-    if (conn->instream) {
-        qd_message_stream_data_append(qdr_delivery_message(conn->instream), 
&buffers, &conn->q2_blocked);
-        if (conn->q2_blocked) {
-            // note: unit tests grep for this log!
-            qd_log(tcp_adaptor->log_source, QD_LOG_TRACE, "[C%"PRIu64"] client 
link blocked on Q2 limit", conn->conn_id);
-        }
-        qdr_delivery_continue(tcp_adaptor->core, conn->instream, false);
-        qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, 
"[C%"PRIu64"][L%"PRIu64"] Continuing message with %i bytes", conn->conn_id, 
conn->incoming_id, count);
-    } else {
+    qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG,
+        "[C%"PRIu64"] pn_raw_connection_take_read_buffers() took %zu, freed 
%i",
+        conn->conn_id, DEQ_SIZE(*buffers), free_count);
+
+    return count;
+}
+
+
+// Fetch incoming raw incoming buffers from proton and pass them to a delivery.
+// Create a new delivery if necessary.
+// Return number of bytes read from raw connection
+static int handle_incoming(qdr_tcp_connection_t *conn, const char *msg)
+{
+    qd_log_source_t *log = tcp_adaptor->log_source;
+
+    qd_log(log, QD_LOG_TRACE,
+           "[C%"PRIu64"][L%"PRIu64"] handle_incoming %s for %s connection. 
read_closed:%s, flow_enabled:%s",
+           conn->conn_id, conn->incoming_id, msg,
+           qdr_tcp_connection_role_name(conn),
+           conn->raw_closed_read ? "T" : "F",
+           conn->flow_enabled    ? "T" : "F");
+
+    if (conn->raw_read_shutdown) {
+        // Drain all read buffers that may still be in the raw connection
+        qd_log(log, QD_LOG_TRACE,
+            "[C%"PRIu64"][L%"PRIu64"] handle_incoming %s for %s connection. 
drain read buffers",
+            conn->conn_id, conn->incoming_id, msg,
+            qdr_tcp_connection_role_name(conn));
+        qd_buffer_list_t buffers;
+        DEQ_INIT(buffers);
+        handle_incoming_raw_read(conn, &buffers);
+        qd_buffer_list_free_buffers(&buffers);
+        return 0;
+    }
+
+    // Don't initiate an ingress stream message
+    // if we don't yet have a reply-to address and credit.
+    if (conn->ingress && !conn->reply_to) {
+        qd_log(log, QD_LOG_DEBUG,
+                "[C%"PRIu64"][L%"PRIu64"] Waiting for reply-to address before 
initiating ingress stream message",
+                conn->conn_id, conn->incoming_id);
+        return 0;
+    }
+    if (!conn->flow_enabled) {
+        qd_log(log, QD_LOG_DEBUG,
+                "[C%"PRIu64"][L%"PRIu64"] Waiting for credit before initiating 
ingress stream message",
+                conn->conn_id, conn->incoming_id);
+        return 0;
+    }
+
+    // Ensure existence of ingress stream message
+    if (!conn->instream) {
         qd_message_t *msg = qd_message();
 
         qd_message_set_stream_annotation(msg, true);
@@ -256,14 +304,19 @@ static int handle_incoming_impl(qdr_tcp_connection_t 
*conn, bool close_pending)
         qd_compose_insert_null(props);                      // user-id
         if (conn->ingress) {
             qd_compose_insert_string(props, conn->config.address); // to
-            qd_compose_insert_string(props, conn->global_id);   // subject
-            qd_compose_insert_string(props, conn->reply_to);    // reply-to
-            qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, 
"[C%"PRIu64"][L%"PRIu64"] Initiating ingress to: %s reply: %s", conn->conn_id, 
conn->incoming_id, conn->config.address, conn->reply_to);
+            qd_compose_insert_string(props, conn->global_id);      // subject
+            qd_compose_insert_string(props, conn->reply_to);       // reply-to
+            qd_log(log, QD_LOG_DEBUG,
+                   "[C%"PRIu64"][L%"PRIu64"] Initiating ingress stream 
incoming link for %s connection to: %s reply: %s",
+                   conn->conn_id, conn->incoming_id, 
qdr_tcp_connection_role_name(conn),
+                   conn->config.address, conn->reply_to);
         } else {
-            qd_compose_insert_string(props, conn->reply_to); // to
-            qd_compose_insert_string(props, conn->global_id);   // subject
-            qd_compose_insert_null(props);    // reply-to
-            qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, 
"[C%"PRIu64"][L%"PRIu64"] Initiating egress to: %s", conn->conn_id, 
conn->incoming_id, conn->reply_to);
+            qd_compose_insert_string(props, conn->reply_to);  // to
+            qd_compose_insert_string(props, conn->global_id); // subject
+            qd_compose_insert_null(props);                    // reply-to
+            qd_log(log, QD_LOG_DEBUG,
+                   "[C%"PRIu64"][L%"PRIu64"] Initiating ingress stream 
incoming link for %s connection to: %s",
+                   conn->conn_id, conn->incoming_id, 
qdr_tcp_connection_role_name(conn), conn->reply_to);
         }
         //qd_compose_insert_null(props);                      // correlation-id
         //qd_compose_insert_null(props);                      // content-type
@@ -275,11 +328,6 @@ static int handle_incoming_impl(qdr_tcp_connection_t 
*conn, bool close_pending)
         //qd_compose_insert_null(props);                      // 
reply-to-group-id
         qd_compose_end_list(props);
 
-        if (count > 0) {
-            props = qd_compose(QD_PERFORMATIVE_BODY_DATA, props);
-            qd_compose_insert_binary_buffers(props, &buffers);
-        }
-
         qd_message_compose_2(msg, props, false);
         qd_compose_free(props);
 
@@ -288,18 +336,65 @@ static int handle_incoming_impl(qdr_tcp_connection_t 
*conn, bool close_pending)
         qd_message_set_q2_unblocked_handler(msg, qdr_tcp_q2_unblocked_handler, 
conn_sp);
 
         conn->instream = qdr_link_deliver(conn->incoming, msg, 0, false, 0, 0, 
0, 0);
-        qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, 
"[C%"PRIu64"][L%"PRIu64"] Initiating message with %i bytes", conn->conn_id, 
conn->incoming_id, count);
+
+        qd_log(log, QD_LOG_DEBUG,
+               "[C%"PRIu64"][L%"PRIu64"][D%"PRIu64"] Initiating ingress stream 
message with 0 bytes",
+               conn->conn_id, conn->incoming_id, conn->instream->delivery_id);
+
+        conn->incoming_started = true;
+    }
+
+    // Don't read from proton if in Q2 holdoff
+    if (conn->q2_blocked) {
+        qd_log(log, QD_LOG_DEBUG,
+               "[C%"PRIu64"] handle_incoming q2_blocked for %s connection",
+               conn->conn_id,  qdr_tcp_connection_role_name(conn));
+        return 0;
     }
-    return count;
-}
 
+    // Read all buffers available from proton.
+    // Collect buffers for ingress; free empty buffers.
+    qd_buffer_list_t buffers;
+    DEQ_INIT(buffers);
+    int count = handle_incoming_raw_read(conn, &buffers);
 
-static int handle_incoming(qdr_tcp_connection_t *conn)
-{
-    // Normal incoming runs with no close pending
-    return handle_incoming_impl(conn, false);
+    // Grant more buffers to proton for reading if read side is still open
+    if (!conn->raw_closed_read) {
+        // normal path - keep on processing
+        grant_read_buffers(conn);
+    }
+
+    // Push the bytes just read into the streaming message
+    if (count > 0) {
+        qd_message_stream_data_append(qdr_delivery_message(conn->instream), 
&buffers, &conn->q2_blocked);
+        if (conn->q2_blocked) {
+            // note: unit tests grep for this log!
+            qd_log(log, QD_LOG_TRACE,
+                    "[C%"PRIu64"][L%"PRIu64"] client link blocked on Q2 limit",
+                    conn->conn_id, conn->incoming_id);
+        }
+        qdr_delivery_continue(tcp_adaptor->core, conn->instream, false);
+        qd_log(log, QD_LOG_DEBUG,
+                "[C%"PRIu64"][L%"PRIu64"] Continuing message with %i bytes",
+                conn->conn_id, conn->incoming_id, count);
+    } else {
+        assert (DEQ_SIZE(buffers) == 0);
+    }
+
+    // Close the stream message if read side has closed
+    if (conn->raw_closed_read) {
+        qd_log(log, QD_LOG_DEBUG,
+            "[C%"PRIu64"][L%"PRIu64"] close instream delivery",
+            conn->conn_id, conn->incoming_id);
+        qd_message_set_receive_complete(qdr_delivery_message(conn->instream));
+        qdr_delivery_continue(tcp_adaptor->core, conn->instream, true);
+        conn->raw_read_shutdown = true;
+    }
+
+    return count;
 }
 
+
 static void flush_outgoing_buffs(qdr_tcp_connection_t *conn)
 {
     // Flush buffers staged for writing to raw conn
@@ -342,21 +437,29 @@ static void free_qdr_tcp_connection(qdr_tcp_connection_t* 
tc)
 static void handle_disconnected(qdr_tcp_connection_t* conn)
 {
     if (conn->instream) {
-        qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, 
"[C%"PRIu64"][L%"PRIu64"] handle_disconnected - close instream", conn->conn_id, 
conn->incoming_id);
+        qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG,
+               "[C%"PRIu64"][L%"PRIu64"] handle_disconnected - close instream",
+               conn->conn_id, conn->incoming_id);
         qd_message_set_receive_complete(qdr_delivery_message(conn->instream));
         qdr_delivery_continue(tcp_adaptor->core, conn->instream, true);
         qdr_delivery_decref(tcp_adaptor->core, conn->instream, 
"tcp-adaptor.handle_disconnected - instream");
     }
     if (conn->outstream) {
-        qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, 
"[C%"PRIu64"][L%"PRIu64"] handle_disconnected close outstream", conn->conn_id, 
conn->outgoing_id);
+        qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG,
+               "[C%"PRIu64"][L%"PRIu64"] handle_disconnected - close 
outstream",
+               conn->conn_id, conn->outgoing_id);
         qdr_delivery_decref(tcp_adaptor->core, conn->outstream, 
"tcp-adaptor.handle_disconnected - outstream");
     }
     if (conn->incoming) {
-        qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, 
"[C%"PRIu64"][L%"PRIu64"] handle_disconnected - detach incoming", 
conn->conn_id, conn->incoming_id);
+        qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG,
+               "[C%"PRIu64"][L%"PRIu64"] handle_disconnected - detach 
incoming",
+               conn->conn_id, conn->incoming_id);
         qdr_link_detach(conn->incoming, QD_LOST, 0);
     }
     if (conn->outgoing) {
-        qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, 
"[C%"PRIu64"][L%"PRIu64"] handle_disconnected - detach outgoing", 
conn->conn_id, conn->outgoing_id);
+        qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG,
+               "[C%"PRIu64"][L%"PRIu64"] handle_disconnected - detach 
outgoing",
+               conn->conn_id, conn->outgoing_id);
         qdr_link_detach(conn->outgoing, QD_LOST, 0);
     }
     if (conn->qdr_conn) {
@@ -393,9 +496,14 @@ static int read_message_body(qdr_tcp_connection_t *conn, 
qd_message_t *msg, pn_r
         } else {
             switch (stream_data_result) {
             case QD_MESSAGE_STREAM_DATA_NO_MORE:
-                qd_log(tcp_adaptor->log_source, QD_LOG_INFO, "[C%"PRIu64"] 
EOS", conn->conn_id); break;
+                qd_log(tcp_adaptor->log_source, QD_LOG_INFO,
+                       "[C%"PRIu64"] EOS", conn->conn_id);
+                conn->read_eos_seen = true;
+                break;
             case QD_MESSAGE_STREAM_DATA_INVALID:
-                qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "[C%"PRIu64"] 
Invalid body data for streaming message", conn->conn_id); break;
+                qd_log(tcp_adaptor->log_source, QD_LOG_ERROR,
+                       "[C%"PRIu64"] Invalid body data for streaming message", 
conn->conn_id);
+                break;
             default:
                 break;
             }
@@ -459,11 +567,12 @@ static bool write_outgoing_buffs(qdr_tcp_connection_t 
*conn)
                 bytes_written += conn->outgoing_buffs[conn->outgoing_buff_idx 
+ i].size;
             } else {
                 qd_log(tcp_adaptor->log_source, QD_LOG_ERROR,
-                       "[C%"PRIu64"] empty buffer can't be written (%"PRIu64" 
of %"PRIu64")", conn->conn_id, i+1, used);
+                       "[C%"PRIu64"] empty buffer can't be written (%"PRIu64" 
of %"PRIu64")",
+                       conn->conn_id, i+1, used);
             }
         }
         qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG,
-               "[C%"PRIu64"] Writing %i bytes", conn->conn_id, bytes_written);
+               "[C%"PRIu64"] pn_raw_connection_write_buffers wrote %i bytes", 
conn->conn_id, bytes_written);
 
         conn->outgoing_buff_count -= used;
         conn->outgoing_buff_idx   += used;
@@ -501,8 +610,14 @@ static void handle_outgoing(qdr_tcp_connection_t *conn)
             }
         }
 
-        if (qd_message_receive_complete(msg) || qd_message_send_complete(msg)) 
{
-            pn_raw_connection_close(conn->pn_raw_conn);
+        if (conn->read_eos_seen) {
+            qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG,
+                   "[C%"PRIu64"] handle_outgoing calling 
pn_raw_connection_write_close(). rcv_complete:%s, send_complete:%s",
+                    conn->conn_id, qd_message_receive_complete(msg) ? "T" : 
"F", qd_message_send_complete(msg) ? "T" : "F");
+            sys_mutex_lock(conn->activation_lock);
+            conn->raw_closed_write = true;
+            sys_mutex_unlock(conn->activation_lock);
+            pn_raw_connection_write_close(conn->pn_raw_conn);
         }
     }
 }
@@ -600,8 +715,6 @@ static void 
qdr_tcp_connection_ingress_accept(qdr_tcp_connection_t* tc)
     tc->opened_time = tcp_adaptor->core->uptime_ticks;
     qdr_link_set_context(tc->incoming, tc);
 
-    grant_read_buffers(tc);
-
     qdr_action_t *action = qdr_action(qdr_add_tcp_connection_CT, 
"add_tcp_connection");
     action->args.general.context_1 = tc;
     qdr_action_enqueue(tcp_adaptor->core, action);
@@ -614,13 +727,17 @@ static void handle_connection_event(pn_event_t *e, 
qd_server_t *qd_server, void
     switch (pn_event_type(e)) {
     case PN_RAW_CONNECTION_CONNECTED: {
         if (conn->ingress) {
-            qd_log(log, QD_LOG_INFO, "[C%"PRIu64"] PN_RAW_CONNECTION_CONNECTED 
Ingress accepted to %s from %s (global_id=%s)", conn->conn_id, 
conn->config.host_port, conn->remote_address, conn->global_id);
             qdr_tcp_connection_ingress_accept(conn);
+            qd_log(log, QD_LOG_INFO,
+                   "[C%"PRIu64"] PN_RAW_CONNECTION_CONNECTED Ingress accepted 
to %s from %s (global_id=%s)",
+                   conn->conn_id, conn->config.host_port, 
conn->remote_address, conn->global_id);
             break;
         } else {
             conn->remote_address = get_address_string(conn->pn_raw_conn);
             conn->opened_time = tcp_adaptor->core->uptime_ticks;
-            qd_log(log, QD_LOG_INFO, "[C%"PRIu64"] PN_RAW_CONNECTION_CONNECTED 
Egress connected to %s", conn->conn_id, conn->remote_address);
+            qd_log(log, QD_LOG_INFO,
+                   "[C%"PRIu64"] PN_RAW_CONNECTION_CONNECTED Egress connected 
to %s",
+                   conn->conn_id, conn->remote_address);
             if (!!conn->initial_delivery) {
                 qdr_tcp_open_server_side_connection(conn);
             }
@@ -630,25 +747,28 @@ static void handle_connection_event(pn_event_t *e, 
qd_server_t *qd_server, void
         }
     }
     case PN_RAW_CONNECTION_CLOSED_READ: {
-        qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] 
PN_RAW_CONNECTION_CLOSED_READ", conn->conn_id);
-        conn->q2_blocked = false;
-        handle_incoming_impl(conn, true);
+        qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] 
PN_RAW_CONNECTION_CLOSED_READ",
+               conn->conn_id, conn->incoming_id);
         sys_mutex_lock(conn->activation_lock);
+        conn->q2_blocked = false;
         conn->raw_closed_read = true;
         sys_mutex_unlock(conn->activation_lock);
-        pn_raw_connection_close(conn->pn_raw_conn);
+        handle_incoming(conn, "PNRC_CLOSED_READ");
         break;
     }
     case PN_RAW_CONNECTION_CLOSED_WRITE: {
-        qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] 
PN_RAW_CONNECTION_CLOSED_WRITE", conn->conn_id);
+        qd_log(log, QD_LOG_DEBUG,
+               "[C%"PRIu64"] PN_RAW_CONNECTION_CLOSED_WRITE",
+               conn->conn_id);
         sys_mutex_lock(conn->activation_lock);
         conn->raw_closed_write = true;
         sys_mutex_unlock(conn->activation_lock);
-        pn_raw_connection_close(conn->pn_raw_conn);
         break;
     }
     case PN_RAW_CONNECTION_DISCONNECTED: {
-        qd_log(log, QD_LOG_INFO, "[C%"PRIu64"] 
PN_RAW_CONNECTION_DISCONNECTED", conn->conn_id);
+        qd_log(log, QD_LOG_INFO,
+               "[C%"PRIu64"] PN_RAW_CONNECTION_DISCONNECTED",
+               conn->conn_id);
         sys_mutex_lock(conn->activation_lock);
         conn->pn_raw_conn = 0;
         sys_mutex_unlock(conn->activation_lock);
@@ -656,38 +776,53 @@ static void handle_connection_event(pn_event_t *e, 
qd_server_t *qd_server, void
         break;
     }
     case PN_RAW_CONNECTION_NEED_WRITE_BUFFERS: {
-        qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] 
PN_RAW_CONNECTION_NEED_WRITE_BUFFERS", conn->conn_id);
+        qd_log(log, QD_LOG_DEBUG,
+               "[C%"PRIu64"] PN_RAW_CONNECTION_NEED_WRITE_BUFFERS",
+               conn->conn_id);
         while (qdr_connection_process(conn->qdr_conn)) {}
         handle_outgoing(conn);
         break;
     }
     case PN_RAW_CONNECTION_NEED_READ_BUFFERS: {
-        qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] 
PN_RAW_CONNECTION_NEED_READ_BUFFERS", conn->conn_id);
+        qd_log(log, QD_LOG_DEBUG,
+               "[C%"PRIu64"] PN_RAW_CONNECTION_NEED_READ_BUFFERS",
+               conn->conn_id);
         while (qdr_connection_process(conn->qdr_conn)) {}
-        handle_incoming(conn);
+        if (conn->incoming_started) {
+            grant_read_buffers(conn);
+            handle_incoming(conn, "PNRC_NEED_READ_BUFFERS");
+        }
         break;
     }
     case PN_RAW_CONNECTION_WAKE: {
-        qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] PN_RAW_CONNECTION_WAKE", 
conn->conn_id);
-        sys_mutex_lock(conn->activation_lock);
+        qd_log(log, QD_LOG_DEBUG,
+               "[C%"PRIu64"] PN_RAW_CONNECTION_WAKE",
+               conn->conn_id);
         if (sys_atomic_set(&conn->q2_restart, 0)) {
-            sys_mutex_unlock(conn->activation_lock);
-            // note: unit tests grep for this log!
-            qd_log(log, QD_LOG_TRACE, "[C%"PRIu64"] client link unblocked from 
Q2 limit", conn->conn_id);
+            sys_mutex_lock(conn->activation_lock);
             conn->q2_blocked = false;
-            handle_incoming(conn);
-        }
-        else {
             sys_mutex_unlock(conn->activation_lock);
+            // note: unit tests grep for this log!
+            qd_log(log, QD_LOG_TRACE,
+                   "[C%"PRIu64"] client link unblocked from Q2 limit",
+                   conn->conn_id);
+            handle_incoming(conn, "PNRC_WAKE after Q2 unblock");
         }
         while (qdr_connection_process(conn->qdr_conn)) {}
         break;
     }
     case PN_RAW_CONNECTION_READ: {
-        int read = handle_incoming(conn);
-        conn->last_in_time = tcp_adaptor->core->uptime_ticks;
-        conn->bytes_in += read;
-        qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] PN_RAW_CONNECTION_READ Read %i 
bytes. Total read %"PRIu64" bytes", conn->conn_id, read, conn->bytes_in);
+        qd_log(log, QD_LOG_DEBUG,
+               "[C%"PRIu64"] PN_RAW_CONNECTION_READ Event ",
+               conn->conn_id);
+        int read = 0;
+        if (conn->incoming_started) {
+            // Streaming message exists. Process read normally.
+            read = handle_incoming(conn, "PNRC_READ");
+        }
+        qd_log(log, QD_LOG_DEBUG,
+               "[C%"PRIu64"] PN_RAW_CONNECTION_READ Read %i bytes. Total read 
%"PRIu64" bytes",
+               conn->conn_id, read, conn->bytes_in);
         while (qdr_connection_process(conn->qdr_conn)) {}
         break;
     }
@@ -705,7 +840,9 @@ static void handle_connection_event(pn_event_t *e, 
qd_server_t *qd_server, void
         }
         conn->last_out_time = tcp_adaptor->core->uptime_ticks;
         conn->bytes_out += written;
-        qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] PN_RAW_CONNECTION_WRITTEN 
Wrote %zu bytes. Total written %"PRIu64" bytes", conn->conn_id, written, 
conn->bytes_out);
+        qd_log(log, QD_LOG_DEBUG,
+               "[C%"PRIu64"] PN_RAW_CONNECTION_WRITTEN 
pn_raw_connection_take_written_buffers wrote %zu bytes. Total written %"PRIu64" 
bytes",
+               conn->conn_id, written, conn->bytes_out);
         while (qdr_connection_process(conn->qdr_conn)) {}
         break;
     }
@@ -732,6 +869,7 @@ static qdr_tcp_connection_t 
*qdr_tcp_connection_ingress(qd_tcp_listener_t* liste
     //event on another thread, which is where the rest of the
     //initialisation will happen, through a call to
     //qdr_tcp_connection_ingress_accept
+    qd_log(tcp_adaptor->log_source, QD_LOG_INFO, "[C%"PRIu64"] call 
pn_listener_raw_accept()", tc->conn_id);
     pn_listener_raw_accept(listener->pn_listener, tc->pn_raw_conn);
     return tc;
 }
@@ -742,21 +880,21 @@ static void 
qdr_tcp_open_server_side_connection(qdr_tcp_connection_t* tc)
     const char *host = tc->egress_dispatcher ? "egress-dispatch" : 
tc->config.host_port;
     qd_log(tcp_adaptor->log_source, QD_LOG_INFO, "[C%"PRIu64"] Opening 
server-side core connection %s", tc->conn_id, host);
 
-    qdr_connection_info_t *info = qdr_connection_info(false, //bool            
 is_encrypted,
-                                                      false, //bool            
 is_authenticated,
-                                                      true,  //bool            
 opened,
-                                                      "",   //char            
*sasl_mechanisms,
+    qdr_connection_info_t *info = qdr_connection_info(false,       //bool      
       is_encrypted,
+                                                      false,       //bool      
       is_authenticated,
+                                                      true,        //bool      
       opened,
+                                                      "",          //char      
      *sasl_mechanisms,
                                                       QD_OUTGOING, 
//qd_direction_t   dir,
-                                                      host,  //const char      
*host,
-                                                      "",    //const char      
*ssl_proto,
-                                                      "",    //const char      
*ssl_cipher,
-                                                      "",    //const char      
*user,
-                                                      "TcpAdaptor",    //const 
char      *container,
-                                                      0,     //pn_data_t       
*connection_properties,
-                                                      0,     //int             
 ssl_ssf,
-                                                      false, //bool            
 ssl,
-                                                      "",                  // 
peer router version,
-                                                      false);              // 
streaming links
+                                                      host,        //const 
char      *host,
+                                                      "",          //const 
char      *ssl_proto,
+                                                      "",          //const 
char      *ssl_cipher,
+                                                      "",          //const 
char      *user,
+                                                      "TcpAdaptor",//const 
char      *container,
+                                                      0,           //pn_data_t 
      *connection_properties,
+                                                      0,           //int       
       ssl_ssf,
+                                                      false,       //bool      
       ssl,
+                                                      "",          // peer 
router version,
+                                                      false);      // 
streaming links
 
     qdr_connection_t *conn = qdr_connection_opened(tcp_adaptor->core,
                                                    tcp_adaptor->adaptor,
@@ -782,6 +920,12 @@ static void 
qdr_tcp_open_server_side_connection(qdr_tcp_connection_t* tc)
 
     // This attach passes the ownership of the delivery from the core-side 
connection and link
     // to the adaptor-side outgoing connection and link.
+    uint64_t i_conn_id = 0;
+    uint64_t i_link_id = 0;
+    if (!!tc->initial_delivery) {
+        i_conn_id = tc->initial_delivery->conn_id;
+        i_link_id = tc->initial_delivery->link_id;
+    }
     tc->outgoing = qdr_link_first_attach(conn,
                                          QD_OUTGOING,
                                          source,           //qdr_terminus_t   
*source,
@@ -792,8 +936,10 @@ static void 
qdr_tcp_open_server_side_connection(qdr_tcp_connection_t* tc)
                                          tc->initial_delivery,
                                          &(tc->outgoing_id));
     if (!!tc->initial_delivery) {
-        qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, DLV_FMT" 
initial_delivery ownership passed to "DLV_FMT,
-               DLV_ARGS(tc->initial_delivery), tc->outgoing->conn_id, 
tc->outgoing->identity, tc->initial_delivery->delivery_id);
+        qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG,
+               DLV_FMT" initial_delivery ownership passed to "DLV_FMT,
+               i_conn_id, i_link_id, tc->initial_delivery->delivery_id,
+               tc->outgoing->conn_id, tc->outgoing->identity, 
tc->initial_delivery->delivery_id);
         qdr_delivery_decref(tcp_adaptor->core, tc->initial_delivery, 
"tcp-adaptor - passing initial_delivery into new link");
         tc->initial_delivery = 0;
     }
@@ -823,14 +969,17 @@ static qdr_tcp_connection_t 
*qdr_tcp_connection_egress(qd_bridge_config_t *confi
     tc->conn_id = qd_server_allocate_connection_id(tc->server);
 
     //
-    // If this is the egress dispatcher, set up the core connection now.  
Otherwise, set up a physical
-    // raw connection and wait until we are running in that connection's 
context to set up the core
+    // If this is the egress dispatcher, set up the core connection now.
+    // Otherwise, set up a physical raw connection and wait until we are
+    // running in that connection's context to set up the core
     // connection.
     //
     if (tc->egress_dispatcher)
         qdr_tcp_open_server_side_connection(tc);
     else {
-        qd_log(tcp_adaptor->log_source, QD_LOG_INFO, "[C%"PRIu64"] Connecting 
to: %s", tc->conn_id, tc->config.host_port);
+        qd_log(tcp_adaptor->log_source, QD_LOG_INFO,
+               "[C%"PRIu64"] call pn_proactor_raw_connect(). Egress connecting 
to: %s",
+               tc->conn_id, tc->config.host_port);
         tc->pn_raw_conn = pn_raw_connection();
         pn_raw_connection_set_context(tc->pn_raw_conn, tc);
         pn_proactor_raw_connect(qd_server_proactor(tc->server), 
tc->pn_raw_conn, tc->config.host_port);
@@ -900,7 +1049,7 @@ static void handle_listener_event(pn_event_t *e, 
qd_server_t *qd_server, void *c
     }
 
     case PN_LISTENER_ACCEPT: {
-        qd_log(log, QD_LOG_INFO, "PN_LISTENER_ACCEPT Accepting TCP connection 
on %s", host_port);
+        qd_log(log, QD_LOG_INFO, "PN_LISTENER_ACCEPT Accepting TCP connection 
to %s", host_port);
         qdr_tcp_connection_ingress(li);
         break;
     }
@@ -977,7 +1126,9 @@ void qd_dispatch_delete_tcp_listener(qd_dispatch_t *qd, 
void *impl)
             pn_listener_close(li->pn_listener);
         }
         DEQ_REMOVE(tcp_adaptor->listeners, li);
-        qd_log(tcp_adaptor->log_source, QD_LOG_INFO, "Deleted TcpListener for 
%s, %s:%s", li->config.address, li->config.host, li->config.port);
+        qd_log(tcp_adaptor->log_source, QD_LOG_INFO,
+               "Deleted TcpListener for %s, %s:%s",
+               li->config.address, li->config.host, li->config.port);
         qd_tcp_listener_decref(li);
     }
 }
@@ -1033,7 +1184,9 @@ void qd_dispatch_delete_tcp_connector(qd_dispatch_t *qd, 
void *impl)
     if (ct) {
         //need to close the pseudo-connection used for dispatching
         //deliveries out to live connnections:
-        qd_log(tcp_adaptor->log_source, QD_LOG_INFO, "Deleted TcpConnector for 
%s, %s:%s", ct->config.address, ct->config.host, ct->config.port);
+        qd_log(tcp_adaptor->log_source, QD_LOG_INFO,
+               "Deleted TcpConnector for %s, %s:%s",
+               ct->config.address, ct->config.host, ct->config.port);
         close_egress_dispatcher((qdr_tcp_connection_t*) ct->dispatcher);
         DEQ_REMOVE(tcp_adaptor->connectors, ct);
         qd_tcp_connector_decref(ct);
@@ -1052,7 +1205,9 @@ static void qdr_tcp_first_attach(void *context, 
qdr_connection_t *conn, qdr_link
     void *tcontext = qdr_connection_get_context(conn);
     if (tcontext) {
         qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) tcontext;
-        qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, 
"[C%"PRIu64"][L%"PRIu64"] qdr_tcp_first_attach: NOOP", conn->conn_id, 
qdr_tcp_conn_linkid(conn));
+        qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG,
+               "[C%"PRIu64"][L%"PRIu64"] qdr_tcp_first_attach: NOOP",
+               conn->conn_id, qdr_tcp_conn_linkid(conn));
     } else {
         qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "qdr_tcp_first_attach: 
no link context");
         assert(false);
@@ -1078,21 +1233,23 @@ static void qdr_tcp_second_attach(void *context, 
qdr_link_t *link,
     if (link_context) {
         qdr_tcp_connection_t* tc = (qdr_tcp_connection_t*) link_context;
         if (qdr_link_direction(link) == QD_OUTGOING) {
-            qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, 
"[C%"PRIu64"][L%"PRIu64"] qdr_tcp_second_attach", tc->conn_id, tc->outgoing_id);
+            qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG,
+                   "[C%"PRIu64"][L%"PRIu64"] %s qdr_tcp_second_attach",
+                   tc->conn_id, tc->outgoing_id,
+                   qdr_tcp_quadrant_id(tc, link));
             if (tc->ingress) {
                 qdr_tcp_connection_copy_reply_to(tc, 
qdr_terminus_get_address(source));
                 // for ingress, can start reading from socket once we have
                 // a reply to address, as that is when we are able to send
                 // out a message
-                grant_read_buffers(tc);
-                handle_incoming(tc);
+                handle_incoming(tc, "qdr_tcp_second_attach");
             }
             qdr_link_flow(tcp_adaptor->core, link, 10, false);
         } else if (!tc->ingress) {
-            qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, 
"[C%"PRIu64"][L%"PRIu64"] qdr_tcp_second_attach", tc->conn_id, tc->incoming_id);
-            //for egress we can start reading from the socket once we
-            //have the link to send messages over
-            grant_read_buffers(tc);
+            qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG,
+                   "[C%"PRIu64"][L%"PRIu64"] %s qdr_tcp_second_attach",
+                   tc->conn_id, tc->incoming_id,
+               qdr_tcp_quadrant_id(tc, link));
         }
     } else {
         qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "qdr_tcp_second_attach: 
no link context");
@@ -1115,11 +1272,13 @@ static void qdr_tcp_flow(void *context, qdr_link_t 
*link, int credit)
         qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) link_context;
         if (!conn->flow_enabled && credit > 0) {
             conn->flow_enabled = true;
-            qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, 
"[C%"PRIu64"][L%"PRIu64"] qdr_tcp_flow: Flow enabled, credit=%d",
+            qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG,
+                   "[C%"PRIu64"][L%"PRIu64"] qdr_tcp_flow: Flow enabled, 
credit=%d",
                    conn->conn_id, conn->outgoing_id, credit);
-            handle_incoming(conn);
+            handle_incoming(conn, "qdr_tcp_flow");
         } else {
-            qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, 
"[C%"PRIu64"][L%"PRIu64"] qdr_tcp_flow: No action. enabled:%s, credit:%d",
+            qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG,
+                   "[C%"PRIu64"][L%"PRIu64"] qdr_tcp_flow: No action. 
enabled:%s, credit:%d",
                    conn->conn_id, qdr_tcp_conn_linkid(conn), 
conn->flow_enabled?"T":"F", credit);
         }
     } else {
@@ -1134,7 +1293,9 @@ static void qdr_tcp_offer(void *context, qdr_link_t 
*link, int delivery_count)
     void* link_context = qdr_link_get_context(link);
     if (link_context) {
         qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) link_context;
-        qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, 
"[C%"PRIu64"][L%"PRIu64"] qdr_tcp_offer: NOOP", conn->conn_id, 
qdr_tcp_conn_linkid(conn));
+        qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG,
+               "[C%"PRIu64"][L%"PRIu64"] qdr_tcp_offer: NOOP",
+               conn->conn_id, qdr_tcp_conn_linkid(conn));
     } else {
         qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "qdr_tcp_offer: no link 
context");
         assert(false);
@@ -1148,7 +1309,9 @@ static void qdr_tcp_drained(void *context, qdr_link_t 
*link)
     void* link_context = qdr_link_get_context(link);
     if (link_context) {
         qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) link_context;
-        qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, 
"[C%"PRIu64"][L%"PRIu64"] qdr_tcp_drained: NOOP", conn->conn_id, 
qdr_tcp_conn_linkid(conn));
+        qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG,
+               "[C%"PRIu64"][L%"PRIu64"] qdr_tcp_drained: NOOP",
+               conn->conn_id, qdr_tcp_conn_linkid(conn));
     } else {
         qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "qdr_tcp_drained: no 
link context");
         assert(false);
@@ -1161,7 +1324,9 @@ static void qdr_tcp_drain(void *context, qdr_link_t 
*link, bool mode)
     void* link_context = qdr_link_get_context(link);
     if (link_context) {
         qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) link_context;
-        qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, 
"[C%"PRIu64"][L%"PRIu64"] qdr_tcp_drain: NOOP", conn->conn_id, 
qdr_tcp_conn_linkid(conn));
+        qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG,
+               "[C%"PRIu64"][L%"PRIu64"] qdr_tcp_drain: NOOP",
+               conn->conn_id, qdr_tcp_conn_linkid(conn));
     } else {
         qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "qdr_tcp_drain: no link 
context");
         assert(false);
@@ -1174,7 +1339,9 @@ static int qdr_tcp_push(void *context, qdr_link_t *link, 
int limit)
     void* link_context = qdr_link_get_context(link);
     if (link_context) {
         qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) link_context;
-        qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, 
"[C%"PRIu64"][L%"PRIu64"] qdr_tcp_push", conn->conn_id, 
qdr_tcp_conn_linkid(conn));
+        qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG,
+               "[C%"PRIu64"][L%"PRIu64"] qdr_tcp_push",
+               conn->conn_id, qdr_tcp_conn_linkid(conn));
         return qdr_link_process_deliveries(tcp_adaptor->core, link, limit);
     } else {
         qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "qdr_tcp_push: no link 
context");
@@ -1189,9 +1356,11 @@ static uint64_t qdr_tcp_deliver(void *context, 
qdr_link_t *link, qdr_delivery_t
     void* link_context = qdr_link_get_context(link);
     if (link_context) {
         qdr_tcp_connection_t* tc = (qdr_tcp_connection_t*) link_context;
-        qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, DLV_FMT" qdr_tcp_deliver 
Delivery event", DLV_ARGS(delivery));
+        qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG,
+               DLV_FMT" qdr_tcp_deliver Delivery event", DLV_ARGS(delivery));
         if (tc->egress_dispatcher) {
-            qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, DLV_FMT" tcp_adaptor 
initiating egress connection", DLV_ARGS(delivery));
+            qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG,
+                   DLV_FMT" tcp_adaptor initiating egress connection", 
DLV_ARGS(delivery));
             qdr_tcp_connection_egress(&(tc->config), tc->server, delivery);
             return QD_DELIVERY_MOVED_TO_NEW_LINK;
         } else if (!tc->outstream) {
@@ -1213,20 +1382,25 @@ static uint64_t qdr_tcp_deliver(void *context, 
qdr_link_t *link, qdr_delivery_t
                 tc->incoming = qdr_link_first_attach(tc->qdr_conn,
                                                      QD_INCOMING,
                                                      qdr_terminus(0),  
//qdr_terminus_t   *source,
-                                                     target, //qdr_terminus_t  
 *target,
+                                                     target,           
//qdr_terminus_t   *target,
                                                      "tcp.egress.in",  //const 
char       *name,
                                                      0,                //const 
char       *terminus_addr,
                                                      false,
                                                      NULL,
                                                      &(tc->incoming_id));
-                qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, 
"[C%"PRIu64"][L%"PRIu64"] Create Link to %s", tc->conn_id, 
tc->incoming->identity, tc->reply_to);
+                assert(tc);
+                assert(tc->incoming);
+                qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG,
+                       "[C%"PRIu64"][L%"PRIu64"] %s Created link to %s",
+                       tc->conn_id, tc->incoming->identity,
+                       qdr_tcp_quadrant_id(tc, tc->incoming), tc->reply_to);
                 qdr_link_set_context(tc->incoming, tc);
                 //add this connection to those visible through management now 
that we have the global_id
                 qdr_action_t *action = qdr_action(qdr_add_tcp_connection_CT, 
"add_tcp_connection");
                 action->args.general.context_1 = tc;
                 qdr_action_enqueue(tcp_adaptor->core, action);
 
-                handle_incoming(tc);
+                handle_incoming(tc, "qdr_tcp_deliver");
             }
         }
         handle_outgoing(tc);
@@ -1243,7 +1417,9 @@ static int qdr_tcp_get_credit(void *context, qdr_link_t 
*link)
     void* link_context = qdr_link_get_context(link);
     if (link_context) {
         qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) link_context;
-        qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, 
"[C%"PRIu64"][L%"PRIu64"] qdr_tcp_get_credit: NOOP", conn->conn_id, 
qdr_tcp_conn_linkid(conn));
+        qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG,
+               "[C%"PRIu64"][L%"PRIu64"] qdr_tcp_get_credit: NOOP",
+               conn->conn_id, qdr_tcp_conn_linkid(conn));
     } else {
         qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "qdr_tcp_get_credit: no 
link context");
         assert(false);
@@ -1257,13 +1433,19 @@ static void qdr_tcp_delivery_update(void *context, 
qdr_delivery_t *dlv, uint64_t
     void* link_context = qdr_link_get_context(qdr_delivery_link(dlv));
     if (link_context) {
         qdr_tcp_connection_t* tc = (qdr_tcp_connection_t*) link_context;
-        qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, DLV_FMT" 
qdr_tcp_delivery_update: disp: %"PRIu64", settled: %s",
+        qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG,
+               DLV_FMT" qdr_tcp_delivery_update: disp: %"PRIu64", settled: %s",
                DLV_ARGS(dlv), disp, settled ? "true" : "false");
 
-        //
-        // If one of the streaming deliveries is ever settled, the connection 
must be torn down.
-        //
         if (settled) {
+            // the only settlement occurs when the initial delivery is
+            // settled, which occurs when the connector is unable to
+            // connect to the configured tcp endpoint, so in this case
+            // we can just close the connection
+            // (The end of the message is used to convey half closed status)
+            qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG,
+                   DLV_FMT" qdr_tcp_delivery_update: call 
pn_raw_connection_close()",
+                   DLV_ARGS(dlv));
             pn_raw_connection_close(tc->pn_raw_conn);
         }
     } else {
@@ -1278,7 +1460,8 @@ static void qdr_tcp_conn_close(void *context, 
qdr_connection_t *conn, qdr_error_
     void *tcontext = qdr_connection_get_context(conn);
     if (tcontext) {
         qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) tcontext;
-        qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, 
"[C%"PRIu64"][L%"PRIu64"] qdr_tcp_conn_close: NOOP", conn->conn_id, 
qdr_tcp_conn_linkid(conn));
+        qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG,
+               "[C%"PRIu64"][L%"PRIu64"] qdr_tcp_conn_close: NOOP", 
conn->conn_id, qdr_tcp_conn_linkid(conn));
     } else {
         qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "qdr_tcp_conn_close: no 
connection context");
         assert(false);
@@ -1291,9 +1474,12 @@ static void qdr_tcp_conn_trace(void *context, 
qdr_connection_t *conn, bool trace
     void *tcontext = qdr_connection_get_context(conn);
     if (tcontext) {
         qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) tcontext;
-        qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, 
"[C%"PRIu64"][L%"PRIu64"] qdr_tcp_conn_trace: NOOP", conn->conn_id, 
qdr_tcp_conn_linkid(conn));
+        qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG,
+               "[C%"PRIu64"][L%"PRIu64"] qdr_tcp_conn_trace: NOOP",
+               conn->conn_id, qdr_tcp_conn_linkid(conn));
     } else {
-        qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "qdr_tcp_conn_trace: no 
connection context");
+        qd_log(tcp_adaptor->log_source, QD_LOG_ERROR,
+               "qdr_tcp_conn_trace: no connection context");
         assert(false);
     }
 }
@@ -1304,8 +1490,9 @@ static void qdr_tcp_activate(void *notused, 
qdr_connection_t *c)
     if (context) {
         qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) context;
         sys_mutex_lock(conn->activation_lock);
-        if (conn->pn_raw_conn && !(conn->raw_closed_read || 
conn->raw_closed_write)) {
-            qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] 
qdr_tcp_activate: waking raw connection", conn->conn_id);
+        if (conn->pn_raw_conn && !(conn->raw_closed_read && 
conn->raw_closed_write)) {
+            qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG,
+                   "[C%"PRIu64"] qdr_tcp_activate: call 
pn_raw_connection_wake()", conn->conn_id);
             pn_raw_connection_wake(conn->pn_raw_conn);
             sys_mutex_unlock(conn->activation_lock);
         } else if (conn->activate_timer) {
@@ -1316,11 +1503,13 @@ static void qdr_tcp_activate(void *notused, 
qdr_connection_t *c)
             // received. Prior to that however a subscribing link (and
             // its associated connection must be setup), for which we
             // fake wakeup by using a timer.
-            qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] 
qdr_tcp_activate: schedule activate_timer", conn->conn_id);
+            qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG,
+                   "[C%"PRIu64"] qdr_tcp_activate: schedule activate_timer", 
conn->conn_id);
             qd_timer_schedule(conn->activate_timer, 0);
         } else {
             sys_mutex_unlock(conn->activation_lock);
-            qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] 
qdr_tcp_activate: Cannot activate", conn->conn_id);
+            qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG,
+                   "[C%"PRIu64"] qdr_tcp_activate: Cannot activate", 
conn->conn_id);
         }
     } else {
         qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "qdr_tcp_activate: no 
connection context");
@@ -1342,7 +1531,7 @@ static void qdr_tcp_adaptor_init(qdr_core_t *core, void 
**adaptor_context)
     adaptor->adaptor = qdr_protocol_adaptor(core,
                                             "tcp",                // name
                                             adaptor,              // context
-                                            qdr_tcp_activate,                  
  // activate
+                                            qdr_tcp_activate,
                                             qdr_tcp_first_attach,
                                             qdr_tcp_second_attach,
                                             qdr_tcp_detach,
@@ -1563,7 +1752,8 @@ static qdr_tcp_connection_t *find_by_identity(qdr_core_t 
*core, qd_iterator_t *i
 
 void qdra_tcp_connection_get_first_CT(qdr_core_t *core, qdr_query_t *query, 
int offset)
 {
-    qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "query for first tcp 
connection (%i)", offset);
+    qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG,
+           "query for first tcp connection (%i)", offset);
     query->status = QD_AMQP_OK;
 
     if (offset >= DEQ_SIZE(tcp_adaptor->connections)) {
@@ -1618,7 +1808,8 @@ void qdra_tcp_connection_get_CT(qdr_core_t          *core,
     if (!identity) {
         query->status = QD_AMQP_BAD_REQUEST;
         query->status.description = "Name not supported. Identity required";
-        qd_log(core->agent_log, QD_LOG_ERROR, "Error performing READ of %s: 
%s", TCP_CONNECTION_TYPE, query->status.description);
+        qd_log(core->agent_log, QD_LOG_ERROR,
+               "Error performing READ of %s: %s", TCP_CONNECTION_TYPE, 
query->status.description);
     } else {
         conn = find_by_identity(core, identity);
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to