This is an automated email from the ASF dual-hosted git repository.

kgiusti pushed a commit to branch dev-protocol-adaptors-2
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/dev-protocol-adaptors-2 by 
this push:
     new 7529eb2  DISPATCH-1816: avoid race between conn disconnect and 
activation
7529eb2 is described below

commit 7529eb2d01de6dcf3c228a8dbd3927df342e1807
Author: Kenneth Giusti <kgiu...@apache.org>
AuthorDate: Wed Nov 18 10:24:29 2020 -0500

    DISPATCH-1816: avoid race between conn disconnect and activation
---
 src/adaptors/http1/http1_adaptor.c  |  11 +-
 src/adaptors/http1/http1_client.c   |   3 +-
 src/adaptors/http1/http1_private.h  |   6 +-
 src/adaptors/http1/http1_server.c   | 204 ++++++++++++++++--------------------
 tests/system_tests_http1_adaptor.py |   7 +-
 5 files changed, 106 insertions(+), 125 deletions(-)

diff --git a/src/adaptors/http1/http1_adaptor.c 
b/src/adaptors/http1/http1_adaptor.c
index 9767b4c..9eb177f 100644
--- a/src/adaptors/http1/http1_adaptor.c
+++ b/src/adaptors/http1/http1_adaptor.c
@@ -94,8 +94,6 @@ void qdr_http1_connection_free(qdr_http1_connection_t *hconn)
             DEQ_REMOVE(qdr_http1_adaptor->connections, hconn);
             qd_timer_free(hconn->server.reconnect_timer);
             hconn->server.reconnect_timer = 0;
-            qd_timer_free(hconn->server.activate_timer);
-            hconn->server.activate_timer = 0;
             rconn = hconn->raw_conn;
             hconn->raw_conn = 0;
             if (hconn->server.connector) {
@@ -435,11 +433,10 @@ static void _core_connection_activate_CT(void *context, 
qdr_connection_t *conn)
         if (hconn->raw_conn) {
             pn_raw_connection_wake(hconn->raw_conn);
             activated = true;
-        } else if (hconn->type == HTTP1_CONN_SERVER) {
-            if (hconn->server.activate_timer) {
-                qd_timer_schedule(hconn->server.activate_timer, 0);
-                activated = true;
-            }
+        } else if (hconn->server.reconnect_timer) {
+            assert(hconn->type == HTTP1_CONN_SERVER);
+            qd_timer_schedule(hconn->server.reconnect_timer, 0);
+            activated = true;
         }
     }
     sys_mutex_unlock(qdr_http1_adaptor->lock);
diff --git a/src/adaptors/http1/http1_client.c 
b/src/adaptors/http1/http1_client.c
index 1116fcf..ebc85eb 100644
--- a/src/adaptors/http1/http1_client.c
+++ b/src/adaptors/http1/http1_client.c
@@ -126,7 +126,7 @@ static qdr_http1_connection_t 
*_create_client_connection(qd_http_listener_t *li)
     hconn->handler_context.handler = &_handle_connection_events;
     hconn->handler_context.context = hconn;
 
-    hconn->client.next_msg_id = 99383939;
+    hconn->client.next_msg_id = 1;
 
     // configure the HTTP/1.x library
 
