Moved binding/listening on the server socket from barrier.barrier.run_server() to a separate class (barrier.listen_server). Added a keyword argument to barrier to pass a listen_server instance to use it when it needs access to the server socket. When a listen_server instance is not given to barrier it will create one in run_server() (preserving the old semantics). Updated barriertest (used by server side profiler support code) to reuse a listen_server instance in almost all its barrier instances (thus hopefully fixing an issue where we rebind too fast on the same port when packets are in transit from an old connection and get a ECONNRESET on a new connection because of this). Updated unittests.
Signed-off-by: Mihai Rusu <[email protected]> --- autotest/client/common_lib/barrier.py 2010-03-15 16:03:28.000000000 -0700 +++ autotest/client/common_lib/barrier.py 2010-03-15 16:03:28.000000000 -0700 @@ -2,6 +2,50 @@ from time import time, sleep from autotest_lib.client.common_lib import error +# default barrier port +_DEFAULT_PORT = 11922 + +class listen_server(object): + """ + Manages a listening socket for barrier. + + Can be used to run multiple barrier instances with the same listening + socket (if they were going to listen on the same port). + + Attributes: + @attribute address: address where to bind (string) + @attribute port: port where to bind + @attribute socket: listening socket object + """ + def __init__(self, address='', port=_DEFAULT_PORT): + """Create a listen_server instance for the given address/port. + + @param address: on what address to listen to + @param port: on what port to listen to + """ + self.address = address + self.port = port + self.socket = self._setup() + + + def _setup(self): + """ + Create, bind and listen on the listening socket. + """ + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + sock.bind((self.address, self.port)) + sock.listen(10) + + return sock + + + def close(self): + """ + Close the listening socket. + """ + self.socket.close() + class barrier(object): """ Multi-machine barrier support @@ -95,12 +139,28 @@ Clients who have checked in and are waiting (master) masterid Hostname/IP address + optional tag of selected master + server + External listen server instance to use instead of creating + our own. Prefer to create a listen_server instance and + reuse it in multiple barrier instances so that the barrier + code doesn't have to re-bind on the same port very fast + (and if packets are in transit they may reset new + connections). """ - def __init__(self, hostid, tag, timeout=None, port=11922): + def __init__(self, hostid, tag, timeout=None, port=None, + listen_server=None): self.hostid = hostid self.tag = tag - self.port = port + if listen_server: + if port: + raise error.BarrierError( + 'Received both "port" and "listen_server" arguments ' + '(provide only one of them)') + self.port = listen_server.port + else: + self.port = port or _DEFAULT_PORT + self.server = listen_server self.timeout = timeout self.start = None logging.info("tag=%s port=%d timeout=%r", @@ -303,18 +363,14 @@ def run_server(self, is_master): - self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - self.server.bind(('', self.port)) - self.server.listen(10) - + server = self.server or listen_server(port=self.port) failed = 0 try: while 1: try: # Wait for callers welcoming each. - self.server.settimeout(self.remaining()) - connection = self.server.accept() + server.socket.settimeout(self.remaining()) + connection = server.socket.accept() if is_master: self.master_welcome(connection) else: @@ -336,13 +392,12 @@ logging.info("slave connected to master") self.slave_wait() break - - self.waiting_close() - self.server.close() - except: + finally: self.waiting_close() - self.server.close() - raise + # if we created the listening_server in the beginning of this + # function then close the listening socket here + if not self.server: + server.close() def run_client(self, is_master): --- autotest/client/common_lib/barrier_unittest.py 2010-03-15 16:03:28.000000000 -0700 +++ autotest/client/common_lib/barrier_unittest.py 2010-03-15 16:03:28.000000000 -0700 @@ -9,6 +9,23 @@ from autotest_lib.client.common_lib.test_utils import mock +class listen_server_test(unittest.TestCase): + + def test_init(self): + server = barrier.listen_server() + server.close() + + + def test_close(self): + server = barrier.listen_server() + # cannot bind on the same port again + self.assertRaises(socket.error, barrier.listen_server) + server.close() + # now we can + server = barrier.listen_server() + server.close() + + class barrier_test(unittest.TestCase): def setUp(self): @@ -137,9 +154,13 @@ # Internal utility function (not a unit test) def rendezvous_test(self, timeout, port=11922, rendezvous_servers=False, test_abort=False, - abort=False): + abort=False, listen_server=None): + if listen_server: + port = None + def _rdv(addr): - b1 = barrier.barrier(addr, "test_meeting", timeout, port) + b1 = barrier.barrier(addr, "test_meeting", timeout, port, + listen_server=listen_server) if not rendezvous_servers: if test_abort: b1.rendezvous('127.0.0.1#0', '127.0.0.1#1', abort=abort) @@ -158,8 +179,7 @@ try: _rdv(addr) except error.BarrierError: - if timeout == 0: - pass + pass client = threading.Thread(target=_thread_rdv, args=('127.0.0.1#0',)) @@ -168,5 +188,15 @@ client.join() + def test_reusing_listen_server(self): + """ + Test that reusing the same listen server object works. + """ + server = barrier.listen_server() + self.rendezvous_test(10, listen_server=server) + self.rendezvous_test(10, listen_server=server) + self.rendezvous_test(10, listen_server=server) + + if __name__ == "__main__": unittest.main() --- autotest/client/tests/barriertest/barriertest.py 2010-03-15 16:03:28.000000000 -0700 +++ autotest/client/tests/barriertest/barriertest.py 2010-03-15 16:03:28.000000000 -0700 @@ -1,5 +1,6 @@ import time from autotest_lib.client.bin import test +from autotest_lib.client.common_lib import barrier class barriertest(test.test): version = 1 @@ -9,12 +10,13 @@ hostid, masterid, all_ids): profilers = self.job.profilers + barrier_server = barrier.listen_server(port=11920) b0 = self.job.barrier(hostid, "sync_profilers", timeout_start, - port=11920) + listen_server=barrier_server) b0.rendezvous_servers(masterid, hostid) b1 = self.job.barrier(hostid, "start_profilers", timeout_start, - port=11920) + listen_server=barrier_server) b1.rendezvous_servers(masterid, hostid) b2 = self.job.barrier(hostid, "local_sync_profilers", timeout_sync) @@ -23,12 +25,14 @@ profilers.start(self) b3 = self.job.barrier(hostid, "stop_profilers", timeout_stop, - port=11920) + listen_server=barrier_server) b3.rendezvous_servers(masterid, hostid) profilers.stop(self) profilers.report(self) b4 = self.job.barrier(hostid, "finish_profilers", timeout_stop, - port=11920) + listen_server=barrier_server) b4.rendezvous_servers(masterid, hostid) + + barrier_server.close() _______________________________________________ Autotest mailing list [email protected] http://test.kernel.org/cgi-bin/mailman/listinfo/autotest
