This is an automated email from the ASF dual-hosted git repository. tross pushed a commit to branch dev-protocol-adaptors-2 in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
commit 3c933ecce6c97e47bf155845b8a275ad63e35182 Author: Ted Ross <tr...@apache.org> AuthorDate: Wed Nov 11 09:19:33 2020 -0500 DISPATCH_1829 - Patch from Gordon Sim --- src/adaptors/tcp_adaptor.c | 29 +++++++++++++++++++++++------ 1 file changed, 23 insertions(+), 6 deletions(-) diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c index d006d89..c585553 100644 --- a/src/adaptors/tcp_adaptor.c +++ b/src/adaptors/tcp_adaptor.c @@ -50,6 +50,7 @@ struct qdr_tcp_connection_t { qdr_delivery_t *instream; qdr_delivery_t *outstream; bool ingress; + bool flow_enabled; bool egress_dispatcher; bool connector_closed;//only used if egress_dispatcher=true qd_timer_t *activate_timer; @@ -131,10 +132,17 @@ static void grant_read_buffers(qdr_tcp_connection_t *conn) static int handle_incoming(qdr_tcp_connection_t *conn) { // - // Don't initiate an ingress stream message if we don't yet have a reply-to address. + // Don't initiate an ingress stream message if we don't yet have a reply-to address and credit. // - if (!conn->instream && conn->ingress && !conn->reply_to) + if (!conn->instream && ((conn->ingress && !conn->reply_to) || !conn->flow_enabled)) { + if (conn->ingress && !conn->reply_to) { + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] Waiting for reply-to address to initiate message", conn->conn_id, conn->outgoing_id); + } + if (!conn->flow_enabled) { + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] Waiting for credit to initiate message", conn->conn_id, conn->outgoing_id); + } return 0; + } qd_buffer_list_t buffers; DEQ_INIT(buffers); @@ -351,12 +359,12 @@ static void handle_outgoing(qdr_tcp_connection_t *conn) conn->outgoing_buff_idx = 0; conn->outgoing_buff_count = read_message_body(conn, msg, conn->outgoing_buffs, WRITE_BUFFERS); - if (conn->outgoing_buff_count == 0) { - // The incoming stream has no new data to send - break; - } else if (conn->outgoing_buff_count > 0) { + if (conn->outgoing_buff_count > 0) { // Send the data just returned read_more_body = write_outgoing_buffs(conn); + } else { + // The incoming stream has no new data to send + break; } } @@ -904,6 +912,15 @@ static void qdr_tcp_detach(void *context, qdr_link_t *link, qdr_error_t *error, static void qdr_tcp_flow(void *context, qdr_link_t *link, int credit) { + void* link_context = qdr_link_get_context(link); + if (link_context) { + qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) link_context; + if (!conn->flow_enabled && credit > 0) { + conn->flow_enabled = true; + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] Flow enabled", conn->conn_id, conn->outgoing_id); + handle_incoming(conn); + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org