This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch dev-protocol-adaptors-2 in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/dev-protocol-adaptors-2 by this push: new 29f4929 DISPATCH-1851: TCP adaptor - add more connection and link IDs to log output 29f4929 is described below commit 29f49295b3e695211b07241651fc5375894d71fe Author: Chuck Rolke <c...@apache.org> AuthorDate: Tue Nov 24 09:40:29 2020 -0500 DISPATCH-1851: TCP adaptor - add more connection and link IDs to log output This closes #928 --- src/adaptors/tcp_adaptor.c | 201 ++++++++++++++++++++++++++++++++------------- 1 file changed, 143 insertions(+), 58 deletions(-) diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c index 7476808..fe8ee26 100644 --- a/src/adaptors/tcp_adaptor.c +++ b/src/adaptors/tcp_adaptor.c @@ -94,6 +94,12 @@ static void qdr_del_tcp_connection_CT(qdr_core_t *core, qdr_action_t *action, bo static void handle_disconnected(qdr_tcp_connection_t* conn); static void free_qdr_tcp_connection(qdr_tcp_connection_t* conn); +static inline uint64_t qdr_tcp_conn_linkid(const qdr_tcp_connection_t *conn) +{ + assert(conn); + return conn->instream ? conn->incoming_id : conn->outgoing_id; +} + static void on_activate(void *context) { qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) context; @@ -227,18 +233,23 @@ static void free_qdr_tcp_connection(qdr_tcp_connection_t* tc) static void handle_disconnected(qdr_tcp_connection_t* conn) { + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] handle_disconnected", conn->conn_id); if (conn->instream) { + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] handle_disconnected - close instream", conn->conn_id, conn->incoming_id); qd_message_set_receive_complete(qdr_delivery_message(conn->instream)); qdr_delivery_continue(tcp_adaptor->core, conn->instream, true); qdr_delivery_decref(tcp_adaptor->core, conn->instream, "tcp-adaptor.handle_disconnected"); } if (conn->outstream) { + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] handle_disconnected close outstream", conn->conn_id, conn->outgoing_id); qdr_delivery_decref(tcp_adaptor->core, conn->outstream, "tcp-adaptor.handle_disconnected"); } if (conn->incoming) { + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] handle_disconnected - detach incoming", conn->conn_id, conn->incoming_id); qdr_link_detach(conn->incoming, QD_LOST, 0); } if (conn->outgoing) { + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] handle_disconnected - detach outgoing", conn->conn_id, conn->outgoing_id); qdr_link_detach(conn->outgoing, QD_LOST, 0); } qdr_connection_closed(conn->conn); @@ -868,7 +879,14 @@ static void qdr_tcp_first_attach(void *context, qdr_connection_t *conn, qdr_link qdr_terminus_t *source, qdr_terminus_t *target, qd_session_class_t session_class) { - qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "qdr_tcp_first_attach"); + void *tcontext = qdr_connection_get_context(conn); + if (tcontext) { + qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) tcontext; + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] qdr_tcp_first_attach: NOOP", conn->conn_id, qdr_tcp_conn_linkid(conn)); + } else { + qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "qdr_tcp_first_attach: no link context"); + assert(false); + } } static void qdr_tcp_connection_copy_reply_to(qdr_tcp_connection_t* tc, qd_iterator_t* reply_to) @@ -886,11 +904,10 @@ static void qdr_tcp_connection_copy_global_id(qdr_tcp_connection_t* tc, qd_itera static void qdr_tcp_second_attach(void *context, qdr_link_t *link, qdr_terminus_t *source, qdr_terminus_t *target) { - qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "qdr_tcp_second_attach"); - void* link_context = qdr_link_get_context(link); if (link_context) { qdr_tcp_connection_t* tc = (qdr_tcp_connection_t*) link_context; + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] qdr_tcp_second_attach", tc->conn_id, qdr_tcp_conn_linkid(tc)); if (qdr_link_direction(link) == QD_OUTGOING) { if (tc->ingress) { qdr_tcp_connection_copy_reply_to(tc, qdr_terminus_get_address(source)); @@ -906,101 +923,142 @@ static void qdr_tcp_second_attach(void *context, qdr_link_t *link, //have the link to send messages over grant_read_buffers(tc); } + } else { + qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "qdr_tcp_second_attach: no link context"); + assert(false); } } static void qdr_tcp_detach(void *context, qdr_link_t *link, qdr_error_t *error, bool first, bool close) { - qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "qdr_tcp_detach"); + qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "qdr_tcp_detach"); assert(false); } static void qdr_tcp_flow(void *context, qdr_link_t *link, int credit) { - qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "qdr_tcp_flow"); void* link_context = qdr_link_get_context(link); if (link_context) { qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) link_context; if (!conn->flow_enabled && credit > 0) { conn->flow_enabled = true; - qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] Flow enabled", conn->conn_id, conn->outgoing_id); + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] qdr_tcp_flow: Flow enabled, credit=%d", + conn->conn_id, conn->outgoing_id, credit); handle_incoming(conn); + } else { + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] qdr_tcp_flow: No action. enabled:%s, credit:%d", + conn->conn_id, qdr_tcp_conn_linkid(conn), conn->flow_enabled?"T":"F", credit); } + } else { + qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "qdr_tcp_flow: no link context"); + assert(false); } } static void qdr_tcp_offer(void *context, qdr_link_t *link, int delivery_count) { - qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "qdr_tcp_offer"); + void* link_context = qdr_link_get_context(link); + if (link_context) { + qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) link_context; + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] qdr_tcp_offer: NOOP", conn->conn_id, qdr_tcp_conn_linkid(conn)); + } else { + qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "qdr_tcp_offer: no link context"); + assert(false); + } + } static void qdr_tcp_drained(void *context, qdr_link_t *link) { - qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "qdr_tcp_drained"); + void* link_context = qdr_link_get_context(link); + if (link_context) { + qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) link_context; + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] qdr_tcp_drained: NOOP", conn->conn_id, qdr_tcp_conn_linkid(conn)); + } else { + qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "qdr_tcp_drained: no link context"); + assert(false); + } } static void qdr_tcp_drain(void *context, qdr_link_t *link, bool mode) { - qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "qdr_tcp_drained"); + void* link_context = qdr_link_get_context(link); + if (link_context) { + qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) link_context; + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] qdr_tcp_drain: NOOP", conn->conn_id, qdr_tcp_conn_linkid(conn)); + } else { + qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "qdr_tcp_drain: no link context"); + assert(false); + } } static int qdr_tcp_push(void *context, qdr_link_t *link, int limit) { - qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "qdr_tcp_push"); - return qdr_link_process_deliveries(tcp_adaptor->core, link, limit); + void* link_context = qdr_link_get_context(link); + if (link_context) { + qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) link_context; + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] qdr_tcp_push", conn->conn_id, qdr_tcp_conn_linkid(conn)); + return qdr_link_process_deliveries(tcp_adaptor->core, link, limit); + } else { + qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "qdr_tcp_push: no link context"); + assert(false); + return 0; + } } static uint64_t qdr_tcp_deliver(void *context, qdr_link_t *link, qdr_delivery_t *delivery, bool settled) { - qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "qdr_tcp_deliver"); void* link_context = qdr_link_get_context(link); if (link_context) { - qdr_tcp_connection_t* tc = (qdr_tcp_connection_t*) link_context; - qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] Delivery event", tc->conn_id, tc->outgoing_id); - if (tc->egress_dispatcher) { - qdr_tcp_connection_egress(&(tc->config), tc->server, delivery); - } else if (!tc->outstream) { - tc->outstream = delivery; - qdr_delivery_incref(delivery, "tcp_adaptor - new outstream"); - if (!tc->ingress) { - //on egress, can only set up link for the reverse - //direction once we receive the first part of the - //message from client to server - qd_message_t *msg = qdr_delivery_message(delivery); - qd_iterator_t *f_iter = qd_message_field_iterator(msg, QD_FIELD_SUBJECT); - qdr_tcp_connection_copy_global_id(tc, f_iter); - qd_iterator_free(f_iter); - f_iter = qd_message_field_iterator(msg, QD_FIELD_REPLY_TO); - qdr_tcp_connection_copy_reply_to(tc, f_iter); - qd_iterator_free(f_iter); - qdr_terminus_t *target = qdr_terminus(0); - qdr_terminus_set_address(target, tc->reply_to); - tc->incoming = qdr_link_first_attach(tc->conn, - QD_INCOMING, - qdr_terminus(0), //qdr_terminus_t *source, - target, //qdr_terminus_t *target, - "tcp.egress.in", //const char *name, - 0, //const char *terminus_addr, - false, - NULL, - &(tc->incoming_id)); - qdr_link_set_context(tc->incoming, tc); - //add this connection to those visible through management now that we have the global_id - qdr_action_t *action = qdr_action(qdr_add_tcp_connection_CT, "add_tcp_connection"); - action->args.general.context_1 = tc; - qdr_action_enqueue(tcp_adaptor->core, action); - - handle_incoming(tc); - } + qdr_tcp_connection_t* tc = (qdr_tcp_connection_t*) link_context; + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] qdr_tcp_deliver Delivery event", tc->conn_id, tc->outgoing_id); + if (tc->egress_dispatcher) { + qdr_tcp_connection_egress(&(tc->config), tc->server, delivery); + } else if (!tc->outstream) { + tc->outstream = delivery; + qdr_delivery_incref(delivery, "tcp_adaptor - new outstream"); + if (!tc->ingress) { + //on egress, can only set up link for the reverse + //direction once we receive the first part of the + //message from client to server + qd_message_t *msg = qdr_delivery_message(delivery); + qd_iterator_t *f_iter = qd_message_field_iterator(msg, QD_FIELD_SUBJECT); + qdr_tcp_connection_copy_global_id(tc, f_iter); + qd_iterator_free(f_iter); + f_iter = qd_message_field_iterator(msg, QD_FIELD_REPLY_TO); + qdr_tcp_connection_copy_reply_to(tc, f_iter); + qd_iterator_free(f_iter); + qdr_terminus_t *target = qdr_terminus(0); + qdr_terminus_set_address(target, tc->reply_to); + tc->incoming = qdr_link_first_attach(tc->conn, + QD_INCOMING, + qdr_terminus(0), //qdr_terminus_t *source, + target, //qdr_terminus_t *target, + "tcp.egress.in", //const char *name, + 0, //const char *terminus_addr, + false, + NULL, + &(tc->incoming_id)); + qdr_link_set_context(tc->incoming, tc); + //add this connection to those visible through management now that we have the global_id + qdr_action_t *action = qdr_action(qdr_add_tcp_connection_CT, "add_tcp_connection"); + action->args.general.context_1 = tc; + qdr_action_enqueue(tcp_adaptor->core, action); + + handle_incoming(tc); } - handle_outgoing(tc); + } + handle_outgoing(tc); + } else { + qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "qdr_tcp_deliver: no link context"); + assert(false); } return 0; } @@ -1008,19 +1066,25 @@ static uint64_t qdr_tcp_deliver(void *context, qdr_link_t *link, qdr_delivery_t static int qdr_tcp_get_credit(void *context, qdr_link_t *link) { - qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "qdr_tcp_get_credit"); + void* link_context = qdr_link_get_context(link); + if (link_context) { + qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) link_context; + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] qdr_tcp_get_credit: NOOP", conn->conn_id, qdr_tcp_conn_linkid(conn)); + } else { + qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "qdr_tcp_get_credit: no link context"); + assert(false); + } return 10; } static void qdr_tcp_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t disp, bool settled) { - qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "qdr_tcp_delivery_update"); void* link_context = qdr_link_get_context(qdr_delivery_link(dlv)); if (link_context) { qdr_tcp_connection_t* tc = (qdr_tcp_connection_t*) link_context; - qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] Delivery update disp: %"PRIu64", settled: %s", - tc->conn_id, disp, settled ? "true" : "false"); + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] qdr_tcp_delivery_update: disp: %"PRIu64", settled: %s", + tc->conn_id, qdr_tcp_conn_linkid(tc), disp, settled ? "true" : "false"); // // If one of the streaming deliveries is ever settled, the connection must be torn down. @@ -1028,28 +1092,45 @@ static void qdr_tcp_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t if (settled) { pn_raw_connection_close(tc->socket); } + } else { + qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "qdr_tcp_delivery_update: no link context"); + assert(false); } } static void qdr_tcp_conn_close(void *context, qdr_connection_t *conn, qdr_error_t *error) { - qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "qdr_tcp_conn_close"); + void *tcontext = qdr_connection_get_context(conn); + if (tcontext) { + qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) tcontext; + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] qdr_tcp_conn_close: NOOP", conn->conn_id, qdr_tcp_conn_linkid(conn)); + } else { + qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "qdr_tcp_conn_close: no connection context"); + assert(false); + } } static void qdr_tcp_conn_trace(void *context, qdr_connection_t *conn, bool trace) { - qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "qdr_tcp_conn_trace"); + void *tcontext = qdr_connection_get_context(conn); + if (tcontext) { + qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) tcontext; + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] qdr_tcp_conn_trace: NOOP", conn->conn_id, qdr_tcp_conn_linkid(conn)); + } else { + qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "qdr_tcp_conn_trace: no connection context"); + assert(false); + } } static void qdr_tcp_activate(void *notused, qdr_connection_t *c) { - qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "qdr_tcp_activate"); void *context = qdr_connection_get_context(c); if (context) { qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) context; if (conn->socket) { + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] qdr_tcp_activate: waking raw connection", conn->conn_id); pn_raw_connection_wake(conn->socket); } else if (conn->activate_timer) { // On egress, the raw connection is only created once the @@ -1058,10 +1139,14 @@ static void qdr_tcp_activate(void *notused, qdr_connection_t *c) // received. Prior to that however a subscribing link (and // its associated connection must be setup), for which we // fake wakeup by using a timer. + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] qdr_tcp_activate: schedule activate_timer", conn->conn_id); qd_timer_schedule(conn->activate_timer, 0); } else { - qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "[C%"PRIu64"] Cannot activate", conn->conn_id); + qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "[C%"PRIu64"] qdr_tcp_activate: Cannot activate", conn->conn_id); } + } else { + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "qdr_tcp_activate: no connection context"); + // assert(false); This is routine. TODO: Is that a problem? } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org