This is an automated email from the ASF dual-hosted git repository.

chug pushed a commit to branch dev-protocol-adaptors-2
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/dev-protocol-adaptors-2 by 
this push:
     new 29f4929  DISPATCH-1851: TCP adaptor - add more connection and link IDs 
to log output
29f4929 is described below

commit 29f49295b3e695211b07241651fc5375894d71fe
Author: Chuck Rolke <c...@apache.org>
AuthorDate: Tue Nov 24 09:40:29 2020 -0500

    DISPATCH-1851: TCP adaptor - add more connection and link IDs to log output
    
    This closes #928
---
 src/adaptors/tcp_adaptor.c | 201 ++++++++++++++++++++++++++++++++-------------
 1 file changed, 143 insertions(+), 58 deletions(-)

diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c
index 7476808..fe8ee26 100644
--- a/src/adaptors/tcp_adaptor.c
+++ b/src/adaptors/tcp_adaptor.c
@@ -94,6 +94,12 @@ static void qdr_del_tcp_connection_CT(qdr_core_t *core, 
qdr_action_t *action, bo
 static void handle_disconnected(qdr_tcp_connection_t* conn);
 static void free_qdr_tcp_connection(qdr_tcp_connection_t* conn);
 
+static inline uint64_t qdr_tcp_conn_linkid(const qdr_tcp_connection_t *conn)
+{
+    assert(conn);
+    return conn->instream ? conn->incoming_id : conn->outgoing_id;
+}
+
 static void on_activate(void *context)
 {
     qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) context;
@@ -227,18 +233,23 @@ static void free_qdr_tcp_connection(qdr_tcp_connection_t* 
tc)
 
 static void handle_disconnected(qdr_tcp_connection_t* conn)
 {
+    qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] 
handle_disconnected", conn->conn_id);
     if (conn->instream) {
+        qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, 
"[C%"PRIu64"][L%"PRIu64"] handle_disconnected - close instream", conn->conn_id, 
conn->incoming_id);
         qd_message_set_receive_complete(qdr_delivery_message(conn->instream));
         qdr_delivery_continue(tcp_adaptor->core, conn->instream, true);
         qdr_delivery_decref(tcp_adaptor->core, conn->instream, 
"tcp-adaptor.handle_disconnected");
     }
     if (conn->outstream) {
+        qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, 
"[C%"PRIu64"][L%"PRIu64"] handle_disconnected close outstream", conn->conn_id, 
conn->outgoing_id);
         qdr_delivery_decref(tcp_adaptor->core, conn->outstream, 
"tcp-adaptor.handle_disconnected");
     }
     if (conn->incoming) {
+        qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, 
"[C%"PRIu64"][L%"PRIu64"] handle_disconnected - detach incoming", 
conn->conn_id, conn->incoming_id);
         qdr_link_detach(conn->incoming, QD_LOST, 0);
     }
     if (conn->outgoing) {
+        qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, 
"[C%"PRIu64"][L%"PRIu64"] handle_disconnected - detach outgoing", 
conn->conn_id, conn->outgoing_id);
         qdr_link_detach(conn->outgoing, QD_LOST, 0);
     }
     qdr_connection_closed(conn->conn);
