This is an automated email from the ASF dual-hosted git repository.

chug pushed a commit to branch dev-protocol-adaptors-2
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/dev-protocol-adaptors-2 by 
this push:
     new 6c88bd7  DISPATCH-1848: TCP echo client/server handle socket errors 
better
6c88bd7 is described below

commit 6c88bd7cfa40d00e3e8e007ea2368226bfe1cb6f
Author: Chuck Rolke <c...@apache.org>
AuthorDate: Wed Nov 18 15:13:56 2020 -0500

    DISPATCH-1848: TCP echo client/server handle socket errors better
    
    And various PEP-8 compliance changes.
---
 tests/TCP_echo_client.py | 85 ++++++++++++++++++++++---------------------
 tests/TCP_echo_server.py | 94 ++++++++++++++++++++++++++++--------------------
 2 files changed, 101 insertions(+), 78 deletions(-)

diff --git a/tests/TCP_echo_client.py b/tests/TCP_echo_client.py
index 8465ad5..56280aa 100755
--- a/tests/TCP_echo_client.py
+++ b/tests/TCP_echo_client.py
@@ -38,14 +38,16 @@ import types
 from system_test import Logger
 from system_test import TIMEOUT
 
-class GracefulKiller:
-  kill_now = False
-  def __init__(self):
-    signal.signal(signal.SIGINT, self.exit_gracefully)
-    signal.signal(signal.SIGTERM, self.exit_gracefully)
 
-  def exit_gracefully(self,signum, frame):
-    self.kill_now = True
+class GracefulExitSignaler:
+    kill_now = False
+
+    def __init__(self):
+        signal.signal(signal.SIGINT, self.exit_gracefully)
+        signal.signal(signal.SIGTERM, self.exit_gracefully)
+
+    def exit_gracefully(self, signum, frame):
+        self.kill_now = True
 
 
 def split_chunk_for_display(raw_bytes):
@@ -63,24 +65,24 @@ def split_chunk_for_display(raw_bytes):
     return result
 
 
-class TcpEchoClient():
+class TcpEchoClient:
 
     def __init__(self, prefix, host, port, size, count, timeout, logger):
