This is an automated email from the ASF dual-hosted git repository.
gmurthy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/main by this push:
new 393cb70 DISPATCH-2026: Add a system test to create and delete
tcpListener and tcpConnector via management. This closes #1091.
393cb70 is described below
commit 393cb70db18333d13169890c489400b2c6e5027d
Author: Ganesh Murthy <[email protected]>
AuthorDate: Tue Mar 30 15:45:23 2021 -0400
DISPATCH-2026: Add a system test to create and delete tcpListener and
tcpConnector via management. This closes #1091.
---
tests/TCP_echo_client.py | 1 +
tests/system_tests_tcp_adaptor.py | 356 ++++++++++++++++++++++++++++----------
2 files changed, 262 insertions(+), 95 deletions(-)
diff --git a/tests/TCP_echo_client.py b/tests/TCP_echo_client.py
index f9bbf24..f517cb5 100755
--- a/tests/TCP_echo_client.py
+++ b/tests/TCP_echo_client.py
@@ -215,6 +215,7 @@ class TcpEchoClient:
except Exception:
self.error = "ERROR: exception : '%s'" % traceback.format_exc()
+ self.sock.close()
self.is_running = False
diff --git a/tests/system_tests_tcp_adaptor.py
b/tests/system_tests_tcp_adaptor.py
index 277cbbe..d5d0bac 100644
--- a/tests/system_tests_tcp_adaptor.py
+++ b/tests/system_tests_tcp_adaptor.py
@@ -88,6 +88,98 @@ Q2_DELAY_SECONDS = 1.0
# so that it stops reading from the TcpConnector and Q2 finally kicks in.
Q2_TEST_MESSAGE_SIZE = 10000000
+# local timeout in seconds to wait for one echo client to finish
+echo_timeout = 30
+
+
+#
+# Test concurrent clients
+#
+class EchoClientRunner():
+ """
+ Launch an echo client upon construction.
+ Provide poll interface for checking done/error.
+ Provide wait/join to shut down.
+ """
+
+ def __init__(self, test_name, client_n, logger, client, server, size,
+ count,
+ print_client_logs=True,
+ timeout=TIMEOUT,
+ port_override=None):
+ """
+ Launch an echo client upon construction.
+
+ :param test_name: Unique name for log file prefix
+ :param client_n: Client number for differentiating otherwise identical
clients
+ :param logger: parent logger for logging test activity vs. client
activity
+ :param client: router name to which the client connects
+ :param server: name whose address the client is targeting
+ :param size: length of messages in bytes
+ :param count: number of messages to be sent/verified
+ :param print_client_logs: verbosity switch
+ :return Null if success else string describing error
+ """
+ self.test_name = test_name
+ self.client_n = str(client_n)
+ self.logger = logger
+ self.client = client
+ self.server = server
+ self.size = size
+ self.count = count
+ self.timeout = timeout
+ self.print_client_logs = print_client_logs
+ self.client_final = False
+
+ # Each router has a listener for the echo server attached to every
router
+ self.listener_port =
TcpAdaptor.tcp_client_listener_ports[self.client][self.server] if port_override
is None else port_override
+
+ self.name = "%s_%s_%s_%s" % \
+ (self.test_name, self.client_n, self.size, self.count)
+ self.client_prefix = "ECHO_CLIENT %s" % self.name
+ self.client_logger = Logger(title=self.client_prefix,
+ print_to_console=self.print_client_logs,
+ save_for_dump=False,
+
ofilename="../setUpClass/TcpAdaptor_echo_client_%s.log" % self.name)
+
+ try:
+ self.e_client = TcpEchoClient(prefix=self.client_prefix,
+ host='localhost',
+ port=self.listener_port,
+ size=self.size,
+ count=self.count,
+ timeout=self.timeout,
+ logger=self.client_logger)
+
+ except Exception as exc:
+ self.e_client.error = "TCP_TEST TcpAdaptor_runner_%s failed.
Exception: %s" % \
+ (self.name, traceback.format_exc())
+ self.logger.log(self.e_client.error)
+ raise Exception(self.e_client.error)
+
+ def client_error(self):
+ return self.e_client.error
+
+ def client_exit_status(self):
+ return self.e_client.exit_status
+
+ def client_running(self):
+ return self.e_client.is_running
+
+ def wait(self):
+ # wait for client to exit
+ # Return None if successful wait/join/exit/close else error message
+ result = None
+ try:
+ self.e_client.wait()
+
+ except Exception as exc:
+ self.e_client.error = "TCP_TEST EchoClient %s failed. Exception:
%s" % \
+ (self.name, traceback.format_exc())
+ self.logger.log(self.e_client.error)
+ result = self.e_client.error
+ return result
+
class TcpAdaptor(TestCase):
"""
@@ -151,9 +243,6 @@ class TcpAdaptor(TestCase):
# Each router has a console listener
# http_listener_ports = {}
- # local timeout in seconds to wait for one echo client to finish
- echo_timeout = 30
-
# TCP siteId for listeners and connectors
site = "mySite"
@@ -439,90 +528,6 @@ class TcpAdaptor(TestCase):
cls.echo_server_NS_CONN_STALL.wait()
super(TcpAdaptor, cls).tearDownClass()
- #
- # Test concurrent clients
- #
- class EchoClientRunner():
- """
- Launch an echo client upon construction.
- Provide poll interface for checking done/error.
- Provide wait/join to shut down.
- """
-
- def __init__(self, test_name, client_n, logger, client, server, size,
count, print_client_logs,
- port_override=None):
- """
- Launch an echo client upon construction.
-
- :param test_name: Unique name for log file prefix
- :param client_n: Client number for differentiating otherwise
identical clients
- :param logger: parent logger for logging test activity vs. client
activity
- :param client: router name to which the client connects
- :param server: name whose address the client is targeting
- :param size: length of messages in bytes
- :param count: number of messages to be sent/verified
- :param print_client_logs: verbosity switch
- :return Null if success else string describing error
- """
- self.test_name = test_name
- self.client_n = str(client_n)
- self.logger = logger
- self.client = client
- self.server = server
- self.size = size
- self.count = count
- self.print_client_logs = print_client_logs
- self.client_final = False
-
- # Each router has a listener for the echo server attached to every
router
- self.listener_port =
TcpAdaptor.tcp_client_listener_ports[self.client][self.server] \
- if port_override is None else port_override
-
- self.name = "%s_%s_%s_%s" % (self.test_name, self.client_n,
self.size, self.count)
- self.client_prefix = "ECHO_CLIENT %s" % self.name
- self.client_logger = Logger(title=self.client_prefix,
-
print_to_console=self.print_client_logs,
- save_for_dump=False,
-
ofilename="../setUpClass/TcpAdaptor_echo_client_%s.log" % self.name)
-
- try:
- self.e_client = TcpEchoClient(prefix=self.client_prefix,
- host='localhost',
- port=self.listener_port,
- size=self.size,
- count=self.count,
- timeout=TIMEOUT,
- logger=self.client_logger)
-
- except Exception as exc:
- self.e_client.error = "TCP_TEST TcpAdaptor_runner_%s failed.
Exception: %s" % \
- (self.name, traceback.format_exc())
- self.logger.log(self.e_client.error)
- raise Exception(self.e_client.error)
-
- def client_error(self):
- return self.e_client.error
-
- def client_exit_status(self):
- return self.e_client.exit_status
-
- def client_running(self):
- return self.e_client.is_running
-
- def wait(self):
- # wait for client to exit
- # Return None if successful wait/join/exit/close else error message
- result = None
- try:
- self.e_client.wait()
-
- except Exception as exc:
- self.e_client.error = "TCP_TEST EchoClient %s failed.
Exception: %s" % \
- (self.name, traceback.format_exc())
- self.logger.log(self.e_client.error)
- result = self.e_client.error
- return result
-
class EchoPair():
"""
For the concurrent tcp tests this class describes one of the client-
@@ -559,9 +564,10 @@ class TcpAdaptor(TestCase):
log_msg = "TCP_TEST %s Running pair %d %s->%s size=%d
count=%d" % \
(test_name, client_num, client, server,
size, count)
self.logger.log(log_msg)
- runner = self.EchoClientRunner(test_name, client_num,
self.logger,
- client, server, size,
count,
- self.print_logs_client)
+ runner = EchoClientRunner(test_name, client_num,
+ self.logger,
+ client, server, size, count,
+ self.print_logs_client)
runners.append(runner)
client_num += 1
@@ -570,7 +576,7 @@ class TcpAdaptor(TestCase):
# Check for timeout
time.sleep(0.1)
elapsed = time.time() - start_time
- if elapsed > self.echo_timeout:
+ if elapsed > echo_timeout:
result = "TCP_TEST TIMEOUT - local wait time exceeded"
break
# Make sure servers are still up
@@ -658,9 +664,10 @@ class TcpAdaptor(TestCase):
log_msg = "TCP_TEST %s Running singleton %d %s->%s port %d,
size=%d count=%d" % \
(test_name, client_num, client.name, server.name,
echo_port, size, count)
self.logger.log(log_msg)
- runner = self.EchoClientRunner(test_name, client_num, self.logger,
- client.name, server.name, size,
count,
- self.print_logs_client,
port_override=echo_port)
+ runner = EchoClientRunner(test_name, client_num, self.logger,
+ client.name, server.name, size, count,
+ self.print_logs_client,
+ port_override=echo_port)
runners.append(runner)
client_num += 1
@@ -669,7 +676,7 @@ class TcpAdaptor(TestCase):
# Check for timeout
time.sleep(0.1)
elapsed = time.time() - start_time
- if elapsed > self.echo_timeout:
+ if elapsed > echo_timeout:
result = "TCP_TEST TIMEOUT - local wait time exceeded"
break
# Make sure servers are still up
@@ -868,5 +875,164 @@ class TcpAdaptor(TestCase):
self.logger.log("TCP_TEST Stop %s SUCCESS" % name)
+class TcpAdaptorManagementTest(TestCase):
+ """
+ Test Creation and deletion of TCP management entities
+ """
+ @classmethod
+ def setUpClass(cls):
+ super(TcpAdaptorManagementTest, cls).setUpClass()
+
+ if DISABLE_SELECTOR_TESTS:
+ return
+
+ cls.tcp_server_port = cls.tester.get_port()
+ cls.tcp_listener_port = cls.tester.get_port()
+ cls.test_name = 'TCPMgmtTest'
+
+ # Here we have a simple barebones standalone router config.
+ config = [
+ ('router', {'mode': 'standalone',
+ 'id': cls.test_name}),
+ ('listener', {'role': 'normal',
+ 'port': cls.tester.get_port()}),
+ ]
+ config = Qdrouterd.Config(config)
+ cls.router = cls.tester.qdrouterd(cls.test_name, config, wait=True)
+
+ # Start the echo server. This is the server that the tcpConnector
+ # will be connecting to.
+ server_prefix = "ECHO_SERVER ES_%s" % cls.test_name
+ cls.logger = Logger(title="TcpAdaptor",
+ print_to_console=True,
+ save_for_dump=False,
+
ofilename="../setUpClass/TcpAdaptor_echo_server.log")
+ cls.echo_server = TcpEchoServer(prefix=server_prefix,
+ port=cls.tcp_server_port,
+ logger=cls.logger)
+ # The router and the echo server are running at this point.
+ assert cls.echo_server.is_running
+
+ @SkipIfNeeded(DISABLE_SELECTOR_TESTS, DISABLE_SELECTOR_REASON)
+ def test_01_mgmt(self):
+ """
+ Create and delete TCP connectors and listeners
+ """
+ LISTENER_TYPE = 'org.apache.qpid.dispatch.tcpListener'
+ CONNECTOR_TYPE = 'org.apache.qpid.dispatch.tcpConnector'
+
+ mgmt = self.router.management
+
+ # When starting out, there should be no tcpListeners or tcpConnectors.
+ self.assertEqual(0, len(mgmt.query(type=LISTENER_TYPE).results))
+ self.assertEqual(0, len(mgmt.query(type=CONNECTOR_TYPE).results))
+
+ connector_name = "ServerConnector"
+ listener_name = "ClientListener"
+
+ mgmt.create(type=LISTENER_TYPE,
+ name=listener_name,
+ attributes={'address': self.test_name,
+ 'port': self.tcp_listener_port,
+ 'host': '127.0.0.1'})
+ mgmt.create(type=CONNECTOR_TYPE,
+ name=connector_name,
+ attributes={'address': self.test_name,
+ 'port': self.tcp_server_port,
+ 'host': '127.0.0.1'})
+
+ # verify the entities have been created and tcp traffic works
+ self.assertEqual(1, len(mgmt.query(type=LISTENER_TYPE).results))
+ self.assertEqual(1, len(mgmt.query(type=CONNECTOR_TYPE).results))
+
+ # Give a second for the tcpListener to start listening.
+ time.sleep(1)
+ # Start the echo client runner
+ client_runner_timeout = 3
+ runner = EchoClientRunner(self.test_name, 1, self.logger,
+ None, None, 100, 1,
+ timeout=client_runner_timeout,
+ port_override=self.tcp_listener_port)
+ result = None
+
+ # Give some time for the client runner to finish up.
+ time.sleep(client_runner_timeout + 1)
+
+ # Make sure servers are still up
+ if self.echo_server.error:
+ self.logger.log(
+ "TCP_TEST %s Server %s stopped with error: %s" %
+ (self.test_name, self.echo_server.prefix,
+ self.echo_server.error))
+ result = self.echo_server.error
+
+ if self.echo_server.exit_status:
+ self.logger.log(
+ "TCP_TEST %s Server %s stopped with status: %s" %
+ (self.test_name, self.echo_server.prefix,
self.echo_server.exit_status))
+ result = self.echo_server.exit_status
+
+ self.assertIsNone(result)
+
+ error = runner.client_error()
+ if error is not None:
+ self.logger.log("TCP_TEST %s Client %s stopped with error: %s" %
+ (self.test_name, runner.name, error))
+
+ self.assertIsNone(error)
+ status = runner.client_exit_status()
+ if status is not None:
+ self.logger.log("TCP_TEST %s Client %s stopped with status: %s" %
+ (self.test_name, runner.name, status))
+ self.assertIsNone(status)
+ self.assertFalse(runner.client_running())
+
+ # Delete the connector and make sure the echo client fails.
+ out = mgmt.delete(type=CONNECTOR_TYPE, name=connector_name)
+ self.assertIsNone(out)
+
+ # Give some time for the connector to be deleted by the router.
+ # Deleting a connector also involves deleting existing connections
+ # that were made using the details from the connector.
+ # In this case, the router would have to drop the connection it
+ # already made to the echo server, so let's give it some time to
+ # do that.
+ time.sleep(2)
+
+ client_runner_timeout = 2
+ # Start the echo client runner
+ runner = EchoClientRunner(self.test_name, 1, self.logger,
+ None, None, 100, 1,
+ # Try for 2 seconds before timing out
+ timeout=client_runner_timeout,
+ port_override=self.tcp_listener_port)
+ time.sleep(client_runner_timeout + 1)
+ exit_status = runner.client_exit_status()
+
+ if exit_status is not None:
+ # The test is a success, the echo client sender timed out
+ # because it did not receive anything back from the
+ # echo server because the connector to the echo server
+ # got deleted
+ self.logger.log("TCP_TEST %s Client %s timedout with error: %s" %
+ (self.test_name, runner.name, exit_status))
+ else:
+ self.logger.log("ERROR: Connector not deleted")
+ self.assertIsNotNone(exit_status)
+
+ # Now delete the tcpListener
+ out = mgmt.delete(type=LISTENER_TYPE, name=listener_name)
+ self.assertIsNone(out)
+
+ runner = EchoClientRunner(self.test_name, 1, self.logger,
+ None, None, 100, 1,
+ # Try for 2 seconds before timing out
+ timeout=client_runner_timeout,
+ port_override=self.tcp_listener_port)
+ time.sleep(client_runner_timeout + 1)
+ error = runner.client_error()
+ self.assertIn("ConnectionRefusedError", error)
+
+
if __name__ == '__main__':
unittest.main(main_module())
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]