@@ -868,7 +879,14 @@ static void qdr_tcp_first_attach(void *context, 
qdr_connection_t *conn, qdr_link
                                  qdr_terminus_t *source, qdr_terminus_t 
*target,
                                  qd_session_class_t session_class)
 {
-    qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "qdr_tcp_first_attach");
+    void *tcontext = qdr_connection_get_context(conn);
+    if (tcontext) {
+        qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) tcontext;
+        qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, 
"[C%"PRIu64"][L%"PRIu64"] qdr_tcp_first_attach: NOOP", conn->conn_id, 
qdr_tcp_conn_linkid(conn));
+    } else {
+        qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "qdr_tcp_first_attach: 
no link context");
+        assert(false);
+    }
 }
 
 static void qdr_tcp_connection_copy_reply_to(qdr_tcp_connection_t* tc, 
qd_iterator_t* reply_to)
@@ -886,11 +904,10 @@ static void 
qdr_tcp_connection_copy_global_id(qdr_tcp_connection_t* tc, qd_itera
 static void qdr_tcp_second_attach(void *context, qdr_link_t *link,
                                   qdr_terminus_t *source, qdr_terminus_t 
*target)
 {
-    qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "qdr_tcp_second_attach");
-    
     void* link_context = qdr_link_get_context(link);
     if (link_context) {
         qdr_tcp_connection_t* tc = (qdr_tcp_connection_t*) link_context;
+        qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, 
"[C%"PRIu64"][L%"PRIu64"] qdr_tcp_second_attach", tc->conn_id, 
qdr_tcp_conn_linkid(tc));
         if (qdr_link_direction(link) == QD_OUTGOING) {
             if (tc->ingress) {
                 qdr_tcp_connection_copy_reply_to(tc, 
qdr_terminus_get_address(source));
@@ -906,101 +923,142 @@ static void qdr_tcp_second_attach(void *context, 
qdr_link_t *link,
             //have the link to send messages over
             grant_read_buffers(tc);
         }
+    } else {
+        qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "qdr_tcp_second_attach: 
no link context");
+        assert(false);
     }
 }
 
 
 static void qdr_tcp_detach(void *context, qdr_link_t *link, qdr_error_t 
*error, bool first, bool close)
 {
-    qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "qdr_tcp_detach");
+    qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "qdr_tcp_detach");
     assert(false);
 }
 
 
 static void qdr_tcp_flow(void *context, qdr_link_t *link, int credit)
 {
-    qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "qdr_tcp_flow");
     void* link_context = qdr_link_get_context(link);
     if (link_context) {
         qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) link_context;
         if (!conn->flow_enabled && credit > 0) {
             conn->flow_enabled = true;
-            qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, 
"[C%"PRIu64"][L%"PRIu64"] Flow enabled", conn->conn_id, conn->outgoing_id);
+            qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, 
"[C%"PRIu64"][L%"PRIu64"] qdr_tcp_flow: Flow enabled, credit=%d",
+                   conn->conn_id, conn->outgoing_id, credit);
             handle_incoming(conn);
+        } else {
+            qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, 
"[C%"PRIu64"][L%"PRIu64"] qdr_tcp_flow: No action. enabled:%s, credit:%d",
+                   conn->conn_id, qdr_tcp_conn_linkid(conn), 
conn->flow_enabled?"T":"F", credit);
         }
