This is an automated email from the ASF dual-hosted git repository.

tross pushed a commit to branch dev-protocol-adaptors-2
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git

commit 3c933ecce6c97e47bf155845b8a275ad63e35182
Author: Ted Ross <tr...@apache.org>
AuthorDate: Wed Nov 11 09:19:33 2020 -0500

    DISPATCH_1829 - Patch from Gordon Sim
---
 src/adaptors/tcp_adaptor.c | 29 +++++++++++++++++++++++------
 1 file changed, 23 insertions(+), 6 deletions(-)

diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c
index d006d89..c585553 100644
--- a/src/adaptors/tcp_adaptor.c
+++ b/src/adaptors/tcp_adaptor.c
@@ -50,6 +50,7 @@ struct qdr_tcp_connection_t {
     qdr_delivery_t       *instream;
     qdr_delivery_t       *outstream;
     bool                  ingress;
+    bool                  flow_enabled;
     bool                  egress_dispatcher;
     bool                  connector_closed;//only used if 
egress_dispatcher=true
     qd_timer_t           *activate_timer;
@@ -131,10 +132,17 @@ static void grant_read_buffers(qdr_tcp_connection_t *conn)
 static int handle_incoming(qdr_tcp_connection_t *conn)
 {
     //
-    // Don't initiate an ingress stream message if we don't yet have a 
reply-to address.
+    // Don't initiate an ingress stream message if we don't yet have a 
reply-to address and credit.
     //
-    if (!conn->instream && conn->ingress && !conn->reply_to)
+    if (!conn->instream && ((conn->ingress && !conn->reply_to) || 
!conn->flow_enabled)) {
+        if (conn->ingress && !conn->reply_to) {
+            qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, 
"[C%"PRIu64"][L%"PRIu64"] Waiting for reply-to address to initiate message", 
conn->conn_id, conn->outgoing_id);
+        }
+        if (!conn->flow_enabled) {
+            qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, 
"[C%"PRIu64"][L%"PRIu64"] Waiting for credit to initiate message", 
conn->conn_id, conn->outgoing_id);
+        }
         return 0;
+    }
 
     qd_buffer_list_t buffers;
     DEQ_INIT(buffers);
@@ -351,12 +359,12 @@ static void handle_outgoing(qdr_tcp_connection_t *conn)
             conn->outgoing_buff_idx   = 0;
             conn->outgoing_buff_count = read_message_body(conn, msg, 
conn->outgoing_buffs, WRITE_BUFFERS);
 
-            if (conn->outgoing_buff_count == 0) {
-                // The incoming stream has no new data to send
-                break;
-            } else if (conn->outgoing_buff_count > 0) {
+            if (conn->outgoing_buff_count > 0) {
                 // Send the data just returned
                 read_more_body = write_outgoing_buffs(conn);
+            } else {
+                // The incoming stream has no new data to send
+                break;
             }
         }
 
@@ -904,6 +912,15 @@ static void qdr_tcp_detach(void *context, qdr_link_t 
*link, qdr_error_t *error,
 
 static void qdr_tcp_flow(void *context, qdr_link_t *link, int credit)
 {
+    void* link_context = qdr_link_get_context(link);
+    if (link_context) {
+        qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) link_context;
+        if (!conn->flow_enabled && credit > 0) {
+            conn->flow_enabled = true;
+            qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, 
"[C%"PRIu64"][L%"PRIu64"] Flow enabled", conn->conn_id, conn->outgoing_id);
+            handle_incoming(conn);
+        }
+    }
 }
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to