This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/main by this push: new d288c21 DISPATCH-1878: Handle half-closed TCP connections without losing data d288c21 is described below commit d288c21fe5ecc7a2d78eaf1f38e973fce2356e45 Author: Chuck Rolke <c...@apache.org> AuthorDate: Fri Apr 23 09:27:15 2021 -0400 DISPATCH-1878: Handle half-closed TCP connections without losing data Restructure the code several ways to handle closed reads and closed writes without necessarily closing the whole connection and losing data in flight. This is a squashed and rebased commit of work during development and tracked under PR#1129. This closes #1129 --- src/adaptors/tcp_adaptor.c | 539 ++++++++++++++++++++++++++++++--------------- 1 file changed, 365 insertions(+), 174 deletions(-) diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c index eb10f8a..9fcf267 100644 --- a/src/adaptors/tcp_adaptor.c +++ b/src/adaptors/tcp_adaptor.c @@ -57,11 +57,14 @@ struct qdr_tcp_connection_t { qdr_delivery_t *outstream; bool ingress; bool flow_enabled; + bool incoming_started; bool egress_dispatcher; bool connector_closed;//only used if egress_dispatcher=true bool in_list; // This connection is in the adaptor's connections list - bool raw_closed_read; - bool raw_closed_write; + bool raw_closed_read; // proton event seen + bool raw_closed_write; // proton event seen or write_close called + bool raw_read_shutdown; // stream closed + bool read_eos_seen; qdr_delivery_t *initial_delivery; qd_timer_t *activate_timer; qd_bridge_config_t config; @@ -116,6 +119,26 @@ static inline uint64_t qdr_tcp_conn_linkid(const qdr_tcp_connection_t *conn) return conn->instream ? conn->incoming_id : conn->outgoing_id; } +static inline const char * qdr_link_direction_name(const qdr_link_t *link) +{ + assert(link); + return qdr_link_direction(link) == QD_OUTGOING ? "outgoing" : "incoming"; +} + +static inline const char * qdr_tcp_connection_role_name(const qdr_tcp_connection_t *tc) +{ + assert(tc); + return tc->ingress ? "listener" : "connector"; +} + +static const char * qdr_tcp_quadrant_id(const qdr_tcp_connection_t *tc, const qdr_link_t *link) +{ + if (tc->ingress) + return link->link_direction == QD_INCOMING ? "(listener incoming)" : "(listener outgoing)"; + else + return link->link_direction == QD_INCOMING ? "(connector incoming)" : "(connector outgoing)"; +} + static void on_activate(void *context) { qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) context; @@ -136,22 +159,22 @@ static void grant_read_buffers(qdr_tcp_connection_t *conn) pn_raw_buffer_t raw_buffers[READ_BUFFERS]; // Give proactor more read buffers for the socket - if (!pn_raw_connection_is_read_closed(conn->pn_raw_conn)) { - size_t desired = pn_raw_connection_read_buffers_capacity(conn->pn_raw_conn); - qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] Granted %zu read buffers", conn->conn_id, desired); - while (desired) { - size_t i; - for (i = 0; i < desired && i < READ_BUFFERS; ++i) { - qd_buffer_t *buf = qd_buffer(); - raw_buffers[i].bytes = (char*) qd_buffer_base(buf); - raw_buffers[i].capacity = qd_buffer_capacity(buf); - raw_buffers[i].size = 0; - raw_buffers[i].offset = 0; - raw_buffers[i].context = (uintptr_t) buf; - } - desired -= i; - pn_raw_connection_give_read_buffers(conn->pn_raw_conn, raw_buffers, i); + size_t desired = pn_raw_connection_read_buffers_capacity(conn->pn_raw_conn); + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, + "[C%"PRIu64"][L%"PRIu64"] Granting %zu to pn_raw_connection_give_read_buffers()", + conn->conn_id, conn->incoming_id, desired); + while (desired) { + size_t i; + for (i = 0; i < desired && i < READ_BUFFERS; ++i) { + qd_buffer_t *buf = qd_buffer(); + raw_buffers[i].bytes = (char*) qd_buffer_base(buf); + raw_buffers[i].capacity = qd_buffer_capacity(buf); + raw_buffers[i].size = 0; + raw_buffers[i].offset = 0; + raw_buffers[i].context = (uintptr_t) buf; } + desired -= i; + pn_raw_connection_give_read_buffers(conn->pn_raw_conn, raw_buffers, i); } } @@ -175,6 +198,9 @@ void qdr_tcp_q2_unblocked_handler(const qd_alloc_safe_ptr_t context) if (tc->pn_raw_conn) { sys_atomic_set(&tc->q2_restart, 1); + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, + "[C%"PRIu64"] q2 unblocked: call pn_raw_connection_wake()", + tc->conn_id); pn_raw_connection_wake(tc->pn_raw_conn); } @@ -182,36 +208,14 @@ void qdr_tcp_q2_unblocked_handler(const qd_alloc_safe_ptr_t context) } -// Fetch incoming raw incoming buffers from proton and pass them to -// existing delivery or create a new delivery. -// If close is pending then do not give more buffers to proton. -static int handle_incoming_impl(qdr_tcp_connection_t *conn, bool close_pending) +// Extract buffers and their bytes from raw connection. +// * Proton decides how many buffers are to be taken. +// * Buffers with no data are freed. +// * Buffers with data are appended to caller's buffers list. +// * Add received byte count to connection stats +// * Return the count of bytes in the buffers list +static int handle_incoming_raw_read(qdr_tcp_connection_t *conn, qd_buffer_list_t *buffers) { - // - // Don't initiate an ingress stream message if we don't yet have a reply-to address and credit. - // - if (!pn_raw_connection_is_read_closed(conn->pn_raw_conn) && !conn->instream && ((conn->ingress && !conn->reply_to) || !conn->flow_enabled)) { - if (conn->ingress && !conn->reply_to) { - qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] Waiting for reply-to address to initiate message", conn->conn_id, conn->outgoing_id); - } - if (!conn->flow_enabled) { - qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] Waiting for credit to initiate message", conn->conn_id, conn->outgoing_id); - } - return 0; - } - - // - // Don't read from proton if in Q2 holdoff - // - if (conn->q2_blocked) { - qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] handle_incoming q2_blocked", conn->conn_id); - return 0; - } - - // Read all buffers available from proton. - // Collect buffers for ingress; free empty buffers. - qd_buffer_list_t buffers; - DEQ_INIT(buffers); pn_raw_buffer_t raw_buffers[READ_BUFFERS]; size_t n; int count = 0; @@ -221,31 +225,75 @@ static int handle_incoming_impl(qdr_tcp_connection_t *conn, bool close_pending) qd_buffer_t *buf = (qd_buffer_t*) raw_buffers[i].context; qd_buffer_insert(buf, raw_buffers[i].size); count += raw_buffers[i].size; + + assert(raw_buffers[i].size == qd_buffer_size(buf)); if (raw_buffers[i].size > 0) { - DEQ_INSERT_TAIL(buffers, buf); + DEQ_INSERT_TAIL(*buffers, buf); } else { qd_buffer_free(buf); free_count++; } } } - qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] Took %zu read buffers", conn->conn_id, DEQ_SIZE(buffers)); - qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] Freed %i read buffers", conn->conn_id, free_count); - // Only grant more buffers to proton for reading if close is not pending - if (!close_pending) { - grant_read_buffers(conn); + if (count > 0) { + // account for any incoming bytes just read + conn->last_in_time = tcp_adaptor->core->uptime_ticks; + conn->bytes_in += count; } - if (conn->instream) { - qd_message_stream_data_append(qdr_delivery_message(conn->instream), &buffers, &conn->q2_blocked); - if (conn->q2_blocked) { - // note: unit tests grep for this log! - qd_log(tcp_adaptor->log_source, QD_LOG_TRACE, "[C%"PRIu64"] client link blocked on Q2 limit", conn->conn_id); - } - qdr_delivery_continue(tcp_adaptor->core, conn->instream, false); - qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] Continuing message with %i bytes", conn->conn_id, conn->incoming_id, count); - } else { + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, + "[C%"PRIu64"] pn_raw_connection_take_read_buffers() took %zu, freed %i", + conn->conn_id, DEQ_SIZE(*buffers), free_count); + + return count; +} + + +// Fetch incoming raw incoming buffers from proton and pass them to a delivery. +// Create a new delivery if necessary. +// Return number of bytes read from raw connection +static int handle_incoming(qdr_tcp_connection_t *conn, const char *msg) +{ + qd_log_source_t *log = tcp_adaptor->log_source; + + qd_log(log, QD_LOG_TRACE, + "[C%"PRIu64"][L%"PRIu64"] handle_incoming %s for %s connection. read_closed:%s, flow_enabled:%s", + conn->conn_id, conn->incoming_id, msg, + qdr_tcp_connection_role_name(conn), + conn->raw_closed_read ? "T" : "F", + conn->flow_enabled ? "T" : "F"); + + if (conn->raw_read_shutdown) { + // Drain all read buffers that may still be in the raw connection + qd_log(log, QD_LOG_TRACE, + "[C%"PRIu64"][L%"PRIu64"] handle_incoming %s for %s connection. drain read buffers", + conn->conn_id, conn->incoming_id, msg, + qdr_tcp_connection_role_name(conn)); + qd_buffer_list_t buffers; + DEQ_INIT(buffers); + handle_incoming_raw_read(conn, &buffers); + qd_buffer_list_free_buffers(&buffers); + return 0; + } + + // Don't initiate an ingress stream message + // if we don't yet have a reply-to address and credit. + if (conn->ingress && !conn->reply_to) { + qd_log(log, QD_LOG_DEBUG, + "[C%"PRIu64"][L%"PRIu64"] Waiting for reply-to address before initiating ingress stream message", + conn->conn_id, conn->incoming_id); + return 0; + } + if (!conn->flow_enabled) { + qd_log(log, QD_LOG_DEBUG, + "[C%"PRIu64"][L%"PRIu64"] Waiting for credit before initiating ingress stream message", + conn->conn_id, conn->incoming_id); + return 0; + } + + // Ensure existence of ingress stream message + if (!conn->instream) { qd_message_t *msg = qd_message(); qd_message_set_stream_annotation(msg, true); @@ -256,14 +304,19 @@ static int handle_incoming_impl(qdr_tcp_connection_t *conn, bool close_pending) qd_compose_insert_null(props); // user-id if (conn->ingress) { qd_compose_insert_string(props, conn->config.address); // to - qd_compose_insert_string(props, conn->global_id); // subject - qd_compose_insert_string(props, conn->reply_to); // reply-to - qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] Initiating ingress to: %s reply: %s", conn->conn_id, conn->incoming_id, conn->config.address, conn->reply_to); + qd_compose_insert_string(props, conn->global_id); // subject + qd_compose_insert_string(props, conn->reply_to); // reply-to + qd_log(log, QD_LOG_DEBUG, + "[C%"PRIu64"][L%"PRIu64"] Initiating ingress stream incoming link for %s connection to: %s reply: %s", + conn->conn_id, conn->incoming_id, qdr_tcp_connection_role_name(conn), + conn->config.address, conn->reply_to); } else { - qd_compose_insert_string(props, conn->reply_to); // to - qd_compose_insert_string(props, conn->global_id); // subject - qd_compose_insert_null(props); // reply-to - qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] Initiating egress to: %s", conn->conn_id, conn->incoming_id, conn->reply_to); + qd_compose_insert_string(props, conn->reply_to); // to + qd_compose_insert_string(props, conn->global_id); // subject + qd_compose_insert_null(props); // reply-to + qd_log(log, QD_LOG_DEBUG, + "[C%"PRIu64"][L%"PRIu64"] Initiating ingress stream incoming link for %s connection to: %s", + conn->conn_id, conn->incoming_id, qdr_tcp_connection_role_name(conn), conn->reply_to); } //qd_compose_insert_null(props); // correlation-id //qd_compose_insert_null(props); // content-type @@ -275,11 +328,6 @@ static int handle_incoming_impl(qdr_tcp_connection_t *conn, bool close_pending) //qd_compose_insert_null(props); // reply-to-group-id qd_compose_end_list(props); - if (count > 0) { - props = qd_compose(QD_PERFORMATIVE_BODY_DATA, props); - qd_compose_insert_binary_buffers(props, &buffers); - } - qd_message_compose_2(msg, props, false); qd_compose_free(props); @@ -288,18 +336,65 @@ static int handle_incoming_impl(qdr_tcp_connection_t *conn, bool close_pending) qd_message_set_q2_unblocked_handler(msg, qdr_tcp_q2_unblocked_handler, conn_sp); conn->instream = qdr_link_deliver(conn->incoming, msg, 0, false, 0, 0, 0, 0); - qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] Initiating message with %i bytes", conn->conn_id, conn->incoming_id, count); + + qd_log(log, QD_LOG_DEBUG, + "[C%"PRIu64"][L%"PRIu64"][D%"PRIu64"] Initiating ingress stream message with 0 bytes", + conn->conn_id, conn->incoming_id, conn->instream->delivery_id); + + conn->incoming_started = true; + } + + // Don't read from proton if in Q2 holdoff + if (conn->q2_blocked) { + qd_log(log, QD_LOG_DEBUG, + "[C%"PRIu64"] handle_incoming q2_blocked for %s connection", + conn->conn_id, qdr_tcp_connection_role_name(conn)); + return 0; } - return count; -} + // Read all buffers available from proton. + // Collect buffers for ingress; free empty buffers. + qd_buffer_list_t buffers; + DEQ_INIT(buffers); + int count = handle_incoming_raw_read(conn, &buffers); -static int handle_incoming(qdr_tcp_connection_t *conn) -{ - // Normal incoming runs with no close pending - return handle_incoming_impl(conn, false); + // Grant more buffers to proton for reading if read side is still open + if (!conn->raw_closed_read) { + // normal path - keep on processing + grant_read_buffers(conn); + } + + // Push the bytes just read into the streaming message + if (count > 0) { + qd_message_stream_data_append(qdr_delivery_message(conn->instream), &buffers, &conn->q2_blocked); + if (conn->q2_blocked) { + // note: unit tests grep for this log! + qd_log(log, QD_LOG_TRACE, + "[C%"PRIu64"][L%"PRIu64"] client link blocked on Q2 limit", + conn->conn_id, conn->incoming_id); + } + qdr_delivery_continue(tcp_adaptor->core, conn->instream, false); + qd_log(log, QD_LOG_DEBUG, + "[C%"PRIu64"][L%"PRIu64"] Continuing message with %i bytes", + conn->conn_id, conn->incoming_id, count); + } else { + assert (DEQ_SIZE(buffers) == 0); + } + + // Close the stream message if read side has closed + if (conn->raw_closed_read) { + qd_log(log, QD_LOG_DEBUG, + "[C%"PRIu64"][L%"PRIu64"] close instream delivery", + conn->conn_id, conn->incoming_id); + qd_message_set_receive_complete(qdr_delivery_message(conn->instream)); + qdr_delivery_continue(tcp_adaptor->core, conn->instream, true); + conn->raw_read_shutdown = true; + } + + return count; } + static void flush_outgoing_buffs(qdr_tcp_connection_t *conn) { // Flush buffers staged for writing to raw conn @@ -342,21 +437,29 @@ static void free_qdr_tcp_connection(qdr_tcp_connection_t* tc) static void handle_disconnected(qdr_tcp_connection_t* conn) { if (conn->instream) { - qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] handle_disconnected - close instream", conn->conn_id, conn->incoming_id); + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, + "[C%"PRIu64"][L%"PRIu64"] handle_disconnected - close instream", + conn->conn_id, conn->incoming_id); qd_message_set_receive_complete(qdr_delivery_message(conn->instream)); qdr_delivery_continue(tcp_adaptor->core, conn->instream, true); qdr_delivery_decref(tcp_adaptor->core, conn->instream, "tcp-adaptor.handle_disconnected - instream"); } if (conn->outstream) { - qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] handle_disconnected close outstream", conn->conn_id, conn->outgoing_id); + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, + "[C%"PRIu64"][L%"PRIu64"] handle_disconnected - close outstream", + conn->conn_id, conn->outgoing_id); qdr_delivery_decref(tcp_adaptor->core, conn->outstream, "tcp-adaptor.handle_disconnected - outstream"); } if (conn->incoming) { - qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] handle_disconnected - detach incoming", conn->conn_id, conn->incoming_id); + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, + "[C%"PRIu64"][L%"PRIu64"] handle_disconnected - detach incoming", + conn->conn_id, conn->incoming_id); qdr_link_detach(conn->incoming, QD_LOST, 0); } if (conn->outgoing) { - qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] handle_disconnected - detach outgoing", conn->conn_id, conn->outgoing_id); + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, + "[C%"PRIu64"][L%"PRIu64"] handle_disconnected - detach outgoing", + conn->conn_id, conn->outgoing_id); qdr_link_detach(conn->outgoing, QD_LOST, 0); } if (conn->qdr_conn) { @@ -393,9 +496,14 @@ static int read_message_body(qdr_tcp_connection_t *conn, qd_message_t *msg, pn_r } else { switch (stream_data_result) { case QD_MESSAGE_STREAM_DATA_NO_MORE: - qd_log(tcp_adaptor->log_source, QD_LOG_INFO, "[C%"PRIu64"] EOS", conn->conn_id); break; + qd_log(tcp_adaptor->log_source, QD_LOG_INFO, + "[C%"PRIu64"] EOS", conn->conn_id); + conn->read_eos_seen = true; + break; case QD_MESSAGE_STREAM_DATA_INVALID: - qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "[C%"PRIu64"] Invalid body data for streaming message", conn->conn_id); break; + qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, + "[C%"PRIu64"] Invalid body data for streaming message", conn->conn_id); + break; default: break; } @@ -459,11 +567,12 @@ static bool write_outgoing_buffs(qdr_tcp_connection_t *conn) bytes_written += conn->outgoing_buffs[conn->outgoing_buff_idx + i].size; } else { qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, - "[C%"PRIu64"] empty buffer can't be written (%"PRIu64" of %"PRIu64")", conn->conn_id, i+1, used); + "[C%"PRIu64"] empty buffer can't be written (%"PRIu64" of %"PRIu64")", + conn->conn_id, i+1, used); } } qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, - "[C%"PRIu64"] Writing %i bytes", conn->conn_id, bytes_written); + "[C%"PRIu64"] pn_raw_connection_write_buffers wrote %i bytes", conn->conn_id, bytes_written); conn->outgoing_buff_count -= used; conn->outgoing_buff_idx += used; @@ -501,8 +610,14 @@ static void handle_outgoing(qdr_tcp_connection_t *conn) } } - if (qd_message_receive_complete(msg) || qd_message_send_complete(msg)) { - pn_raw_connection_close(conn->pn_raw_conn); + if (conn->read_eos_seen) { + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, + "[C%"PRIu64"] handle_outgoing calling pn_raw_connection_write_close(). rcv_complete:%s, send_complete:%s", + conn->conn_id, qd_message_receive_complete(msg) ? "T" : "F", qd_message_send_complete(msg) ? "T" : "F"); + sys_mutex_lock(conn->activation_lock); + conn->raw_closed_write = true; + sys_mutex_unlock(conn->activation_lock); + pn_raw_connection_write_close(conn->pn_raw_conn); } } } @@ -600,8 +715,6 @@ static void qdr_tcp_connection_ingress_accept(qdr_tcp_connection_t* tc) tc->opened_time = tcp_adaptor->core->uptime_ticks; qdr_link_set_context(tc->incoming, tc); - grant_read_buffers(tc); - qdr_action_t *action = qdr_action(qdr_add_tcp_connection_CT, "add_tcp_connection"); action->args.general.context_1 = tc; qdr_action_enqueue(tcp_adaptor->core, action); @@ -614,13 +727,17 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void switch (pn_event_type(e)) { case PN_RAW_CONNECTION_CONNECTED: { if (conn->ingress) { - qd_log(log, QD_LOG_INFO, "[C%"PRIu64"] PN_RAW_CONNECTION_CONNECTED Ingress accepted to %s from %s (global_id=%s)", conn->conn_id, conn->config.host_port, conn->remote_address, conn->global_id); qdr_tcp_connection_ingress_accept(conn); + qd_log(log, QD_LOG_INFO, + "[C%"PRIu64"] PN_RAW_CONNECTION_CONNECTED Ingress accepted to %s from %s (global_id=%s)", + conn->conn_id, conn->config.host_port, conn->remote_address, conn->global_id); break; } else { conn->remote_address = get_address_string(conn->pn_raw_conn); conn->opened_time = tcp_adaptor->core->uptime_ticks; - qd_log(log, QD_LOG_INFO, "[C%"PRIu64"] PN_RAW_CONNECTION_CONNECTED Egress connected to %s", conn->conn_id, conn->remote_address); + qd_log(log, QD_LOG_INFO, + "[C%"PRIu64"] PN_RAW_CONNECTION_CONNECTED Egress connected to %s", + conn->conn_id, conn->remote_address); if (!!conn->initial_delivery) { qdr_tcp_open_server_side_connection(conn); } @@ -630,25 +747,28 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void } } case PN_RAW_CONNECTION_CLOSED_READ: { - qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] PN_RAW_CONNECTION_CLOSED_READ", conn->conn_id); - conn->q2_blocked = false; - handle_incoming_impl(conn, true); + qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] PN_RAW_CONNECTION_CLOSED_READ", + conn->conn_id, conn->incoming_id); sys_mutex_lock(conn->activation_lock); + conn->q2_blocked = false; conn->raw_closed_read = true; sys_mutex_unlock(conn->activation_lock); - pn_raw_connection_close(conn->pn_raw_conn); + handle_incoming(conn, "PNRC_CLOSED_READ"); break; } case PN_RAW_CONNECTION_CLOSED_WRITE: { - qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] PN_RAW_CONNECTION_CLOSED_WRITE", conn->conn_id); + qd_log(log, QD_LOG_DEBUG, + "[C%"PRIu64"] PN_RAW_CONNECTION_CLOSED_WRITE", + conn->conn_id); sys_mutex_lock(conn->activation_lock); conn->raw_closed_write = true; sys_mutex_unlock(conn->activation_lock); - pn_raw_connection_close(conn->pn_raw_conn); break; } case PN_RAW_CONNECTION_DISCONNECTED: { - qd_log(log, QD_LOG_INFO, "[C%"PRIu64"] PN_RAW_CONNECTION_DISCONNECTED", conn->conn_id); + qd_log(log, QD_LOG_INFO, + "[C%"PRIu64"] PN_RAW_CONNECTION_DISCONNECTED", + conn->conn_id); sys_mutex_lock(conn->activation_lock); conn->pn_raw_conn = 0; sys_mutex_unlock(conn->activation_lock); @@ -656,38 +776,53 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void break; } case PN_RAW_CONNECTION_NEED_WRITE_BUFFERS: { - qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] PN_RAW_CONNECTION_NEED_WRITE_BUFFERS", conn->conn_id); + qd_log(log, QD_LOG_DEBUG, + "[C%"PRIu64"] PN_RAW_CONNECTION_NEED_WRITE_BUFFERS", + conn->conn_id); while (qdr_connection_process(conn->qdr_conn)) {} handle_outgoing(conn); break; } case PN_RAW_CONNECTION_NEED_READ_BUFFERS: { - qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] PN_RAW_CONNECTION_NEED_READ_BUFFERS", conn->conn_id); + qd_log(log, QD_LOG_DEBUG, + "[C%"PRIu64"] PN_RAW_CONNECTION_NEED_READ_BUFFERS", + conn->conn_id); while (qdr_connection_process(conn->qdr_conn)) {} - handle_incoming(conn); + if (conn->incoming_started) { + grant_read_buffers(conn); + handle_incoming(conn, "PNRC_NEED_READ_BUFFERS"); + } break; } case PN_RAW_CONNECTION_WAKE: { - qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] PN_RAW_CONNECTION_WAKE", conn->conn_id); - sys_mutex_lock(conn->activation_lock); + qd_log(log, QD_LOG_DEBUG, + "[C%"PRIu64"] PN_RAW_CONNECTION_WAKE", + conn->conn_id); if (sys_atomic_set(&conn->q2_restart, 0)) { - sys_mutex_unlock(conn->activation_lock); - // note: unit tests grep for this log! - qd_log(log, QD_LOG_TRACE, "[C%"PRIu64"] client link unblocked from Q2 limit", conn->conn_id); + sys_mutex_lock(conn->activation_lock); conn->q2_blocked = false; - handle_incoming(conn); - } - else { sys_mutex_unlock(conn->activation_lock); + // note: unit tests grep for this log! + qd_log(log, QD_LOG_TRACE, + "[C%"PRIu64"] client link unblocked from Q2 limit", + conn->conn_id); + handle_incoming(conn, "PNRC_WAKE after Q2 unblock"); } while (qdr_connection_process(conn->qdr_conn)) {} break; } case PN_RAW_CONNECTION_READ: { - int read = handle_incoming(conn); - conn->last_in_time = tcp_adaptor->core->uptime_ticks; - conn->bytes_in += read; - qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] PN_RAW_CONNECTION_READ Read %i bytes. Total read %"PRIu64" bytes", conn->conn_id, read, conn->bytes_in); + qd_log(log, QD_LOG_DEBUG, + "[C%"PRIu64"] PN_RAW_CONNECTION_READ Event ", + conn->conn_id); + int read = 0; + if (conn->incoming_started) { + // Streaming message exists. Process read normally. + read = handle_incoming(conn, "PNRC_READ"); + } + qd_log(log, QD_LOG_DEBUG, + "[C%"PRIu64"] PN_RAW_CONNECTION_READ Read %i bytes. Total read %"PRIu64" bytes", + conn->conn_id, read, conn->bytes_in); while (qdr_connection_process(conn->qdr_conn)) {} break; } @@ -705,7 +840,9 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void } conn->last_out_time = tcp_adaptor->core->uptime_ticks; conn->bytes_out += written; - qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] PN_RAW_CONNECTION_WRITTEN Wrote %zu bytes. Total written %"PRIu64" bytes", conn->conn_id, written, conn->bytes_out); + qd_log(log, QD_LOG_DEBUG, + "[C%"PRIu64"] PN_RAW_CONNECTION_WRITTEN pn_raw_connection_take_written_buffers wrote %zu bytes. Total written %"PRIu64" bytes", + conn->conn_id, written, conn->bytes_out); while (qdr_connection_process(conn->qdr_conn)) {} break; } @@ -732,6 +869,7 @@ static qdr_tcp_connection_t *qdr_tcp_connection_ingress(qd_tcp_listener_t* liste //event on another thread, which is where the rest of the //initialisation will happen, through a call to //qdr_tcp_connection_ingress_accept + qd_log(tcp_adaptor->log_source, QD_LOG_INFO, "[C%"PRIu64"] call pn_listener_raw_accept()", tc->conn_id); pn_listener_raw_accept(listener->pn_listener, tc->pn_raw_conn); return tc; } @@ -742,21 +880,21 @@ static void qdr_tcp_open_server_side_connection(qdr_tcp_connection_t* tc) const char *host = tc->egress_dispatcher ? "egress-dispatch" : tc->config.host_port; qd_log(tcp_adaptor->log_source, QD_LOG_INFO, "[C%"PRIu64"] Opening server-side core connection %s", tc->conn_id, host); - qdr_connection_info_t *info = qdr_connection_info(false, //bool is_encrypted, - false, //bool is_authenticated, - true, //bool opened, - "", //char *sasl_mechanisms, + qdr_connection_info_t *info = qdr_connection_info(false, //bool is_encrypted, + false, //bool is_authenticated, + true, //bool opened, + "", //char *sasl_mechanisms, QD_OUTGOING, //qd_direction_t dir, - host, //const char *host, - "", //const char *ssl_proto, - "", //const char *ssl_cipher, - "", //const char *user, - "TcpAdaptor", //const char *container, - 0, //pn_data_t *connection_properties, - 0, //int ssl_ssf, - false, //bool ssl, - "", // peer router version, - false); // streaming links + host, //const char *host, + "", //const char *ssl_proto, + "", //const char *ssl_cipher, + "", //const char *user, + "TcpAdaptor",//const char *container, + 0, //pn_data_t *connection_properties, + 0, //int ssl_ssf, + false, //bool ssl, + "", // peer router version, + false); // streaming links qdr_connection_t *conn = qdr_connection_opened(tcp_adaptor->core, tcp_adaptor->adaptor, @@ -782,6 +920,12 @@ static void qdr_tcp_open_server_side_connection(qdr_tcp_connection_t* tc) // This attach passes the ownership of the delivery from the core-side connection and link // to the adaptor-side outgoing connection and link. + uint64_t i_conn_id = 0; + uint64_t i_link_id = 0; + if (!!tc->initial_delivery) { + i_conn_id = tc->initial_delivery->conn_id; + i_link_id = tc->initial_delivery->link_id; + } tc->outgoing = qdr_link_first_attach(conn, QD_OUTGOING, source, //qdr_terminus_t *source, @@ -792,8 +936,10 @@ static void qdr_tcp_open_server_side_connection(qdr_tcp_connection_t* tc) tc->initial_delivery, &(tc->outgoing_id)); if (!!tc->initial_delivery) { - qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, DLV_FMT" initial_delivery ownership passed to "DLV_FMT, - DLV_ARGS(tc->initial_delivery), tc->outgoing->conn_id, tc->outgoing->identity, tc->initial_delivery->delivery_id); + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, + DLV_FMT" initial_delivery ownership passed to "DLV_FMT, + i_conn_id, i_link_id, tc->initial_delivery->delivery_id, + tc->outgoing->conn_id, tc->outgoing->identity, tc->initial_delivery->delivery_id); qdr_delivery_decref(tcp_adaptor->core, tc->initial_delivery, "tcp-adaptor - passing initial_delivery into new link"); tc->initial_delivery = 0; } @@ -823,14 +969,17 @@ static qdr_tcp_connection_t *qdr_tcp_connection_egress(qd_bridge_config_t *confi tc->conn_id = qd_server_allocate_connection_id(tc->server); // - // If this is the egress dispatcher, set up the core connection now. Otherwise, set up a physical - // raw connection and wait until we are running in that connection's context to set up the core + // If this is the egress dispatcher, set up the core connection now. + // Otherwise, set up a physical raw connection and wait until we are + // running in that connection's context to set up the core // connection. // if (tc->egress_dispatcher) qdr_tcp_open_server_side_connection(tc); else { - qd_log(tcp_adaptor->log_source, QD_LOG_INFO, "[C%"PRIu64"] Connecting to: %s", tc->conn_id, tc->config.host_port); + qd_log(tcp_adaptor->log_source, QD_LOG_INFO, + "[C%"PRIu64"] call pn_proactor_raw_connect(). Egress connecting to: %s", + tc->conn_id, tc->config.host_port); tc->pn_raw_conn = pn_raw_connection(); pn_raw_connection_set_context(tc->pn_raw_conn, tc); pn_proactor_raw_connect(qd_server_proactor(tc->server), tc->pn_raw_conn, tc->config.host_port); @@ -900,7 +1049,7 @@ static void handle_listener_event(pn_event_t *e, qd_server_t *qd_server, void *c } case PN_LISTENER_ACCEPT: { - qd_log(log, QD_LOG_INFO, "PN_LISTENER_ACCEPT Accepting TCP connection on %s", host_port); + qd_log(log, QD_LOG_INFO, "PN_LISTENER_ACCEPT Accepting TCP connection to %s", host_port); qdr_tcp_connection_ingress(li); break; } @@ -977,7 +1126,9 @@ void qd_dispatch_delete_tcp_listener(qd_dispatch_t *qd, void *impl) pn_listener_close(li->pn_listener); } DEQ_REMOVE(tcp_adaptor->listeners, li); - qd_log(tcp_adaptor->log_source, QD_LOG_INFO, "Deleted TcpListener for %s, %s:%s", li->config.address, li->config.host, li->config.port); + qd_log(tcp_adaptor->log_source, QD_LOG_INFO, + "Deleted TcpListener for %s, %s:%s", + li->config.address, li->config.host, li->config.port); qd_tcp_listener_decref(li); } } @@ -1033,7 +1184,9 @@ void qd_dispatch_delete_tcp_connector(qd_dispatch_t *qd, void *impl) if (ct) { //need to close the pseudo-connection used for dispatching //deliveries out to live connnections: - qd_log(tcp_adaptor->log_source, QD_LOG_INFO, "Deleted TcpConnector for %s, %s:%s", ct->config.address, ct->config.host, ct->config.port); + qd_log(tcp_adaptor->log_source, QD_LOG_INFO, + "Deleted TcpConnector for %s, %s:%s", + ct->config.address, ct->config.host, ct->config.port); close_egress_dispatcher((qdr_tcp_connection_t*) ct->dispatcher); DEQ_REMOVE(tcp_adaptor->connectors, ct); qd_tcp_connector_decref(ct); @@ -1052,7 +1205,9 @@ static void qdr_tcp_first_attach(void *context, qdr_connection_t *conn, qdr_link void *tcontext = qdr_connection_get_context(conn); if (tcontext) { qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) tcontext; - qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] qdr_tcp_first_attach: NOOP", conn->conn_id, qdr_tcp_conn_linkid(conn)); + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, + "[C%"PRIu64"][L%"PRIu64"] qdr_tcp_first_attach: NOOP", + conn->conn_id, qdr_tcp_conn_linkid(conn)); } else { qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "qdr_tcp_first_attach: no link context"); assert(false); @@ -1078,21 +1233,23 @@ static void qdr_tcp_second_attach(void *context, qdr_link_t *link, if (link_context) { qdr_tcp_connection_t* tc = (qdr_tcp_connection_t*) link_context; if (qdr_link_direction(link) == QD_OUTGOING) { - qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] qdr_tcp_second_attach", tc->conn_id, tc->outgoing_id); + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, + "[C%"PRIu64"][L%"PRIu64"] %s qdr_tcp_second_attach", + tc->conn_id, tc->outgoing_id, + qdr_tcp_quadrant_id(tc, link)); if (tc->ingress) { qdr_tcp_connection_copy_reply_to(tc, qdr_terminus_get_address(source)); // for ingress, can start reading from socket once we have // a reply to address, as that is when we are able to send // out a message - grant_read_buffers(tc); - handle_incoming(tc); + handle_incoming(tc, "qdr_tcp_second_attach"); } qdr_link_flow(tcp_adaptor->core, link, 10, false); } else if (!tc->ingress) { - qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] qdr_tcp_second_attach", tc->conn_id, tc->incoming_id); - //for egress we can start reading from the socket once we - //have the link to send messages over - grant_read_buffers(tc); + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, + "[C%"PRIu64"][L%"PRIu64"] %s qdr_tcp_second_attach", + tc->conn_id, tc->incoming_id, + qdr_tcp_quadrant_id(tc, link)); } } else { qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "qdr_tcp_second_attach: no link context"); @@ -1115,11 +1272,13 @@ static void qdr_tcp_flow(void *context, qdr_link_t *link, int credit) qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) link_context; if (!conn->flow_enabled && credit > 0) { conn->flow_enabled = true; - qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] qdr_tcp_flow: Flow enabled, credit=%d", + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, + "[C%"PRIu64"][L%"PRIu64"] qdr_tcp_flow: Flow enabled, credit=%d", conn->conn_id, conn->outgoing_id, credit); - handle_incoming(conn); + handle_incoming(conn, "qdr_tcp_flow"); } else { - qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] qdr_tcp_flow: No action. enabled:%s, credit:%d", + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, + "[C%"PRIu64"][L%"PRIu64"] qdr_tcp_flow: No action. enabled:%s, credit:%d", conn->conn_id, qdr_tcp_conn_linkid(conn), conn->flow_enabled?"T":"F", credit); } } else { @@ -1134,7 +1293,9 @@ static void qdr_tcp_offer(void *context, qdr_link_t *link, int delivery_count) void* link_context = qdr_link_get_context(link); if (link_context) { qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) link_context; - qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] qdr_tcp_offer: NOOP", conn->conn_id, qdr_tcp_conn_linkid(conn)); + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, + "[C%"PRIu64"][L%"PRIu64"] qdr_tcp_offer: NOOP", + conn->conn_id, qdr_tcp_conn_linkid(conn)); } else { qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "qdr_tcp_offer: no link context"); assert(false); @@ -1148,7 +1309,9 @@ static void qdr_tcp_drained(void *context, qdr_link_t *link) void* link_context = qdr_link_get_context(link); if (link_context) { qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) link_context; - qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] qdr_tcp_drained: NOOP", conn->conn_id, qdr_tcp_conn_linkid(conn)); + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, + "[C%"PRIu64"][L%"PRIu64"] qdr_tcp_drained: NOOP", + conn->conn_id, qdr_tcp_conn_linkid(conn)); } else { qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "qdr_tcp_drained: no link context"); assert(false); @@ -1161,7 +1324,9 @@ static void qdr_tcp_drain(void *context, qdr_link_t *link, bool mode) void* link_context = qdr_link_get_context(link); if (link_context) { qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) link_context; - qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] qdr_tcp_drain: NOOP", conn->conn_id, qdr_tcp_conn_linkid(conn)); + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, + "[C%"PRIu64"][L%"PRIu64"] qdr_tcp_drain: NOOP", + conn->conn_id, qdr_tcp_conn_linkid(conn)); } else { qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "qdr_tcp_drain: no link context"); assert(false); @@ -1174,7 +1339,9 @@ static int qdr_tcp_push(void *context, qdr_link_t *link, int limit) void* link_context = qdr_link_get_context(link); if (link_context) { qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) link_context; - qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] qdr_tcp_push", conn->conn_id, qdr_tcp_conn_linkid(conn)); + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, + "[C%"PRIu64"][L%"PRIu64"] qdr_tcp_push", + conn->conn_id, qdr_tcp_conn_linkid(conn)); return qdr_link_process_deliveries(tcp_adaptor->core, link, limit); } else { qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "qdr_tcp_push: no link context"); @@ -1189,9 +1356,11 @@ static uint64_t qdr_tcp_deliver(void *context, qdr_link_t *link, qdr_delivery_t void* link_context = qdr_link_get_context(link); if (link_context) { qdr_tcp_connection_t* tc = (qdr_tcp_connection_t*) link_context; - qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, DLV_FMT" qdr_tcp_deliver Delivery event", DLV_ARGS(delivery)); + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, + DLV_FMT" qdr_tcp_deliver Delivery event", DLV_ARGS(delivery)); if (tc->egress_dispatcher) { - qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, DLV_FMT" tcp_adaptor initiating egress connection", DLV_ARGS(delivery)); + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, + DLV_FMT" tcp_adaptor initiating egress connection", DLV_ARGS(delivery)); qdr_tcp_connection_egress(&(tc->config), tc->server, delivery); return QD_DELIVERY_MOVED_TO_NEW_LINK; } else if (!tc->outstream) { @@ -1213,20 +1382,25 @@ static uint64_t qdr_tcp_deliver(void *context, qdr_link_t *link, qdr_delivery_t tc->incoming = qdr_link_first_attach(tc->qdr_conn, QD_INCOMING, qdr_terminus(0), //qdr_terminus_t *source, - target, //qdr_terminus_t *target, + target, //qdr_terminus_t *target, "tcp.egress.in", //const char *name, 0, //const char *terminus_addr, false, NULL, &(tc->incoming_id)); - qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] Create Link to %s", tc->conn_id, tc->incoming->identity, tc->reply_to); + assert(tc); + assert(tc->incoming); + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, + "[C%"PRIu64"][L%"PRIu64"] %s Created link to %s", + tc->conn_id, tc->incoming->identity, + qdr_tcp_quadrant_id(tc, tc->incoming), tc->reply_to); qdr_link_set_context(tc->incoming, tc); //add this connection to those visible through management now that we have the global_id qdr_action_t *action = qdr_action(qdr_add_tcp_connection_CT, "add_tcp_connection"); action->args.general.context_1 = tc; qdr_action_enqueue(tcp_adaptor->core, action); - handle_incoming(tc); + handle_incoming(tc, "qdr_tcp_deliver"); } } handle_outgoing(tc); @@ -1243,7 +1417,9 @@ static int qdr_tcp_get_credit(void *context, qdr_link_t *link) void* link_context = qdr_link_get_context(link); if (link_context) { qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) link_context; - qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] qdr_tcp_get_credit: NOOP", conn->conn_id, qdr_tcp_conn_linkid(conn)); + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, + "[C%"PRIu64"][L%"PRIu64"] qdr_tcp_get_credit: NOOP", + conn->conn_id, qdr_tcp_conn_linkid(conn)); } else { qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "qdr_tcp_get_credit: no link context"); assert(false); @@ -1257,13 +1433,19 @@ static void qdr_tcp_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t void* link_context = qdr_link_get_context(qdr_delivery_link(dlv)); if (link_context) { qdr_tcp_connection_t* tc = (qdr_tcp_connection_t*) link_context; - qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, DLV_FMT" qdr_tcp_delivery_update: disp: %"PRIu64", settled: %s", + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, + DLV_FMT" qdr_tcp_delivery_update: disp: %"PRIu64", settled: %s", DLV_ARGS(dlv), disp, settled ? "true" : "false"); - // - // If one of the streaming deliveries is ever settled, the connection must be torn down. - // if (settled) { + // the only settlement occurs when the initial delivery is + // settled, which occurs when the connector is unable to + // connect to the configured tcp endpoint, so in this case + // we can just close the connection + // (The end of the message is used to convey half closed status) + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, + DLV_FMT" qdr_tcp_delivery_update: call pn_raw_connection_close()", + DLV_ARGS(dlv)); pn_raw_connection_close(tc->pn_raw_conn); } } else { @@ -1278,7 +1460,8 @@ static void qdr_tcp_conn_close(void *context, qdr_connection_t *conn, qdr_error_ void *tcontext = qdr_connection_get_context(conn); if (tcontext) { qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) tcontext; - qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] qdr_tcp_conn_close: NOOP", conn->conn_id, qdr_tcp_conn_linkid(conn)); + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, + "[C%"PRIu64"][L%"PRIu64"] qdr_tcp_conn_close: NOOP", conn->conn_id, qdr_tcp_conn_linkid(conn)); } else { qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "qdr_tcp_conn_close: no connection context"); assert(false); @@ -1291,9 +1474,12 @@ static void qdr_tcp_conn_trace(void *context, qdr_connection_t *conn, bool trace void *tcontext = qdr_connection_get_context(conn); if (tcontext) { qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) tcontext; - qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] qdr_tcp_conn_trace: NOOP", conn->conn_id, qdr_tcp_conn_linkid(conn)); + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, + "[C%"PRIu64"][L%"PRIu64"] qdr_tcp_conn_trace: NOOP", + conn->conn_id, qdr_tcp_conn_linkid(conn)); } else { - qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "qdr_tcp_conn_trace: no connection context"); + qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, + "qdr_tcp_conn_trace: no connection context"); assert(false); } } @@ -1304,8 +1490,9 @@ static void qdr_tcp_activate(void *notused, qdr_connection_t *c) if (context) { qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) context; sys_mutex_lock(conn->activation_lock); - if (conn->pn_raw_conn && !(conn->raw_closed_read || conn->raw_closed_write)) { - qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] qdr_tcp_activate: waking raw connection", conn->conn_id); + if (conn->pn_raw_conn && !(conn->raw_closed_read && conn->raw_closed_write)) { + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, + "[C%"PRIu64"] qdr_tcp_activate: call pn_raw_connection_wake()", conn->conn_id); pn_raw_connection_wake(conn->pn_raw_conn); sys_mutex_unlock(conn->activation_lock); } else if (conn->activate_timer) { @@ -1316,11 +1503,13 @@ static void qdr_tcp_activate(void *notused, qdr_connection_t *c) // received. Prior to that however a subscribing link (and // its associated connection must be setup), for which we // fake wakeup by using a timer. - qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] qdr_tcp_activate: schedule activate_timer", conn->conn_id); + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, + "[C%"PRIu64"] qdr_tcp_activate: schedule activate_timer", conn->conn_id); qd_timer_schedule(conn->activate_timer, 0); } else { sys_mutex_unlock(conn->activation_lock); - qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] qdr_tcp_activate: Cannot activate", conn->conn_id); + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, + "[C%"PRIu64"] qdr_tcp_activate: Cannot activate", conn->conn_id); } } else { qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "qdr_tcp_activate: no connection context"); @@ -1342,7 +1531,7 @@ static void qdr_tcp_adaptor_init(qdr_core_t *core, void **adaptor_context) adaptor->adaptor = qdr_protocol_adaptor(core, "tcp", // name adaptor, // context - qdr_tcp_activate, // activate + qdr_tcp_activate, qdr_tcp_first_attach, qdr_tcp_second_attach, qdr_tcp_detach, @@ -1563,7 +1752,8 @@ static qdr_tcp_connection_t *find_by_identity(qdr_core_t *core, qd_iterator_t *i void qdra_tcp_connection_get_first_CT(qdr_core_t *core, qdr_query_t *query, int offset) { - qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "query for first tcp connection (%i)", offset); + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, + "query for first tcp connection (%i)", offset); query->status = QD_AMQP_OK; if (offset >= DEQ_SIZE(tcp_adaptor->connections)) { @@ -1618,7 +1808,8 @@ void qdra_tcp_connection_get_CT(qdr_core_t *core, if (!identity) { query->status = QD_AMQP_BAD_REQUEST; query->status.description = "Name not supported. Identity required"; - qd_log(core->agent_log, QD_LOG_ERROR, "Error performing READ of %s: %s", TCP_CONNECTION_TYPE, query->status.description); + qd_log(core->agent_log, QD_LOG_ERROR, + "Error performing READ of %s: %s", TCP_CONNECTION_TYPE, query->status.description); } else { conn = find_by_identity(core, identity); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org