+    } else {
+        qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "qdr_tcp_flow: no link 
context");
+        assert(false);
     }
 }
 
 
 static void qdr_tcp_offer(void *context, qdr_link_t *link, int delivery_count)
 {
-    qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "qdr_tcp_offer");
+    void* link_context = qdr_link_get_context(link);
+    if (link_context) {
+        qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) link_context;
+        qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, 
"[C%"PRIu64"][L%"PRIu64"] qdr_tcp_offer: NOOP", conn->conn_id, 
qdr_tcp_conn_linkid(conn));
+    } else {
+        qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "qdr_tcp_offer: no link 
context");
+        assert(false);
+    }
+
 }
 
 
 static void qdr_tcp_drained(void *context, qdr_link_t *link)
 {
-    qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "qdr_tcp_drained");
+    void* link_context = qdr_link_get_context(link);
+    if (link_context) {
+        qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) link_context;
+        qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, 
"[C%"PRIu64"][L%"PRIu64"] qdr_tcp_drained: NOOP", conn->conn_id, 
qdr_tcp_conn_linkid(conn));
+    } else {
+        qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "qdr_tcp_drained: no 
link context");
+        assert(false);
+    }
 }
 
 
 static void qdr_tcp_drain(void *context, qdr_link_t *link, bool mode)
 {
-    qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "qdr_tcp_drained");
+    void* link_context = qdr_link_get_context(link);
+    if (link_context) {
+        qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) link_context;
+        qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, 
"[C%"PRIu64"][L%"PRIu64"] qdr_tcp_drain: NOOP", conn->conn_id, 
qdr_tcp_conn_linkid(conn));
+    } else {
+        qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "qdr_tcp_drain: no link 
context");
+        assert(false);
+    }
 }
 
 
 static int qdr_tcp_push(void *context, qdr_link_t *link, int limit)
 {
-    qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "qdr_tcp_push");
-    return qdr_link_process_deliveries(tcp_adaptor->core, link, limit);
+    void* link_context = qdr_link_get_context(link);
+    if (link_context) {
+        qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) link_context;
+        qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, 
"[C%"PRIu64"][L%"PRIu64"] qdr_tcp_push", conn->conn_id, 
qdr_tcp_conn_linkid(conn));
+        return qdr_link_process_deliveries(tcp_adaptor->core, link, limit);
+    } else {
+        qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "qdr_tcp_push: no link 
context");
+        assert(false);
+        return 0;
+    }
 }
 
 
 static uint64_t qdr_tcp_deliver(void *context, qdr_link_t *link, 
qdr_delivery_t *delivery, bool settled)
 {
-    qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "qdr_tcp_deliver");
     void* link_context = qdr_link_get_context(link);
     if (link_context) {
-            qdr_tcp_connection_t* tc = (qdr_tcp_connection_t*) link_context;
-            qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, 
"[C%"PRIu64"][L%"PRIu64"] Delivery event", tc->conn_id, tc->outgoing_id);
-            if (tc->egress_dispatcher) {
-                qdr_tcp_connection_egress(&(tc->config), tc->server, delivery);
-            } else if (!tc->outstream) {
-                tc->outstream = delivery;
-                qdr_delivery_incref(delivery, "tcp_adaptor - new outstream");
-                if (!tc->ingress) {
-                    //on egress, can only set up link for the reverse
-                    //direction once we receive the first part of the
-                    //message from client to server
-                    qd_message_t *msg = qdr_delivery_message(delivery);
-                    qd_iterator_t *f_iter = qd_message_field_iterator(msg, 
QD_FIELD_SUBJECT);
-                    qdr_tcp_connection_copy_global_id(tc, f_iter);
-                    qd_iterator_free(f_iter);
-                    f_iter = qd_message_field_iterator(msg, QD_FIELD_REPLY_TO);
-                    qdr_tcp_connection_copy_reply_to(tc, f_iter);
-                    qd_iterator_free(f_iter);
-                    qdr_terminus_t *target = qdr_terminus(0);
-                    qdr_terminus_set_address(target, tc->reply_to);
-                    tc->incoming = qdr_link_first_attach(tc->conn,
-                                                         QD_INCOMING,
-                                                         qdr_terminus(0),  
//qdr_terminus_t   *source,
-                                                         target, 
//qdr_terminus_t   *target,
-                                                         "tcp.egress.in",  
//const char       *name,
-                                                         0,                
//const char       *terminus_addr,
-                                                         false,
-                                                         NULL,
-                                                         &(tc->incoming_id));
-                    qdr_link_set_context(tc->incoming, tc);
-                    //add this connection to those visible through management 
now that we have the global_id
-                    qdr_action_t *action = 
qdr_action(qdr_add_tcp_connection_CT, "add_tcp_connection");
-                    action->args.general.context_1 = tc;
-                    qdr_action_enqueue(tcp_adaptor->core, action);
-
-                    handle_incoming(tc);
-                }
+        qdr_tcp_connection_t* tc = (qdr_tcp_connection_t*) link_context;
+        qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, 
"[C%"PRIu64"][L%"PRIu64"] qdr_tcp_deliver Delivery event", tc->conn_id, 
tc->outgoing_id);
+        if (tc->egress_dispatcher) {
+            qdr_tcp_connection_egress(&(tc->config), tc->server, delivery);
+        } else if (!tc->outstream) {
+            tc->outstream = delivery;
+            qdr_delivery_incref(delivery, "tcp_adaptor - new outstream");
+            if (!tc->ingress) {
+                //on egress, can only set up link for the reverse
+                //direction once we receive the first part of the
+                //message from client to server
+                qd_message_t *msg = qdr_delivery_message(delivery);
+                qd_iterator_t *f_iter = qd_message_field_iterator(msg, 
QD_FIELD_SUBJECT);
+                qdr_tcp_connection_copy_global_id(tc, f_iter);
+                qd_iterator_free(f_iter);
+                f_iter = qd_message_field_iterator(msg, QD_FIELD_REPLY_TO);
+                qdr_tcp_connection_copy_reply_to(tc, f_iter);
+                qd_iterator_free(f_iter);
+                qdr_terminus_t *target = qdr_terminus(0);
+                qdr_terminus_set_address(target, tc->reply_to);
+                tc->incoming = qdr_link_first_attach(tc->conn,
+                                                        QD_INCOMING,
+                                                        qdr_terminus(0),  
//qdr_terminus_t   *source,
+                                                        target, 
//qdr_terminus_t   *target,
+                                                        "tcp.egress.in",  
//const char       *name,
+                                                        0,                
//const char       *terminus_addr,
+                                                        false,
+                                                        NULL,
+                                                        &(tc->incoming_id));
+                qdr_link_set_context(tc->incoming, tc);
+                //add this connection to those visible through management now 
that we have the global_id
+                qdr_action_t *action = qdr_action(qdr_add_tcp_connection_CT, 
"add_tcp_connection");
+                action->args.general.context_1 = tc;
+                qdr_action_enqueue(tcp_adaptor->core, action);
+
+                handle_incoming(tc);
             }
-            handle_outgoing(tc);
+        }
+        handle_outgoing(tc);
+    } else {
+        qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "qdr_tcp_deliver: no 
link context");
+        assert(false);
     }
     return 0;
 }