-        '''
+        """
         :param host: connect to this host
         :param port: connect to this port
         :param size: size of individual payload chunks in bytes
         :param count: number of payload chunks
-        :param strategy: "1" Send one payload;  # TODO
+        :param strategy: "1" Send one payload;  # TODO more strategies
                              Recv one payload
         :param logger: Logger() object
         :return:
-        '''
+        """
         # Start up
         self.sock = None
         self.prefix = prefix
         self.host = host
-        self.port = port
+        self.port = int(port)
         self.size = size
         self.count = count
         self.timeout = timeout
@@ -94,12 +96,12 @@ class TcpEchoClient():
         self._thread.start()
 
     def run(self):
-        self.logger.log("%s Client is starting up" % (self.prefix))
+        self.logger.log("%s Client is starting up" % self.prefix)
         try:
             start_time = time.time()
             self.is_running = True
             self.logger.log('%s Connecting to host:%s, port:%d, size:%d, 
count:%d' %
-                       (self.prefix, self.host, self.port, self.size, 
self.count))
+                            (self.prefix, self.host, self.port, self.size, 
self.count))
             total_sent = 0
             total_rcvd = 0
 
@@ -115,7 +117,7 @@ class TcpEchoClient():
             #    port: 33333
             #    index: 6
             #    offset into message: 0
-            CONTENT_CHUNK_SIZE = 50 # Content repeats after chunks this big - 
used by echo server, too
+            CONTENT_CHUNK_SIZE = 50  # Content repeats after chunks this big - 
used by echo server, too
             for idx in range(self.count):
                 body_msg = ""
                 padchar = "abcdefghijklmnopqrstuvwxyz@#$%"[idx % 30]
@@ -128,8 +130,8 @@ class TcpEchoClient():
                     body_msg = body_msg[:self.size]
                 payload_out.append(bytearray(body_msg.encode()))
             # incoming payloads
-            payload_in  = []
-            in_list_idx = 0 # current _in array being received
+            payload_in = []
+            in_list_idx = 0  # current _in array being received
             for i in range(self.count):
                 payload_in.append(bytearray())
 
@@ -145,13 +147,13 @@ class TcpEchoClient():
                          selectors.EVENT_READ | selectors.EVENT_WRITE)
 
             # event loop
-            time.sleep(0.1) # DISPATCH-1820 investigation
+            time.sleep(0.1)  # DISPATCH-1820 investigation
             while self.keep_running:
                 if self.timeout > 0.0:
                     elapsed = time.time() - start_time
                     if elapsed > self.timeout:
                         self.exit_status = "%s Exiting due to timeout. Total 
sent= %d, total rcvd= %d" % \
-                                            (self.prefix, total_sent, 
total_rcvd)
+                                           (self.prefix, total_sent, 
total_rcvd)
                         break
                 for key, mask in sel.select(timeout=0.1):
                     sock = key.fileobj
@@ -170,48 +172,49 @@ class TcpEchoClient():
                                     for idxc in range(self.count):
                                         for idxs in range(self.size):
                                             ob = payload_out[idxc][idxs]
-                                            ib = payload_in [idxc][idxs]
+                                            ib = payload_in[idxc][idxs]
                                             if ob != ib:
-                                                self.error = "%s ERROR Rcvd 
message verify fail. row:%d, col:%d, expected:%s, actual:%s" \
-                                                        % (self.prefix, idxc, 
idxs, repr(ob), repr(ib))
+                                                self.error = "%s ERROR Rcvd 
message verify fail. row:%d, col:%d, " \
+                                                             "expected:%s, 
actual:%s" \
+                                                             % (self.prefix, 
idxc, idxs, repr(ob), repr(ib))
                                                 break
                                 else:
                                     out_ready_to_send = True
                                     sel.modify(sock, selectors.EVENT_READ | 
selectors.EVENT_WRITE)
                             elif len(payload_in[in_list_idx]) > self.size:
                                 self.error = "ERROR Received message too big. 
Expected:%d, actual:%d" % \
-                                              (self.size, 
len(payload_in[in_list_idx]))
+                                             (self.size, 
len(payload_in[in_list_idx]))
                                 break
                             else:
-                                pass # still accumulating a message
+                                pass  # still accumulating a message
                         else:
                             # socket closed
                             self.keep_running = False
                     if self.keep_running and mask & selectors.EVENT_WRITE:
                         if out_ready_to_send:
-                            n_sent = self.sock.send( 
payload_out[out_list_idx][out_byte_idx:] )
+                            n_sent = 
self.sock.send(payload_out[out_list_idx][out_byte_idx:])
                             total_sent += n_sent
                             out_byte_idx += n_sent
                             if out_byte_idx == self.size:
                                 self.logger.log("%s Sent message %d" % 
(self.prefix, out_list_idx))
                                 out_byte_idx = 0
                                 out_list_idx += 1
-                                sel.modify(self.sock, selectors.EVENT_READ) # 
turn off write events
-                                out_ready_to_send = False # turn on when rcvr 
receives
+                                sel.modify(self.sock, selectors.EVENT_READ)  # 
turn off write events
+                                out_ready_to_send = False  # turn on when rcvr 
receives
                         else:
-                            pass # logger.log("DEBUG: ignoring EVENT_WRITE")
+                            pass  # logger.log("DEBUG: ignoring EVENT_WRITE")
 
             # shut down
             sel.unregister(self.sock)
             self.sock.close()
 
-        except Exception as exc:
+        except Exception:
             self.error = "ERROR: exception : '%s'" % traceback.format_exc()
 
         self.is_running = False
 
     def wait(self, timeout=TIMEOUT):
-        self.logger.log("%s Client is shutting down" % (self.prefix))
+        self.logger.log("%s Client is shutting down" % self.prefix)
         self.keep_running = False
         self._thread.join(timeout)
 
@@ -266,14 +269,14 @@ def main(argv):
     if args.timeout < 0.0:
         raise Exception("Timeout must be greater than or equal to zero")
 
-    killer = GracefulKiller()
-    client = None
+    signaller = GracefulExitSignaler()
+    logger = None
 
     try:
         # logging
-        logger = Logger(title = "%s host:%s port %d size:%d count:%d" % 
(prefix, host, port, size, count),
-                        print_to_console = args.log,
-                        save_for_dump = False)
+        logger = Logger(title="%s host:%s port %d size:%d count:%d" % (prefix, 
host, port, size, count),
+                        print_to_console=args.log,
+                        save_for_dump=False)
 
         client = TcpEchoClient(prefix, host, port, size, count, args.timeout, 
logger)
 
@@ -287,18 +290,20 @@ def main(argv):
             if client.exit_status is not None:
                 logger.log("%s Client stopped with status: %s" % (prefix, 
client.exit_status))
                 keep_running = False
-            if killer.kill_now:
-                logger.log("%s Process killed with signal" % (prefix))
+            if signaller.kill_now:
+                logger.log("%s Process killed with signal" % prefix)
                 keep_running = False
             if keep_running and not client.is_running:
-                logger.log("%s Client stopped with no error or status" % 
(prefix))
+                logger.log("%s Client stopped with no error or status" % 
prefix)
                 keep_running = False
 
-    except Exception as e:
-        logger.log("%s Exception: %s" % (prefix, traceback.format_exc()))
+    except Exception:
+        if logger is not None:
+            logger.log("%s Exception: %s" % (prefix, traceback.format_exc()))
         retval = 1
 
     return retval
 
+
 if __name__ == "__main__":
     sys.exit(main(sys.argv))
diff --git a/tests/TCP_echo_server.py b/tests/TCP_echo_server.py
index e74c780..b3c3ba0 100755
--- a/tests/TCP_echo_server.py
+++ b/tests/TCP_echo_server.py
@@ -38,6 +38,7 @@ import types
 from system_test import Logger
 from system_test import TIMEOUT
 
+
 class ClientRecord(object):
     """
     Object to register with the selector 'data' field
