This is an automated email from the ASF dual-hosted git repository. tross pushed a commit to branch dev-protocol-adaptors-2 in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/dev-protocol-adaptors-2 by this push: new ffc4e86 DISPATCH-1857 - Handle asynchronous moving of a delivery to another link. Patch contents from Gordon Sim. This improves the stability of multiple, concurrent connection handling. This closes #932. ffc4e86 is described below commit ffc4e8649aac070a70688a99fc19dbb8f17e195e Author: Ted Ross <tr...@apache.org> AuthorDate: Wed Dec 2 12:55:32 2020 -0500 DISPATCH-1857 - Handle asynchronous moving of a delivery to another link. Patch contents from Gordon Sim. This improves the stability of multiple, concurrent connection handling. This closes #932. --- include/qpid/dispatch/protocol_adaptor.h | 1 + src/adaptors/tcp_adaptor.c | 110 ++++++++++++++++++------------- src/router_core/router_core.c | 2 + src/router_core/transfer.c | 11 +++- 4 files changed, 78 insertions(+), 46 deletions(-) diff --git a/include/qpid/dispatch/protocol_adaptor.h b/include/qpid/dispatch/protocol_adaptor.h index 93b23b4..ae7036f 100644 --- a/include/qpid/dispatch/protocol_adaptor.h +++ b/include/qpid/dispatch/protocol_adaptor.h @@ -203,6 +203,7 @@ typedef void (*qdr_link_drain_t) (void *context, qdr_link_t *link, bool mode); */ typedef int (*qdr_link_push_t) (void *context, qdr_link_t *link, int limit); +extern const uint64_t QD_DELIVERY_MOVED_TO_NEW_LINK; /** * qdr_link_deliver_t callback * diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c index 9c94c86..98df531 100644 --- a/src/adaptors/tcp_adaptor.c +++ b/src/adaptors/tcp_adaptor.c @@ -53,6 +53,7 @@ struct qdr_tcp_connection_t { bool flow_enabled; bool egress_dispatcher; bool connector_closed;//only used if egress_dispatcher=true + qdr_delivery_t *initial_delivery; qd_timer_t *activate_timer; qd_bridge_config_t config; qd_server_t *server; @@ -93,6 +94,7 @@ static void qdr_del_tcp_connection_CT(qdr_core_t *core, qdr_action_t *action, bo static void handle_disconnected(qdr_tcp_connection_t* conn); static void free_qdr_tcp_connection(qdr_tcp_connection_t* conn); +static void qdr_tcp_open_server_side_connection(qdr_tcp_connection_t* tc); static inline uint64_t qdr_tcp_conn_linkid(const qdr_tcp_connection_t *conn) { @@ -494,12 +496,16 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void case PN_RAW_CONNECTION_CONNECTED: { if (conn->ingress) { qdr_tcp_connection_ingress_accept(conn); - qd_log(log, QD_LOG_INFO, "[C%"PRIu64"] Accepted from %s", conn->conn_id, conn->remote_address); + qd_log(log, QD_LOG_INFO, "[C%"PRIu64"] Accepted from %s (global_id=%s)", conn->conn_id, conn->remote_address, conn->global_id); break; } else { conn->remote_address = get_address_string(conn->socket); conn->opened_time = tcp_adaptor->core->uptime_ticks; qd_log(log, QD_LOG_INFO, "[C%"PRIu64"] Connected", conn->conn_id); + if (!!conn->initial_delivery) { + qdr_tcp_open_server_side_connection(conn); + conn->initial_delivery = 0; + } while (qdr_connection_process(conn->conn)) {} handle_outgoing(conn); break; @@ -529,6 +535,7 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void case PN_RAW_CONNECTION_NEED_READ_BUFFERS: { qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Need read buffers", conn->conn_id); while (qdr_connection_process(conn->conn)) {} + handle_incoming(conn); break; } case PN_RAW_CONNECTION_WAKE: { @@ -563,6 +570,7 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void break; } default: + qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Unexpected Event: %d", conn->conn_id, pn_event_type(e)); break; } } @@ -586,35 +594,18 @@ static qdr_tcp_connection_t *qdr_tcp_connection_ingress(qd_tcp_listener_t* liste return tc; } -static void tcp_connector_establish(qdr_tcp_connection_t *conn) -{ - qd_log(tcp_adaptor->log_source, QD_LOG_INFO, "[C%"PRIu64"] Connecting to: %s", conn->conn_id, conn->config.host_port); - conn->socket = pn_raw_connection(); - pn_raw_connection_set_context(conn->socket, conn); - pn_proactor_raw_connect(qd_server_proactor(conn->server), conn->socket, conn->config.host_port); -} -static qdr_tcp_connection_t *qdr_tcp_connection_egress(qd_bridge_config_t *config, qd_server_t *server, qdr_delivery_t *initial_delivery) +static void qdr_tcp_open_server_side_connection(qdr_tcp_connection_t* tc) { - qdr_tcp_connection_t* tc = NEW(qdr_tcp_connection_t); - ZERO(tc); - if (initial_delivery) { - tc->egress_dispatcher = false; - } else { - tc->activate_timer = qd_timer(tcp_adaptor->core->qd, on_activate, tc); - tc->egress_dispatcher = true; - } - tc->ingress = false; - tc->context.context = tc; - tc->context.handler = &handle_connection_event; - tc->config = *config; - tc->server = server; + const char *host = tc->egress_dispatcher ? "egress-dispatch" : tc->config.host_port; + qd_log(tcp_adaptor->log_source, QD_LOG_INFO, "[C%"PRIu64"] Opening server-side core connection %s", tc->conn_id, host); + qdr_connection_info_t *info = qdr_connection_info(false, //bool is_encrypted, false, //bool is_authenticated, true, //bool opened, "", //char *sasl_mechanisms, QD_OUTGOING, //qd_direction_t dir, - tc->egress_dispatcher ? "egress-dispatch" : tc->config.host_port, //const char *host, + host, //const char *host, "", //const char *ssl_proto, "", //const char *ssl_cipher, "", //const char *user, @@ -625,7 +616,6 @@ static qdr_tcp_connection_t *qdr_tcp_connection_egress(qd_bridge_config_t *confi "", // peer router version, false); // streaming links - tc->conn_id = qd_server_allocate_connection_id(tc->server); qdr_connection_t *conn = qdr_connection_opened(tcp_adaptor->core, tcp_adaptor->adaptor, false, @@ -650,20 +640,48 @@ static qdr_tcp_connection_t *qdr_tcp_connection_egress(qd_bridge_config_t *confi qdr_terminus_set_address(source, tc->config.address); tc->outgoing = qdr_link_first_attach(conn, - QD_OUTGOING, - source, //qdr_terminus_t *source, - qdr_terminus(0), //qdr_terminus_t *target, - "tcp.egress.out", //const char *name, - 0, //const char *terminus_addr, - !(tc->egress_dispatcher), - initial_delivery, - &(tc->outgoing_id)); + QD_OUTGOING, + source, //qdr_terminus_t *source, + qdr_terminus(0), //qdr_terminus_t *target, + "tcp.egress.out", //const char *name, + 0, //const char *terminus_addr, + !(tc->egress_dispatcher), + tc->initial_delivery, + &(tc->outgoing_id)); qdr_link_set_context(tc->outgoing, tc); - //the incoming link for egress is created once we receive the - //message which has the reply to address (and read buffers are - //granted at that point) - if (!tc->egress_dispatcher) { - tcp_connector_establish(tc); +} + + +static qdr_tcp_connection_t *qdr_tcp_connection_egress(qd_bridge_config_t *config, qd_server_t *server, qdr_delivery_t *initial_delivery) +{ + qdr_tcp_connection_t* tc = NEW(qdr_tcp_connection_t); + ZERO(tc); + if (initial_delivery) { + tc->egress_dispatcher = false; + tc->initial_delivery = initial_delivery; + } else { + tc->activate_timer = qd_timer(tcp_adaptor->core->qd, on_activate, tc); + tc->egress_dispatcher = true; + } + tc->ingress = false; + tc->context.context = tc; + tc->context.handler = &handle_connection_event; + tc->config = *config; + tc->server = server; + tc->conn_id = qd_server_allocate_connection_id(tc->server); + + // + // If this is the egress dispatcher, set up the core connection now. Otherwise, set up a physical + // raw connection and wait until we are running in that connection's context to set up the core + // connection. + // + if (tc->egress_dispatcher) + qdr_tcp_open_server_side_connection(tc); + else { + qd_log(tcp_adaptor->log_source, QD_LOG_INFO, "[C%"PRIu64"] Connecting to: %s", tc->conn_id, tc->config.host_port); + tc->socket = pn_raw_connection(); + pn_raw_connection_set_context(tc->socket, tc); + pn_proactor_raw_connect(qd_server_proactor(tc->server), tc->socket, tc->config.host_port); } return tc; @@ -1021,6 +1039,7 @@ static uint64_t qdr_tcp_deliver(void *context, qdr_link_t *link, qdr_delivery_t qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] qdr_tcp_deliver Delivery event", tc->conn_id, tc->outgoing_id); if (tc->egress_dispatcher) { qdr_tcp_connection_egress(&(tc->config), tc->server, delivery); + return QD_DELIVERY_MOVED_TO_NEW_LINK; } else if (!tc->outstream) { tc->outstream = delivery; qdr_delivery_incref(delivery, "tcp_adaptor - new outstream"); @@ -1038,14 +1057,15 @@ static uint64_t qdr_tcp_deliver(void *context, qdr_link_t *link, qdr_delivery_t qdr_terminus_t *target = qdr_terminus(0); qdr_terminus_set_address(target, tc->reply_to); tc->incoming = qdr_link_first_attach(tc->conn, - QD_INCOMING, - qdr_terminus(0), //qdr_terminus_t *source, - target, //qdr_terminus_t *target, - "tcp.egress.in", //const char *name, - 0, //const char *terminus_addr, - false, - NULL, - &(tc->incoming_id)); + QD_INCOMING, + qdr_terminus(0), //qdr_terminus_t *source, + target, //qdr_terminus_t *target, + "tcp.egress.in", //const char *name, + 0, //const char *terminus_addr, + false, + NULL, + &(tc->incoming_id)); + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] Create Link to %s", tc->conn_id, tc->reply_to); qdr_link_set_context(tc->incoming, tc); //add this connection to those visible through management now that we have the global_id qdr_action_t *action = qdr_action(qdr_add_tcp_connection_CT, "add_tcp_connection"); diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c index d1f7273..779a1f0 100644 --- a/src/router_core/router_core.c +++ b/src/router_core/router_core.c @@ -39,6 +39,8 @@ ALLOC_DEFINE(qdr_connection_ref_t); ALLOC_DEFINE(qdr_connection_info_t); ALLOC_DEFINE(qdr_subscription_ref_t); +const uint64_t QD_DELIVERY_MOVED_TO_NEW_LINK = 999999999; + static void qdr_general_handler(void *context); qdr_core_t *qdr_core(qd_dispatch_t *qd, qd_router_mode_t mode, const char *area, const char *id) diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c index 8c361a2..d481634 100644 --- a/src/router_core/transfer.c +++ b/src/router_core/transfer.c @@ -170,6 +170,9 @@ int qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit) to_new_link = true; break; } + if (new_disp == QD_DELIVERY_MOVED_TO_NEW_LINK) { + break; + } } while (settled != dlv->settled && !to_new_link); // oops missed the settlement send_complete = qdr_delivery_send_complete(dlv); @@ -209,6 +212,12 @@ int qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit) } } } + else if (new_disp == QD_DELIVERY_MOVED_TO_NEW_LINK) { + DEQ_REMOVE_HEAD(link->undelivered); + dlv->link_work = 0; + dlv->where = QDR_DELIVERY_NOWHERE; + qdr_delivery_decref(core, dlv, "qdr_link_process_deliveries - moved from undelivered list to some other link"); + } else { qdr_delivery_decref(core, dlv, "qdr_link_process_deliveries - release local reference - not send_complete"); @@ -232,7 +241,7 @@ int qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit) } sys_mutex_unlock(conn->work_lock); - if (new_disp) { + if (new_disp && new_disp != QD_DELIVERY_MOVED_TO_NEW_LINK) { // the remote sender-settle-mode forced us to pre-settle the // message. The core needs to know this, so we "fake" receiving a // settle+disposition update from the remote end of the link: --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org