@@ -382,6 +382,7 @@ static void _handle_connection_events(pn_event_t *e, 
qd_server_t *qd_server, voi
         qdr_connection_set_context(hconn->qdr_conn, 0);
         hconn->raw_conn = 0;
         sys_mutex_unlock(qdr_http1_adaptor->lock);
+        // at this point the core can no longer activate this connection
 
         if (hconn->out_link) {
             qdr_link_set_context(hconn->out_link, 0);
diff --git a/src/adaptors/http1/http1_private.h 
b/src/adaptors/http1/http1_private.h
index 18be8bc..7aaea63 100644
--- a/src/adaptors/http1/http1_private.h
+++ b/src/adaptors/http1/http1_private.h
@@ -158,9 +158,9 @@ struct qdr_http1_connection_t {
     // State if connected to an HTTP server
     struct {
         qd_http_connector_t *connector;
-        qd_timer_t *activate_timer;
-        qd_timer_t *reconnect_timer;
-        int         reconnect_count;
+        qd_timer_t          *reconnect_timer;
+        qd_timestamp_t       link_timeout;
+        qd_duration_t        reconnect_pause;
     } server;
 
     // Outgoing link (router ==> HTTP app)
diff --git a/src/adaptors/http1/http1_server.c 
b/src/adaptors/http1/http1_server.c
index e17eaa3..159e51b 100644
--- a/src/adaptors/http1/http1_server.c
+++ b/src/adaptors/http1/http1_server.c
@@ -90,9 +90,17 @@ ALLOC_DEFINE(_server_request_t);
 //
 
 
-#define DEFAULT_CAPACITY 250
-#define RETRY_PAUSE_MSEC ((qd_duration_t)500)
-#define MAX_RECONNECT    5  // 5 * 500 = 2.5 sec
+#define DEFAULT_CAPACITY     250
+
+// Reconnection logic time values: When the HTTP server disconnects this
+// adaptor will attempt to reconnect. The reconnect interval increases by
+// RETRY_PAUSE_MSEC with each reconnect failure until it hits the maximum of
+// RETRY_MAX_PAUSE_MSEC. If the reconnection does not succeed after
+// LINK_TIMEOUT_MSEC then the qdr_link_t's are detached to prevent client
+// requests from arriving for a potentially dead server.
+#define RETRY_PAUSE_MSEC     ((qd_duration_t)500)
+#define RETRY_MAX_PAUSE_MSEC ((qd_duration_t)3000)
+#define LINK_TIMEOUT_MSEC    ((qd_duration_t)2500)
 
 static void _server_tx_buffers_cb(h1_codec_request_state_t *lib_hrs, 
qd_buffer_list_t *blist, unsigned int len);
 static void _server_tx_stream_data_cb(h1_codec_request_state_t *lib_hrs, 
qd_message_stream_data_t *stream_data);
@@ -113,7 +121,6 @@ static void _server_rx_done_cb(h1_codec_request_state_t 
*hrs);
 static void _server_request_complete_cb(h1_codec_request_state_t *hrs, bool 
cancelled);
 static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, 
void *context);
 static void _do_reconnect(void *context);
-static void _do_activate(void *context);
 static void _server_response_msg_free(_server_request_t *req, 
_server_response_msg_t *rmsg);
 static void _server_request_free(_server_request_t *hreq);
 static void _write_pending_request(_server_request_t *req);
@@ -125,9 +132,8 @@ static void _cancel_request(_server_request_t *req);
 ////////////////////////////////////////////////////////
 
 
-// An HttpConnector has been created.  Create an qdr_http_connection_t for it.
-// Do not create a raw connection - this is done on demand when the router
-// sends a delivery over the connector.
+// An HttpConnector has been created.  Create an qdr_http_connection_t and a
+// qdr_connection_t for it.
 //
 static qdr_http1_connection_t *_create_server_connection(qd_http_connector_t 
*ctor,
                                                          qd_dispatch_t *qd,
@@ -152,11 +158,7 @@ static qdr_http1_connection_t 
*_create_server_connection(qd_http_connector_t *ct
     // for initiating a connection to the server
     hconn->server.reconnect_timer = qd_timer(qdr_http1_adaptor->core->qd, 
_do_reconnect, hconn);
 
-    // to run qdr_connection_process() when there is no raw connection to wake
-    hconn->server.activate_timer = qd_timer(qdr_http1_adaptor->core->qd, 
_do_activate, hconn);
-
     // Create the qdr_connection
-
     qdr_connection_info_t *info = qdr_connection_info(false, //bool            
 is_encrypted,
                                                       false, //bool            
 is_authenticated,
                                                       true,  //bool            
 opened,
@@ -191,18 +193,10 @@ static qdr_http1_connection_t 
*_create_server_connection(qd_http_connector_t *ct
                                             info,
                                             0,      // bind context
                                             0);     // bind token
-    qdr_connection_set_context(hconn->qdr_conn, hconn);
-
-    qd_log(hconn->adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"] HTTP connection to 
server created", hconn->conn_id);
 
     // wait for the raw connection to come up before creating the in and out 
links
 
-    hconn->raw_conn = pn_raw_connection();
-    pn_raw_connection_set_context(hconn->raw_conn, &hconn->handler_context);
-
-    sys_mutex_lock(qdr_http1_adaptor->lock);
-    DEQ_INSERT_TAIL(qdr_http1_adaptor->connections, hconn);
-    sys_mutex_unlock(qdr_http1_adaptor->lock);
+    qd_log(hconn->adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"] HTTP connection to 
server created", hconn->conn_id);
 
     return hconn;
 }
@@ -222,16 +216,23 @@ qd_http_connector_t 
*qd_http1_configure_connector(qd_dispatch_t *qd, const qd_ht
 
     qdr_http1_connection_t *hconn = _create_server_connection(c, qd, config);
     if (hconn) {
-        sys_mutex_lock(qdr_http1_adaptor->lock);
-        DEQ_INSERT_TAIL(qdr_http1_adaptor->connectors, c);
-        sys_mutex_unlock(qdr_http1_adaptor->lock);
-
-        // activate the raw connection. This connection may be scheduled on
-        // another thread by this call:
         qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
                "[C%"PRIu64"] Initiating connection to HTTP server %s",
                hconn->conn_id, hconn->cfg.host_port);
-        pn_proactor_raw_connect(qd_server_proactor(hconn->qd_server), 
hconn->raw_conn, hconn->cfg.host_port);
+
+        // lock out the core activation thread.  Up until this point the core
+        // thread cannot activate the qdr_connection_t since the
+        // qdr_connection_t context has not been set (see
+        // _core_connection_activate_CT in http1_adaptor.c). This keeps the
+        // core from attempting to schedule the connection until we finish
+        // setup.
+        sys_mutex_lock(qdr_http1_adaptor->lock);
+        DEQ_INSERT_TAIL(qdr_http1_adaptor->connections, hconn);
+        DEQ_INSERT_TAIL(qdr_http1_adaptor->connectors, c);
+        qdr_connection_set_context(hconn->qdr_conn, hconn);
+        qd_timer_schedule(hconn->server.reconnect_timer, 0);
+        sys_mutex_unlock(qdr_http1_adaptor->lock);
+        // setup complete - core thread can activate the connection
         return c;
     } else {
         qd_http_connector_decref(c);
@@ -348,7 +349,6 @@ static void _setup_server_links(qdr_http1_connection_t 
*hconn)
 //
 static void _teardown_server_links(qdr_http1_connection_t *hconn)
 {
-    // @TODO(kgiusti): should we PN_RELEASE all unsent outbound deliveries 
first?
     _server_request_t *hreq = (_server_request_t*) DEQ_HEAD(hconn->requests);
     while (hreq) {
         _server_request_free(hreq);
@@ -358,12 +358,18 @@ static void _teardown_server_links(qdr_http1_connection_t 
*hconn)
     hconn->http_conn = 0;
 
     if (hconn->out_link) {
+        qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
+               "[C%"PRIu64"][L%"PRIu64"] Closing outgoing HTTP link",
+               hconn->conn_id, hconn->out_link_id);
         qdr_link_set_context(hconn->out_link, 0);
         qdr_link_detach(hconn->out_link, QD_CLOSED, 0);
         hconn->out_link = 0;
     }
 
     if (hconn->in_link) {
+        qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
+               "[C%"PRIu64"][L%"PRIu64"] Closing incoming HTTP link",
+               hconn->conn_id, hconn->in_link_id);
         qdr_link_set_context(hconn->in_link, 0);
         qdr_link_detach(hconn->in_link, QD_CLOSED, 0);
         hconn->in_link = 0;
@@ -371,87 +377,51 @@ static void _teardown_server_links(qdr_http1_connection_t 
*hconn)
 }
 
 
-//
-// A note about reconnect and activate timer handlers:
-//
-// Both _do_reconnect and _do_activate are run via separate qd_timers.
-// qd_timers execute on an arbitrary I/O thread and are guaranteed NOT to be
-// run in parallel.  The _do_activate timer is started by the core thread via
-// _core_connection_activate_CT (http1_adaptor.c).  The _do_reconnect timer is
-// started by the I/O thread handling the server raw connection
-// PN_RAW_CONNECTION_DISCONNECTED event.
-//
-// Since the server PN_RAW_CONNECTION_DISCONNECTED handler releases the raw
-// connection and at a later point in time _do_reconnect creates a new raw
-// connection it is guaranteed that _do_reconnect will NOT run in parallel with
-// an I/O thread running the raw connection event handler (since no such raw
-// connection exists when _do_reconnect is run)
-//
-// However it is possible to have a race between an I/O thread running
-// _do_activate and an I/O thread running the raw connection event handler IF
-// _do_activate runs _after_ _do_reconnect has run (since a new raw connection
-// is created and can be immediately scheduled).
-//
-// To avoid this race the _do_reconnect handler cancels the _do_activate timer
-// to prevent it from running immediately after _do_reconnect completes
-// (remember: timer handlers never run in parallel).  To prevent the core
-// thread from rescheduling _do_activate after _do_reconnect runs a lock is
-// held by _do_reconnect while it sets hconn->raw_conn.
-//
-
-
-// This adapter attempts to keep the connection to the server up as long as the
-// connector is configured.  This is called via a timer scheduled when the
-// PN_CONNECTION_CLOSE event is handled.
-// (See above note)
+// Reconnection timer handler.
+// This timer can be scheduled either by the event loop during the
+// PN_RAW_CONNECTION_DISCONNECT event or by the core thread via
+// _core_connection_activate_CT in http1_adaptor.c.  Since timers do not run
+// concurrently this handler is guaranteed never to collide with itself. Once
+// hconn->raw_conn is set to zero by the disconnect handler it will remain zero
+// until this handler creates a new raw connection.
 //
 static void _do_reconnect(void *context)
 {
     qdr_http1_connection_t *hconn = (qdr_http1_connection_t*) context;
-    bool connecting = false;
+    uint64_t conn_id = hconn->conn_id;
 
-    // lock out core activation
-    sys_mutex_lock(qdr_http1_adaptor->lock);
-
-    // prevent _do_activate() from trying to process the qdr_connection after
-    // we schedule the raw connection on another thread
-    if (hconn->server.activate_timer)
-        qd_timer_cancel(hconn->server.activate_timer);
-    if (!hconn->raw_conn) {
-        connecting = true;
-        hconn->raw_conn = pn_raw_connection();
-        pn_raw_connection_set_context(hconn->raw_conn, 
&hconn->handler_context);
-        // this call may reschedule the connection on another I/O thread:
-        pn_proactor_raw_connect(qd_server_proactor(hconn->qd_server), 
hconn->raw_conn, hconn->cfg.host_port);
-    }
+    // while timers do not run concurrently it is possible to reschedule them
+    // via another thread while the timer handler is running, resulting in this
+    // handler running twice
+    if (hconn->raw_conn) return;  // already ran
 
-    sys_mutex_unlock(qdr_http1_adaptor->lock);
-
-    if (connecting)
-        qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
-               "[C%"PRIu64"] Connecting to HTTP server...", hconn->conn_id);
-}
+    if (hconn->qdr_conn) {
 
+        // handle any qdr_connection_t processing requests that occurred since
+        // this raw connection dropped.
+        while (qdr_connection_process(hconn->qdr_conn))
+            ;
 
-// This adapter attempts to keep the qdr_connection_t open as it tries to
-// re-connect to the server.  During this reconnect phase there is no raw
-// connection.  If the core needs to process the qdr_connection_t when there is
-// no raw connection to wake this zero-length timer handler will perform the
-// connection processing (under the I/O thread).
-// (See above note)
-//
-static void _do_activate(void *context)
-{
-    qdr_http1_connection_t *hconn = (qdr_http1_connection_t*) context;
-    if (!hconn->raw_conn && hconn->qdr_conn) {
-        while (qdr_connection_process(hconn->qdr_conn)) {}
         if (!hconn->qdr_conn) {
             // the qdr_connection_t has been closed
             qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
                    "[C%"PRIu64"] HTTP/1.x server connection closed", 
hconn->conn_id);
             qdr_http1_connection_free(hconn);
+            return;
         }
     }
+
+    // lock out core activation
+    sys_mutex_lock(qdr_http1_adaptor->lock);
+    hconn->raw_conn = pn_raw_connection();
+    pn_raw_connection_set_context(hconn->raw_conn, &hconn->handler_context);
+    // this next call may immediately reschedule the connection on another I/O
+    // thread. After this call hconn may no longer be valid!
+    pn_proactor_raw_connect(qd_server_proactor(hconn->qd_server), 
hconn->raw_conn, hconn->cfg.host_port);
+    sys_mutex_unlock(qdr_http1_adaptor->lock);
+
+    qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
+           "[C%"PRIu64"] Connecting to HTTP server...", conn_id);
 }
 
 
@@ -469,7 +439,7 @@ static void _handle_connection_events(pn_event_t *e, 
qd_server_t *qd_server, voi
     switch (pn_event_type(e)) {
 
     case PN_RAW_CONNECTION_CONNECTED: {
-        hconn->server.reconnect_count = 0;
+        hconn->server.link_timeout = 0;
         _setup_server_links(hconn);
         while (qdr_connection_process(hconn->qdr_conn)) {}
         break;
@@ -493,11 +463,6 @@ static void _handle_connection_events(pn_event_t *e, 
qd_server_t *qd_server, voi
 
         qd_log(log, QD_LOG_INFO, "[C%"PRIu64"] Connection closed", 
hconn->conn_id);
 
-        // prevent core from activating raw conn since it will no longer exist
-        // on return from the handler
-        sys_mutex_lock(qdr_http1_adaptor->lock);
-        hconn->raw_conn = 0;
-        sys_mutex_unlock(qdr_http1_adaptor->lock);
 
         // if the current request was not completed, cancel it.  it's ok if
         // there are outstanding *response* deliveries in flight as long as the
@@ -509,22 +474,36 @@ static void _handle_connection_events(pn_event_t *e, 
qd_server_t *qd_server, voi
             _cancel_request(hreq);
         }
 
+        //
+        // reconnect to the server. Leave the links intact so pending requests
+        // are not aborted.  If we fail to reconnect after LINK_TIMEOUT_MSECS
+        // drop the links to prevent additional request from arriving.
+        //
+
+        bool reconnect = false;
         if (hconn->qdr_conn) {
-            //
-            // reconnect to the server. Leave the links intact so pending 
requests
-            // are not aborted.  Once we've failed to reconnect after 
MAX_RECONNECT
-            // tries drop the links to prevent additional request from 
arriving.
-            //
-            qd_duration_t nap_time = RETRY_PAUSE_MSEC * 
hconn->server.reconnect_count;
-            if (hconn->server.reconnect_count == MAX_RECONNECT) {
-                qd_log(log, QD_LOG_INFO, "[C%"PRIu64"] Server not responding - 
disconnecting...", hconn->conn_id);
-                _teardown_server_links(hconn);
+            if (hconn->server.link_timeout == 0) {
+                hconn->server.link_timeout = qd_timer_now() + 
LINK_TIMEOUT_MSEC;
+                hconn->server.reconnect_pause = 0;
             } else {
-                hconn->server.reconnect_count += 1;  // increase next sleep 
interval
+                if ((qd_timer_now() - hconn->server.link_timeout) >= 0)
+                    _teardown_server_links(hconn);
+                if (hconn->server.reconnect_pause < RETRY_MAX_PAUSE_MSEC)
+                    hconn->server.reconnect_pause += RETRY_PAUSE_MSEC;
             }
-            qd_timer_schedule(hconn->server.reconnect_timer, nap_time);
+            reconnect = true;
         }
-        break;
+
+        // prevent core activation
+        sys_mutex_lock(qdr_http1_adaptor->lock);
+        hconn->raw_conn = 0;
+        if (reconnect)
+            qd_timer_schedule(hconn->server.reconnect_timer, 
hconn->server.reconnect_pause);
+        sys_mutex_unlock(qdr_http1_adaptor->lock);
+
+        // do not manipulate hconn further as it may now be processed by the
+        // timer thread
+        return;
     }
     case PN_RAW_CONNECTION_NEED_WRITE_BUFFERS: {
         qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Need write buffers", 
hconn->conn_id);
@@ -1512,6 +1491,7 @@ void qdr_http1_server_core_conn_close(qdr_http1_adaptor_t 
*adaptor,
     qdr_connection_set_context(hconn->qdr_conn, 0);
     hconn->qdr_conn = 0;
     sys_mutex_unlock(qdr_http1_adaptor->lock);
+    // the core thread can no longer activate this connection
 
     qdr_connection_closed(qdr_conn);
     qdr_http1_close_connection(hconn, "Connection closed by management");
diff --git a/tests/system_tests_http1_adaptor.py 
b/tests/system_tests_http1_adaptor.py
index 35c2b91..f630050 100644
--- a/tests/system_tests_http1_adaptor.py
+++ b/tests/system_tests_http1_adaptor.py
@@ -1217,7 +1217,8 @@ class Http1AdaptorEdge2EdgeTest(TestCase):
                                     repeat=2)
         # the adaptor will detach the links to the server if the connection
         # cannot be reestablished after 2.5 seconds.  Restart the server before
-        # that occurrs to prevent client messages from being released
+        # that occurrs to prevent client messages from being released with 503
+        # status.
         server = TestServer(server_port=self.http_server11_port,
                             client_port=self.http_listener11_port,
                             tests=TESTS)
@@ -1268,7 +1269,9 @@ class Http1AdaptorEdge2EdgeTest(TestCase):
             ]
         }
 
-        # Kill the server then issue client requests, expect 503 response
+        # Kill the server then issue client requests. These requests will be
+        # held on the server's outgoing links until they expire (2.5 seconds).
+        # At that point the client will receive a 503 response.
         server.wait()
         client = ThreadedTestClient(TESTS_FAIL, self.http_listener11_port)
         client.wait()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to