This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch dev-protocol-adaptors-2 in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
commit 2d074ac41800f18a9549f97b82e49911027607ec Author: Chuck Rolke <c...@apache.org> AuthorDate: Tue Oct 27 15:04:06 2020 -0400 DISPATCH-1807: Add TCP protocol adaptor tests Rewrite do-nothing tcp_adaptor test * Mistakenly committed test cmake including it before it's ready * It's still not ready but is finding some form Improve echo server * For messages larger than 100 bytes only print the first and last 50 bytes to logs. Improve echo client * Send unique data for each message * Improve logging * Add timeout * Don't use socket after closing it --- tests/TCP_echo_client.py | 102 ++++++++++++++++++++++++--- tests/TCP_echo_server.py | 29 ++++++-- tests/system_tests_tcp_adaptor.py | 145 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 262 insertions(+), 14 deletions(-) diff --git a/tests/TCP_echo_client.py b/tests/TCP_echo_client.py index 83f1b44..5f2a687 100755 --- a/tests/TCP_echo_client.py +++ b/tests/TCP_echo_client.py @@ -24,12 +24,38 @@ import os import selectors import socket import sys +import time import traceback import types from system_test import Logger -def main_except(host, port, size, count, logger): + +class EchoLogger(Logger): + def __init__(self, prefix="ECHO_LOGGER", title="EchoLogger", print_to_console=False, save_for_dump=False): + self.prefix = prefix + ' ' if len(prefix) > 0 else '' + super(EchoLogger, self).__init__(title=title, print_to_console=print_to_console, save_for_dump=save_for_dump) + + def log(self, msg): + super(EchoLogger, self).log(self.prefix + msg) + + +def split_chunk_for_display(raw_bytes): + """ + Given some raw bytes, return a display string + Only show the beginning and end of largish (2xMAGIC_SIZE) arrays. + :param raw_bytes: + :return: display string + """ + MAGIC_SIZE = 50 # Content repeats after chunks this big - used by echo client, too + if len(raw_bytes) > 2 * MAGIC_SIZE: + result = repr(raw_bytes[:MAGIC_SIZE]) + " ... " + repr(raw_bytes[-MAGIC_SIZE:]) + else: + result = repr(raw_bytes) + return result + + +def main_except(host, port, size, count, timeout, logger): ''' :param host: connect to this host :param port: connect to this port @@ -41,16 +67,36 @@ def main_except(host, port, size, count, logger): :return: ''' # Start up + start_time = time.time() logger.log('Connecting to host:%s, port:%d, size:%d, count:%d' % (host, port, size, count)) keep_going = True + total_sent = 0 + total_rcvd = 0 # outbound payload payload_out = [] out_list_idx = 0 # current _out array being sent out_byte_idx = 0 # next-to-send in current array out_ready_to_send = True - for i in range(count): - payload_out.append(bytearray([i & 255] * size)) + # Generate unique content for each message so you can tell where the message + # or fragment belongs in the whole stream. Chunks look like: + # b'[localhost:33333:6:0]ggggggggggggggggggggggggggggg' + # host: localhost + # port: 33333 + # index: 6 + # offset into message: 0 + MAGIC_SIZE = 50 # Content repeats after chunks this big - used by echo server, too + for idx in range(count): + body_msg = "" + padchar = "abcdefghijklmnopqrstuvwxyz@#$%"[idx % 30] + while len(body_msg) < size: + chunk = "[%s:%d:%d:%d]" % (host, port, idx, len(body_msg)) + padlen = MAGIC_SIZE - len(chunk) + chunk += padchar * padlen + body_msg += chunk + if len(body_msg) > size: + body_msg = body_msg[:size] + payload_out.append(bytearray(body_msg.encode())) # incoming payloads payload_in = [] in_list_idx = 0 # current _in array being received @@ -70,11 +116,17 @@ def main_except(host, port, size, count, logger): # event loop while keep_going: - for key, mask in sel.select(timeout=1): + if timeout > 0.0: + elapsed = time.time() - start_time + if elapsed > timeout: + logger.log("Exiting due to timeout. Total sent= %d, total rcvd= %d" % (total_sent, total_rcvd)) + break + for key, mask in sel.select(timeout=0.1): sock = key.fileobj if mask & selectors.EVENT_READ: recv_data = sock.recv(1024) if recv_data: + total_rcvd = len(recv_data) payload_in[in_list_idx].extend(recv_data) if len(payload_in[in_list_idx]) == size: logger.log("Rcvd message %d" % in_list_idx) @@ -82,6 +134,16 @@ def main_except(host, port, size, count, logger): if in_list_idx == count: # Received all bytes of all chunks - done. keep_going = False + # Verify the received data + for idxc in range(count): + for idxs in range(size): + ob = payload_out[idxc][idxs] + ib = payload_in [idxc][idxs] + if ob != ib: + error = "CRITICAL Rcvd message verify fail. row:%d, col:%d, expected:%s, actual:%s" \ + % (idxc, idxs, repr(ob), repr(ib)) + logger.log(error) + raise Exception(error) else: out_ready_to_send = True sel.modify(sock, selectors.EVENT_READ | selectors.EVENT_WRITE) @@ -89,7 +151,7 @@ def main_except(host, port, size, count, logger): error = "CRITICAL Rcvd message too big. Expected:%d, actual:%d" % \ (size, len(payload_in[in_list_idx])) logger.log(error) - keep_going = False + raise Exception(error) else: pass # still accumulating a message else: @@ -98,6 +160,7 @@ def main_except(host, port, size, count, logger): if mask & selectors.EVENT_WRITE: if out_ready_to_send: n_sent = sock.send( payload_out[out_list_idx][out_byte_idx:] ) + total_sent += n_sent out_byte_idx += n_sent if out_byte_idx == size: logger.log("Sent message %d" % out_list_idx) @@ -124,6 +187,10 @@ def main(argv): help='Size of payload in bytes') p.add_argument('--count', '-c', type=int, default=1, const=1, nargs='?', help='Number of payloads to process') + p.add_argument('--name', + help='Optional logger prefix') + p.add_argument('--timeout', '-t', type=float, default=0.0, const=1, nargs="?", + help='Timeout in seconds. Default value "0" disables timeouts') p.add_argument('--log', '-l', action='store_true', help='Write activity log to console') @@ -141,18 +208,35 @@ def main(argv): port = args.port # size + if args.size <= 0: + raise Exception("Size must be greater than zero") size = args.size # count + if args.count <= 0: + raise Exception("Count must be greater than zero") count = args.count + # name / prefix + prefix = args.name if args.name is not None else "ECHO_CLIENT" + + # timeout + if args.timeout < 0.0: + raise Exception("Timeout must be greater than or equal to zero") + # logging - logger = Logger(title = "TCP_echo_client host:%s port %d size:%d count:%d" % (host, port, size, count), - print_to_console = args.log, - save_for_dump = False) + logger = EchoLogger(prefix=prefix, + title = "%s host:%s port %d size:%d count:%d" % (prefix, host, port, size, count), + print_to_console = args.log, + save_for_dump = False) - main_except(host, port, size, count, logger) + main_except(host, port, size, count, args.timeout, logger) return 0 + + except KeyboardInterrupt: + logger.log("Exiting due to KeyboardInterrupt") + return 0 + except Exception as e: traceback.print_exc() return 1 diff --git a/tests/TCP_echo_server.py b/tests/TCP_echo_server.py index 44aefd7..28fc2ae 100755 --- a/tests/TCP_echo_server.py +++ b/tests/TCP_echo_server.py @@ -63,6 +63,21 @@ class EchoLogger(Logger): super(EchoLogger, self).log(self.prefix + msg) +def split_chunk_for_display(raw_bytes): + """ + Given some raw bytes, return a display string + Only show the beginning and end of largish (2xMAGIC_SIZE) arrays. + :param raw_bytes: + :return: display string + """ + MAGIC_SIZE = 50 # Content repeats after chunks this big - used by echo client, too + if len(raw_bytes) > 2 * MAGIC_SIZE: + result = repr(raw_bytes[:MAGIC_SIZE]) + " ... " + repr(raw_bytes[-MAGIC_SIZE:]) + else: + result = repr(raw_bytes) + return result + + def main_except(sock, port, echo_count, timeout, logger): ''' :param lsock: socket to listen on @@ -128,23 +143,26 @@ def do_service(key, mask, sel, logger): recv_data = sock.recv(1024) if recv_data: data.outb += recv_data - logger.log('read from: %s:%d len:%d: %s' % (data.addr[0], data.addr[1], len(recv_data), repr(recv_data))) + logger.log('read from: %s:%d len:%d: %s' % (data.addr[0], data.addr[1], len(recv_data), + split_chunk_for_display(recv_data))) sel.modify(sock, selectors.EVENT_READ | selectors.EVENT_WRITE, data=data) else: logger.log('Closing connection to %s:%d' % (data.addr[0], data.addr[1])) sel.unregister(sock) sock.close() + return 0 if mask & selectors.EVENT_WRITE: if data.outb: sent = sock.send(data.outb) retval += sent if sent > 0: - logger.log('write to : %s:%d len:%d: %s' % (data.addr[0], data.addr[1], sent, repr(data.outb[:sent]))) + logger.log('write to : %s:%d len:%d: %s' % (data.addr[0], data.addr[1], sent, + split_chunk_for_display(data.outb[:sent]))) else: logger.log('write to : %s:%d len:0' % (data.addr[0], data.addr[1])) data.outb = data.outb[sent:] else: - logger.log('write event with no data' + str(data)) + #logger.log('write event with no data' + str(data)) sel.modify(sock, selectors.EVENT_READ, data=data) return retval @@ -183,7 +201,7 @@ def main(argv): # timeout if args.timeout < 0.0: - raise Exception("Timeout must be greater than zero") + raise Exception("Timeout must be greater than or equal to zero") # logging logger = EchoLogger(prefix = prefix, @@ -197,9 +215,10 @@ def main(argv): main_except(lsock, port, args.echo, args.timeout, logger) except KeyboardInterrupt: - pass + logger.log("Exiting due to KeyboardInterrupt. Total echoed = %d" % total_echoed) except Exception as e: + logger.log("Exiting due to Exception. Total echoed = %d" % total_echoed) traceback.print_exc() retval = 1 diff --git a/tests/system_tests_tcp_adaptor.py b/tests/system_tests_tcp_adaptor.py new file mode 100644 index 0000000..8120937 --- /dev/null +++ b/tests/system_tests_tcp_adaptor.py @@ -0,0 +1,145 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from __future__ import unicode_literals +from __future__ import division +from __future__ import absolute_import +from __future__ import print_function + +import os +from time import sleep +from threading import Event +from threading import Timer + +from proton import Message, Timeout +from system_test import TestCase, Qdrouterd, main_module, TIMEOUT, MgmtMsgProxy, TestTimeout, PollTimeout +from system_test import AsyncTestReceiver +from system_test import AsyncTestSender +from system_test import Logger +from system_test import QdManager +from system_test import unittest +from system_test import Process +from system_tests_link_routes import ConnLinkRouteService +from test_broker import FakeBroker +from test_broker import FakeService +from proton.handlers import MessagingHandler +from proton.reactor import Container, DynamicNodeProperties +from proton.utils import BlockingConnection +from qpid_dispatch.management.client import Node +from qpid_dispatch_internal.tools.command import version_supports_mutually_exclusive_arguments +from subprocess import PIPE, STDOUT +import re + +class AddrTimer(object): + def __init__(self, parent): + self.parent = parent + + def on_timer_task(self, event): + self.parent.check_address() + + +class TcpAdaptorOneRouterEcho(TestCase): + """ + Run echo tests through a stand-alone router + """ + amqp_listener_port = None + tcp_client_listener_port = None + tcp_server_listener_port = None + + @classmethod + def setUpClass(cls): + """Start a router and echo server""" + super(TcpAdaptorOneRouterEcho, cls).setUpClass() + + def router(name, mode, l_amqp, l_tcp_client, l_tcp_server, addr, site, extra=None): + config = [ + ('router', {'mode': mode, 'id': name}), + ('listener', {'port': l_amqp, 'stripAnnotations': 'no'}), + ('tcpConnector', {"host": "127.0.0.1", + "port": l_tcp_server, + "address": addr, + "siteId": site}), + ('tcpListener', {"host": "0.0.0.0", + "port": l_tcp_client, + "address": addr, + "siteId": site}) + ] + + if extra: + config.append(extra) + config = Qdrouterd.Config(config) + cls.routers.append(cls.tester.qdrouterd(name, config, wait=True)) + + cls.routers = [] + + cls.amqp_listener_port = cls.tester.get_port() + cls.tcp_client_listener_port = cls.tester.get_port() + cls.tcp_server_listener_port = cls.tester.get_port() + + router('INT.A', 'interior', cls.amqp_listener_port, cls.tcp_client_listener_port, + cls.tcp_server_listener_port, "some_address", "best_site") + + cls.logger = Logger(title="TCP echo one router", print_to_console=True) + + @classmethod + def tearDownClass(cls): + pass + + def spawn_echo_server(self, port, expect=None): + cmd = ["TCP_echo_server.py", + "--port", str(port), + "--log"] + return self.popen(cmd, name='echo-server', stdout=PIPE, expect=expect, + universal_newlines=True) + + def spawn_echo_client(self, logger, host, port, size, count, expect=None): + if expect is None: + expect = Process.EXIT_OK + cmd = ["TCP_echo_client.py", + "--host", host, + "--port", str(port), + "--size", str(size), + "--count", str(count), + "--log"] + logger.log("Start client. cmd=%s" % str(cmd)) + return self.popen(cmd, name='echo-clint', stdout=PIPE, expect=expect, + universal_newlines=True) + + def do_test_echo(self, logger, host, port, size, count): + # start echo client + client = self.spawn_echo_client(logger, host, port, size, count) + cl_text, cl_error = client.communicate(timeout=TIMEOUT) + if client.returncode: + raise Exception("Echo client failed size:%d, count:%d : %s %s" % + (size, count, cl_text, cl_error)) + + def test_01_tcp_echo_one_router(self): + # start echo server + #server = self.spawn_echo_server(self.tcp_server_listener_port) + + #for size in [1, 5, 10, 50, 100]: + # for count in [1, 5, 10, 50, 100]: + # self.logger.log("Starting echo client host:localhost, port:%d, size:%d, count:%d" % + # (self.tcp_client_listener_port, size, count)) + # self.do_test_echo(self.logger, "localhost", self.tcp_client_listener_port, size, count) + #server.join() + pass + +if __name__== '__main__': + unittest.main(main_module()) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org