This is an automated email from the ASF dual-hosted git repository.

tross pushed a commit to branch dev-protocol-adaptors-2
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git

commit 27ac4be023ec8f7f69cf00bbf3441ee408240e86
Author: Ted Ross <tr...@apache.org>
AuthorDate: Thu Nov 12 14:02:40 2020 -0500

    DISPATCH-1826 - Drop raw connections when stream messages are settled.
---
 src/adaptors/tcp_adaptor.c | 30 +++++++++++++++++++++++++++---
 1 file changed, 27 insertions(+), 3 deletions(-)

diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c
index c585553..742a42c 100644
--- a/src/adaptors/tcp_adaptor.c
+++ b/src/adaptors/tcp_adaptor.c
@@ -490,6 +490,7 @@ static void handle_connection_event(pn_event_t *e, 
qd_server_t *qd_server, void
             conn->opened_time = tcp_adaptor->core->uptime_ticks;
             qd_log(log, QD_LOG_INFO, "[C%"PRIu64"] Connected", conn->conn_id);
             while (qdr_connection_process(conn->conn)) {}
+            handle_outgoing(conn);
             break;
         }
     }
@@ -866,6 +867,7 @@ static void qdr_tcp_first_attach(void *context, 
qdr_connection_t *conn, qdr_link
                                  qdr_terminus_t *source, qdr_terminus_t 
*target,
                                  qd_session_class_t session_class)
 {
+    qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "qdr_tcp_first_attach");
 }
 
 static void qdr_tcp_connection_copy_reply_to(qdr_tcp_connection_t* tc, 
qd_iterator_t* reply_to)
@@ -883,6 +885,8 @@ static void 
qdr_tcp_connection_copy_global_id(qdr_tcp_connection_t* tc, qd_itera
 static void qdr_tcp_second_attach(void *context, qdr_link_t *link,
                                   qdr_terminus_t *source, qdr_terminus_t 
*target)
 {
+    qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "qdr_tcp_second_attach");
+    
     void* link_context = qdr_link_get_context(link);
     if (link_context) {
         qdr_tcp_connection_t* tc = (qdr_tcp_connection_t*) link_context;
@@ -907,11 +911,14 @@ static void qdr_tcp_second_attach(void *context, 
qdr_link_t *link,
 
 static void qdr_tcp_detach(void *context, qdr_link_t *link, qdr_error_t 
*error, bool first, bool close)
 {
+    qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "qdr_tcp_detach");
+    assert(false);
 }
 
 
 static void qdr_tcp_flow(void *context, qdr_link_t *link, int credit)
 {
+    qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "qdr_tcp_flow");
     void* link_context = qdr_link_get_context(link);
     if (link_context) {
         qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) link_context;
@@ -926,27 +933,32 @@ static void qdr_tcp_flow(void *context, qdr_link_t *link, 
int credit)
 
 static void qdr_tcp_offer(void *context, qdr_link_t *link, int delivery_count)
 {
+    qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "qdr_tcp_offer");
 }
 
 
 static void qdr_tcp_drained(void *context, qdr_link_t *link)
 {
+    qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "qdr_tcp_drained");
 }
 
 
 static void qdr_tcp_drain(void *context, qdr_link_t *link, bool mode)
 {
+    qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "qdr_tcp_drained");
 }
 
 
 static int qdr_tcp_push(void *context, qdr_link_t *link, int limit)
 {
+    qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "qdr_tcp_push");
     return qdr_link_process_deliveries(tcp_adaptor->core, link, limit);
 }
 
 
 static uint64_t qdr_tcp_deliver(void *context, qdr_link_t *link, 
qdr_delivery_t *delivery, bool settled)
 {
+    qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "qdr_tcp_deliver");
     void* link_context = qdr_link_get_context(link);
     if (link_context) {
             qdr_tcp_connection_t* tc = (qdr_tcp_connection_t*) link_context;
@@ -995,32 +1007,44 @@ static uint64_t qdr_tcp_deliver(void *context, 
qdr_link_t *link, qdr_delivery_t
 
 static int qdr_tcp_get_credit(void *context, qdr_link_t *link)
 {
+    qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "qdr_tcp_get_credit");
     return 10;
 }
 
 
 static void qdr_tcp_delivery_update(void *context, qdr_delivery_t *dlv, 
uint64_t disp, bool settled)
 {
+    qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "qdr_tcp_delivery_update");
     void* link_context = qdr_link_get_context(qdr_delivery_link(dlv));
     if (link_context) {
         qdr_tcp_connection_t* tc = (qdr_tcp_connection_t*) link_context;
-        qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] Delivery 
update", tc->conn_id);
+        qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] Delivery 
update disp: %"PRIu64", settled: %s",
+               tc->conn_id, disp, settled ? "true" : "false");
+
+        //
+        // If one of the streaming deliveries is ever settled, the connection 
must be torn down.
+        //
+        if (settled) {
+            pn_raw_connection_close(tc->socket);
+        }
     }
 }
 
 
 static void qdr_tcp_conn_close(void *context, qdr_connection_t *conn, 
qdr_error_t *error)
 {
-    
+    qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "qdr_tcp_conn_close");
 }
 
 
 static void qdr_tcp_conn_trace(void *context, qdr_connection_t *conn, bool 
trace)
 {
+    qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "qdr_tcp_conn_trace");
 }
 
 static void qdr_tcp_activate(void *notused, qdr_connection_t *c)
 {
+    qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "qdr_tcp_activate");
     void *context = qdr_connection_get_context(c);
     if (context) {
         qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) context;
@@ -1080,7 +1104,7 @@ static void qdr_tcp_adaptor_init(qdr_core_t *core, void 
**adaptor_context)
 
 static void qdr_tcp_adaptor_final(void *adaptor_context)
 {
-    qd_log(tcp_adaptor->log_source, QD_LOG_CRITICAL, "Shutting down TCP 
protocol adaptor");
+    qd_log(tcp_adaptor->log_source, QD_LOG_INFO, "Shutting down TCP protocol 
adaptor");
     qdr_tcp_adaptor_t *adaptor = (qdr_tcp_adaptor_t*) adaptor_context;
 
     qd_tcp_listener_t *tl = DEQ_HEAD(adaptor->listeners);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to