@@ -1008,19 +1066,25 @@ static uint64_t qdr_tcp_deliver(void *context, 
qdr_link_t *link, qdr_delivery_t
 
 static int qdr_tcp_get_credit(void *context, qdr_link_t *link)
 {
-    qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "qdr_tcp_get_credit");
+    void* link_context = qdr_link_get_context(link);
+    if (link_context) {
+        qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) link_context;
+        qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, 
"[C%"PRIu64"][L%"PRIu64"] qdr_tcp_get_credit: NOOP", conn->conn_id, 
qdr_tcp_conn_linkid(conn));
+    } else {
+        qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "qdr_tcp_get_credit: no 
link context");
+        assert(false);
+    }
     return 10;
 }
 
 
 static void qdr_tcp_delivery_update(void *context, qdr_delivery_t *dlv, 
uint64_t disp, bool settled)
 {
-    qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "qdr_tcp_delivery_update");
     void* link_context = qdr_link_get_context(qdr_delivery_link(dlv));
     if (link_context) {
         qdr_tcp_connection_t* tc = (qdr_tcp_connection_t*) link_context;
-        qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] Delivery 
update disp: %"PRIu64", settled: %s",
-               tc->conn_id, disp, settled ? "true" : "false");
+        qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, 
"[C%"PRIu64"][L%"PRIu64"] qdr_tcp_delivery_update: disp: %"PRIu64", settled: 
%s",
+               tc->conn_id, qdr_tcp_conn_linkid(tc), disp, settled ? "true" : 
"false");
 
         //
         // If one of the streaming deliveries is ever settled, the connection 
must be torn down.
@@ -1028,28 +1092,45 @@ static void qdr_tcp_delivery_update(void *context, 
qdr_delivery_t *dlv, uint64_t
         if (settled) {
             pn_raw_connection_close(tc->socket);
         }
+    } else {
+        qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, 
"qdr_tcp_delivery_update: no link context");
+        assert(false);
     }
 }
 
 
 static void qdr_tcp_conn_close(void *context, qdr_connection_t *conn, 
qdr_error_t *error)
 {
-    qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "qdr_tcp_conn_close");
+    void *tcontext = qdr_connection_get_context(conn);
+    if (tcontext) {
+        qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) tcontext;
+        qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, 
"[C%"PRIu64"][L%"PRIu64"] qdr_tcp_conn_close: NOOP", conn->conn_id, 
qdr_tcp_conn_linkid(conn));
+    } else {
+        qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "qdr_tcp_conn_close: no 
connection context");
+        assert(false);
+    }
 }
 
 
 static void qdr_tcp_conn_trace(void *context, qdr_connection_t *conn, bool 
trace)
 {
-    qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "qdr_tcp_conn_trace");
+    void *tcontext = qdr_connection_get_context(conn);
+    if (tcontext) {
+        qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) tcontext;
+        qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, 
"[C%"PRIu64"][L%"PRIu64"] qdr_tcp_conn_trace: NOOP", conn->conn_id, 
qdr_tcp_conn_linkid(conn));
+    } else {
+        qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "qdr_tcp_conn_trace: no 
connection context");
+        assert(false);
+    }
 }
 
 static void qdr_tcp_activate(void *notused, qdr_connection_t *c)
 {
-    qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "qdr_tcp_activate");
     void *context = qdr_connection_get_context(c);
     if (context) {
         qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) context;
         if (conn->socket) {
+            qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] 
qdr_tcp_activate: waking raw connection", conn->conn_id);
             pn_raw_connection_wake(conn->socket);
         } else if (conn->activate_timer) {
             // On egress, the raw connection is only created once the
@@ -1058,10 +1139,14 @@ static void qdr_tcp_activate(void *notused, 
qdr_connection_t *c)
             // received. Prior to that however a subscribing link (and
             // its associated connection must be setup), for which we
             // fake wakeup by using a timer.
+            qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] 
qdr_tcp_activate: schedule activate_timer", conn->conn_id);
             qd_timer_schedule(conn->activate_timer, 0);
         } else {
-            qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "[C%"PRIu64"] Cannot 
activate", conn->conn_id);
+            qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "[C%"PRIu64"] 
qdr_tcp_activate: Cannot activate", conn->conn_id);
         }
+    } else {
+        qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "qdr_tcp_activate: no 
connection context");
+        // assert(false); This is routine. TODO: Is that a problem?
     }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to