This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/master by this push: new 4439fea DISPATCH-1981: TCP self test to force Q2 flow control 4439fea is described below commit 4439fea1fbcfcffb2a2d6657bcd13e4d6ec96bad Author: Chuck Rolke <c...@apache.org> AuthorDate: Mon Mar 8 11:29:10 2021 -0500 DISPATCH-1981: TCP self test to force Q2 flow control The echo server is rewritten to generate Q2 holdoff more reliably. The self test uses the connection-stall feature of the the echo server. The server does not read the message content, beyond normal TCP window and python prefetch, so that the router gets into a Q2 block state. This closes #1060 --- tests/TCP_echo_server.py | 56 ++++++++-- tests/system_tests_tcp_adaptor.py | 227 ++++++++++++++++++++++++++++++++++++-- 2 files changed, 263 insertions(+), 20 deletions(-) diff --git a/tests/TCP_echo_server.py b/tests/TCP_echo_server.py index 9e1a286..678132c 100755 --- a/tests/TCP_echo_server.py +++ b/tests/TCP_echo_server.py @@ -46,7 +46,6 @@ class ClientRecord(object): the inbound and outbound data list buffers for this socket's payload. """ - def __init__(self, address): self.addr = address self.inb = b'' @@ -87,7 +86,8 @@ def split_chunk_for_display(raw_bytes): class TcpEchoServer: - def __init__(self, prefix="ECHO_SERVER", port="0", echo_count=0, timeout=0.0, logger=None): + def __init__(self, prefix="ECHO_SERVER", port="0", echo_count=0, timeout=0.0, logger=None, + conn_stall=0.0, close_on_conn=False, close_on_data=False): """ Start echo server in separate thread @@ -104,6 +104,9 @@ class TcpEchoServer: self.echo_count = echo_count self.timeout = timeout self.logger = logger + self.conn_stall = conn_stall + self.close_on_conn = close_on_conn + self.close_on_data = close_on_data self.keep_running = True self.HOST = '127.0.0.1' self.is_running = False @@ -115,7 +118,11 @@ class TcpEchoServer: def run(self): """ - Run server in daemon thread + Run server in daemon thread. + A single thread runs multiple sockets through selectors. + Note that timeouts and such are done in line and processing stops for + all sockets when one socket is timing out. For the intended one-at-a-time + test cases this works but it is not general solution for all cases. :return: """ try: @@ -160,11 +167,12 @@ class TcpEchoServer: for key, mask in events: if key.data is None: if key.fileobj is self.sock: - self.do_accept(key.fileobj, sel, self.logger) + self.do_accept(key.fileobj, sel, self.logger, self.conn_stall, self.close_on_conn) else: pass # Only listener 'sock' has None in opaque data field else: - total_echoed += self.do_service(key, mask, sel, self.logger) + n_echoed = self.do_service(key, mask, sel, self.logger, self.close_on_data) + total_echoed += n_echoed if n_echoed > 0 else 0 else: pass # select timeout. probably. @@ -176,14 +184,22 @@ class TcpEchoServer: self.is_running = False - def do_accept(self, sock, sel, logger): + def do_accept(self, sock, sel, logger, conn_stall, close_on_conn): conn, addr = sock.accept() logger.log('%s Accepted connection from %s:%d' % (self.prefix, addr[0], addr[1])) + if conn_stall > 0.0: + logger.log('%s Connection from %s:%d stall start' % (self.prefix, addr[0], addr[1])) + time.sleep(conn_stall) + logger.log('%s Connection from %s:%d stall end' % (self.prefix, addr[0], addr[1])) + if close_on_conn: + logger.log('%s Connection from %s:%d closing due to close_on_conn' % (self.prefix, addr[0], addr[1])) + conn.close() + return conn.setblocking(False) events = selectors.EVENT_READ | selectors.EVENT_WRITE sel.register(conn, events, data=ClientRecord(addr)) - def do_service(self, key, mask, sel, logger): + def do_service(self, key, mask, sel, logger, close_on_data): retval = 0 sock = key.fileobj data = key.data @@ -205,6 +221,11 @@ class TcpEchoServer: return 1 if recv_data: data.outb += recv_data + if close_on_data: + logger.log('%s Connection to %s:%d closed due to close_on_data' % (self.prefix, data.addr[0], data.addr[1])) + sel.unregister(sock) + sock.close() + return 0 logger.log('%s read from: %s:%d len:%d: %s' % (self.prefix, data.addr[0], data.addr[1], len(recv_data), split_chunk_for_display(recv_data))) sel.modify(sock, selectors.EVENT_READ | selectors.EVENT_WRITE, data=data) @@ -259,10 +280,22 @@ def main(argv): p.add_argument('--echo', '-e', type=int, default=0, const=1, nargs="?", help='Exit after echoing this many bytes. Default value "0" disables exiting on byte count.') p.add_argument('--timeout', '-t', type=float, default=0.0, const=1, nargs="?", - help='Timeout in seconds. Default value "0" disables timeouts') + help='Timeout in seconds. Default value "0.0" disables timeouts') p.add_argument('--log', '-l', action='store_true', help='Write activity log to console') + # Add controlled server misbehavior for testing conditions seen in the field + # Stall required to trigger Q2 testing for DISPATCH-1947 and improving test DISPATCH-1981 + p.add_argument('--connect-stall', type=float, default=0.0, const=1, nargs="?", + help='Accept connections but wait this many seconds before reading from socket. Default value "0.0" disables stall') + # Close on connect - exercises control paths scrutinized under DISPATCH-1968 + p.add_argument('--close-on-connect', + action='store_true', + help='Close client connection without reading from socket when listener connects. If stall is specified then stall before closing.') + # Close on data - exercises control paths scrutinized under DISPATCH-1968 + p.add_argument('--close-on-data', + action='store_true', + help='Close client connection as soon as data arrives.') del argv[0] args = p.parse_args(argv) @@ -282,6 +315,10 @@ def main(argv): if args.timeout < 0.0: raise Exception("Timeout must be greater than or equal to zero") + # timeout + if args.connect_stall < 0.0: + raise Exception("Connect-stall must be greater than or equal to zero") + signaller = GracefulExitSignaler() server = None @@ -291,7 +328,8 @@ def main(argv): print_to_console=args.log, save_for_dump=False) - server = TcpEchoServer(prefix, port, args.echo, args.timeout, logger) + server = TcpEchoServer(prefix, port, args.echo, args.timeout, logger, + args.connect_stall, args.close_on_connect, args.close_on_data) keep_running = True while keep_running: diff --git a/tests/system_tests_tcp_adaptor.py b/tests/system_tests_tcp_adaptor.py index d9b5f0a..bde08a3 100644 --- a/tests/system_tests_tcp_adaptor.py +++ b/tests/system_tests_tcp_adaptor.py @@ -22,6 +22,7 @@ from __future__ import division from __future__ import absolute_import from __future__ import print_function +import io import os import sys import time @@ -61,6 +62,29 @@ except ImportError: DISABLE_SELECTOR_REASON = "Python selectors module is not available on this platform." +# This code takes a wild guess how long an echo server must stall +# receiving input data before Q2 holdoff triggers in the host router +# on all the various CI systems out there. +Q2_DELAY_SECONDS = 1.0 + +# This code needs to know the size in bytes of the holdoff trigger threshold. +# Whitebox testing knows that the holdoff is specified in some number of +# buffers. What whitebox testing does not know how big the buffers are +# or the number of buffers or how many bytes are actually in each buffer. +# Today the holdoff is probably 128K bytes so use something bigger than that +# in the test to get the trigger to kick in. +# On top of that the echo server is undermined by having TCP window or python +# read the server socket in advance of the echo server asking it to. +# In a test case the adaptor logged writing almost 3MBytes +# 2021-02-26 19:11:20.831826 PN_RAW_CONNECTION_WRITTEN Wrote 8192 bytes. Total written 2777007 bytes +# well before the server started reading from the socket. +# 2021-02-26 19:11:21.534246 J0#206 TCP_TEST [] [] ECHO_SERVER TcpAdaptor NS_EC2_CONN_STALL Connection from 127.0.0.1:54410 stall end +# 2021-02-26 19:11:21.534801 J0#207 TCP_TEST [] [] ECHO_SERVER TcpAdaptor NS_EC2_CONN_STALL read from: 127.0.0.1:54410 len:1024: +# Giving the stalled server 10Mbytes seems to run the TCP window out of capacity +# so that it stops reading from the TcpConnector and Q2 finally kicks in. +Q2_TEST_MESSAGE_SIZE = 10000000 + + class Logger(): """ Record event logs as existing Logger. Also add: @@ -95,9 +119,9 @@ class Logger(): print(self) sys.stdout.flush() + @property def __str__(self): - lines = [] - lines.append(self.title) + lines = [self.title] for ts, msg in self.logs: lines.append("%s %s" % (ts, msg)) res = str('\n'.join(lines)) @@ -138,13 +162,20 @@ class TcpAdaptor(TestCase): # | | +------+ # +------------------------------------------------------------------+ # + # Router EC2 has naughty, misbehaving echo servers: + # * conn_stall - delays before reading socket to force triggering Q2 holdoff + # + # Routers EC2 has a TCP listener for conn_stall echo server. + # * Sending "large" messages through this listener should trigger Q2 holdoff + # on router EC1. + # * A similar listener on INTA does *not* trigger Q2 holdoff on EA1. # Allocate routers in this order router_order = ['INTA', 'INTB', 'INTC', 'EA1', 'EA2', 'EB1', 'EB2', 'EC1', 'EC2'] # List indexed in router_order # First listener in each router is normal AMQP for test setup and mgmt. - amqp_listener_ports = {} + amqp_listener_ports = {} # Each router listens for TCP where the tcp-address is the router name. # Each router has N listeners, one for the echo server connected to each router. @@ -157,7 +188,7 @@ class TcpAdaptor(TestCase): nodest_listener_ports = {} # Each router has a console listener - #http_listener_ports = {} + # http_listener_ports = {} # local timeout in seconds to wait for one echo client to finish echo_timeout = 30 @@ -168,6 +199,9 @@ class TcpAdaptor(TestCase): # Each router has an echo server to which it connects echo_servers = {} + # Special echo servers + echo_server_NS_CONN_STALL = None + @classmethod def setUpClass(cls): """Start a router""" @@ -194,7 +228,7 @@ class TcpAdaptor(TestCase): config = [ ('router', {'mode': mode, 'id': name}), ('listener', {'port': cls.amqp_listener_ports[name]}), - #('listener', {'port': cls.http_listener_ports[name], 'http': 'yes'}), + # ('listener', {'port': cls.http_listener_ports[name], 'http': 'yes'}), ('tcpListener', {'host': "0.0.0.0", 'port': cls.nodest_listener_ports[name], 'address': 'nodest', @@ -233,13 +267,16 @@ class TcpAdaptor(TestCase): tl_ports[tcp_listener] = cls.tester.get_port() cls.tcp_client_listener_ports[rtr] = tl_ports cls.nodest_listener_ports[rtr] = cls.tester.get_port() - #cls.http_listener_ports[rtr] = cls.tester.get_port() + # cls.http_listener_ports[rtr] = cls.tester.get_port() inter_router_port_AB = cls.tester.get_port() inter_router_port_BC = cls.tester.get_port() cls.INTA_edge_port = cls.tester.get_port() cls.INTB_edge_port = cls.tester.get_port() cls.INTC_edge_port = cls.tester.get_port() + cls.EC2_conn_stall_connector_port = cls.tester.get_port() + cls.INTA_conn_stall_listener_port = cls.tester.get_port() + cls.EC2_conn_stall_listener_port = cls.tester.get_port() cls.logger = Logger(title="TcpAdaptor-testClass", print_to_console=True, @@ -268,9 +305,14 @@ class TcpAdaptor(TestCase): p_out.append("INTA_edge_port=%d" % cls.INTA_edge_port) p_out.append("INTB_edge_port=%d" % cls.INTB_edge_port) p_out.append("INTC_edge_port=%d" % cls.INTC_edge_port) + p_out.append("EC2_conn_stall_connector_port%d" % cls.EC2_conn_stall_connector_port) + p_out.append("INTA_conn_stall_listener_port%d" % cls.INTA_conn_stall_listener_port) + p_out.append("EC2_conn_stall_listener_port%d" % cls.EC2_conn_stall_listener_port) + # write to log for line in p_out: cls.logger.log("TCP_TEST %s" % line) + # write to shell script with open("../setUpClass/TcpAdaptor-ports.sh", 'w') as o_file: for line in p_out: @@ -303,7 +345,9 @@ class TcpAdaptor(TestCase): # Launch the routers router('INTA', 'interior', [('listener', {'role': 'inter-router', 'port': inter_router_port_AB}), - ('listener', {'name': 'uplink', 'role': 'edge', 'port': cls.INTA_edge_port})]) + ('listener', {'name': 'uplink', 'role': 'edge', 'port': cls.INTA_edge_port}), + ('tcpListener', {'host': "0.0.0.0", 'port': cls.INTA_conn_stall_listener_port, + 'address': 'NS_EC2_CONN_STALL', 'siteId': cls.site})]) router('INTB', 'interior', [('connector', {'role': 'inter-router', 'port': inter_router_port_AB}), @@ -325,7 +369,11 @@ class TcpAdaptor(TestCase): router('EC1', 'edge', [('connector', {'name': 'uplink', 'role': 'edge', 'port': cls.INTC_edge_port})]) router('EC2', 'edge', - [('connector', {'name': 'uplink', 'role': 'edge', 'port': cls.INTC_edge_port})]) + [('connector', {'name': 'uplink', 'role': 'edge', 'port': cls.INTC_edge_port}), + ('tcpConnector', {'host': "127.0.0.1", 'port': cls.EC2_conn_stall_connector_port, + 'address': 'NS_EC2_CONN_STALL', 'siteId': cls.site}), + ('tcpListener', {'host': "0.0.0.0", 'port': cls.EC2_conn_stall_listener_port, + 'address': 'NS_EC2_CONN_STALL', 'siteId': cls.site})]) cls.INTA = cls.routers[0] cls.INTB = cls.routers[1] @@ -376,6 +424,20 @@ class TcpAdaptor(TestCase): assert server.is_running cls.echo_servers[rtr] = server + # start special naughty servers that misbehave on purpose + server_prefix = "ECHO_SERVER TcpAdaptor NS_EC2_CONN_STALL" + server_logger = Logger(title="TcpAdaptor", + print_to_console=cls.print_logs_server, + save_for_dump=False, + ofilename="../setUpClass/TcpAdaptor_echo_server_NS_CONN_STALL.log") + cls.logger.log("TCP_TEST Launching echo server '%s'" % server_prefix) + server = TcpEchoServer(prefix=server_prefix, + port=cls.EC2_conn_stall_connector_port, + logger=server_logger, + conn_stall=Q2_DELAY_SECONDS) + assert server.is_running + cls.echo_server_NS_CONN_STALL = server + # wait for server addresses (mobile ES_<rtr>) to propagate to all interior routers interior_rtrs = [rtr for rtr in cls.router_order if rtr.startswith('I')] found_all = False @@ -411,6 +473,9 @@ class TcpAdaptor(TestCase): if server is not None: cls.logger.log("TCP_TEST Stopping echo server %s" % rtr) server.wait() + if cls.echo_server_NS_CONN_STALL is not None: + cls.logger.log("TCP_TEST Stopping echo server NS_EC2_CONN_STALL") + cls.echo_server_NS_CONN_STALL.wait() super(TcpAdaptor, cls).tearDownClass() # @@ -423,7 +488,8 @@ class TcpAdaptor(TestCase): Provide wait/join to shut down. """ - def __init__(self, test_name, client_n, logger, client, server, size, count, print_client_logs): + def __init__(self, test_name, client_n, logger, client, server, size, count, print_client_logs, + port_override=None): """ Launch an echo client upon construction. @@ -448,7 +514,8 @@ class TcpAdaptor(TestCase): self.client_final = False # Each router has a listener for the echo server attached to every router - self.listener_port = TcpAdaptor.tcp_client_listener_ports[self.client][self.server] + self.listener_port = TcpAdaptor.tcp_client_listener_ports[self.client][self.server] \ + if port_override is None else port_override self.name = "%s_%s_%s_%s" % (self.test_name, self.client_n, self.size, self.count) self.client_prefix = "ECHO_CLIENT %s" % self.name @@ -604,8 +671,107 @@ class TcpAdaptor(TestCase): return result + def do_tcp_echo_singleton(self, test_name, client, server, size, count, echo_port): + """ + Launch a single echo client to the echo_port + Wait for completion. + Note that client and server do not define the port that the echo client + must connect to. That is overridden by echo_port. Still client and server + are passed to the EchoClientRunner + :param test_name test name + :param client router to which echo client attaches + :param server router that has the connector to the echo server + :param size size of message to be echoed + :param count number of messages to be echoed + :param echo_port the router network listener port + :return: None if success else error message for ctest + """ + self.logger.log("TCP_TEST %s Start do_tcp_echo_singleton" % test_name) + result = None + runners = [] + client_num = 0 + start_time = time.time() + + try: + # Launch the runner + log_msg = "TCP_TEST %s Running singleton %d %s->%s port %d, size=%d count=%d" % \ + (test_name, client_num, client.name, server.name, echo_port, size, count) + self.logger.log(log_msg) + runner = self.EchoClientRunner(test_name, client_num, self.logger, + client.name, server.name, size, count, + self.print_logs_client, port_override=echo_port) + runners.append(runner) + client_num += 1 + + # Loop until timeout, error, or completion + while result is None: + # Check for timeout + time.sleep(0.1) + elapsed = time.time() - start_time + if elapsed > self.echo_timeout: + result = "TCP_TEST TIMEOUT - local wait time exceeded" + break + # Make sure servers are still up + for rtr in TcpAdaptor.router_order: + es = TcpAdaptor.echo_servers[rtr] + if es.error is not None: + self.logger.log("TCP_TEST %s Server %s stopped with error: %s" % + (test_name, es.prefix, es.error)) + result = es.error + break + if es.exit_status is not None: + self.logger.log("TCP_TEST %s Server %s stopped with status: %s" % + (test_name, es.prefix, es.exit_status)) + result = es.exit_status + break + if result is not None: + break + + # Check for completion or runner error + complete = True + for runner in runners: + if not runner.client_final: + error = runner.client_error() + if error is not None: + self.logger.log("TCP_TEST %s Client %s stopped with error: %s" % + (test_name, runner.name, error)) + result = error + runner.client_final = True + break + status = runner.client_exit_status() + if status is not None: + self.logger.log("TCP_TEST %s Client %s stopped with status: %s" % + (test_name, runner.name, status)) + result = status + runner.client_final = True + break + running = runner.client_running() + if running: + complete = False + else: + self.logger.log("TCP_TEST %s Client %s exited normally" % + (test_name, runner.name)) + runner.client_final = True + if complete and result is None: + self.logger.log("TCP_TEST %s SUCCESS" % + test_name) + break + + # Wait/join all the runners + for runner in runners: + runner.wait() + + if result is not None: + self.logger.log("TCP_TEST %s failed: %s" % (test_name, result)) + + except Exception as exc: + result = "TCP_TEST %s failed. Exception: %s" % \ + (test_name, traceback.format_exc()) + + return result + # - # A series of 1-byte messsages, one at a time, to prove general connectivity + # Tests run by ctest # @SkipIfNeeded(DISABLE_SELECTOR_TESTS, DISABLE_SELECTOR_REASON) def test_01_tcp_basic_connectivity(self): @@ -701,6 +867,45 @@ class TcpAdaptor(TestCase): assert result is None, "TCP_TEST Stop %s FAIL: %s" % (name, result) self.logger.log("TCP_TEST Stop %s SUCCESS" % name) + # Q2 holdoff + @SkipIfNeeded(DISABLE_SELECTOR_TESTS, DISABLE_SELECTOR_REASON) + def test_60_q2_holdoff(self): + name = "test_60_q2_holdoff" + self.logger.log("TCP_TEST Start %s" % name) + + # Verify going to EC2 + result = self.do_tcp_echo_singleton(name, self.EC2, self.EC2, Q2_TEST_MESSAGE_SIZE, + 1, self.EC2_conn_stall_listener_port) + if result is not None: + print(result) + sys.stdout.flush() + assert result is None, "TCP_TEST Stop %s FAIL: %s" % (name, result) + + # search the router log file to verify Q2 was hit + for attempt in range(10): + block_ct = 0 + unblock_ct = 0 + lines = 0 + with io.open(self.EC2.logfile_path) as f: + for line in f: + lines += 1 + if 'client link blocked on Q2 limit' in line: + block_ct += 1 + if 'client link unblocked from Q2 limit' in line: + unblock_ct += 1 + if block_ct > 0 and block_ct == unblock_ct: + break + self.logger.log("Q2 holdoff from EC2 not detected. Wait for log file to update...") + time.sleep(0.1) + result = "failed" if block_ct == 0 or not block_ct == unblock_ct else "passed" + self.logger.log("TCP_TEST %s EC2 log scrape %s. block_ct=%d, unblock_ct=%d, lines=%d" % + (name, result, block_ct, unblock_ct, lines)) + self.assertTrue(block_ct > 0) + self.assertEqual(block_ct, unblock_ct) + + # Declare success + self.logger.log("TCP_TEST Stop %s SUCCESS" % name) + if __name__ == '__main__': unittest.main(main_module()) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org