This is an automated email from the ASF dual-hosted git repository. kgiusti pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/main by this push: new 304f9b4 DISPATCH-2260: HTTP/1.x: fix deletion of httpConnector and httpListener 304f9b4 is described below commit 304f9b419b5274ee7362ce66027b8a753097436d Author: Kenneth Giusti <kgiu...@apache.org> AuthorDate: Mon Oct 25 15:32:43 2021 -0400 DISPATCH-2260: HTTP/1.x: fix deletion of httpConnector and httpListener This closes #1394 --- include/qpid/dispatch/protocol_adaptor.h | 12 ++ src/adaptors/http1/http1_adaptor.c | 19 +-- src/adaptors/http1/http1_client.c | 26 +++- src/adaptors/http1/http1_private.h | 6 +- src/adaptors/http1/http1_server.c | 73 +++++++----- src/router_core/connections.c | 6 +- src/router_core/router_core_private.h | 15 +-- tests/system_tests_http1_adaptor.py | 197 ++++++++++++++++++++++--------- 8 files changed, 237 insertions(+), 117 deletions(-) diff --git a/include/qpid/dispatch/protocol_adaptor.h b/include/qpid/dispatch/protocol_adaptor.h index 654fe74..00551c2 100644 --- a/include/qpid/dispatch/protocol_adaptor.h +++ b/include/qpid/dispatch/protocol_adaptor.h @@ -323,6 +323,18 @@ void qdr_protocol_adaptor_free(qdr_core_t *core, qdr_protocol_adaptor_t *adaptor */ typedef enum { + QD_CONN_OPER_UP, + QD_CONN_OPER_DOWN, +} qd_conn_oper_status_t; + + +typedef enum { + QD_CONN_ADMIN_ENABLED, + QD_CONN_ADMIN_DELETED +} qd_conn_admin_status_t; + + +typedef enum { QD_LINK_ENDPOINT, ///< A link to a connected endpoint QD_LINK_CONTROL, ///< A link to a peer router for control messages QD_LINK_ROUTER, ///< A link to a peer router for routed messages diff --git a/src/adaptors/http1/http1_adaptor.c b/src/adaptors/http1/http1_adaptor.c index 64b2a17..3828e89 100644 --- a/src/adaptors/http1/http1_adaptor.c +++ b/src/adaptors/http1/http1_adaptor.c @@ -405,7 +405,8 @@ void qdr_http1_q2_unblocked_handler(const qd_alloc_safe_ptr_t context) // -// Invoked by the core thread to wake an I/O thread for the connection +// Invoked by the core/mgmt thread to wake an I/O thread for the connection. +// Must be thread safe. // static void _core_connection_activate_CT(void *context, qdr_connection_t *conn) { @@ -670,21 +671,23 @@ static void qd_http1_adaptor_final(void *adaptor_context) qdr_http1_adaptor_t *adaptor = (qdr_http1_adaptor_t*) adaptor_context; qdr_protocol_adaptor_free(adaptor->core, adaptor->adaptor); + qdr_http1_connection_t *hconn = DEQ_HEAD(adaptor->connections); + while (hconn) { + qdr_http1_connection_free(hconn); + hconn = DEQ_HEAD(adaptor->connections); + } qd_http_listener_t *li = DEQ_HEAD(adaptor->listeners); while (li) { - qd_http1_delete_listener(0, li); + DEQ_REMOVE_HEAD(qdr_http1_adaptor->listeners); + qd_http_listener_decref(li); li = DEQ_HEAD(adaptor->listeners); } qd_http_connector_t *ct = DEQ_HEAD(adaptor->connectors); while (ct) { - qd_http1_delete_connector(0, ct); + DEQ_REMOVE_HEAD(qdr_http1_adaptor->connectors); + qd_http_connector_decref(ct); ct = DEQ_HEAD(adaptor->connectors); } - qdr_http1_connection_t *hconn = DEQ_HEAD(adaptor->connections); - while (hconn) { - qdr_http1_connection_free(hconn); - hconn = DEQ_HEAD(adaptor->connections); - } sys_mutex_free(adaptor->lock); qdr_http1_adaptor = NULL; diff --git a/src/adaptors/http1/http1_client.c b/src/adaptors/http1/http1_client.c index 9a6b845..fa73560 100644 --- a/src/adaptors/http1/http1_client.c +++ b/src/adaptors/http1/http1_client.c @@ -132,6 +132,8 @@ static qdr_http1_connection_t *_create_client_connection(qd_http_listener_t *li) ZERO(hconn); hconn->type = HTTP1_CONN_CLIENT; + hconn->admin_status = QD_CONN_ADMIN_ENABLED; + hconn->oper_status = QD_CONN_OPER_DOWN; hconn->qd_server = li->server; hconn->adaptor = qdr_http1_adaptor; hconn->handler_context.handler = &_handle_connection_events; @@ -219,8 +221,14 @@ static void _handle_listener_events(pn_event_t *e, qd_server_t *qd_server, void } else { qd_log(log, QD_LOG_TRACE, "Listener closed on %s", host_port); } + + sys_mutex_lock(qdr_http1_adaptor->lock); pn_listener_set_context(li->pn_listener, 0); li->pn_listener = 0; + DEQ_REMOVE(qdr_http1_adaptor->listeners, li); + sys_mutex_unlock(qdr_http1_adaptor->lock); + + qd_http_listener_decref(li); } break; } @@ -233,6 +241,9 @@ static void _handle_listener_events(pn_event_t *e, qd_server_t *qd_server, void // Management Agent API - Create // +// Note that this runs on the Management Agent thread, which may be running concurrently with the +// I/O and timer threads. +// qd_http_listener_t *qd_http1_configure_listener(qd_dispatch_t *qd, const qd_http_bridge_config_t *config, qd_entity_t *entity) { qd_http_listener_t *li = qd_http_listener(qd->server, &_handle_listener_events); @@ -256,19 +267,20 @@ qd_http_listener_t *qd_http1_configure_listener(qd_dispatch_t *qd, const qd_http // Management Agent API - Delete // +// Note that this runs on the Management Agent thread, which may be running concurrently with the +// I/O and timer threads. +// void qd_http1_delete_listener(qd_dispatch_t *ignore, qd_http_listener_t *li) { if (li) { + qd_log(qdr_http1_adaptor->log, QD_LOG_INFO, "Deleting HttpListener for %s, %s:%s", li->config.address, li->config.host, li->config.port); + sys_mutex_lock(qdr_http1_adaptor->lock); if (li->pn_listener) { + // note that the proactor may immediately schedule the + // PN_LISTENER_CLOSED event on another thread... pn_listener_close(li->pn_listener); - li->pn_listener = 0; } - sys_mutex_lock(qdr_http1_adaptor->lock); - DEQ_REMOVE(qdr_http1_adaptor->listeners, li); sys_mutex_unlock(qdr_http1_adaptor->lock); - - qd_log(qdr_http1_adaptor->log, QD_LOG_INFO, "Deleted HttpListener for %s, %s:%s", li->config.address, li->config.host, li->config.port); - qd_http_listener_decref(li); } } @@ -317,6 +329,7 @@ static void _setup_client_connection(qdr_http1_connection_t *hconn) 0, // bind context 0); // bind token qdr_connection_set_context(hconn->qdr_conn, hconn); + hconn->oper_status = QD_CONN_OPER_UP; qd_log(hconn->adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"] HTTP connection to client created", hconn->conn_id); @@ -460,6 +473,7 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi sys_mutex_unlock(qdr_http1_adaptor->lock); // at this point the core can no longer activate this connection + hconn->oper_status = QD_CONN_OPER_DOWN; if (hconn->out_link) { qdr_link_set_context(hconn->out_link, 0); qdr_link_detach(hconn->out_link, QD_LOST, 0); diff --git a/src/adaptors/http1/http1_private.h b/src/adaptors/http1/http1_private.h index 8296633..1d4a987 100644 --- a/src/adaptors/http1/http1_private.h +++ b/src/adaptors/http1/http1_private.h @@ -105,6 +105,7 @@ struct qdr_http1_request_base_t { }; DEQ_DECLARE(qdr_http1_request_base_t, qdr_http1_request_list_t); + // A single HTTP adaptor connection. // struct qdr_http1_connection_t { @@ -118,6 +119,8 @@ struct qdr_http1_connection_t { uint64_t conn_id; qd_handler_context_t handler_context; h1_codec_connection_type_t type; + qd_conn_admin_status_t admin_status; + qd_conn_oper_status_t oper_status; struct { char *host; @@ -194,9 +197,6 @@ ALLOC_DECLARE(qdr_http1_connection_t); // http1_adaptor.c // -//int qdr_http1_write_out_data(qdr_http1_connection_t *hconn); -//void qdr_http1_write_buffer_list(qdr_http1_request_t *hreq, qd_buffer_list_t *blist); - void qdr_http1_free_written_buffers(qdr_http1_connection_t *hconn); void qdr_http1_enqueue_buffer_list(qdr_http1_out_data_list_t *fifo, qd_buffer_list_t *blist, uintmax_t octets); void qdr_http1_enqueue_stream_data(qdr_http1_out_data_list_t *fifo, qd_message_stream_data_t *stream_data); diff --git a/src/adaptors/http1/http1_server.c b/src/adaptors/http1/http1_server.c index c606a46..a2e93bd 100644 --- a/src/adaptors/http1/http1_server.c +++ b/src/adaptors/http1/http1_server.c @@ -152,6 +152,8 @@ static qdr_http1_connection_t *_create_server_connection(qd_http_connector_t *ct ZERO(hconn); hconn->type = HTTP1_CONN_SERVER; + hconn->admin_status = QD_CONN_ADMIN_ENABLED; + hconn->oper_status = QD_CONN_OPER_UP; hconn->qd_server = qd->server; hconn->adaptor = qdr_http1_adaptor; hconn->handler_context.handler = &_handle_connection_events; @@ -216,6 +218,8 @@ static qdr_http1_connection_t *_create_server_connection(qd_http_connector_t *ct // Management Agent API - Create // +// Note that this runs on the Management Agent thread, which may be running concurrently with the +// I/O and timer threads. qd_http_connector_t *qd_http1_configure_connector(qd_dispatch_t *qd, const qd_http_bridge_config_t *config, qd_entity_t *entity) { qd_http_connector_t *c = qd_http_connector(qd->server); @@ -257,6 +261,8 @@ qd_http_connector_t *qd_http1_configure_connector(qd_dispatch_t *qd, const qd_ht // Management Agent API - Delete // +// Note that this runs on the Management Agent thread, which may be running concurrently with the +// I/O and timer threads. void qd_http1_delete_connector(qd_dispatch_t *ignored, qd_http_connector_t *ct) { if (ct) { @@ -265,15 +271,17 @@ void qd_http1_delete_connector(qd_dispatch_t *ignored, qd_http_connector_t *ct) sys_mutex_lock(qdr_http1_adaptor->lock); DEQ_REMOVE(qdr_http1_adaptor->connectors, ct); qdr_http1_connection_t *hconn = (qdr_http1_connection_t*) ct->ctx; + qdr_connection_t *qdr_conn = 0; if (hconn) { + hconn->admin_status = QD_CONN_ADMIN_DELETED; hconn->server.connector = 0; ct->ctx = 0; - if (hconn->qdr_conn) - // have the core close this connection - qdr_core_close_connection(hconn->qdr_conn); + qdr_conn = hconn->qdr_conn; } sys_mutex_unlock(qdr_http1_adaptor->lock); + if (qdr_conn) + qdr_core_close_connection(qdr_conn); qd_http_connector_decref(ct); } } @@ -435,25 +443,28 @@ static void _do_reconnect(void *context) _process_request((_server_request_t*) DEQ_HEAD(hconn->requests)); - // Do not attempt to re-connect if the current request is still in - // progress. This happens when the server has closed the connection before - // the request message has fully arrived (!rx_complete). - // qdr_connection_process() will continue to invoke the - // qdr_http1_server_core_link_deliver callback until the request message is - // complete. - - // false positive: head request is removed before it is freed, null is passed - /* coverity[pass_freed_arg] */ - if (!_is_request_in_progress((_server_request_t*) DEQ_HEAD(hconn->requests))) { - qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG, - "[C%"PRIu64"] Connecting to HTTP server...", conn_id); - sys_mutex_lock(qdr_http1_adaptor->lock); - hconn->raw_conn = pn_raw_connection(); - pn_raw_connection_set_context(hconn->raw_conn, &hconn->handler_context); - // this next call may immediately reschedule the connection on another I/O - // thread. After this call hconn may no longer be valid! - pn_proactor_raw_connect(qd_server_proactor(hconn->qd_server), hconn->raw_conn, hconn->cfg.host_port); - sys_mutex_unlock(qdr_http1_adaptor->lock); + if (hconn->admin_status == QD_CONN_ADMIN_ENABLED) { + + // Do not attempt to re-connect if the current request is still in + // progress. This happens when the server has closed the connection before + // the request message has fully arrived (!rx_complete). + // qdr_connection_process() will continue to invoke the + // qdr_http1_server_core_link_deliver callback until the request message is + // complete. + + // false positive: head request is removed before it is freed, null is passed + /* coverity[pass_freed_arg] */ + if (!_is_request_in_progress((_server_request_t*) DEQ_HEAD(hconn->requests))) { + qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG, + "[C%"PRIu64"] Connecting to HTTP server...", conn_id); + sys_mutex_lock(qdr_http1_adaptor->lock); + hconn->raw_conn = pn_raw_connection(); + pn_raw_connection_set_context(hconn->raw_conn, &hconn->handler_context); + // this next call may immediately reschedule the connection on another I/O + // thread. After this call hconn may no longer be valid! + pn_proactor_raw_connect(qd_server_proactor(hconn->qd_server), hconn->raw_conn, hconn->cfg.host_port); + sys_mutex_unlock(qdr_http1_adaptor->lock); + } } } @@ -584,7 +595,7 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi // bool reconnect = false; - if (hconn->qdr_conn) { + if (hconn->admin_status == QD_CONN_ADMIN_ENABLED && hconn->qdr_conn) { if (hconn->server.link_timeout == 0) { hconn->server.link_timeout = qd_timer_now() + LINK_TIMEOUT_MSEC; hconn->server.reconnect_pause = 0; @@ -600,13 +611,15 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi // prevent core activation sys_mutex_lock(qdr_http1_adaptor->lock); hconn->raw_conn = 0; - if (reconnect && hconn->server.reconnect_timer) + if (reconnect && hconn->server.reconnect_timer) { qd_timer_schedule(hconn->server.reconnect_timer, hconn->server.reconnect_pause); + sys_mutex_unlock(qdr_http1_adaptor->lock); + // do not manipulate hconn further as it may now be processed by the + // timer thread + return; + } sys_mutex_unlock(qdr_http1_adaptor->lock); - - // do not manipulate hconn further as it may now be processed by the - // timer thread - return; + break; } case PN_RAW_CONNECTION_NEED_WRITE_BUFFERS: { _send_request_message((_server_request_t*) DEQ_HEAD(hconn->requests)); @@ -1722,8 +1735,10 @@ void qdr_http1_server_core_conn_close(qdr_http1_adaptor_t *adaptor, sys_mutex_unlock(qdr_http1_adaptor->lock); // the core thread can no longer activate this connection + hconn->oper_status = QD_CONN_OPER_DOWN; + _teardown_server_links(hconn); qdr_connection_closed(qdr_conn); - qdr_http1_close_connection(hconn, "Connection closed by management"); + qdr_http1_close_connection(hconn, error); // it is expected that this callback is the final callback before returning // from qdr_connection_process(). Free hconn when qdr_connection_process returns. diff --git a/src/router_core/connections.c b/src/router_core/connections.c index 12812ad..840b82f 100644 --- a/src/router_core/connections.c +++ b/src/router_core/connections.c @@ -102,8 +102,8 @@ qdr_connection_t *qdr_connection_opened(qdr_core_t *core, conn->policy_spec = policy_spec; conn->link_capacity = link_capacity; conn->mask_bit = -1; - conn->admin_status = QDR_CONN_ADMIN_ENABLED; - conn->oper_status = QDR_CONN_OPER_UP; + conn->admin_status = QD_CONN_ADMIN_ENABLED; + conn->oper_status = QD_CONN_OPER_UP; DEQ_INIT(conn->links); DEQ_INIT(conn->work_list); DEQ_INIT(conn->streaming_link_pool); @@ -279,7 +279,7 @@ void qdr_close_connection_CT(qdr_core_t *core, qdr_connection_t *conn) { conn->closed = true; conn->error = qdr_error(QD_AMQP_COND_CONNECTION_FORCED, "Connection forced-closed by management request"); - conn->admin_status = QDR_CONN_ADMIN_DELETED; + conn->admin_status = QD_CONN_ADMIN_DELETED; //Activate the connection, so the I/O threads can finish the job. qdr_connection_activate_CT(core, conn); diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h index 219290c..7fb5dc9 100644 --- a/src/router_core/router_core_private.h +++ b/src/router_core/router_core_private.h @@ -654,17 +654,6 @@ ALLOC_DECLARE(qdr_connection_info_t); DEQ_DECLARE(qdr_link_route_t, qdr_link_route_list_t); -typedef enum { - QDR_CONN_OPER_UP, -} qdr_conn_oper_status_t; - - -typedef enum { - QDR_CONN_ADMIN_ENABLED, - QDR_CONN_ADMIN_DELETED -} qdr_conn_admin_status_t; - - struct qdr_connection_t { DEQ_LINKS(qdr_connection_t); DEQ_LINKS_N(ACTIVATE, qdr_connection_t); @@ -692,8 +681,8 @@ struct qdr_connection_t { qdr_connection_info_t *connection_info; void *user_context; /* Updated from IO thread, use work_lock */ qdr_link_route_list_t conn_link_routes; // connection scoped link routes - qdr_conn_oper_status_t oper_status; - qdr_conn_admin_status_t admin_status; + qd_conn_oper_status_t oper_status; + qd_conn_admin_status_t admin_status; qdr_error_t *error; uint32_t conn_uptime; // Timestamp which can be used to calculate the number of seconds this connection has been up and running. uint32_t last_delivery_time; // Timestamp which can be used to calculate the number of seconds since the last delivery arrived on this connection. diff --git a/tests/system_tests_http1_adaptor.py b/tests/system_tests_http1_adaptor.py index 2f2117b..986a265 100644 --- a/tests/system_tests_http1_adaptor.py +++ b/tests/system_tests_http1_adaptor.py @@ -43,85 +43,106 @@ from http1_tests import Http1CurlTestsMixIn class Http1AdaptorManagementTest(TestCase): """ - Test Creation and deletion of HTTP1 management entities + Test Creation and deletion of HTTP1 management entities. """ @classmethod def setUpClass(cls): super(Http1AdaptorManagementTest, cls).setUpClass() + cls.LISTENER_TYPE = 'org.apache.qpid.dispatch.httpListener' + cls.CONNECTOR_TYPE = 'org.apache.qpid.dispatch.httpConnector' + cls.CONNECTION_TYPE = 'org.apache.qpid.dispatch.connection' + + cls.interior_edge_port = cls.tester.get_port() + cls.interior_mgmt_port = cls.tester.get_port() + cls.edge_mgmt_port = cls.tester.get_port() + cls.http_server_port = cls.tester.get_port() cls.http_listener_port = cls.tester.get_port() - config = [ - ('router', {'mode': 'standalone', - 'id': 'HTTP1MgmtTest', - 'allowUnsettledMulticast': 'yes'}), + i_config = [ + ('router', {'mode': 'interior', + 'id': 'HTTP1MgmtTestInterior'}), ('listener', {'role': 'normal', - 'port': cls.tester.get_port()}), + 'port': cls.interior_mgmt_port}), + ('listener', {'role': 'edge', 'port': cls.interior_edge_port}), ('address', {'prefix': 'closest', 'distribution': 'closest'}), ('address', {'prefix': 'multicast', 'distribution': 'multicast'}), ] + config = Qdrouterd.Config(i_config) + cls.i_router = cls.tester.qdrouterd('HTTP1MgmtTestInterior', config, wait=False) - config = Qdrouterd.Config(config) - cls.router = cls.tester.qdrouterd('HTTP1MgmtTest', config, wait=True) - - def test_01_mgmt(self): - """ - Create and delete HTTP1 connectors and listeners - """ - LISTENER_TYPE = 'org.apache.qpid.dispatch.httpListener' - CONNECTOR_TYPE = 'org.apache.qpid.dispatch.httpConnector' - CONNECTION_TYPE = 'org.apache.qpid.dispatch.connection' - - mgmt = self.router.management - self.assertEqual(0, len(mgmt.query(type=LISTENER_TYPE).results)) - self.assertEqual(0, len(mgmt.query(type=CONNECTOR_TYPE).results)) - - mgmt.create(type=CONNECTOR_TYPE, - name="ServerConnector", - attributes={'address': 'http1', - 'port': self.http_server_port, - 'protocolVersion': 'HTTP1'}) - - mgmt.create(type=LISTENER_TYPE, - name="ClientListener", - attributes={'address': 'http1', - 'port': self.http_listener_port, - 'protocolVersion': 'HTTP1'}) + e_config = [ + ('router', {'mode': 'edge', + 'id': 'HTTP1MgmtTestEdge'}), + ('listener', {'role': 'normal', + 'port': cls.edge_mgmt_port}), + ('connector', {'name': 'edge', 'role': 'edge', + 'port': cls.interior_edge_port}), + ('address', {'prefix': 'closest', 'distribution': 'closest'}), + ('address', {'prefix': 'multicast', 'distribution': 'multicast'}), + ] + config = Qdrouterd.Config(e_config) + cls.e_router = cls.tester.qdrouterd('HTTP1MgmtTestEdge', config, + wait=False) + + cls.i_router.wait_ready() + cls.e_router.wait_ready() + + def test_01_create_delete(self): + """ Create and delete HTTP1 connectors and listeners. The + connectors/listeners are created on the edge router. Verify that the + adaptor properly notifies the interior of the subscribers/producers. + """ + e_mgmt = self.e_router.management + self.assertEqual(0, len(e_mgmt.query(type=self.LISTENER_TYPE).results)) + self.assertEqual(0, len(e_mgmt.query(type=self.CONNECTOR_TYPE).results)) + + e_mgmt.create(type=self.CONNECTOR_TYPE, + name="ServerConnector", + attributes={'address': 'closest/http1Service', + 'port': self.http_server_port, + 'protocolVersion': 'HTTP1'}) + + e_mgmt.create(type=self.LISTENER_TYPE, + name="ClientListener", + attributes={'address': 'closest/http1Service', + 'port': self.http_listener_port, + 'protocolVersion': 'HTTP1'}) # verify the entities have been created and http traffic works - self.assertEqual(1, len(mgmt.query(type=LISTENER_TYPE).results)) - self.assertEqual(1, len(mgmt.query(type=CONNECTOR_TYPE).results)) + self.assertEqual(1, len(e_mgmt.query(type=self.LISTENER_TYPE).results)) + self.assertEqual(1, len(e_mgmt.query(type=self.CONNECTOR_TYPE).results)) count, error = http1_ping(sport=self.http_server_port, cport=self.http_listener_port) self.assertIsNone(error) self.assertEqual(1, count) + # now check the interior router for the closest/http1Service address + self.i_router.wait_address("closest/http1Service", subscribers=1) + # - # delete the connector and wait for the associated connection to be - # removed + # delete the connector and listener; wait for the associated connection + # to be removed # + e_mgmt.delete(type=self.CONNECTOR_TYPE, name="ServerConnector") + self.assertEqual(0, len(e_mgmt.query(type=self.CONNECTOR_TYPE).results)) + e_mgmt.delete(type=self.LISTENER_TYPE, name="ClientListener") + self.assertEqual(0, len(e_mgmt.query(type=self.LISTENER_TYPE).results)) - mgmt.delete(type=CONNECTOR_TYPE, name="ServerConnector") - self.assertEqual(0, len(mgmt.query(type=CONNECTOR_TYPE).results)) - - retry = 20 # 20 * 0.25 = 5 sec - hconns = 0 - while retry: - obj = mgmt.query(type=CONNECTION_TYPE, - attribute_names=["protocol"]) + # will hit test timeout on failure: + while True: + hconns = 0 + obj = e_mgmt.query(type=self.CONNECTION_TYPE, + attribute_names=["protocol"]) for item in obj.get_dicts(): if "http/1.x" in item["protocol"]: hconns += 1 if hconns == 0: break sleep(0.25) - retry -= 1 - hconns = 0 - - self.assertEqual(0, hconns, msg="HTTP connection not deleted") # When a connector is configured the router will periodically attempt # to connect to the server address. To prove that the connector has @@ -137,22 +158,88 @@ class Http1AdaptorManagementTest(TestCase): conn, addr = s.accept() s.close() + # Verify that the address is no longer bound on the interior + self.i_router.wait_address_unsubscribed("closest/http1Service") + # - # re-create the connector and verify it works + # re-create the connector and listener; verify it works # - mgmt.create(type=CONNECTOR_TYPE, - name="ServerConnector", - attributes={'address': 'http1', - 'port': self.http_server_port, - 'protocolVersion': 'HTTP1'}) + e_mgmt.create(type=self.CONNECTOR_TYPE, + name="ServerConnector", + attributes={'address': 'closest/http1Service', + 'port': self.http_server_port, + 'protocolVersion': 'HTTP1'}) + + e_mgmt.create(type=self.LISTENER_TYPE, + name="ClientListener", + attributes={'address': 'closest/http1Service', + 'port': self.http_listener_port, + 'protocolVersion': 'HTTP1'}) - self.assertEqual(1, len(mgmt.query(type=CONNECTOR_TYPE).results)) + self.assertEqual(1, len(e_mgmt.query(type=self.LISTENER_TYPE).results)) + self.assertEqual(1, len(e_mgmt.query(type=self.CONNECTOR_TYPE).results)) count, error = http1_ping(sport=self.http_server_port, cport=self.http_listener_port) self.assertIsNone(error) self.assertEqual(1, count) + self.i_router.wait_address("closest/http1Service", subscribers=1) + + e_mgmt.delete(type=self.CONNECTOR_TYPE, name="ServerConnector") + self.assertEqual(0, len(e_mgmt.query(type=self.CONNECTOR_TYPE).results)) + e_mgmt.delete(type=self.LISTENER_TYPE, name="ClientListener") + self.assertEqual(0, len(e_mgmt.query(type=self.LISTENER_TYPE).results)) + + def test_01_delete_active_connector(self): + """Delete an HTTP1 connector that is currently connected to a server. + Verify the connection is dropped. + """ + e_mgmt = self.e_router.management + self.assertEqual(0, len(e_mgmt.query(type=self.CONNECTOR_TYPE).results)) + + e_mgmt.create(type=self.CONNECTOR_TYPE, + name="ServerConnector", + attributes={'address': 'closest/http1Service', + 'port': self.http_server_port, + 'protocolVersion': 'HTTP1'}) + + # verify the connector has been created and attach a dummy server + self.assertEqual(1, len(e_mgmt.query(type=self.CONNECTOR_TYPE).results)) + + server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + server.bind(("", self.http_server_port)) + server.setblocking(True) + server.settimeout(5) + server.listen(1) + conn, _ = server.accept() + server.close() + + # now check the interior router for the closest/http1Service address + self.i_router.wait_address("closest/http1Service", subscribers=1) + + # delete the connector + e_mgmt.delete(type=self.CONNECTOR_TYPE, name="ServerConnector") + self.assertEqual(0, len(e_mgmt.query(type=self.CONNECTOR_TYPE).results)) + + # expect socket to close + while True: + try: + rd, _, _ = select.select([conn], [], []) + except select.error as serror: + if serror[0] == errno.EINTR: + print("ignoring interrupt from select(): %s" % str(serror)) + continue + raise # assuming fatal... + if len(conn.recv(10)) == 0: + break + + conn.close() + + # Verify that the address is no longer bound on the interior + self.i_router.wait_address_unsubscribed("closest/http1Service") + class Http1AdaptorOneRouterTest(Http1OneRouterTestBase, CommonHttp1OneRouterTest): --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org