This is an automated email from the ASF dual-hosted git repository. kgiusti pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/main by this push: new 24df7cb DISPATCH-2261: close dispatcher out link on connector delete 24df7cb is described below commit 24df7cbef5ca51f9210c7f0afa3e5ef27ef307db Author: Kenneth Giusti <kgiu...@apache.org> AuthorDate: Tue Oct 26 09:05:12 2021 -0400 DISPATCH-2261: close dispatcher out link on connector delete This closes #1398 --- src/adaptors/tcp_adaptor.c | 38 +++++--- tests/system_test.py | 2 +- tests/system_tests_tcp_adaptor.py | 190 +++++++++++++++++--------------------- 3 files changed, 111 insertions(+), 119 deletions(-) diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c index 3e71611..57ab8bc 100644 --- a/src/adaptors/tcp_adaptor.c +++ b/src/adaptors/tcp_adaptor.c @@ -140,6 +140,7 @@ static void handle_disconnected(qdr_tcp_connection_t* conn); static void free_qdr_tcp_connection(qdr_tcp_connection_t* conn); static void free_bridge_config(qd_tcp_bridge_t *config); static void qdr_tcp_open_server_side_connection(qdr_tcp_connection_t* tc); +static void detach_links(qdr_tcp_connection_t *tc); // is the incoming byte window full @@ -195,6 +196,7 @@ static void on_activate(void *context) qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] on_activate", conn->conn_id); while (qdr_connection_process(conn->qdr_conn)) {} if (conn->egress_dispatcher && conn->connector_closed) { + detach_links(conn); qdr_connection_set_context(conn->qdr_conn, 0); qdr_connection_closed(conn->qdr_conn); conn->qdr_conn = 0; @@ -473,25 +475,18 @@ static void handle_disconnected(qdr_tcp_connection_t* conn) qd_message_set_receive_complete(qdr_delivery_message(conn->instream)); qdr_delivery_continue(tcp_adaptor->core, conn->instream, true); qdr_delivery_decref(tcp_adaptor->core, conn->instream, "tcp-adaptor.handle_disconnected - instream"); + conn->instream = 0; } if (conn->outstream) { qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] handle_disconnected - close outstream", conn->conn_id, conn->outgoing_id); qdr_delivery_decref(tcp_adaptor->core, conn->outstream, "tcp-adaptor.handle_disconnected - outstream"); + conn->outstream = 0; } - if (conn->incoming) { - qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, - "[C%"PRIu64"][L%"PRIu64"] handle_disconnected - detach incoming", - conn->conn_id, conn->incoming_id); - qdr_link_detach(conn->incoming, QD_LOST, 0); - } - if (conn->outgoing) { - qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, - "[C%"PRIu64"][L%"PRIu64"] handle_disconnected - detach outgoing", - conn->conn_id, conn->outgoing_id); - qdr_link_detach(conn->outgoing, QD_LOST, 0); - } + + detach_links(conn); + if (conn->initial_delivery) { qdr_delivery_remote_state_updated(tcp_adaptor->core, conn->initial_delivery, PN_RELEASED, true, 0, false); qdr_delivery_decref(tcp_adaptor->core, conn->initial_delivery, "tcp-adaptor.handle_disconnected - initial_delivery"); @@ -2060,3 +2055,22 @@ static void qdr_del_tcp_connection_CT(qdr_core_t *core, qdr_action_t *action, bo free_qdr_tcp_connection(conn); } } + + +static void detach_links(qdr_tcp_connection_t *conn) +{ + if (conn->incoming) { + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, + "[C%"PRIu64"][L%"PRIu64"] detaching incoming link", + conn->conn_id, conn->incoming_id); + qdr_link_detach(conn->incoming, QD_LOST, 0); + conn->incoming = 0; + } + if (conn->outgoing) { + qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, + "[C%"PRIu64"][L%"PRIu64"] detaching outgoing link", + conn->conn_id, conn->outgoing_id); + qdr_link_detach(conn->outgoing, QD_LOST, 0); + conn->outgoing = 0; + } +} diff --git a/tests/system_test.py b/tests/system_test.py index bb2406e..f0e5ee6 100755 --- a/tests/system_test.py +++ b/tests/system_test.py @@ -665,7 +665,7 @@ class Qdrouterd(Process): def check(): addrs = self.management.query(a_type).get_dicts() - rc = [a for a in addrs if address in a['name']] + rc = [a for a in addrs if a['name'].endswith(address)] count = 0 for a in rc: count += a['subscriberCount'] diff --git a/tests/system_tests_tcp_adaptor.py b/tests/system_tests_tcp_adaptor.py index fb2e186..7db9510 100644 --- a/tests/system_tests_tcp_adaptor.py +++ b/tests/system_tests_tcp_adaptor.py @@ -20,6 +20,7 @@ import io import json import os +import socket import sys import time import traceback @@ -990,42 +991,67 @@ class TcpAdaptorManagementTest(TestCase): cls.test_name = 'TCPMgmtTest' - # Here we have a simple barebones standalone router config. - config = [ - ('router', {'mode': 'standalone', - 'id': cls.test_name}), + # create edge and interior routers. The listener/connector will be on + # the edge router. It is expected that the edge will create proxy + # links to the interior and remove them when the test is done. + + cls.interior_edge_port = cls.tester.get_port() + cls.interior_mgmt_port = cls.tester.get_port() + cls.edge_mgmt_port = cls.tester.get_port() + + cls.tcp_server_port = cls.tester.get_port() + cls.tcp_listener_port = cls.tester.get_port() + + i_config = [ + ('router', {'mode': 'interior', + 'id': 'TCPMgmtTestInterior'}), ('listener', {'role': 'normal', - 'port': cls.tester.get_port()}), + 'port': cls.interior_mgmt_port}), + ('listener', {'role': 'edge', 'port': cls.interior_edge_port}), + ('address', {'prefix': 'closest', 'distribution': 'closest'}), + ('address', {'prefix': 'multicast', 'distribution': 'multicast'}), ] - config = Qdrouterd.Config(config) - cls.router = cls.tester.qdrouterd(cls.test_name, config, wait=True) + config = Qdrouterd.Config(i_config) + cls.i_router = cls.tester.qdrouterd('TCPMgmtTestInterior', config, wait=False) - # Start the echo server. This is the server that the tcpConnector - # will be connecting to. - server_prefix = "ECHO_SERVER ES_%s" % cls.test_name - parent_path = os.path.dirname(os.getcwd()) - cls.logger = Logger(title="TcpAdaptor", - print_to_console=True, - save_for_dump=False, - ofilename=os.path.join(parent_path, "setUpClass/TcpAdaptor_echo_server.log")) - cls.echo_server = TcpEchoServer(prefix=server_prefix, - port=0, - logger=cls.logger) - # The router and the echo server are running at this point. - assert cls.echo_server.is_running - - cls.tcp_server_port = cls.echo_server.port - cls.tcp_listener_port = cls.tester.get_port() + e_config = [ + ('router', {'mode': 'edge', + 'id': 'TCPMgmtTestEdge'}), + ('listener', {'role': 'normal', + 'port': cls.edge_mgmt_port}), + ('connector', {'name': 'edge', 'role': 'edge', + 'port': cls.interior_edge_port}), + ('address', {'prefix': 'closest', 'distribution': 'closest'}), + ('address', {'prefix': 'multicast', 'distribution': 'multicast'}), + ] + config = Qdrouterd.Config(e_config) + cls.e_router = cls.tester.qdrouterd('TCPMgmtTestEdge', config, + wait=False) + + cls.i_router.wait_ready() + cls.e_router.wait_ready() + + def _query_links_by_addr(self, router_mgmt, owning_addr): + oid = 'org.apache.qpid.dispatch.router.link' + attrs = ['owningAddr', 'linkDir'] + + links = [] + rc = router_mgmt.query(type=oid, attribute_names=attrs).results + for link in rc: + if link[0] is not None and link[0].endswith(owning_addr): + links.append(link) + return links @unittest.skipIf(DISABLE_SELECTOR_TESTS, DISABLE_SELECTOR_REASON) def test_01_mgmt(self): """ - Create and delete TCP connectors and listeners + Create and delete TCP connectors and listeners. Ensure that the service + address is properly removed on the interior router. """ LISTENER_TYPE = 'org.apache.qpid.dispatch.tcpListener' CONNECTOR_TYPE = 'org.apache.qpid.dispatch.tcpConnector' - mgmt = self.router.management + mgmt = self.e_router.management # When starting out, there should be no tcpListeners or tcpConnectors. self.assertEqual(0, len(mgmt.query(type=LISTENER_TYPE).results)) @@ -1049,93 +1075,45 @@ class TcpAdaptorManagementTest(TestCase): self.assertEqual(1, len(mgmt.query(type=LISTENER_TYPE).results)) self.assertEqual(1, len(mgmt.query(type=CONNECTOR_TYPE).results)) - # Give a second for the tcpListener to start listening. - time.sleep(1) - # Start the echo client runner - client_runner_timeout = 3 - runner = EchoClientRunner(self.test_name, 1, self.logger, - None, None, 100, 1, - timeout=client_runner_timeout, - port_override=self.tcp_listener_port) - result = None + # now verify that the interior router sees the service address + # and two proxy links are created + self.i_router.wait_address(self.test_name, subscribers=1) + while True: + links = self._query_links_by_addr(self.i_router.management, + self.test_name) + if links: + # expect a single consumer link that represents + # the connector + self.assertEqual(1, len(links)) + self.assertEqual("out", links[0][1]) + break + time.sleep(0.25) - # Give some time for the client runner to finish up. - time.sleep(client_runner_timeout + 1) - - # Make sure servers are still up - if self.echo_server.error: - self.logger.log( - "TCP_TEST %s Server %s stopped with error: %s" % - (self.test_name, self.echo_server.prefix, - self.echo_server.error)) - result = self.echo_server.error - - if self.echo_server.exit_status: - self.logger.log( - "TCP_TEST %s Server %s stopped with status: %s" % - (self.test_name, self.echo_server.prefix, self.echo_server.exit_status)) - result = self.echo_server.exit_status - - self.assertIsNone(result) - - error = runner.client_error() - if error is not None: - self.logger.log("TCP_TEST %s Client %s stopped with error: %s" % - (self.test_name, runner.name, error)) - - self.assertIsNone(error) - status = runner.client_exit_status() - if status is not None: - self.logger.log("TCP_TEST %s Client %s stopped with status: %s" % - (self.test_name, runner.name, status)) - self.assertIsNone(status) - self.assertFalse(runner.client_running()) - - # Delete the connector and make sure the echo client fails. + # Delete the connector and listener out = mgmt.delete(type=CONNECTOR_TYPE, name=connector_name) self.assertIsNone(out) - - # Give some time for the connector to be deleted by the router. - # Deleting a connector also involves deleting existing connections - # that were made using the details from the connector. - # In this case, the router would have to drop the connection it - # already made to the echo server, so let's give it some time to - # do that. - time.sleep(2) - - client_runner_timeout = 2 - # Start the echo client runner - runner = EchoClientRunner(self.test_name, 1, self.logger, - None, None, 100, 1, - # Try for 2 seconds before timing out - timeout=client_runner_timeout, - port_override=self.tcp_listener_port) - time.sleep(client_runner_timeout + 1) - exit_status = runner.client_exit_status() - - if exit_status is not None: - # The test is a success, the echo client sender timed out - # because it did not receive anything back from the - # echo server because the connector to the echo server - # got deleted - self.logger.log("TCP_TEST %s Client %s timedout with error: %s" % - (self.test_name, runner.name, exit_status)) - else: - self.logger.log("ERROR: Connector not deleted") - self.assertIsNotNone(exit_status) - - # Now delete the tcpListener + self.assertEqual(0, len(mgmt.query(type=CONNECTOR_TYPE).results)) out = mgmt.delete(type=LISTENER_TYPE, name=listener_name) self.assertIsNone(out) + self.assertEqual(0, len(mgmt.query(type=LISTENER_TYPE).results)) - runner = EchoClientRunner(self.test_name, 1, self.logger, - None, None, 100, 1, - # Try for 2 seconds before timing out - timeout=client_runner_timeout, - port_override=self.tcp_listener_port) - time.sleep(client_runner_timeout + 1) - error = runner.client_error() - self.assertIn("ConnectionRefusedError", error) + # verify the service address and proxy links are no longer active on + # the interior router + self.i_router.wait_address_unsubscribed(self.test_name) + while True: + links = self._query_links_by_addr(self.i_router.management, + self.test_name) + if len(links) == 0: + break + time.sleep(0.25) + + # verify that clients can no longer connect to the listener + client_conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + client_conn.setblocking(True) + client_conn.settimeout(5) + with self.assertRaises(ConnectionRefusedError): + client_conn.connect(('127.0.0.1', self.tcp_listener_port)) + client_conn.close() if __name__ == '__main__': --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org