This is an automated email from the ASF dual-hosted git repository. chug pushed a commit to branch dev-protocol-adaptors-2 in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/dev-protocol-adaptors-2 by this push: new 6c88bd7 DISPATCH-1848: TCP echo client/server handle socket errors better 6c88bd7 is described below commit 6c88bd7cfa40d00e3e8e007ea2368226bfe1cb6f Author: Chuck Rolke <c...@apache.org> AuthorDate: Wed Nov 18 15:13:56 2020 -0500 DISPATCH-1848: TCP echo client/server handle socket errors better And various PEP-8 compliance changes. --- tests/TCP_echo_client.py | 85 ++++++++++++++++++++++--------------------- tests/TCP_echo_server.py | 94 ++++++++++++++++++++++++++++-------------------- 2 files changed, 101 insertions(+), 78 deletions(-) diff --git a/tests/TCP_echo_client.py b/tests/TCP_echo_client.py index 8465ad5..56280aa 100755 --- a/tests/TCP_echo_client.py +++ b/tests/TCP_echo_client.py @@ -38,14 +38,16 @@ import types from system_test import Logger from system_test import TIMEOUT -class GracefulKiller: - kill_now = False - def __init__(self): - signal.signal(signal.SIGINT, self.exit_gracefully) - signal.signal(signal.SIGTERM, self.exit_gracefully) - def exit_gracefully(self,signum, frame): - self.kill_now = True +class GracefulExitSignaler: + kill_now = False + + def __init__(self): + signal.signal(signal.SIGINT, self.exit_gracefully) + signal.signal(signal.SIGTERM, self.exit_gracefully) + + def exit_gracefully(self, signum, frame): + self.kill_now = True def split_chunk_for_display(raw_bytes): @@ -63,24 +65,24 @@ def split_chunk_for_display(raw_bytes): return result -class TcpEchoClient(): +class TcpEchoClient: def __init__(self, prefix, host, port, size, count, timeout, logger): - ''' + """ :param host: connect to this host :param port: connect to this port :param size: size of individual payload chunks in bytes :param count: number of payload chunks - :param strategy: "1" Send one payload; # TODO + :param strategy: "1" Send one payload; # TODO more strategies Recv one payload :param logger: Logger() object :return: - ''' + """ # Start up self.sock = None self.prefix = prefix self.host = host - self.port = port + self.port = int(port) self.size = size self.count = count self.timeout = timeout @@ -94,12 +96,12 @@ class TcpEchoClient(): self._thread.start() def run(self): - self.logger.log("%s Client is starting up" % (self.prefix)) + self.logger.log("%s Client is starting up" % self.prefix) try: start_time = time.time() self.is_running = True self.logger.log('%s Connecting to host:%s, port:%d, size:%d, count:%d' % - (self.prefix, self.host, self.port, self.size, self.count)) + (self.prefix, self.host, self.port, self.size, self.count)) total_sent = 0 total_rcvd = 0 @@ -115,7 +117,7 @@ class TcpEchoClient(): # port: 33333 # index: 6 # offset into message: 0 - CONTENT_CHUNK_SIZE = 50 # Content repeats after chunks this big - used by echo server, too + CONTENT_CHUNK_SIZE = 50 # Content repeats after chunks this big - used by echo server, too for idx in range(self.count): body_msg = "" padchar = "abcdefghijklmnopqrstuvwxyz@#$%"[idx % 30] @@ -128,8 +130,8 @@ class TcpEchoClient(): body_msg = body_msg[:self.size] payload_out.append(bytearray(body_msg.encode())) # incoming payloads - payload_in = [] - in_list_idx = 0 # current _in array being received + payload_in = [] + in_list_idx = 0 # current _in array being received for i in range(self.count): payload_in.append(bytearray()) @@ -145,13 +147,13 @@ class TcpEchoClient(): selectors.EVENT_READ | selectors.EVENT_WRITE) # event loop - time.sleep(0.1) # DISPATCH-1820 investigation + time.sleep(0.1) # DISPATCH-1820 investigation while self.keep_running: if self.timeout > 0.0: elapsed = time.time() - start_time if elapsed > self.timeout: self.exit_status = "%s Exiting due to timeout. Total sent= %d, total rcvd= %d" % \ - (self.prefix, total_sent, total_rcvd) + (self.prefix, total_sent, total_rcvd) break for key, mask in sel.select(timeout=0.1): sock = key.fileobj @@ -170,48 +172,49 @@ class TcpEchoClient(): for idxc in range(self.count): for idxs in range(self.size): ob = payload_out[idxc][idxs] - ib = payload_in [idxc][idxs] + ib = payload_in[idxc][idxs] if ob != ib: - self.error = "%s ERROR Rcvd message verify fail. row:%d, col:%d, expected:%s, actual:%s" \ - % (self.prefix, idxc, idxs, repr(ob), repr(ib)) + self.error = "%s ERROR Rcvd message verify fail. row:%d, col:%d, " \ + "expected:%s, actual:%s" \ + % (self.prefix, idxc, idxs, repr(ob), repr(ib)) break else: out_ready_to_send = True sel.modify(sock, selectors.EVENT_READ | selectors.EVENT_WRITE) elif len(payload_in[in_list_idx]) > self.size: self.error = "ERROR Received message too big. Expected:%d, actual:%d" % \ - (self.size, len(payload_in[in_list_idx])) + (self.size, len(payload_in[in_list_idx])) break else: - pass # still accumulating a message + pass # still accumulating a message else: # socket closed self.keep_running = False if self.keep_running and mask & selectors.EVENT_WRITE: if out_ready_to_send: - n_sent = self.sock.send( payload_out[out_list_idx][out_byte_idx:] ) + n_sent = self.sock.send(payload_out[out_list_idx][out_byte_idx:]) total_sent += n_sent out_byte_idx += n_sent if out_byte_idx == self.size: self.logger.log("%s Sent message %d" % (self.prefix, out_list_idx)) out_byte_idx = 0 out_list_idx += 1 - sel.modify(self.sock, selectors.EVENT_READ) # turn off write events - out_ready_to_send = False # turn on when rcvr receives + sel.modify(self.sock, selectors.EVENT_READ) # turn off write events + out_ready_to_send = False # turn on when rcvr receives else: - pass # logger.log("DEBUG: ignoring EVENT_WRITE") + pass # logger.log("DEBUG: ignoring EVENT_WRITE") # shut down sel.unregister(self.sock) self.sock.close() - except Exception as exc: + except Exception: self.error = "ERROR: exception : '%s'" % traceback.format_exc() self.is_running = False def wait(self, timeout=TIMEOUT): - self.logger.log("%s Client is shutting down" % (self.prefix)) + self.logger.log("%s Client is shutting down" % self.prefix) self.keep_running = False self._thread.join(timeout) @@ -266,14 +269,14 @@ def main(argv): if args.timeout < 0.0: raise Exception("Timeout must be greater than or equal to zero") - killer = GracefulKiller() - client = None + signaller = GracefulExitSignaler() + logger = None try: # logging - logger = Logger(title = "%s host:%s port %d size:%d count:%d" % (prefix, host, port, size, count), - print_to_console = args.log, - save_for_dump = False) + logger = Logger(title="%s host:%s port %d size:%d count:%d" % (prefix, host, port, size, count), + print_to_console=args.log, + save_for_dump=False) client = TcpEchoClient(prefix, host, port, size, count, args.timeout, logger) @@ -287,18 +290,20 @@ def main(argv): if client.exit_status is not None: logger.log("%s Client stopped with status: %s" % (prefix, client.exit_status)) keep_running = False - if killer.kill_now: - logger.log("%s Process killed with signal" % (prefix)) + if signaller.kill_now: + logger.log("%s Process killed with signal" % prefix) keep_running = False if keep_running and not client.is_running: - logger.log("%s Client stopped with no error or status" % (prefix)) + logger.log("%s Client stopped with no error or status" % prefix) keep_running = False - except Exception as e: - logger.log("%s Exception: %s" % (prefix, traceback.format_exc())) + except Exception: + if logger is not None: + logger.log("%s Exception: %s" % (prefix, traceback.format_exc())) retval = 1 return retval + if __name__ == "__main__": sys.exit(main(sys.argv)) diff --git a/tests/TCP_echo_server.py b/tests/TCP_echo_server.py index e74c780..b3c3ba0 100755 --- a/tests/TCP_echo_server.py +++ b/tests/TCP_echo_server.py @@ -38,6 +38,7 @@ import types from system_test import Logger from system_test import TIMEOUT + class ClientRecord(object): """ Object to register with the selector 'data' field @@ -59,14 +60,15 @@ class ClientRecord(object): return self.__repr__() -class GracefulKiller: - kill_now = False - def __init__(self): - signal.signal(signal.SIGINT, self.exit_gracefully) - signal.signal(signal.SIGTERM, self.exit_gracefully) +class GracefulExitSignaler: + kill_now = False + + def __init__(self): + signal.signal(signal.SIGINT, self.exit_gracefully) + signal.signal(signal.SIGTERM, self.exit_gracefully) - def exit_gracefully(self,signum, frame): - self.kill_now = True + def exit_gracefully(self, signum, frame): + self.kill_now = True def split_chunk_for_display(raw_bytes): @@ -84,9 +86,10 @@ def split_chunk_for_display(raw_bytes): return result -class TcpEchoServer(): +class TcpEchoServer: + def __init__(self, prefix="ECHO_SERVER", port="0", echo_count=0, timeout=0.0, logger=None): - ''' + """ Start echo server in separate thread :param prefix: log prefix @@ -95,10 +98,10 @@ class TcpEchoServer(): :param timeout: exit after this many seconds :param logger: Logger() object :return: - ''' + """ self.sock = None self.prefix = prefix - self.port = port + self.port = int(port) self.echo_count = echo_count self.timeout = timeout self.logger = logger @@ -128,10 +131,10 @@ class TcpEchoServer(): self.sock.bind((self.HOST, self.port)) self.sock.listen() self.sock.setblocking(False) - self.logger.log('%s Listening on host:%s, port:%d' % (self.prefix, self.HOST, self.port)) - except Exception as exc: - self.error = ('%s Opening listen socket %s:%d exception: %s' % - (self.prefix, self.HOST, self.port, traceback.format_exc())) + self.logger.log('%s Listening on host:%s, port:%s' % (self.prefix, self.HOST, self.port)) + except Exception: + self.error = ('%s Opening listen socket %s:%s exception: %s' % + (self.prefix, self.HOST, self.port, traceback.format_exc())) self.logger.log(self.error) return 1 @@ -160,16 +163,16 @@ class TcpEchoServer(): if key.fileobj is self.sock: self.do_accept(key.fileobj, sel, self.logger) else: - pass # Only listener 'sock' has None in opaque data field + pass # Only listener 'sock' has None in opaque data field else: total_echoed += self.do_service(key, mask, sel, self.logger) else: - pass # select timeout. probably. + pass # select timeout. probably. sel.unregister(self.sock) self.sock.close() - except Exception as exc: + except Exception: self.error = "ERROR: exception : '%s'" % traceback.format_exc() self.is_running = False @@ -188,15 +191,15 @@ class TcpEchoServer(): if mask & selectors.EVENT_READ: try: recv_data = sock.recv(1024) - except Exception as exc: - logger.log('%s Connection to %s:%d closed by peer' % - (self.prefix, data.addr[0], data.addr[1])) + except IOError: + logger.log('%s Connection to %s:%d IOError: %s' % + (self.prefix, data.addr[0], data.addr[1], traceback.format_exc())) sel.unregister(sock) sock.close() return 0 - except Exception as exc: + except Exception: self.error = ('%s Connection to %s:%d exception: %s' % - (self.prefix, data.addr[0], data.addr[1], traceback.format_exc())) + (self.prefix, data.addr[0], data.addr[1], traceback.format_exc())) logger.log(self.error) sel.unregister(sock) sock.close() @@ -204,7 +207,7 @@ class TcpEchoServer(): if recv_data: data.outb += recv_data logger.log('%s read from: %s:%d len:%d: %s' % (self.prefix, data.addr[0], data.addr[1], len(recv_data), - split_chunk_for_display(recv_data))) + split_chunk_for_display(recv_data))) sel.modify(sock, selectors.EVENT_READ | selectors.EVENT_WRITE, data=data) else: logger.log('%s Closing connection to %s:%d' % (self.prefix, data.addr[0], data.addr[1])) @@ -213,11 +216,25 @@ class TcpEchoServer(): return 0 if mask & selectors.EVENT_WRITE: if data.outb: - sent = sock.send(data.outb) + try: + sent = sock.send(data.outb) + except IOError: + logger.log('%s Connection to %s:%d IOError: %s' % + (self.prefix, data.addr[0], data.addr[1], traceback.format_exc())) + sel.unregister(sock) + sock.close() + return 0 + except Exception: + self.error = ('%s Connection to %s:%d exception: %s' % + (self.prefix, data.addr[0], data.addr[1], traceback.format_exc())) + logger.log(self.error) + sel.unregister(sock) + sock.close() + return 1 retval += sent if sent > 0: logger.log('%s write to : %s:%d len:%d: %s' % (self.prefix, data.addr[0], data.addr[1], sent, - split_chunk_for_display(data.outb[:sent]))) + split_chunk_for_display(data.outb[:sent]))) else: logger.log('%s write to : %s:%d len:0' % (self.prefix, data.addr[0], data.addr[1])) data.outb = data.outb[sent:] @@ -226,13 +243,14 @@ class TcpEchoServer(): return retval def wait(self, timeout=TIMEOUT): - self.logger.log("%s Server is shutting down" % (self.prefix)) + self.logger.log("%s Server is shutting down" % self.prefix) self.keep_running = False self._thread.join(timeout) def main(argv): retval = 0 + logger = None # parse args p = argparse.ArgumentParser() p.add_argument('--port', '-p', @@ -252,7 +270,7 @@ def main(argv): # port if args.port is None: raise Exception("User must specify a port number") - port = int(args.port) + port = args.port # name / prefix prefix = args.name if args.name is not None else "ECHO_SERVER (%s)" % (str(port)) @@ -265,14 +283,14 @@ def main(argv): if args.timeout < 0.0: raise Exception("Timeout must be greater than or equal to zero") - killer = GracefulKiller() + signaller = GracefulExitSignaler() server = None try: # logging - logger = Logger(title = "%s port %d" % (prefix, port), - print_to_console = args.log, - save_for_dump = False) + logger = Logger(title="%s port %s" % (prefix, port), + print_to_console=args.log, + save_for_dump=False) server = TcpEchoServer(prefix, port, args.echo, args.timeout, logger) @@ -286,16 +304,16 @@ def main(argv): if server.exit_status is not None: logger.log("%s Server stopped with status: %s" % (prefix, server.exit_status)) keep_running = False - if killer.kill_now: - logger.log("%s Process killed with signal" % (prefix)) + if signaller.kill_now: + logger.log("%s Process killed with signal" % prefix) keep_running = False if keep_running and not server.is_running: - logger.log("%s Server stopped with no error or status" % (prefix)) + logger.log("%s Server stopped with no error or status" % prefix) keep_running = False - - except Exception as e: - logger.log("%s Exception: %s" % (prefix, traceback.format_exc())) + except Exception: + if logger is not None: + logger.log("%s Exception: %s" % (prefix, traceback.format_exc())) retval = 1 if server is not None and server.sock is not None: --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org