This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch dev-protocol-adaptors-2 in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/dev-protocol-adaptors-2 by this push: new e29ea39 DISPATCH-1807: Add self tests for tcp protocol adaptor e29ea39 is described below commit e29ea396acfab940e54db36b569a865720ac9f8a Author: Chuck Rolke <c...@apache.org> AuthorDate: Fri Oct 30 09:40:35 2020 -0400 DISPATCH-1807: Add self tests for tcp protocol adaptor * echo_server and echo_client modified to have a test class that runs the test proper. * tcp_adaptor test runs the client and server test classes in separate threads and not in separate processes. This closes #905 --- tests/TCP_echo_client.py | 422 +++++++++++++++++++++----------------- tests/TCP_echo_server.py | 357 +++++++++++++++++++------------- tests/system_tests_tcp_adaptor.py | 147 +++++++------ 3 files changed, 532 insertions(+), 394 deletions(-) diff --git a/tests/TCP_echo_client.py b/tests/TCP_echo_client.py index 5f2a687..57152e3 100755 --- a/tests/TCP_echo_client.py +++ b/tests/TCP_echo_client.py @@ -19,25 +19,33 @@ # under the License. # +from __future__ import unicode_literals +from __future__ import division +from __future__ import absolute_import +from __future__ import print_function + import argparse import os import selectors +import signal import socket import sys +from threading import Thread import time import traceback import types from system_test import Logger +from system_test import TIMEOUT +class GracefulKiller: + kill_now = False + def __init__(self): + signal.signal(signal.SIGINT, self.exit_gracefully) + signal.signal(signal.SIGTERM, self.exit_gracefully) -class EchoLogger(Logger): - def __init__(self, prefix="ECHO_LOGGER", title="EchoLogger", print_to_console=False, save_for_dump=False): - self.prefix = prefix + ' ' if len(prefix) > 0 else '' - super(EchoLogger, self).__init__(title=title, print_to_console=print_to_console, save_for_dump=save_for_dump) - - def log(self, msg): - super(EchoLogger, self).log(self.prefix + msg) + def exit_gracefully(self,signum, frame): + self.kill_now = True def split_chunk_for_display(raw_bytes): @@ -55,192 +63,240 @@ def split_chunk_for_display(raw_bytes): return result -def main_except(host, port, size, count, timeout, logger): - ''' - :param host: connect to this host - :param port: connect to this port - :param size: size of individual payload chunks in bytes - :param count: number of payload chunks - :param strategy: "1" Send one payload; # TODO - Recv one payload - :param logger: Logger() object - :return: - ''' - # Start up - start_time = time.time() - logger.log('Connecting to host:%s, port:%d, size:%d, count:%d' % (host, port, size, count)) - keep_going = True - total_sent = 0 - total_rcvd = 0 - - # outbound payload - payload_out = [] - out_list_idx = 0 # current _out array being sent - out_byte_idx = 0 # next-to-send in current array - out_ready_to_send = True - # Generate unique content for each message so you can tell where the message - # or fragment belongs in the whole stream. Chunks look like: - # b'[localhost:33333:6:0]ggggggggggggggggggggggggggggg' - # host: localhost - # port: 33333 - # index: 6 - # offset into message: 0 - MAGIC_SIZE = 50 # Content repeats after chunks this big - used by echo server, too - for idx in range(count): - body_msg = "" - padchar = "abcdefghijklmnopqrstuvwxyz@#$%"[idx % 30] - while len(body_msg) < size: - chunk = "[%s:%d:%d:%d]" % (host, port, idx, len(body_msg)) - padlen = MAGIC_SIZE - len(chunk) - chunk += padchar * padlen - body_msg += chunk - if len(body_msg) > size: - body_msg = body_msg[:size] - payload_out.append(bytearray(body_msg.encode())) - # incoming payloads - payload_in = [] - in_list_idx = 0 # current _in array being received - for i in range(count): - payload_in.append(bytearray()) - - # set up connection - host_address = (host, port) - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.connect(host_address) - sock.setblocking(False) - - # set up selector - sel = selectors.DefaultSelector() - sel.register(sock, - selectors.EVENT_READ | selectors.EVENT_WRITE) - - # event loop - while keep_going: - if timeout > 0.0: - elapsed = time.time() - start_time - if elapsed > timeout: - logger.log("Exiting due to timeout. Total sent= %d, total rcvd= %d" % (total_sent, total_rcvd)) - break - for key, mask in sel.select(timeout=0.1): - sock = key.fileobj - if mask & selectors.EVENT_READ: - recv_data = sock.recv(1024) - if recv_data: - total_rcvd = len(recv_data) - payload_in[in_list_idx].extend(recv_data) - if len(payload_in[in_list_idx]) == size: - logger.log("Rcvd message %d" % in_list_idx) - in_list_idx += 1 - if in_list_idx == count: - # Received all bytes of all chunks - done. - keep_going = False - # Verify the received data - for idxc in range(count): - for idxs in range(size): - ob = payload_out[idxc][idxs] - ib = payload_in [idxc][idxs] - if ob != ib: - error = "CRITICAL Rcvd message verify fail. row:%d, col:%d, expected:%s, actual:%s" \ - % (idxc, idxs, repr(ob), repr(ib)) - logger.log(error) - raise Exception(error) +class TcpEchoClient(): + + def __init__(self, prefix, host, port, size, count, timeout, logger): + ''' + :param host: connect to this host + :param port: connect to this port + :param size: size of individual payload chunks in bytes + :param count: number of payload chunks + :param strategy: "1" Send one payload; # TODO + Recv one payload + :param logger: Logger() object + :return: + ''' + # Start up + self.sock = None + self.prefix = prefix + self.host = host + self.port = port + self.size = size + self.count = count + self.timeout = timeout + self.logger = logger + self.keep_running = True + self.is_running = False + self.exit_status = None + self.error = None + self._thread = Thread(target=self.run) + self._thread.daemon = True + self._thread.start() + + def run(self): + try: + start_time = time.time() + self.is_running = True + self.logger.log('%s Connecting to host:%s, port:%d, size:%d, count:%d' % + (self.prefix, self.host, self.port, self.size, self.count)) + total_sent = 0 + total_rcvd = 0 + + # outbound payload + payload_out = [] + out_list_idx = 0 # current _out array being sent + out_byte_idx = 0 # next-to-send in current array + out_ready_to_send = True + # Generate unique content for each message so you can tell where the message + # or fragment belongs in the whole stream. Chunks look like: + # b'[localhost:33333:6:0]ggggggggggggggggggggggggggggg' + # host: localhost + # port: 33333 + # index: 6 + # offset into message: 0 + CONTENT_CHUNK_SIZE = 50 # Content repeats after chunks this big - used by echo server, too + for idx in range(self.count): + body_msg = "" + padchar = "abcdefghijklmnopqrstuvwxyz@#$%"[idx % 30] + while len(body_msg) < self.size: + chunk = "[%s:%d:%d:%d]" % (self.host, self.port, idx, len(body_msg)) + padlen = CONTENT_CHUNK_SIZE - len(chunk) + chunk += padchar * padlen + body_msg += chunk + if len(body_msg) > self.size: + body_msg = body_msg[:self.size] + payload_out.append(bytearray(body_msg.encode())) + # incoming payloads + payload_in = [] + in_list_idx = 0 # current _in array being received + for i in range(self.count): + payload_in.append(bytearray()) + + # set up connection + host_address = (self.host, self.port) + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.sock.connect(host_address) + self.sock.setblocking(False) + + # set up selector + sel = selectors.DefaultSelector() + sel.register(self.sock, + selectors.EVENT_READ | selectors.EVENT_WRITE) + + # event loop + while self.keep_running: + if self.timeout > 0.0: + elapsed = time.time() - start_time + if elapsed > self.timeout: + self.exit_status = "%s Exiting due to timeout. Total sent= %d, total rcvd= %d" % \ + (self.prefix, total_sent, total_rcvd) + break + for key, mask in sel.select(timeout=0.1): + sock = key.fileobj + if mask & selectors.EVENT_READ: + recv_data = sock.recv(1024) + if recv_data: + total_rcvd = len(recv_data) + payload_in[in_list_idx].extend(recv_data) + if len(payload_in[in_list_idx]) == self.size: + self.logger.log("%s Rcvd message %d" % (self.prefix, in_list_idx)) + in_list_idx += 1 + if in_list_idx == self.count: + # Received all bytes of all chunks - done. + self.keep_running = False + # Verify the received data + for idxc in range(self.count): + for idxs in range(self.size): + ob = payload_out[idxc][idxs] + ib = payload_in [idxc][idxs] + if ob != ib: + self.error = "%s ERROR Rcvd message verify fail. row:%d, col:%d, expected:%s, actual:%s" \ + % (self.prefix, idxc, idxs, repr(ob), repr(ib)) + break + else: + out_ready_to_send = True + sel.modify(sock, selectors.EVENT_READ | selectors.EVENT_WRITE) + elif len(payload_in[in_list_idx]) > self.size: + self.error = "ERROR Received message too big. Expected:%d, actual:%d" % \ + (self.size, len(payload_in[in_list_idx])) + break + else: + pass # still accumulating a message + else: + # socket closed + self.keep_running = False + if self.keep_running and mask & selectors.EVENT_WRITE: + if out_ready_to_send: + n_sent = self.sock.send( payload_out[out_list_idx][out_byte_idx:] ) + total_sent += n_sent + out_byte_idx += n_sent + if out_byte_idx == self.size: + self.logger.log("%s Sent message %d" % (self.prefix, out_list_idx)) + out_byte_idx = 0 + out_list_idx += 1 + sel.modify(self.sock, selectors.EVENT_READ) # turn off write events + out_ready_to_send = False # turn on when rcvr receives else: - out_ready_to_send = True - sel.modify(sock, selectors.EVENT_READ | selectors.EVENT_WRITE) - elif len(payload_in[in_list_idx]) > size: - error = "CRITICAL Rcvd message too big. Expected:%d, actual:%d" % \ - (size, len(payload_in[in_list_idx])) - logger.log(error) - raise Exception(error) - else: - pass # still accumulating a message - else: - # socket closed - keep_going = False - if mask & selectors.EVENT_WRITE: - if out_ready_to_send: - n_sent = sock.send( payload_out[out_list_idx][out_byte_idx:] ) - total_sent += n_sent - out_byte_idx += n_sent - if out_byte_idx == size: - logger.log("Sent message %d" % out_list_idx) - out_byte_idx = 0 - out_list_idx += 1 - sel.modify(sock, selectors.EVENT_READ) # turn off write events - out_ready_to_send = False # turn on when rcvr receives - else: - logger.log("DEBUG: ignoring EVENT_WRITE") - - # shut down - sel.unregister(sock) - sock.close() + pass # logger.log("DEBUG: ignoring EVENT_WRITE") + + # shut down + sel.unregister(self.sock) + self.sock.close() + + except Exception as exc: + self.error = "ERROR: exception : '%s'" % traceback.format_exc() + + self.is_running = False + + def wait(self, timeout=TIMEOUT): + self.logger.log("%s Client is shutting down" % (self.prefix)) + self.keep_running = False + self._thread.join(timeout) + def main(argv): - try: - # parse args - p = argparse.ArgumentParser() - p.add_argument('--host', '-b', - help='Required target host') - p.add_argument('--port', '-p', type=int, - help='Required target port number') - p.add_argument('--size', '-s', type=int, default=100, const=1, nargs='?', - help='Size of payload in bytes') - p.add_argument('--count', '-c', type=int, default=1, const=1, nargs='?', - help='Number of payloads to process') - p.add_argument('--name', - help='Optional logger prefix') - p.add_argument('--timeout', '-t', type=float, default=0.0, const=1, nargs="?", - help='Timeout in seconds. Default value "0" disables timeouts') - p.add_argument('--log', '-l', - action='store_true', - help='Write activity log to console') - del argv[0] - args = p.parse_args(argv) - - # host - if args.host is None: - raise Exception("User must specify a host") - host = args.host - - # port - if args.port is None: - raise Exception("User must specify a port number") - port = args.port - - # size - if args.size <= 0: - raise Exception("Size must be greater than zero") - size = args.size - - # count - if args.count <= 0: - raise Exception("Count must be greater than zero") - count = args.count - - # name / prefix - prefix = args.name if args.name is not None else "ECHO_CLIENT" - - # timeout - if args.timeout < 0.0: - raise Exception("Timeout must be greater than or equal to zero") + retval = 0 + # parse args + p = argparse.ArgumentParser() + p.add_argument('--host', '-b', + help='Required target host') + p.add_argument('--port', '-p', type=int, + help='Required target port number') + p.add_argument('--size', '-s', type=int, default=100, const=1, nargs='?', + help='Size of payload in bytes') + p.add_argument('--count', '-c', type=int, default=1, const=1, nargs='?', + help='Number of payloads to process') + p.add_argument('--name', + help='Optional logger prefix') + p.add_argument('--timeout', '-t', type=float, default=0.0, const=1, nargs="?", + help='Timeout in seconds. Default value "0" disables timeouts') + p.add_argument('--log', '-l', + action='store_true', + help='Write activity log to console') + del argv[0] + args = p.parse_args(argv) + + # host + if args.host is None: + raise Exception("User must specify a host") + host = args.host + + # port + if args.port is None: + raise Exception("User must specify a port number") + port = args.port + + # size + if args.size <= 0: + raise Exception("Size must be greater than zero") + size = args.size + # count + if args.count <= 0: + raise Exception("Count must be greater than zero") + count = args.count + + # name / prefix + prefix = args.name if args.name is not None else "ECHO_CLIENT (%d_%d_%d)" % \ + (port, size, count) + + # timeout + if args.timeout < 0.0: + raise Exception("Timeout must be greater than or equal to zero") + + killer = GracefulKiller() + client = None + + try: # logging - logger = EchoLogger(prefix=prefix, - title = "%s host:%s port %d size:%d count:%d" % (prefix, host, port, size, count), - print_to_console = args.log, - save_for_dump = False) + logger = Logger(title = "%s host:%s port %d size:%d count:%d" % (prefix, host, port, size, count), + print_to_console = args.log, + save_for_dump = False) - main_except(host, port, size, count, args.timeout, logger) - return 0 + client = TcpEchoClient(prefix, host, port, size, count, args.timeout, logger) - except KeyboardInterrupt: - logger.log("Exiting due to KeyboardInterrupt") - return 0 + keep_running = True + while keep_running: + time.sleep(0.1) + if client.error is not None: + logger.log("%s Client stopped with error: %s" % (prefix, client.error)) + keep_running = False + retval = 1 + if client.exit_status is not None: + logger.log("%s Client stopped with status: %s" % (prefix, client.exit_status)) + keep_running = False + if killer.kill_now: + logger.log("%s Process killed with signal" % (prefix)) + keep_running = False + if keep_running and not client.is_running: + logger.log("%s Client stopped with no error or status" % (prefix)) + keep_running = False except Exception as e: - traceback.print_exc() - return 1 + logger.log("%s Exception: %s" % (prefix, traceback.format_exc())) + retval = 1 + return retval if __name__ == "__main__": sys.exit(main(sys.argv)) diff --git a/tests/TCP_echo_server.py b/tests/TCP_echo_server.py index 28fc2ae..34f72bb 100755 --- a/tests/TCP_echo_server.py +++ b/tests/TCP_echo_server.py @@ -19,19 +19,24 @@ # under the License. # +from __future__ import unicode_literals +from __future__ import division +from __future__ import absolute_import +from __future__ import print_function + import argparse import os import selectors -from signal import signal, SIGINT +import signal import socket import sys +from threading import Thread import time import traceback import types from system_test import Logger - -HOST = '127.0.0.1' +from system_test import TIMEOUT class ClientRecord(object): """ @@ -54,176 +59,234 @@ class ClientRecord(object): return self.__repr__() -class EchoLogger(Logger): - def __init__(self, prefix="ECHO_LOGGER", title="EchoLogger", print_to_console=False, save_for_dump=False): - self.prefix = prefix + ' ' if len(prefix) > 0 else '' - super(EchoLogger, self).__init__(title=title, print_to_console=print_to_console, save_for_dump=save_for_dump) - - def log(self, msg): - super(EchoLogger, self).log(self.prefix + msg) +class GracefulKiller: + kill_now = False + def __init__(self): + signal.signal(signal.SIGINT, self.exit_gracefully) + signal.signal(signal.SIGTERM, self.exit_gracefully) + + def exit_gracefully(self,signum, frame): + self.kill_now = True def split_chunk_for_display(raw_bytes): """ Given some raw bytes, return a display string - Only show the beginning and end of largish (2xMAGIC_SIZE) arrays. + Only show the beginning and end of largish (2x CONTENT_CHUNK_SIZE) arrays. :param raw_bytes: :return: display string """ - MAGIC_SIZE = 50 # Content repeats after chunks this big - used by echo client, too - if len(raw_bytes) > 2 * MAGIC_SIZE: - result = repr(raw_bytes[:MAGIC_SIZE]) + " ... " + repr(raw_bytes[-MAGIC_SIZE:]) + CONTENT_CHUNK_SIZE = 50 # Content repeats after chunks this big - used by echo client, too + if len(raw_bytes) > 2 * CONTENT_CHUNK_SIZE: + result = repr(raw_bytes[:CONTENT_CHUNK_SIZE]) + " ... " + repr(raw_bytes[-CONTENT_CHUNK_SIZE:]) else: result = repr(raw_bytes) return result -def main_except(sock, port, echo_count, timeout, logger): - ''' - :param lsock: socket to listen on - :param port: port to listen on - :param echo_count: exit after echoing this many bytes - :param timeout: exit after this many seconds - :param logger: Logger() object - :return: - ''' - # set up spontaneous exit settings - start_time = time.time() - total_echoed = 0 - - # set up listening socket - sock.bind((HOST, port)) - sock.listen() - sock.setblocking(False) - logger.log('Listening on host:%s, port:%d' % (HOST, port)) - - # set up selector - sel = selectors.DefaultSelector() - sel.register(sock, selectors.EVENT_READ, data=None) - - # event loop - while True: - if timeout > 0.0: - elapsed = time.time() - start_time - if elapsed > timeout: - logger.log("Exiting due to timeout. Total echoed = %d" % total_echoed) - break - if echo_count > 0: - if total_echoed >= echo_count: - logger.log("Exiting due to echo byte count. Total echoed = %d" % total_echoed) - break - events = sel.select(timeout=0.1) - if events: - for key, mask in events: - if key.data is None: - if key.fileobj is sock: - do_accept(key.fileobj, sel, logger) - else: - raise Exception("Only listener 'sock' has None in opaque data field") +class TcpEchoServer(): + def __init__(self, prefix="ECHO_SERVER", port="0", echo_count=0, timeout=0.0, logger=None): + ''' + Start echo server in separate thread + + :param prefix: log prefix + :param port: port to listen on + :param echo_count: exit after echoing this many bytes + :param timeout: exit after this many seconds + :param logger: Logger() object + :return: + ''' + self.sock = None + self.prefix = prefix + self.port = port + self.echo_count = echo_count + self.timeout = timeout + self.logger = logger + self.keep_running = True + self.HOST = '127.0.0.1' + self.is_running = False + self.exit_status = None + self.error = None + self._thread = Thread(target=self.run) + self._thread.daemon = True + self._thread.start() + + def run(self): + """ + Run server in daemon thread + :return: + """ + try: + # set up spontaneous exit settings + self.is_running = True + start_time = time.time() + total_echoed = 0 + + # set up listening socket + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.sock.bind((self.HOST, self.port)) + self.sock.listen() + self.sock.setblocking(False) + self.logger.log('%s Listening on host:%s, port:%d' % (self.prefix, self.HOST, self.port)) + + # set up selector + sel = selectors.DefaultSelector() + sel.register(self.sock, selectors.EVENT_READ, data=None) + + # event loop + while True: + if not self.keep_running: + self.exit_status = "INFO: command shutdown:" + break + if self.timeout > 0.0: + elapsed = time.time() - start_time + if elapsed > self.timeout: + self.exit_status = "Exiting due to timeout. Total echoed = %d" % total_echoed + break + if self.echo_count > 0: + if total_echoed >= self.echo_count: + self.exit_status = "Exiting due to echo byte count. Total echoed = %d" % total_echoed + break + events = sel.select(timeout=0.1) + if events: + for key, mask in events: + if key.data is None: + if key.fileobj is self.sock: + self.do_accept(key.fileobj, sel, self.logger) + else: + pass # Only listener 'sock' has None in opaque data field + else: + total_echoed += self.do_service(key, mask, sel, self.logger) + else: + pass # select timeout. probably. + + sel.unregister(self.sock) + self.sock.close() + + except Exception as exc: + self.error = "ERROR: exception : '%s'" % traceback.format_exc() + + self.is_running = False + + def do_accept(self, sock, sel, logger): + conn, addr = sock.accept() + logger.log('%s Accepted connection from %s:%d' % (self.prefix, addr[0], addr[1])) + conn.setblocking(False) + events = selectors.EVENT_READ | selectors.EVENT_WRITE + sel.register(conn, events, data=ClientRecord(addr)) + + def do_service(self, key, mask, sel, logger): + retval = 0 + sock = key.fileobj + data = key.data + if mask & selectors.EVENT_READ: + try: + recv_data = sock.recv(1024) + except ConnectionResetError as exc: + logger.log('%s Connection to %s:%d closed by peer' % (self.prefix, data.addr[0], data.addr[1])) + sel.unregister(sock) + sock.close() + return 0 + if recv_data: + data.outb += recv_data + logger.log('%s read from: %s:%d len:%d: %s' % (self.prefix, data.addr[0], data.addr[1], len(recv_data), + split_chunk_for_display(recv_data))) + sel.modify(sock, selectors.EVENT_READ | selectors.EVENT_WRITE, data=data) + else: + logger.log('%s Closing connection to %s:%d' % (self.prefix, data.addr[0], data.addr[1])) + sel.unregister(sock) + sock.close() + return 0 + if mask & selectors.EVENT_WRITE: + if data.outb: + sent = sock.send(data.outb) + retval += sent + if sent > 0: + logger.log('%s write to : %s:%d len:%d: %s' % (self.prefix, data.addr[0], data.addr[1], sent, + split_chunk_for_display(data.outb[:sent]))) else: - total_echoed += do_service(key, mask, sel, logger) - else: - pass # select timeout. probably. + logger.log('%s write to : %s:%d len:0' % (self.prefix, data.addr[0], data.addr[1])) + data.outb = data.outb[sent:] + else: + sel.modify(sock, selectors.EVENT_READ, data=data) + return retval - sel.unregister(sock) - sock.close() + def wait(self, timeout=TIMEOUT): + self.logger.log("%s Server is shutting down" % (self.prefix)) + self.keep_running = False + self._thread.join(timeout) -def do_accept(sock, sel, logger): - conn, addr = sock.accept() - logger.log('Accepted connection from %s:%d' % (addr[0], addr[1])) - conn.setblocking(False) - events = selectors.EVENT_READ | selectors.EVENT_WRITE - sel.register(conn, events, data=ClientRecord(addr)) - -def do_service(key, mask, sel, logger): - retval = 0 - sock = key.fileobj - data = key.data - if mask & selectors.EVENT_READ: - recv_data = sock.recv(1024) - if recv_data: - data.outb += recv_data - logger.log('read from: %s:%d len:%d: %s' % (data.addr[0], data.addr[1], len(recv_data), - split_chunk_for_display(recv_data))) - sel.modify(sock, selectors.EVENT_READ | selectors.EVENT_WRITE, data=data) - else: - logger.log('Closing connection to %s:%d' % (data.addr[0], data.addr[1])) - sel.unregister(sock) - sock.close() - return 0 - if mask & selectors.EVENT_WRITE: - if data.outb: - sent = sock.send(data.outb) - retval += sent - if sent > 0: - logger.log('write to : %s:%d len:%d: %s' % (data.addr[0], data.addr[1], sent, - split_chunk_for_display(data.outb[:sent]))) - else: - logger.log('write to : %s:%d len:0' % (data.addr[0], data.addr[1])) - data.outb = data.outb[sent:] - else: - #logger.log('write event with no data' + str(data)) - sel.modify(sock, selectors.EVENT_READ, data=data) - return retval def main(argv): retval = 0 - try: - # parse args - p = argparse.ArgumentParser() - p.add_argument('--port', '-p', - help='Required listening port number') - p.add_argument('--name', - help='Optional logger prefix') - p.add_argument('--echo', '-e', type=int, default=0, const=1, nargs="?", - help='Exit after echoing this many bytes. Default value "0" disables exiting on byte count.') - p.add_argument('--timeout', '-t', type=float, default=0.0, const=1, nargs="?", - help='Timeout in seconds. Default value "0" disables timeouts') - p.add_argument('--log', '-l', - action='store_true', - help='Write activity log to console') - del argv[0] - args = p.parse_args(argv) - - lsock = None - - # port - if args.port is None: - raise Exception("User must specify a port number") - port = int(args.port) - - # name / prefix - prefix = args.name if args.name is not None else "ECHO_SERVER" - - # echo - if args.echo < 0: - raise Exception("Echo count must be greater than zero") - - # timeout - if args.timeout < 0.0: - raise Exception("Timeout must be greater than or equal to zero") + # parse args + p = argparse.ArgumentParser() + p.add_argument('--port', '-p', + help='Required listening port number') + p.add_argument('--name', + help='Optional logger prefix') + p.add_argument('--echo', '-e', type=int, default=0, const=1, nargs="?", + help='Exit after echoing this many bytes. Default value "0" disables exiting on byte count.') + p.add_argument('--timeout', '-t', type=float, default=0.0, const=1, nargs="?", + help='Timeout in seconds. Default value "0" disables timeouts') + p.add_argument('--log', '-l', + action='store_true', + help='Write activity log to console') + del argv[0] + args = p.parse_args(argv) + + # port + if args.port is None: + raise Exception("User must specify a port number") + port = int(args.port) + + # name / prefix + prefix = args.name if args.name is not None else "ECHO_SERVER (%s)" % (str(port)) + + # echo + if args.echo < 0: + raise Exception("Echo count must be greater than zero") + + # timeout + if args.timeout < 0.0: + raise Exception("Timeout must be greater than or equal to zero") + + killer = GracefulKiller() + server = None + try: # logging - logger = EchoLogger(prefix = prefix, - title = "%s port %d" % (prefix, port), - print_to_console = args.log, - save_for_dump = False) + logger = Logger(title = "%s port %d" % (prefix, port), + print_to_console = args.log, + save_for_dump = False) + + server = TcpEchoServer(prefix, port, args.echo, args.timeout, logger) + + keep_running = True + while keep_running: + time.sleep(0.1) + if server.error is not None: + logger.log("%s Server stopped with error: %s" % (prefix, server.error)) + keep_running = False + retval = 1 + if server.exit_status is not None: + logger.log("%s Server stopped with status: %s" % (prefix, server.exit_status)) + keep_running = False + if killer.kill_now: + logger.log("%s Process killed with signal" % (prefix)) + keep_running = False + if keep_running and not server.is_running: + logger.log("%s Server stopped with no error or status" % (prefix)) + keep_running = False - # the listening socket - lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - - main_except(lsock, port, args.echo, args.timeout, logger) - - except KeyboardInterrupt: - logger.log("Exiting due to KeyboardInterrupt. Total echoed = %d" % total_echoed) except Exception as e: - logger.log("Exiting due to Exception. Total echoed = %d" % total_echoed) - traceback.print_exc() + logger.log("%s Exception: %s" % (prefix, traceback.format_exc())) retval = 1 - if lsock is not None: - lsock.close() + if server is not None and server.sock is not None: + server.sock.close() + return retval diff --git a/tests/system_tests_tcp_adaptor.py b/tests/system_tests_tcp_adaptor.py index 8120937..c2df88d 100644 --- a/tests/system_tests_tcp_adaptor.py +++ b/tests/system_tests_tcp_adaptor.py @@ -23,38 +23,24 @@ from __future__ import absolute_import from __future__ import print_function import os +import traceback from time import sleep from threading import Event from threading import Timer -from proton import Message, Timeout -from system_test import TestCase, Qdrouterd, main_module, TIMEOUT, MgmtMsgProxy, TestTimeout, PollTimeout -from system_test import AsyncTestReceiver -from system_test import AsyncTestSender +from system_test import TestCase, Qdrouterd, main_module, TIMEOUT from system_test import Logger from system_test import QdManager from system_test import unittest from system_test import Process -from system_tests_link_routes import ConnLinkRouteService -from test_broker import FakeBroker -from test_broker import FakeService -from proton.handlers import MessagingHandler -from proton.reactor import Container, DynamicNodeProperties -from proton.utils import BlockingConnection +from system_test import DIR from qpid_dispatch.management.client import Node -from qpid_dispatch_internal.tools.command import version_supports_mutually_exclusive_arguments from subprocess import PIPE, STDOUT -import re +from TCP_echo_client import TcpEchoClient +from TCP_echo_server import TcpEchoServer -class AddrTimer(object): - def __init__(self, parent): - self.parent = parent - def on_timer_task(self, event): - self.parent.check_address() - - -class TcpAdaptorOneRouterEcho(TestCase): +class TcpAdaptorOneRouterEcho(TestCase, Process): """ Run echo tests through a stand-alone router """ @@ -64,7 +50,7 @@ class TcpAdaptorOneRouterEcho(TestCase): @classmethod def setUpClass(cls): - """Start a router and echo server""" + """Start a router""" super(TcpAdaptorOneRouterEcho, cls).setUpClass() def router(name, mode, l_amqp, l_tcp_client, l_tcp_server, addr, site, extra=None): @@ -92,54 +78,87 @@ class TcpAdaptorOneRouterEcho(TestCase): cls.tcp_client_listener_port = cls.tester.get_port() cls.tcp_server_listener_port = cls.tester.get_port() - router('INT.A', 'interior', cls.amqp_listener_port, cls.tcp_client_listener_port, + router('A', 'interior', cls.amqp_listener_port, cls.tcp_client_listener_port, cls.tcp_server_listener_port, "some_address", "best_site") - cls.logger = Logger(title="TCP echo one router", print_to_console=True) - - @classmethod - def tearDownClass(cls): - pass - - def spawn_echo_server(self, port, expect=None): - cmd = ["TCP_echo_server.py", - "--port", str(port), - "--log"] - return self.popen(cmd, name='echo-server', stdout=PIPE, expect=expect, - universal_newlines=True) - - def spawn_echo_client(self, logger, host, port, size, count, expect=None): - if expect is None: - expect = Process.EXIT_OK - cmd = ["TCP_echo_client.py", - "--host", host, - "--port", str(port), - "--size", str(size), - "--count", str(count), - "--log"] - logger.log("Start client. cmd=%s" % str(cmd)) - return self.popen(cmd, name='echo-clint', stdout=PIPE, expect=expect, - universal_newlines=True) - - def do_test_echo(self, logger, host, port, size, count): - # start echo client - client = self.spawn_echo_client(logger, host, port, size, count) - cl_text, cl_error = client.communicate(timeout=TIMEOUT) - if client.returncode: - raise Exception("Echo client failed size:%d, count:%d : %s %s" % - (size, count, cl_text, cl_error)) + cls.logger = Logger(title="TcpAdaptorOneRouterEcho-testClass", print_to_console=True) + + def do_test_echo(self, test_name, logger, host, port, size, count): + # Run echo client. Return true if it works. + name = "%s_%s_%s_%s" % (test_name, port, size, count) + client_prefix = "ECHO_CLIENT %s" % name + client_logger = Logger(title=client_prefix, print_to_console=False, save_for_dump=True) + result = True # assume it works + try: + # start client + client = TcpEchoClient(prefix=client_prefix, + host=host, + port=port, + size=size, + count=count, + timeout=TIMEOUT, + logger=client_logger) + #assert client.is_running + + # wait for client to finish + keep_waiting = True + while keep_waiting: + sleep(0.1) + if client.error is not None: + logger.log("%s Client stopped with error: %s" % (name, client.error)) + keep_waiting = False + result = False + if client.exit_status is not None: + logger.log("%s Client stopped with status: %s" % (name, client.exit_status)) + keep_waiting = False + result = False + if keep_waiting and not client.is_running: + logger.log("%s Client stopped with no error or status" % (name)) + keep_waiting = False + + except Exception as exc: + logger.log("EchoClient %s failed. Exception: %s" % + (name, traceback.format_exc())) + result = False + + if not result: + # On failure, dump the client log through the test log. Compound logs here we go + for line in client_logger.logs: + logger.log("Failed client log: %s" % line) + return result def test_01_tcp_echo_one_router(self): + """ + Run one echo server. + Run many echo clients. + :return: + """ # start echo server - #server = self.spawn_echo_server(self.tcp_server_listener_port) - - #for size in [1, 5, 10, 50, 100]: - # for count in [1, 5, 10, 50, 100]: - # self.logger.log("Starting echo client host:localhost, port:%d, size:%d, count:%d" % - # (self.tcp_client_listener_port, size, count)) - # self.do_test_echo(self.logger, "localhost", self.tcp_client_listener_port, size, count) - #server.join() - pass + test_name = "test_01_tcp_echo_one_router" + server_prefix = "ECHO_SERVER %s" % test_name + server_logger = Logger(title=test_name, print_to_console=False, save_for_dump=True) + server = TcpEchoServer(prefix=server_prefix, + port=self.tcp_server_listener_port, + timeout=TIMEOUT, + logger=server_logger) + assert server.is_running + + # run series of clients to test + result = True + for size in [1]: + for count in [1]: + test_info = "Starting echo client %s host:localhost, port:%d, size:%d, count:%d" % \ + (test_name, self.tcp_client_listener_port, size, count) + self.logger.log(test_info) + result = self.do_test_echo(test_name, self.logger, "localhost", + self.tcp_client_listener_port, size, count) + if not result: + break + if not result: + break + # stop echo server + server.wait() + assert result, "Test case failed %s" % test_info if __name__== '__main__': unittest.main(main_module()) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org