This is an automated email from the ASF dual-hosted git repository. tross pushed a commit to branch dev-protocol-adaptors-2 in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
commit 27ac4be023ec8f7f69cf00bbf3441ee408240e86 Author: Ted Ross <tr...@apache.org> AuthorDate: Thu Nov 12 14:02:40 2020 -0500 DISPATCH-1826 - Drop raw connections when stream messages are settled. --- src/adaptors/tcp_adaptor.c | 30 +++++++++++++++++++++++++++--- 1 file changed, 27 insertions(+), 3 deletions(-) diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c index c585553..742a42c 100644 --- a/src/adaptors/tcp_adaptor.c +++ b/src/adaptors/tcp_adaptor.c @@ -490,6 +490,7 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void conn->opened_time = tcp_adaptor->core->uptime_ticks; qd_log(log, QD_LOG_INFO, "[C%"PRIu64"] Connected", conn->conn_id); while (qdr_connection_process(conn->conn)) {} + handle_outgoing(conn); break; } } @@ -866,6 +867,7 @@ static void qdr_tcp_first_attach(void *context, qdr_connection_t *conn, qdr_link qdr_terminus_t *source, qdr_terminus_t *target, qd_session_class_t session_class) { + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "qdr_tcp_first_attach"); } static void qdr_tcp_connection_copy_reply_to(qdr_tcp_connection_t* tc, qd_iterator_t* reply_to) @@ -883,6 +885,8 @@ static void qdr_tcp_connection_copy_global_id(qdr_tcp_connection_t* tc, qd_itera static void qdr_tcp_second_attach(void *context, qdr_link_t *link, qdr_terminus_t *source, qdr_terminus_t *target) { + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "qdr_tcp_second_attach"); + void* link_context = qdr_link_get_context(link); if (link_context) { qdr_tcp_connection_t* tc = (qdr_tcp_connection_t*) link_context; @@ -907,11 +911,14 @@ static void qdr_tcp_second_attach(void *context, qdr_link_t *link, static void qdr_tcp_detach(void *context, qdr_link_t *link, qdr_error_t *error, bool first, bool close) { + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "qdr_tcp_detach"); + assert(false); } static void qdr_tcp_flow(void *context, qdr_link_t *link, int credit) { + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "qdr_tcp_flow"); void* link_context = qdr_link_get_context(link); if (link_context) { qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) link_context; @@ -926,27 +933,32 @@ static void qdr_tcp_flow(void *context, qdr_link_t *link, int credit) static void qdr_tcp_offer(void *context, qdr_link_t *link, int delivery_count) { + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "qdr_tcp_offer"); } static void qdr_tcp_drained(void *context, qdr_link_t *link) { + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "qdr_tcp_drained"); } static void qdr_tcp_drain(void *context, qdr_link_t *link, bool mode) { + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "qdr_tcp_drained"); } static int qdr_tcp_push(void *context, qdr_link_t *link, int limit) { + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "qdr_tcp_push"); return qdr_link_process_deliveries(tcp_adaptor->core, link, limit); } static uint64_t qdr_tcp_deliver(void *context, qdr_link_t *link, qdr_delivery_t *delivery, bool settled) { + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "qdr_tcp_deliver"); void* link_context = qdr_link_get_context(link); if (link_context) { qdr_tcp_connection_t* tc = (qdr_tcp_connection_t*) link_context; @@ -995,32 +1007,44 @@ static uint64_t qdr_tcp_deliver(void *context, qdr_link_t *link, qdr_delivery_t static int qdr_tcp_get_credit(void *context, qdr_link_t *link) { + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "qdr_tcp_get_credit"); return 10; } static void qdr_tcp_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t disp, bool settled) { + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "qdr_tcp_delivery_update"); void* link_context = qdr_link_get_context(qdr_delivery_link(dlv)); if (link_context) { qdr_tcp_connection_t* tc = (qdr_tcp_connection_t*) link_context; - qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] Delivery update", tc->conn_id); + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] Delivery update disp: %"PRIu64", settled: %s", + tc->conn_id, disp, settled ? "true" : "false"); + + // + // If one of the streaming deliveries is ever settled, the connection must be torn down. + // + if (settled) { + pn_raw_connection_close(tc->socket); + } } } static void qdr_tcp_conn_close(void *context, qdr_connection_t *conn, qdr_error_t *error) { - + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "qdr_tcp_conn_close"); } static void qdr_tcp_conn_trace(void *context, qdr_connection_t *conn, bool trace) { + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "qdr_tcp_conn_trace"); } static void qdr_tcp_activate(void *notused, qdr_connection_t *c) { + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "qdr_tcp_activate"); void *context = qdr_connection_get_context(c); if (context) { qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) context; @@ -1080,7 +1104,7 @@ static void qdr_tcp_adaptor_init(qdr_core_t *core, void **adaptor_context) static void qdr_tcp_adaptor_final(void *adaptor_context) { - qd_log(tcp_adaptor->log_source, QD_LOG_CRITICAL, "Shutting down TCP protocol adaptor"); + qd_log(tcp_adaptor->log_source, QD_LOG_INFO, "Shutting down TCP protocol adaptor"); qdr_tcp_adaptor_t *adaptor = (qdr_tcp_adaptor_t*) adaptor_context; qd_tcp_listener_t *tl = DEQ_HEAD(adaptor->listeners); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org