This is an automated email from the ASF dual-hosted git repository.

chug pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/master by this push:
     new 4439fea  DISPATCH-1981: TCP self test to force Q2 flow control
4439fea is described below

commit 4439fea1fbcfcffb2a2d6657bcd13e4d6ec96bad
Author: Chuck Rolke <c...@apache.org>
AuthorDate: Mon Mar 8 11:29:10 2021 -0500

    DISPATCH-1981: TCP self test to force Q2 flow control
    
    The echo server is rewritten to generate Q2 holdoff more reliably.
    
    The self test uses the connection-stall feature of the the echo
    server. The server does not read the message content, beyond normal
    TCP window and python prefetch, so that the router gets into a
    Q2 block state.
    
    This closes #1060
---
 tests/TCP_echo_server.py          |  56 ++++++++--
 tests/system_tests_tcp_adaptor.py | 227 ++++++++++++++++++++++++++++++++++++--
 2 files changed, 263 insertions(+), 20 deletions(-)

diff --git a/tests/TCP_echo_server.py b/tests/TCP_echo_server.py
index 9e1a286..678132c 100755
--- a/tests/TCP_echo_server.py
+++ b/tests/TCP_echo_server.py
@@ -46,7 +46,6 @@ class ClientRecord(object):
     the inbound and outbound data list buffers for this
     socket's payload.
     """
-
     def __init__(self, address):
         self.addr = address
         self.inb = b''
@@ -87,7 +86,8 @@ def split_chunk_for_display(raw_bytes):
 
 class TcpEchoServer:
 
-    def __init__(self, prefix="ECHO_SERVER", port="0", echo_count=0, 
timeout=0.0, logger=None):
+    def __init__(self, prefix="ECHO_SERVER", port="0", echo_count=0, 
timeout=0.0, logger=None,
+                 conn_stall=0.0, close_on_conn=False, close_on_data=False):
         """
         Start echo server in separate thread
 
@@ -104,6 +104,9 @@ class TcpEchoServer:
         self.echo_count = echo_count
         self.timeout = timeout
         self.logger = logger
+        self.conn_stall = conn_stall
+        self.close_on_conn = close_on_conn
+        self.close_on_data = close_on_data
         self.keep_running = True
         self.HOST = '127.0.0.1'
         self.is_running = False
@@ -115,7 +118,11 @@ class TcpEchoServer:
 
     def run(self):
         """
-        Run server in daemon thread
+        Run server in daemon thread.
+        A single thread runs multiple sockets through selectors.
+        Note that timeouts and such are done in line and processing stops for
+        all sockets when one socket is timing out. For the intended 
one-at-a-time
+        test cases this works but it is not general solution for all cases.
         :return:
         """
         try:
@@ -160,11 +167,12 @@ class TcpEchoServer:
                     for key, mask in events:
                         if key.data is None:
                             if key.fileobj is self.sock:
-                                self.do_accept(key.fileobj, sel, self.logger)
+                                self.do_accept(key.fileobj, sel, self.logger, 
self.conn_stall, self.close_on_conn)
                             else:
                                 pass  # Only listener 'sock' has None in 
opaque data field
                         else:
-                            total_echoed += self.do_service(key, mask, sel, 
self.logger)
+                            n_echoed = self.do_service(key, mask, sel, 
self.logger, self.close_on_data)
+                            total_echoed += n_echoed if n_echoed > 0 else 0
                 else:
                     pass   # select timeout. probably.
 
@@ -176,14 +184,22 @@ class TcpEchoServer:
 
         self.is_running = False
 
-    def do_accept(self, sock, sel, logger):
+    def do_accept(self, sock, sel, logger, conn_stall, close_on_conn):
         conn, addr = sock.accept()
         logger.log('%s Accepted connection from %s:%d' % (self.prefix, 
addr[0], addr[1]))
+        if conn_stall > 0.0:
+            logger.log('%s Connection from %s:%d stall start' % (self.prefix, 
addr[0], addr[1]))
+            time.sleep(conn_stall)
+            logger.log('%s Connection from %s:%d stall end' % (self.prefix, 
addr[0], addr[1]))
+        if close_on_conn:
+            logger.log('%s Connection from %s:%d closing due to close_on_conn' 
% (self.prefix, addr[0], addr[1]))
+            conn.close()
+            return
         conn.setblocking(False)
         events = selectors.EVENT_READ | selectors.EVENT_WRITE
         sel.register(conn, events, data=ClientRecord(addr))
 
