This is an automated email from the ASF dual-hosted git repository. kgiusti pushed a commit to branch dev-protocol-adaptors-2 in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/dev-protocol-adaptors-2 by this push: new 7529eb2 DISPATCH-1816: avoid race between conn disconnect and activation 7529eb2 is described below commit 7529eb2d01de6dcf3c228a8dbd3927df342e1807 Author: Kenneth Giusti <kgiu...@apache.org> AuthorDate: Wed Nov 18 10:24:29 2020 -0500 DISPATCH-1816: avoid race between conn disconnect and activation --- src/adaptors/http1/http1_adaptor.c | 11 +- src/adaptors/http1/http1_client.c | 3 +- src/adaptors/http1/http1_private.h | 6 +- src/adaptors/http1/http1_server.c | 204 ++++++++++++++++-------------------- tests/system_tests_http1_adaptor.py | 7 +- 5 files changed, 106 insertions(+), 125 deletions(-) diff --git a/src/adaptors/http1/http1_adaptor.c b/src/adaptors/http1/http1_adaptor.c index 9767b4c..9eb177f 100644 --- a/src/adaptors/http1/http1_adaptor.c +++ b/src/adaptors/http1/http1_adaptor.c @@ -94,8 +94,6 @@ void qdr_http1_connection_free(qdr_http1_connection_t *hconn) DEQ_REMOVE(qdr_http1_adaptor->connections, hconn); qd_timer_free(hconn->server.reconnect_timer); hconn->server.reconnect_timer = 0; - qd_timer_free(hconn->server.activate_timer); - hconn->server.activate_timer = 0; rconn = hconn->raw_conn; hconn->raw_conn = 0; if (hconn->server.connector) { @@ -435,11 +433,10 @@ static void _core_connection_activate_CT(void *context, qdr_connection_t *conn) if (hconn->raw_conn) { pn_raw_connection_wake(hconn->raw_conn); activated = true; - } else if (hconn->type == HTTP1_CONN_SERVER) { - if (hconn->server.activate_timer) { - qd_timer_schedule(hconn->server.activate_timer, 0); - activated = true; - } + } else if (hconn->server.reconnect_timer) { + assert(hconn->type == HTTP1_CONN_SERVER); + qd_timer_schedule(hconn->server.reconnect_timer, 0); + activated = true; } } sys_mutex_unlock(qdr_http1_adaptor->lock); diff --git a/src/adaptors/http1/http1_client.c b/src/adaptors/http1/http1_client.c index 1116fcf..ebc85eb 100644 --- a/src/adaptors/http1/http1_client.c +++ b/src/adaptors/http1/http1_client.c @@ -126,7 +126,7 @@ static qdr_http1_connection_t *_create_client_connection(qd_http_listener_t *li) hconn->handler_context.handler = &_handle_connection_events; hconn->handler_context.context = hconn; - hconn->client.next_msg_id = 99383939; + hconn->client.next_msg_id = 1; // configure the HTTP/1.x library @@ -382,6 +382,7 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi qdr_connection_set_context(hconn->qdr_conn, 0); hconn->raw_conn = 0; sys_mutex_unlock(qdr_http1_adaptor->lock); + // at this point the core can no longer activate this connection if (hconn->out_link) { qdr_link_set_context(hconn->out_link, 0); diff --git a/src/adaptors/http1/http1_private.h b/src/adaptors/http1/http1_private.h index 18be8bc..7aaea63 100644 --- a/src/adaptors/http1/http1_private.h +++ b/src/adaptors/http1/http1_private.h @@ -158,9 +158,9 @@ struct qdr_http1_connection_t { // State if connected to an HTTP server struct { qd_http_connector_t *connector; - qd_timer_t *activate_timer; - qd_timer_t *reconnect_timer; - int reconnect_count; + qd_timer_t *reconnect_timer; + qd_timestamp_t link_timeout; + qd_duration_t reconnect_pause; } server; // Outgoing link (router ==> HTTP app) diff --git a/src/adaptors/http1/http1_server.c b/src/adaptors/http1/http1_server.c index e17eaa3..159e51b 100644 --- a/src/adaptors/http1/http1_server.c +++ b/src/adaptors/http1/http1_server.c @@ -90,9 +90,17 @@ ALLOC_DEFINE(_server_request_t); // -#define DEFAULT_CAPACITY 250 -#define RETRY_PAUSE_MSEC ((qd_duration_t)500) -#define MAX_RECONNECT 5 // 5 * 500 = 2.5 sec +#define DEFAULT_CAPACITY 250 + +// Reconnection logic time values: When the HTTP server disconnects this +// adaptor will attempt to reconnect. The reconnect interval increases by +// RETRY_PAUSE_MSEC with each reconnect failure until it hits the maximum of +// RETRY_MAX_PAUSE_MSEC. If the reconnection does not succeed after +// LINK_TIMEOUT_MSEC then the qdr_link_t's are detached to prevent client +// requests from arriving for a potentially dead server. +#define RETRY_PAUSE_MSEC ((qd_duration_t)500) +#define RETRY_MAX_PAUSE_MSEC ((qd_duration_t)3000) +#define LINK_TIMEOUT_MSEC ((qd_duration_t)2500) static void _server_tx_buffers_cb(h1_codec_request_state_t *lib_hrs, qd_buffer_list_t *blist, unsigned int len); static void _server_tx_stream_data_cb(h1_codec_request_state_t *lib_hrs, qd_message_stream_data_t *stream_data); @@ -113,7 +121,6 @@ static void _server_rx_done_cb(h1_codec_request_state_t *hrs); static void _server_request_complete_cb(h1_codec_request_state_t *hrs, bool cancelled); static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, void *context); static void _do_reconnect(void *context); -static void _do_activate(void *context); static void _server_response_msg_free(_server_request_t *req, _server_response_msg_t *rmsg); static void _server_request_free(_server_request_t *hreq); static void _write_pending_request(_server_request_t *req); @@ -125,9 +132,8 @@ static void _cancel_request(_server_request_t *req); //////////////////////////////////////////////////////// -// An HttpConnector has been created. Create an qdr_http_connection_t for it. -// Do not create a raw connection - this is done on demand when the router -// sends a delivery over the connector. +// An HttpConnector has been created. Create an qdr_http_connection_t and a +// qdr_connection_t for it. // static qdr_http1_connection_t *_create_server_connection(qd_http_connector_t *ctor, qd_dispatch_t *qd, @@ -152,11 +158,7 @@ static qdr_http1_connection_t *_create_server_connection(qd_http_connector_t *ct // for initiating a connection to the server hconn->server.reconnect_timer = qd_timer(qdr_http1_adaptor->core->qd, _do_reconnect, hconn); - // to run qdr_connection_process() when there is no raw connection to wake - hconn->server.activate_timer = qd_timer(qdr_http1_adaptor->core->qd, _do_activate, hconn); - // Create the qdr_connection - qdr_connection_info_t *info = qdr_connection_info(false, //bool is_encrypted, false, //bool is_authenticated, true, //bool opened, @@ -191,18 +193,10 @@ static qdr_http1_connection_t *_create_server_connection(qd_http_connector_t *ct info, 0, // bind context 0); // bind token - qdr_connection_set_context(hconn->qdr_conn, hconn); - - qd_log(hconn->adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"] HTTP connection to server created", hconn->conn_id); // wait for the raw connection to come up before creating the in and out links - hconn->raw_conn = pn_raw_connection(); - pn_raw_connection_set_context(hconn->raw_conn, &hconn->handler_context); - - sys_mutex_lock(qdr_http1_adaptor->lock); - DEQ_INSERT_TAIL(qdr_http1_adaptor->connections, hconn); - sys_mutex_unlock(qdr_http1_adaptor->lock); + qd_log(hconn->adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"] HTTP connection to server created", hconn->conn_id); return hconn; } @@ -222,16 +216,23 @@ qd_http_connector_t *qd_http1_configure_connector(qd_dispatch_t *qd, const qd_ht qdr_http1_connection_t *hconn = _create_server_connection(c, qd, config); if (hconn) { - sys_mutex_lock(qdr_http1_adaptor->lock); - DEQ_INSERT_TAIL(qdr_http1_adaptor->connectors, c); - sys_mutex_unlock(qdr_http1_adaptor->lock); - - // activate the raw connection. This connection may be scheduled on - // another thread by this call: qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"] Initiating connection to HTTP server %s", hconn->conn_id, hconn->cfg.host_port); - pn_proactor_raw_connect(qd_server_proactor(hconn->qd_server), hconn->raw_conn, hconn->cfg.host_port); + + // lock out the core activation thread. Up until this point the core + // thread cannot activate the qdr_connection_t since the + // qdr_connection_t context has not been set (see + // _core_connection_activate_CT in http1_adaptor.c). This keeps the + // core from attempting to schedule the connection until we finish + // setup. + sys_mutex_lock(qdr_http1_adaptor->lock); + DEQ_INSERT_TAIL(qdr_http1_adaptor->connections, hconn); + DEQ_INSERT_TAIL(qdr_http1_adaptor->connectors, c); + qdr_connection_set_context(hconn->qdr_conn, hconn); + qd_timer_schedule(hconn->server.reconnect_timer, 0); + sys_mutex_unlock(qdr_http1_adaptor->lock); + // setup complete - core thread can activate the connection return c; } else { qd_http_connector_decref(c); @@ -348,7 +349,6 @@ static void _setup_server_links(qdr_http1_connection_t *hconn) // static void _teardown_server_links(qdr_http1_connection_t *hconn) { - // @TODO(kgiusti): should we PN_RELEASE all unsent outbound deliveries first? _server_request_t *hreq = (_server_request_t*) DEQ_HEAD(hconn->requests); while (hreq) { _server_request_free(hreq); @@ -358,12 +358,18 @@ static void _teardown_server_links(qdr_http1_connection_t *hconn) hconn->http_conn = 0; if (hconn->out_link) { + qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG, + "[C%"PRIu64"][L%"PRIu64"] Closing outgoing HTTP link", + hconn->conn_id, hconn->out_link_id); qdr_link_set_context(hconn->out_link, 0); qdr_link_detach(hconn->out_link, QD_CLOSED, 0); hconn->out_link = 0; } if (hconn->in_link) { + qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG, + "[C%"PRIu64"][L%"PRIu64"] Closing incoming HTTP link", + hconn->conn_id, hconn->in_link_id); qdr_link_set_context(hconn->in_link, 0); qdr_link_detach(hconn->in_link, QD_CLOSED, 0); hconn->in_link = 0; @@ -371,87 +377,51 @@ static void _teardown_server_links(qdr_http1_connection_t *hconn) } -// -// A note about reconnect and activate timer handlers: -// -// Both _do_reconnect and _do_activate are run via separate qd_timers. -// qd_timers execute on an arbitrary I/O thread and are guaranteed NOT to be -// run in parallel. The _do_activate timer is started by the core thread via -// _core_connection_activate_CT (http1_adaptor.c). The _do_reconnect timer is -// started by the I/O thread handling the server raw connection -// PN_RAW_CONNECTION_DISCONNECTED event. -// -// Since the server PN_RAW_CONNECTION_DISCONNECTED handler releases the raw -// connection and at a later point in time _do_reconnect creates a new raw -// connection it is guaranteed that _do_reconnect will NOT run in parallel with -// an I/O thread running the raw connection event handler (since no such raw -// connection exists when _do_reconnect is run) -// -// However it is possible to have a race between an I/O thread running -// _do_activate and an I/O thread running the raw connection event handler IF -// _do_activate runs _after_ _do_reconnect has run (since a new raw connection -// is created and can be immediately scheduled). -// -// To avoid this race the _do_reconnect handler cancels the _do_activate timer -// to prevent it from running immediately after _do_reconnect completes -// (remember: timer handlers never run in parallel). To prevent the core -// thread from rescheduling _do_activate after _do_reconnect runs a lock is -// held by _do_reconnect while it sets hconn->raw_conn. -// - - -// This adapter attempts to keep the connection to the server up as long as the -// connector is configured. This is called via a timer scheduled when the -// PN_CONNECTION_CLOSE event is handled. -// (See above note) +// Reconnection timer handler. +// This timer can be scheduled either by the event loop during the +// PN_RAW_CONNECTION_DISCONNECT event or by the core thread via +// _core_connection_activate_CT in http1_adaptor.c. Since timers do not run +// concurrently this handler is guaranteed never to collide with itself. Once +// hconn->raw_conn is set to zero by the disconnect handler it will remain zero +// until this handler creates a new raw connection. // static void _do_reconnect(void *context) { qdr_http1_connection_t *hconn = (qdr_http1_connection_t*) context; - bool connecting = false; + uint64_t conn_id = hconn->conn_id; - // lock out core activation - sys_mutex_lock(qdr_http1_adaptor->lock); - - // prevent _do_activate() from trying to process the qdr_connection after - // we schedule the raw connection on another thread - if (hconn->server.activate_timer) - qd_timer_cancel(hconn->server.activate_timer); - if (!hconn->raw_conn) { - connecting = true; - hconn->raw_conn = pn_raw_connection(); - pn_raw_connection_set_context(hconn->raw_conn, &hconn->handler_context); - // this call may reschedule the connection on another I/O thread: - pn_proactor_raw_connect(qd_server_proactor(hconn->qd_server), hconn->raw_conn, hconn->cfg.host_port); - } + // while timers do not run concurrently it is possible to reschedule them + // via another thread while the timer handler is running, resulting in this + // handler running twice + if (hconn->raw_conn) return; // already ran - sys_mutex_unlock(qdr_http1_adaptor->lock); - - if (connecting) - qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG, - "[C%"PRIu64"] Connecting to HTTP server...", hconn->conn_id); -} + if (hconn->qdr_conn) { + // handle any qdr_connection_t processing requests that occurred since + // this raw connection dropped. + while (qdr_connection_process(hconn->qdr_conn)) + ; -// This adapter attempts to keep the qdr_connection_t open as it tries to -// re-connect to the server. During this reconnect phase there is no raw -// connection. If the core needs to process the qdr_connection_t when there is -// no raw connection to wake this zero-length timer handler will perform the -// connection processing (under the I/O thread). -// (See above note) -// -static void _do_activate(void *context) -{ - qdr_http1_connection_t *hconn = (qdr_http1_connection_t*) context; - if (!hconn->raw_conn && hconn->qdr_conn) { - while (qdr_connection_process(hconn->qdr_conn)) {} if (!hconn->qdr_conn) { // the qdr_connection_t has been closed qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"] HTTP/1.x server connection closed", hconn->conn_id); qdr_http1_connection_free(hconn); + return; } } + + // lock out core activation + sys_mutex_lock(qdr_http1_adaptor->lock); + hconn->raw_conn = pn_raw_connection(); + pn_raw_connection_set_context(hconn->raw_conn, &hconn->handler_context); + // this next call may immediately reschedule the connection on another I/O + // thread. After this call hconn may no longer be valid! + pn_proactor_raw_connect(qd_server_proactor(hconn->qd_server), hconn->raw_conn, hconn->cfg.host_port); + sys_mutex_unlock(qdr_http1_adaptor->lock); + + qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG, + "[C%"PRIu64"] Connecting to HTTP server...", conn_id); } @@ -469,7 +439,7 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi switch (pn_event_type(e)) { case PN_RAW_CONNECTION_CONNECTED: { - hconn->server.reconnect_count = 0; + hconn->server.link_timeout = 0; _setup_server_links(hconn); while (qdr_connection_process(hconn->qdr_conn)) {} break; @@ -493,11 +463,6 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi qd_log(log, QD_LOG_INFO, "[C%"PRIu64"] Connection closed", hconn->conn_id); - // prevent core from activating raw conn since it will no longer exist - // on return from the handler - sys_mutex_lock(qdr_http1_adaptor->lock); - hconn->raw_conn = 0; - sys_mutex_unlock(qdr_http1_adaptor->lock); // if the current request was not completed, cancel it. it's ok if // there are outstanding *response* deliveries in flight as long as the @@ -509,22 +474,36 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi _cancel_request(hreq); } + // + // reconnect to the server. Leave the links intact so pending requests + // are not aborted. If we fail to reconnect after LINK_TIMEOUT_MSECS + // drop the links to prevent additional request from arriving. + // + + bool reconnect = false; if (hconn->qdr_conn) { - // - // reconnect to the server. Leave the links intact so pending requests - // are not aborted. Once we've failed to reconnect after MAX_RECONNECT - // tries drop the links to prevent additional request from arriving. - // - qd_duration_t nap_time = RETRY_PAUSE_MSEC * hconn->server.reconnect_count; - if (hconn->server.reconnect_count == MAX_RECONNECT) { - qd_log(log, QD_LOG_INFO, "[C%"PRIu64"] Server not responding - disconnecting...", hconn->conn_id); - _teardown_server_links(hconn); + if (hconn->server.link_timeout == 0) { + hconn->server.link_timeout = qd_timer_now() + LINK_TIMEOUT_MSEC; + hconn->server.reconnect_pause = 0; } else { - hconn->server.reconnect_count += 1; // increase next sleep interval + if ((qd_timer_now() - hconn->server.link_timeout) >= 0) + _teardown_server_links(hconn); + if (hconn->server.reconnect_pause < RETRY_MAX_PAUSE_MSEC) + hconn->server.reconnect_pause += RETRY_PAUSE_MSEC; } - qd_timer_schedule(hconn->server.reconnect_timer, nap_time); + reconnect = true; } - break; + + // prevent core activation + sys_mutex_lock(qdr_http1_adaptor->lock); + hconn->raw_conn = 0; + if (reconnect) + qd_timer_schedule(hconn->server.reconnect_timer, hconn->server.reconnect_pause); + sys_mutex_unlock(qdr_http1_adaptor->lock); + + // do not manipulate hconn further as it may now be processed by the + // timer thread + return; } case PN_RAW_CONNECTION_NEED_WRITE_BUFFERS: { qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Need write buffers", hconn->conn_id); @@ -1512,6 +1491,7 @@ void qdr_http1_server_core_conn_close(qdr_http1_adaptor_t *adaptor, qdr_connection_set_context(hconn->qdr_conn, 0); hconn->qdr_conn = 0; sys_mutex_unlock(qdr_http1_adaptor->lock); + // the core thread can no longer activate this connection qdr_connection_closed(qdr_conn); qdr_http1_close_connection(hconn, "Connection closed by management"); diff --git a/tests/system_tests_http1_adaptor.py b/tests/system_tests_http1_adaptor.py index 35c2b91..f630050 100644 --- a/tests/system_tests_http1_adaptor.py +++ b/tests/system_tests_http1_adaptor.py @@ -1217,7 +1217,8 @@ class Http1AdaptorEdge2EdgeTest(TestCase): repeat=2) # the adaptor will detach the links to the server if the connection # cannot be reestablished after 2.5 seconds. Restart the server before - # that occurrs to prevent client messages from being released + # that occurrs to prevent client messages from being released with 503 + # status. server = TestServer(server_port=self.http_server11_port, client_port=self.http_listener11_port, tests=TESTS) @@ -1268,7 +1269,9 @@ class Http1AdaptorEdge2EdgeTest(TestCase): ] } - # Kill the server then issue client requests, expect 503 response + # Kill the server then issue client requests. These requests will be + # held on the server's outgoing links until they expire (2.5 seconds). + # At that point the client will receive a 503 response. server.wait() client = ThreadedTestClient(TESTS_FAIL, self.http_listener11_port) client.wait() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org