@@ -59,14 +60,15 @@ class ClientRecord(object):
         return self.__repr__()
 
 
-class GracefulKiller:
-  kill_now = False
-  def __init__(self):
-    signal.signal(signal.SIGINT, self.exit_gracefully)
-    signal.signal(signal.SIGTERM, self.exit_gracefully)
+class GracefulExitSignaler:
+    kill_now = False
+
+    def __init__(self):
+        signal.signal(signal.SIGINT, self.exit_gracefully)
+        signal.signal(signal.SIGTERM, self.exit_gracefully)
 
-  def exit_gracefully(self,signum, frame):
-    self.kill_now = True
+    def exit_gracefully(self, signum, frame):
+        self.kill_now = True
 
 
 def split_chunk_for_display(raw_bytes):
@@ -84,9 +86,10 @@ def split_chunk_for_display(raw_bytes):
     return result
 
 
-class TcpEchoServer():
+class TcpEchoServer:
+
     def __init__(self, prefix="ECHO_SERVER", port="0", echo_count=0, 
timeout=0.0, logger=None):
-        '''
+        """
         Start echo server in separate thread
 
         :param prefix: log prefix
@@ -95,10 +98,10 @@ class TcpEchoServer():
         :param timeout: exit after this many seconds
         :param logger: Logger() object
         :return:
-        '''
+        """
         self.sock = None
         self.prefix = prefix
-        self.port = port
+        self.port = int(port)
         self.echo_count = echo_count
         self.timeout = timeout
         self.logger = logger
@@ -128,10 +131,10 @@ class TcpEchoServer():
                 self.sock.bind((self.HOST, self.port))
                 self.sock.listen()
                 self.sock.setblocking(False)
-                self.logger.log('%s Listening on host:%s, port:%d' % 
(self.prefix, self.HOST, self.port))
-            except Exception as exc:
-                self.error = ('%s Opening listen socket %s:%d exception: %s' %
-                           (self.prefix, self.HOST, self.port, 
traceback.format_exc()))
+                self.logger.log('%s Listening on host:%s, port:%s' % 
(self.prefix, self.HOST, self.port))
+            except Exception:
+                self.error = ('%s Opening listen socket %s:%s exception: %s' %
+                              (self.prefix, self.HOST, self.port, 
traceback.format_exc()))
                 self.logger.log(self.error)
                 return 1
 
@@ -160,16 +163,16 @@ class TcpEchoServer():
                             if key.fileobj is self.sock:
                                 self.do_accept(key.fileobj, sel, self.logger)
                             else:
-                                pass # Only listener 'sock' has None in opaque 
data field
+                                pass  # Only listener 'sock' has None in 
opaque data field
                         else:
                             total_echoed += self.do_service(key, mask, sel, 
self.logger)
                 else:
-                    pass # select timeout. probably.
+                    pass   # select timeout. probably.
 
             sel.unregister(self.sock)
             self.sock.close()
 
-        except Exception as exc:
+        except Exception:
             self.error = "ERROR: exception : '%s'" % traceback.format_exc()
 
         self.is_running = False
@@ -188,15 +191,15 @@ class TcpEchoServer():
         if mask & selectors.EVENT_READ:
             try:
                 recv_data = sock.recv(1024)
-            except Exception as exc:
-                logger.log('%s Connection to %s:%d closed by peer' %
-                           (self.prefix, data.addr[0], data.addr[1]))
+            except IOError:
+                logger.log('%s Connection to %s:%d IOError: %s' %
+                           (self.prefix, data.addr[0], data.addr[1], 
traceback.format_exc()))
                 sel.unregister(sock)
                 sock.close()
                 return 0
-            except Exception as exc:
+            except Exception:
                 self.error = ('%s Connection to %s:%d exception: %s' %
-                           (self.prefix, data.addr[0], data.addr[1], 
traceback.format_exc()))
+                              (self.prefix, data.addr[0], data.addr[1], 
traceback.format_exc()))
                 logger.log(self.error)
                 sel.unregister(sock)
                 sock.close()
@@ -204,7 +207,7 @@ class TcpEchoServer():
             if recv_data:
                 data.outb += recv_data
                 logger.log('%s read from: %s:%d len:%d: %s' % (self.prefix, 
data.addr[0], data.addr[1], len(recv_data),
-                                                            
split_chunk_for_display(recv_data)))
+                                                               
split_chunk_for_display(recv_data)))
                 sel.modify(sock, selectors.EVENT_READ | selectors.EVENT_WRITE, 
data=data)
             else:
                 logger.log('%s Closing connection to %s:%d' % (self.prefix, 
data.addr[0], data.addr[1]))
@@ -213,11 +216,25 @@ class TcpEchoServer():
                 return 0
         if mask & selectors.EVENT_WRITE:
             if data.outb:
-                sent = sock.send(data.outb)
+                try:
+                    sent = sock.send(data.outb)
+                except IOError:
+                    logger.log('%s Connection to %s:%d IOError: %s' %
+                               (self.prefix, data.addr[0], data.addr[1], 
traceback.format_exc()))
+                    sel.unregister(sock)
+                    sock.close()
+                    return 0
+                except Exception:
+                    self.error = ('%s Connection to %s:%d exception: %s' %
+                                  (self.prefix, data.addr[0], data.addr[1], 
traceback.format_exc()))
+                    logger.log(self.error)
+                    sel.unregister(sock)
+                    sock.close()
+                    return 1
                 retval += sent
                 if sent > 0:
                     logger.log('%s write to : %s:%d len:%d: %s' % 
(self.prefix, data.addr[0], data.addr[1], sent,
-                                                                
split_chunk_for_display(data.outb[:sent])))
+                                                                   
split_chunk_for_display(data.outb[:sent])))
                 else:
                     logger.log('%s write to : %s:%d len:0' % (self.prefix, 
data.addr[0], data.addr[1]))
                 data.outb = data.outb[sent:]
@@ -226,13 +243,14 @@ class TcpEchoServer():
         return retval
 
     def wait(self, timeout=TIMEOUT):
-        self.logger.log("%s Server is shutting down" % (self.prefix))
+        self.logger.log("%s Server is shutting down" % self.prefix)
         self.keep_running = False
         self._thread.join(timeout)
 
 
 def main(argv):
     retval = 0
+    logger = None
     # parse args
     p = argparse.ArgumentParser()
     p.add_argument('--port', '-p',
@@ -252,7 +270,7 @@ def main(argv):
     # port
     if args.port is None:
         raise Exception("User must specify a port number")
-    port = int(args.port)
+    port = args.port
 
     # name / prefix
     prefix = args.name if args.name is not None else "ECHO_SERVER (%s)" % 
(str(port))
@@ -265,14 +283,14 @@ def main(argv):
     if args.timeout < 0.0:
         raise Exception("Timeout must be greater than or equal to zero")
 
-    killer = GracefulKiller()
+    signaller = GracefulExitSignaler()
     server = None
 
     try:
         # logging
-        logger = Logger(title = "%s port %d" % (prefix, port),
-                        print_to_console = args.log,
-                        save_for_dump = False)
+        logger = Logger(title="%s port %s" % (prefix, port),
+                        print_to_console=args.log,
+                        save_for_dump=False)
 
         server = TcpEchoServer(prefix, port, args.echo, args.timeout, logger)
 
@@ -286,16 +304,16 @@ def main(argv):
             if server.exit_status is not None:
                 logger.log("%s Server stopped with status: %s" % (prefix, 
server.exit_status))
                 keep_running = False
-            if killer.kill_now:
-                logger.log("%s Process killed with signal" % (prefix))
+            if signaller.kill_now:
+                logger.log("%s Process killed with signal" % prefix)
                 keep_running = False
             if keep_running and not server.is_running:
-                logger.log("%s Server stopped with no error or status" % 
(prefix))
+                logger.log("%s Server stopped with no error or status" % 
prefix)
                 keep_running = False
 
-
-    except Exception as e:
-        logger.log("%s Exception: %s" % (prefix, traceback.format_exc()))
+    except Exception:
+        if logger is not None:
+            logger.log("%s Exception: %s" % (prefix, traceback.format_exc()))
         retval = 1
 
     if server is not None and server.sock is not None:


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to