-    def do_service(self, key, mask, sel, logger):
+    def do_service(self, key, mask, sel, logger, close_on_data):
         retval = 0
         sock = key.fileobj
         data = key.data
@@ -205,6 +221,11 @@ class TcpEchoServer:
                 return 1
             if recv_data:
                 data.outb += recv_data
+                if close_on_data:
+                    logger.log('%s Connection to %s:%d closed due to 
close_on_data' % (self.prefix, data.addr[0], data.addr[1]))
+                    sel.unregister(sock)
+                    sock.close()
+                    return 0
                 logger.log('%s read from: %s:%d len:%d: %s' % (self.prefix, 
data.addr[0], data.addr[1], len(recv_data),
                                                                
split_chunk_for_display(recv_data)))
                 sel.modify(sock, selectors.EVENT_READ | selectors.EVENT_WRITE, 
data=data)
@@ -259,10 +280,22 @@ def main(argv):
     p.add_argument('--echo', '-e', type=int, default=0, const=1, nargs="?",
                    help='Exit after echoing this many bytes. Default value "0" 
disables exiting on byte count.')
     p.add_argument('--timeout', '-t', type=float, default=0.0, const=1, 
nargs="?",
-                   help='Timeout in seconds. Default value "0" disables 
timeouts')
+                   help='Timeout in seconds. Default value "0.0" disables 
timeouts')
     p.add_argument('--log', '-l',
                    action='store_true',
                    help='Write activity log to console')
+    # Add controlled server misbehavior for testing conditions seen in the 
field
+    # Stall required to trigger Q2 testing for DISPATCH-1947 and improving 
test DISPATCH-1981
+    p.add_argument('--connect-stall', type=float, default=0.0, const=1, 
nargs="?",
+                   help='Accept connections but wait this many seconds before 
reading from socket. Default value "0.0" disables stall')
+    # Close on connect - exercises control paths scrutinized under 
DISPATCH-1968
+    p.add_argument('--close-on-connect',
+                   action='store_true',
+                   help='Close client connection without reading from socket 
when listener connects. If stall is specified then stall before closing.')
+    # Close on data - exercises control paths scrutinized under DISPATCH-1968
+    p.add_argument('--close-on-data',
+                   action='store_true',
+                   help='Close client connection as soon as data arrives.')
     del argv[0]
     args = p.parse_args(argv)
 
@@ -282,6 +315,10 @@ def main(argv):
     if args.timeout < 0.0:
         raise Exception("Timeout must be greater than or equal to zero")
 
+    # timeout
+    if args.connect_stall < 0.0:
+        raise Exception("Connect-stall must be greater than or equal to zero")
+
     signaller = GracefulExitSignaler()
     server = None
 
@@ -291,7 +328,8 @@ def main(argv):
                         print_to_console=args.log,
                         save_for_dump=False)
 
-        server = TcpEchoServer(prefix, port, args.echo, args.timeout, logger)
+        server = TcpEchoServer(prefix, port, args.echo, args.timeout, logger,
+                               args.connect_stall, args.close_on_connect, 
args.close_on_data)
 
         keep_running = True
         while keep_running:
diff --git a/tests/system_tests_tcp_adaptor.py 
b/tests/system_tests_tcp_adaptor.py
index d9b5f0a..bde08a3 100644
--- a/tests/system_tests_tcp_adaptor.py
+++ b/tests/system_tests_tcp_adaptor.py
@@ -22,6 +22,7 @@ from __future__ import division
 from __future__ import absolute_import
 from __future__ import print_function
 
+import io
 import os
 import sys
 import time
@@ -61,6 +62,29 @@ except ImportError:
     DISABLE_SELECTOR_REASON = "Python selectors module is not available on 
this platform."
 
 
+# This code takes a wild guess how long an echo server must stall
+# receiving input data before Q2 holdoff triggers in the host router
+# on all the various CI systems out there.
+Q2_DELAY_SECONDS = 1.0
+
+# This code needs to know the size in bytes of the holdoff trigger threshold.
+# Whitebox testing knows that the holdoff is specified in some number of
+# buffers. What whitebox testing does not know how big the buffers are
+# or the number of buffers or how many bytes are actually in each buffer.
+# Today the holdoff is probably 128K bytes so use something bigger than that
+# in the test to get the trigger to kick in.
+# On top of that the echo server is undermined by having TCP window or python
+# read the server socket in advance of the echo server asking it to.
+# In a test case the adaptor logged writing almost 3MBytes
+#   2021-02-26 19:11:20.831826 PN_RAW_CONNECTION_WRITTEN Wrote 8192 bytes. 
Total written 2777007 bytes
+# well before the server started reading from the socket.
+#   2021-02-26 19:11:21.534246 J0#206 TCP_TEST [] [] ECHO_SERVER TcpAdaptor 
NS_EC2_CONN_STALL Connection from 127.0.0.1:54410 stall end
+#   2021-02-26 19:11:21.534801 J0#207 TCP_TEST [] [] ECHO_SERVER TcpAdaptor 
NS_EC2_CONN_STALL read from: 127.0.0.1:54410 len:1024:
+# Giving the stalled server 10Mbytes seems to run the TCP window out of 
capacity
+# so that it stops reading from the TcpConnector and Q2 finally kicks in.
+Q2_TEST_MESSAGE_SIZE = 10000000
+
+
 class Logger():
     """
     Record event logs as existing Logger. Also add:
@@ -95,9 +119,9 @@ class Logger():
         print(self)
         sys.stdout.flush()
 
+    @property
     def __str__(self):
-        lines = []
-        lines.append(self.title)
+        lines = [self.title]
         for ts, msg in self.logs:
             lines.append("%s %s" % (ts, msg))
         res = str('\n'.join(lines))
@@ -138,13 +162,20 @@ class TcpAdaptor(TestCase):
     #   |                                                                  |   
      +------+
     #   +------------------------------------------------------------------+
     #
+    # Router EC2 has naughty, misbehaving echo servers:
+    #  * conn_stall - delays before reading socket to force triggering Q2 
holdoff
+    #
+    # Routers EC2 has a TCP listener for conn_stall echo server.
+    #  * Sending "large" messages through this listener should trigger Q2 
holdoff
+    #    on router EC1.
+    #  * A similar listener on INTA does *not* trigger Q2 holdoff on EA1.
 
     # Allocate routers in this order
     router_order = ['INTA', 'INTB', 'INTC', 'EA1', 'EA2', 'EB1', 'EB2', 'EC1', 
'EC2']
 
     # List indexed in router_order
     # First listener in each router is normal AMQP for test setup and mgmt.
-    amqp_listener_ports       = {}
+    amqp_listener_ports = {}
 
     # Each router listens for TCP where the tcp-address is the router name.
     # Each router has N listeners, one for the echo server connected to each 
router.
@@ -157,7 +188,7 @@ class TcpAdaptor(TestCase):
     nodest_listener_ports = {}
 
     # Each router has a console listener
-    #http_listener_ports = {}
+    # http_listener_ports = {}
 
     # local timeout in seconds to wait for one echo client to finish
     echo_timeout = 30
@@ -168,6 +199,9 @@ class TcpAdaptor(TestCase):
     # Each router has an echo server to which it connects
     echo_servers = {}
 
+    # Special echo servers
+    echo_server_NS_CONN_STALL = None
+
     @classmethod
     def setUpClass(cls):
         """Start a router"""
@@ -194,7 +228,7 @@ class TcpAdaptor(TestCase):
             config = [
                 ('router', {'mode': mode, 'id': name}),
                 ('listener', {'port': cls.amqp_listener_ports[name]}),
-                #('listener', {'port': cls.http_listener_ports[name], 'http': 
'yes'}),
+                # ('listener', {'port': cls.http_listener_ports[name], 'http': 
'yes'}),
                 ('tcpListener', {'host': "0.0.0.0",
                                  'port': cls.nodest_listener_ports[name],
                                  'address': 'nodest',
@@ -233,13 +267,16 @@ class TcpAdaptor(TestCase):
                 tl_ports[tcp_listener] = cls.tester.get_port()
             cls.tcp_client_listener_ports[rtr] = tl_ports
             cls.nodest_listener_ports[rtr] = cls.tester.get_port()
-            #cls.http_listener_ports[rtr] = cls.tester.get_port()
+            # cls.http_listener_ports[rtr] = cls.tester.get_port()
 
         inter_router_port_AB  = cls.tester.get_port()
         inter_router_port_BC  = cls.tester.get_port()
         cls.INTA_edge_port = cls.tester.get_port()
         cls.INTB_edge_port = cls.tester.get_port()
         cls.INTC_edge_port = cls.tester.get_port()
+        cls.EC2_conn_stall_connector_port = cls.tester.get_port()
+        cls.INTA_conn_stall_listener_port = cls.tester.get_port()
+        cls.EC2_conn_stall_listener_port = cls.tester.get_port()
 
         cls.logger = Logger(title="TcpAdaptor-testClass",
                             print_to_console=True,
@@ -268,9 +305,14 @@ class TcpAdaptor(TestCase):
         p_out.append("INTA_edge_port=%d" % cls.INTA_edge_port)
         p_out.append("INTB_edge_port=%d" % cls.INTB_edge_port)
         p_out.append("INTC_edge_port=%d" % cls.INTC_edge_port)
+        p_out.append("EC2_conn_stall_connector_port%d" % 
cls.EC2_conn_stall_connector_port)
+        p_out.append("INTA_conn_stall_listener_port%d" % 
cls.INTA_conn_stall_listener_port)
+        p_out.append("EC2_conn_stall_listener_port%d" % 
cls.EC2_conn_stall_listener_port)
+
         # write to log
         for line in p_out:
             cls.logger.log("TCP_TEST %s" % line)
+
         # write to shell script
         with open("../setUpClass/TcpAdaptor-ports.sh", 'w') as o_file:
             for line in p_out:
@@ -303,7 +345,9 @@ class TcpAdaptor(TestCase):
         # Launch the routers
         router('INTA', 'interior',
                [('listener', {'role': 'inter-router', 'port': 
inter_router_port_AB}),
-                ('listener', {'name': 'uplink', 'role': 'edge', 'port': 
cls.INTA_edge_port})])
+                ('listener', {'name': 'uplink', 'role': 'edge', 'port': 
cls.INTA_edge_port}),
+                ('tcpListener', {'host': "0.0.0.0", 'port': 
cls.INTA_conn_stall_listener_port,
+                                 'address': 'NS_EC2_CONN_STALL', 'siteId': 
cls.site})])
 
         router('INTB', 'interior',
                [('connector', {'role': 'inter-router', 'port': 
inter_router_port_AB}),
@@ -325,7 +369,11 @@ class TcpAdaptor(TestCase):
         router('EC1', 'edge',
                [('connector', {'name': 'uplink', 'role': 'edge', 'port': 
cls.INTC_edge_port})])
         router('EC2', 'edge',
-               [('connector', {'name': 'uplink', 'role': 'edge', 'port': 
cls.INTC_edge_port})])
+               [('connector', {'name': 'uplink', 'role': 'edge', 'port': 
cls.INTC_edge_port}),
+                ('tcpConnector', {'host': "127.0.0.1", 'port': 
cls.EC2_conn_stall_connector_port,
+                                  'address': 'NS_EC2_CONN_STALL', 'siteId': 
cls.site}),
+                ('tcpListener', {'host': "0.0.0.0", 'port': 
cls.EC2_conn_stall_listener_port,
+                                 'address': 'NS_EC2_CONN_STALL', 'siteId': 
cls.site})])
 
         cls.INTA = cls.routers[0]
         cls.INTB = cls.routers[1]
@@ -376,6 +424,20 @@ class TcpAdaptor(TestCase):
             assert server.is_running
             cls.echo_servers[rtr] = server
 
+        # start special naughty servers that misbehave on purpose
+        server_prefix = "ECHO_SERVER TcpAdaptor NS_EC2_CONN_STALL"
+        server_logger = Logger(title="TcpAdaptor",
+                               print_to_console=cls.print_logs_server,
+                               save_for_dump=False,
+                               
ofilename="../setUpClass/TcpAdaptor_echo_server_NS_CONN_STALL.log")
+        cls.logger.log("TCP_TEST Launching echo server '%s'" % server_prefix)
+        server = TcpEchoServer(prefix=server_prefix,
+                               port=cls.EC2_conn_stall_connector_port,
+                               logger=server_logger,
+                               conn_stall=Q2_DELAY_SECONDS)
+        assert server.is_running
+        cls.echo_server_NS_CONN_STALL = server
+
         # wait for server addresses (mobile ES_<rtr>) to propagate to all 
interior routers
         interior_rtrs = [rtr for rtr in cls.router_order if 
rtr.startswith('I')]
         found_all = False
@@ -411,6 +473,9 @@ class TcpAdaptor(TestCase):
             if server is not None:
                 cls.logger.log("TCP_TEST Stopping echo server %s" % rtr)
                 server.wait()
+        if cls.echo_server_NS_CONN_STALL is not None:
+            cls.logger.log("TCP_TEST Stopping echo server NS_EC2_CONN_STALL")
+            cls.echo_server_NS_CONN_STALL.wait()
         super(TcpAdaptor, cls).tearDownClass()
 
     #
@@ -423,7 +488,8 @@ class TcpAdaptor(TestCase):
         Provide wait/join to shut down.
         """
 
-        def __init__(self, test_name, client_n, logger, client, server, size, 
count, print_client_logs):
+        def __init__(self, test_name, client_n, logger, client, server, size, 
count, print_client_logs,
+                     port_override=None):
             """
             Launch an echo client upon construction.
 
@@ -448,7 +514,8 @@ class TcpAdaptor(TestCase):
             self.client_final = False
 
             # Each router has a listener for the echo server attached to every 
router
-            self.listener_port = 
TcpAdaptor.tcp_client_listener_ports[self.client][self.server]
+            self.listener_port = 
TcpAdaptor.tcp_client_listener_ports[self.client][self.server] \
+                if port_override is None else port_override
 
             self.name = "%s_%s_%s_%s" % (self.test_name, self.client_n, 
self.size, self.count)
             self.client_prefix = "ECHO_CLIENT %s" % self.name
@@ -604,8 +671,107 @@ class TcpAdaptor(TestCase):
 
         return result
 
+    def do_tcp_echo_singleton(self, test_name, client, server, size, count, 
echo_port):
+        """
+        Launch a single echo client to the echo_port
+        Wait for completion.
+        Note that client and server do not define the port that the echo client
+        must connect to. That is overridden by echo_port. Still client and 
server
+        are passed to the EchoClientRunner
+        :param test_name test name
+        :param client router to which echo client attaches
+        :param server router that has the connector to the echo server
+        :param size size of message to be echoed
+        :param count number of messages to be echoed
+        :param echo_port the router network listener port
+        :return: None if success else error message for ctest
+        """
+        self.logger.log("TCP_TEST %s Start do_tcp_echo_singleton" % test_name)
+        result = None
+        runners = []
+        client_num = 0
+        start_time = time.time()
+
+        try:
+            # Launch the runner
+            log_msg = "TCP_TEST %s Running singleton %d %s->%s port %d, 
size=%d count=%d" % \
+                      (test_name, client_num, client.name, server.name, 
echo_port, size, count)
+            self.logger.log(log_msg)
+            runner = self.EchoClientRunner(test_name, client_num, self.logger,
+                                           client.name, server.name, size, 
count,
+                                           self.print_logs_client, 
port_override=echo_port)
+            runners.append(runner)
+            client_num += 1
+
+            # Loop until timeout, error, or completion
+            while result is None:
+                # Check for timeout
+                time.sleep(0.1)
+                elapsed = time.time() - start_time
+                if elapsed > self.echo_timeout:
+                    result = "TCP_TEST TIMEOUT - local wait time exceeded"
+                    break
+                # Make sure servers are still up
+                for rtr in TcpAdaptor.router_order:
+                    es = TcpAdaptor.echo_servers[rtr]
+                    if es.error is not None:
+                        self.logger.log("TCP_TEST %s Server %s stopped with 
error: %s" %
+                                        (test_name, es.prefix, es.error))
+                        result = es.error
+                        break
+                    if es.exit_status is not None:
+                        self.logger.log("TCP_TEST %s Server %s stopped with 
status: %s" %
+                                        (test_name, es.prefix, es.exit_status))
+                        result = es.exit_status
+                        break
+                if result is not None:
+                    break
+
+                # Check for completion or runner error
+                complete = True
+                for runner in runners:
+                    if not runner.client_final:
+                        error = runner.client_error()
+                        if error is not None:
+                            self.logger.log("TCP_TEST %s Client %s stopped 
with error: %s" %
+                                            (test_name, runner.name, error))
+                            result = error
+                            runner.client_final = True
+                            break
+                        status = runner.client_exit_status()
+                        if status is not None:
+                            self.logger.log("TCP_TEST %s Client %s stopped 
with status: %s" %
+                                            (test_name, runner.name, status))
+                            result = status
+                            runner.client_final = True
+                            break
+                        running = runner.client_running()
+                        if running:
+                            complete = False
+                        else:
+                            self.logger.log("TCP_TEST %s Client %s exited 
normally" %
+                                            (test_name, runner.name))
+                            runner.client_final = True
+                if complete and result is None:
+                    self.logger.log("TCP_TEST %s SUCCESS" %
+                                    test_name)
+                    break
+
+            # Wait/join all the runners
+            for runner in runners:
+                runner.wait()
+
+            if result is not None:
+                self.logger.log("TCP_TEST %s failed: %s" % (test_name, result))
+
+        except Exception as exc:
+            result = "TCP_TEST %s failed. Exception: %s" % \
+                (test_name, traceback.format_exc())
+
+        return result
+
     #
-    # A series of 1-byte messsages, one at a time, to prove general 
connectivity
+    # Tests run by ctest
     #
     @SkipIfNeeded(DISABLE_SELECTOR_TESTS, DISABLE_SELECTOR_REASON)
     def test_01_tcp_basic_connectivity(self):
@@ -701,6 +867,45 @@ class TcpAdaptor(TestCase):
         assert result is None, "TCP_TEST Stop %s FAIL: %s" % (name, result)
         self.logger.log("TCP_TEST Stop %s SUCCESS" % name)
 
+    # Q2 holdoff
+    @SkipIfNeeded(DISABLE_SELECTOR_TESTS, DISABLE_SELECTOR_REASON)
+    def test_60_q2_holdoff(self):
+        name = "test_60_q2_holdoff"
+        self.logger.log("TCP_TEST Start %s" % name)
+
+        # Verify going to EC2
+        result = self.do_tcp_echo_singleton(name, self.EC2, self.EC2, 
Q2_TEST_MESSAGE_SIZE,
+                                            1, 
self.EC2_conn_stall_listener_port)
+        if result is not None:
+            print(result)
+            sys.stdout.flush()
+        assert result is None, "TCP_TEST Stop %s FAIL: %s" % (name, result)
+
+        # search the router log file to verify Q2 was hit
+        for attempt in range(10):
+            block_ct = 0
+            unblock_ct = 0
+            lines = 0
+            with io.open(self.EC2.logfile_path) as f:
+                for line in f:
+                    lines += 1
+                    if 'client link blocked on Q2 limit' in line:
+                        block_ct += 1
+                    if 'client link unblocked from Q2 limit' in line:
+                        unblock_ct += 1
+            if block_ct > 0 and block_ct == unblock_ct:
+                break
+            self.logger.log("Q2 holdoff from EC2 not detected. Wait for log 
file to update...")
+            time.sleep(0.1)
+        result = "failed" if block_ct == 0 or not block_ct == unblock_ct else 
"passed"
+        self.logger.log("TCP_TEST %s EC2 log scrape %s. block_ct=%d, 
unblock_ct=%d, lines=%d" %
+                        (name, result, block_ct, unblock_ct, lines))
+        self.assertTrue(block_ct > 0)
+        self.assertEqual(block_ct, unblock_ct)
+
+        # Declare success
+        self.logger.log("TCP_TEST Stop %s SUCCESS" % name)
+
 
 if __name__ == '__main__':
     unittest.main(main_module())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to