This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/master by this push: new c6b2055 DISPATCH-1947: TCP Adaptor flow control c6b2055 is described below commit c6b205574ed5925cde40d8f126902d28ff5a32e2 Author: Chuck Rolke <c...@apache.org> AuthorDate: Tue Feb 23 16:48:05 2021 -0500 DISPATCH-1947: TCP Adaptor flow control This closes #1056 --- src/adaptors/tcp_adaptor.c | 88 ++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 78 insertions(+), 10 deletions(-) diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c index 69bed1a..0b83123 100644 --- a/src/adaptors/tcp_adaptor.c +++ b/src/adaptors/tcp_adaptor.c @@ -17,6 +17,7 @@ * under the License. */ +#include "tcp_adaptor.h" #include <proton/condition.h> #include <proton/listener.h> #include <proton/netaddr.h> @@ -26,7 +27,6 @@ #include "qpid/dispatch/ctools.h" #include "qpid/dispatch/protocol_adaptor.h" #include "delivery.h" -#include "tcp_adaptor.h" #include <stdio.h> #include <inttypes.h> @@ -78,6 +78,9 @@ struct qdr_tcp_connection_t { int outgoing_buff_count; // number of buffers with data int outgoing_buff_idx; // first buffer with data + sys_atomic_t q2_restart; // signal to resume receive + bool q2_blocked; // stop reading from raw conn + DEQ_LINKS(qdr_tcp_connection_t); }; @@ -148,7 +151,37 @@ static void grant_read_buffers(qdr_tcp_connection_t *conn) } } -static int handle_incoming(qdr_tcp_connection_t *conn) + +// Per-message callback to resume receiving after Q2 is unblocked on the +// incoming link. +// This routine must be thread safe: the thread on which it is running +// is not an IO thread that owns the underlying pn_raw_conn. +// +void qdr_tcp_q2_unblocked_handler(const qd_alloc_safe_ptr_t context) +{ + qdr_tcp_connection_t *tc = (qdr_tcp_connection_t*)qd_alloc_deref_safe_ptr(&context); + if (tc == 0) { + // bad news. + assert(false); + return; + } + + // prevent the tc from being deleted while running: + sys_mutex_lock(tc->activation_lock); + + if (tc && tc->pn_raw_conn) { + sys_atomic_set(&tc->q2_restart, 1); + pn_raw_connection_wake(tc->pn_raw_conn); + } + + sys_mutex_unlock(tc->activation_lock); +} + + +// Fetch incoming raw incoming buffers from proton and pass them to +// existing delivery or create a new delivery. +// If close is pending then do not give more buffers to proton. +static int handle_incoming_impl(qdr_tcp_connection_t *conn, bool close_pending) { // // Don't initiate an ingress stream message if we don't yet have a reply-to address and credit. @@ -163,6 +196,16 @@ static int handle_incoming(qdr_tcp_connection_t *conn) return 0; } + // + // Don't read from proton if in Q2 holdoff + // + if (conn->q2_blocked) { + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] handle_incoming q2_blocked", conn->conn_id); + return 0; + } + + // Read all buffers available from proton. + // Collect buffers for ingress; free empty buffers. qd_buffer_list_t buffers; DEQ_INIT(buffers); pn_raw_buffer_t raw_buffers[READ_BUFFERS]; @@ -182,14 +225,20 @@ static int handle_incoming(qdr_tcp_connection_t *conn) } } } - qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] Took %zu read buffers", conn->conn_id, DEQ_SIZE(buffers)); qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] Freed %i read buffers", conn->conn_id, free_count); - grant_read_buffers(conn); + + // Only grant more buffers to proton for reading if close is not pending + if (!close_pending) { + grant_read_buffers(conn); + } if (conn->instream) { - // @TODO(kgiusti): handle Q2 block event: - qd_message_stream_data_append(qdr_delivery_message(conn->instream), &buffers, 0); + qd_message_stream_data_append(qdr_delivery_message(conn->instream), &buffers, &conn->q2_blocked); + if (conn->q2_blocked) { + // note: unit tests grep for this log! + qd_log(tcp_adaptor->log_source, QD_LOG_TRACE, "[C%"PRIu64"] client link blocked on Q2 limit", conn->conn_id); + } qdr_delivery_continue(tcp_adaptor->core, conn->instream, false); qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] Continuing message with %i bytes", conn->conn_id, conn->incoming_id, count); } else { @@ -230,6 +279,10 @@ static int handle_incoming(qdr_tcp_connection_t *conn) qd_message_compose_2(msg, props, false); qd_compose_free(props); + // set up message q2 unblocked callback handler + qd_alloc_safe_ptr_t conn_sp = QD_SAFE_PTR_INIT(conn); + qd_message_set_q2_unblocked_handler(msg, qdr_tcp_q2_unblocked_handler, conn_sp); + conn->instream = qdr_link_deliver(conn->incoming, msg, 0, false, 0, 0, 0, 0); qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] Initiating message with %i bytes", conn->conn_id, conn->incoming_id, count); } @@ -237,10 +290,16 @@ static int handle_incoming(qdr_tcp_connection_t *conn) } +static int handle_incoming(qdr_tcp_connection_t *conn) +{ + // Normal incoming runs with no close pending + return handle_incoming_impl(conn, false); +} + static void flush_outgoing_buffs(qdr_tcp_connection_t *conn) { // Flush buffers staged for writing to raw conn - // and free possible references to stream data objects. + // and release any references to stream data objects. if (conn->outgoing_buff_count > 0) { for (size_t i = conn->outgoing_buff_idx; i < conn->outgoing_buff_idx + conn->outgoing_buff_count; @@ -263,10 +322,10 @@ static void flush_outgoing_buffs(qdr_tcp_connection_t *conn) static void free_qdr_tcp_connection(qdr_tcp_connection_t* tc) { - qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] Freeing tcp_connection %p", tc->conn_id, (void*) tc); free(tc->reply_to); free(tc->remote_address); free(tc->global_id); + sys_atomic_destroy(&tc->q2_restart); if (tc->activate_timer) { qd_timer_free(tc->activate_timer); } @@ -278,7 +337,6 @@ static void free_qdr_tcp_connection(qdr_tcp_connection_t* tc) static void handle_disconnected(qdr_tcp_connection_t* conn) { - qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] handle_disconnected", conn->conn_id); if (conn->instream) { qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] handle_disconnected - close instream", conn->conn_id, conn->incoming_id); qd_message_set_receive_complete(qdr_delivery_message(conn->instream)); @@ -552,8 +610,8 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void switch (pn_event_type(e)) { case PN_RAW_CONNECTION_CONNECTED: { if (conn->ingress) { - qdr_tcp_connection_ingress_accept(conn); qd_log(log, QD_LOG_INFO, "[C%"PRIu64"] PN_RAW_CONNECTION_CONNECTED Ingress accepted to %s from %s (global_id=%s)", conn->conn_id, conn->config.host_port, conn->remote_address, conn->global_id); + qdr_tcp_connection_ingress_accept(conn); break; } else { conn->remote_address = get_address_string(conn->pn_raw_conn); @@ -569,6 +627,8 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void } case PN_RAW_CONNECTION_CLOSED_READ: { qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] PN_RAW_CONNECTION_CLOSED_READ", conn->conn_id); + conn->q2_blocked = false; + handle_incoming_impl(conn, true); conn->raw_closed_read = true; pn_raw_connection_close(conn->pn_raw_conn); break; @@ -601,6 +661,12 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void } case PN_RAW_CONNECTION_WAKE: { qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] PN_RAW_CONNECTION_WAKE", conn->conn_id); + if (sys_atomic_set(&conn->q2_restart, 0)) { + // note: unit tests grep for this log! + qd_log(log, QD_LOG_TRACE, "[C%"PRIu64"] client link unblocked from Q2 limit", conn->conn_id); + conn->q2_blocked = false; + handle_incoming(conn); + } while (qdr_connection_process(conn->qdr_conn)) {} break; } @@ -646,6 +712,7 @@ static qdr_tcp_connection_t *qdr_tcp_connection_ingress(qd_tcp_listener_t* liste tc->context.handler = &handle_connection_event; tc->config = listener->config; tc->server = listener->server; + sys_atomic_init(&tc->q2_restart, 0); tc->pn_raw_conn = pn_raw_connection(); pn_raw_connection_set_context(tc->pn_raw_conn, tc); //the following call will cause a PN_RAW_CONNECTION_CONNECTED @@ -739,6 +806,7 @@ static qdr_tcp_connection_t *qdr_tcp_connection_egress(qd_bridge_config_t *confi tc->context.handler = &handle_connection_event; tc->config = *config; tc->server = server; + sys_atomic_init(&tc->q2_restart, 0); tc->conn_id = qd_server_allocate_connection_id(tc->server); // --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org