IMPORTANT: You MUST "svn copy barrier.py base_barrier.py" and
                    "svn copy barrier_unittest.py base_barrier_unittest.py"
           in the client/common_lib/ directory before applying this patch.


barrier cleanups:
* renames barrier.py to base_barrier.py and adds a barrier.py
  stub to import from base_barrier and override with site_barrier
  if found.  barrier_unittest.py is renamed to match.
* Move BarrierAbortError to the error module with everything else.
* Add a rendezvous_servers abort=True from the server test case.
* Moved get_sync_control_file() from common_lib.utils to
  server.base_utils where it belongs to avoid a circular
  import of utils importing barrier.

Signed-off-by: Gregory Smith <[email protected]>

--- autotest/client/common_lib/barrier.py       2010-04-13 16:00:38.000000000 
-0700
+++ autotest/client/common_lib/barrier.py       2010-04-13 18:15:53.000000000 
-0700
@@ -1,543 +1,8 @@
-import sys, socket, errno, logging
-from time import time, sleep
-from autotest_lib.client.common_lib import error
+from autotest_lib.client.common_lib.base_barrier import listen_server, barrier
+from autotest_lib.client.common_lib import utils
 
-# default barrier port
-_DEFAULT_PORT = 11922
-
-
-class BarrierAbortError(error.BarrierError):
-    """Special BarrierError raised when an explicit abort is requested."""
-
-
-class listen_server(object):
-    """
-    Manages a listening socket for barrier.
-
-    Can be used to run multiple barrier instances with the same listening
-    socket (if they were going to listen on the same port).
-
-    Attributes:
-
-    @attr address: Address to bind to (string).
-    @attr port: Port to bind to.
-    @attr socket: Listening socket object.
-    """
-    def __init__(self, address='', port=_DEFAULT_PORT):
-        """
-        Create a listen_server instance for the given address/port.
-
-        @param address: The address to listen on.
-        @param port: The port to listen on.
-        """
-        self.address = address
-        self.port = port
-        self.socket = self._setup()
-
-
-    def _setup(self):
-        """Create, bind and listen on the listening socket."""
-        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
-        sock.bind((self.address, self.port))
-        sock.listen(10)
-
-        return sock
-
-
-    def close(self):
-        """Close the listening socket."""
-        self.socket.close()
-
-
-class barrier(object):
-    """Multi-machine barrier support.
-
-    Provides multi-machine barrier mechanism.
-    Execution stops until all members arrive at the barrier.
-
-    Implementation Details:
-    .......................
-
-    When a barrier is forming the master node (first in sort order) in the
-    set accepts connections from each member of the set.  As they arrive
-    they indicate the barrier they are joining and their identifier (their
-    hostname or IP address and optional tag).  They are then asked to wait.
-    When all members are present the master node then checks that each
-    member is still responding via a ping/pong exchange.  If this is
-    successful then everyone has checked in at the barrier.  We then tell
-    everyone they may continue via a rlse message.
-
-    Where the master is not the first to reach the barrier the client
-    connects will fail.  Client will retry until they either succeed in
-    connecting to master or the overall timeout is exceeded.
-
-    As an example here is the exchange for a three node barrier called
-    'TAG'
-
-      MASTER                        CLIENT1         CLIENT2
-        <-------------TAG C1-------------
-        --------------wait-------------->
-                      [...]
-        <-------------TAG C2-----------------------------
-        --------------wait------------------------------>
-                      [...]
-        --------------ping-------------->
-        <-------------pong---------------
-        --------------ping------------------------------>
-        <-------------pong-------------------------------
-                ----- BARRIER conditions MET -----
-        --------------rlse-------------->
-        --------------rlse------------------------------>
-
-    Note that once the last client has responded to pong the barrier is
-    implicitly deemed satisifed, they have all acknowledged their presence.
-    If we fail to send any of the rlse messages the barrier is still a
-    success, the failed host has effectively broken 'right at the beginning'
-    of the post barrier execution window.
-
-    In addition, there is another rendezvous, that makes each slave a server
-    and the master a client.  The connection process and usage is still the
-    same but allows barriers from machines that only have a one-way
-    connection initiation.  This is called rendezvous_servers.
-
-    For example:
-        if ME == SERVER:
-            server start
-
-        b = job.barrier(ME, 'server-up', 120)
-        b.rendezvous(CLIENT, SERVER)
-
-        if ME == CLIENT:
-            client run
-
-        b = job.barrier(ME, 'test-complete', 3600)
-        b.rendezvous(CLIENT, SERVER)
-
-        if ME == SERVER:
-            server stop
-
-    Any client can also request an abort of the job by setting
-    abort=True in the rendezvous arguments.
-    """
-
-    def __init__(self, hostid, tag, timeout=None, port=None,
-                 listen_server=None):
-        """
-        @param hostid: My hostname/IP address + optional tag.
-        @param tag: Symbolic name of the barrier in progress.
-        @param timeout: Maximum seconds to wait for a the barrier to meet.
-        @param port: Port number to listen on.
-        @param listen_server: External listen_server instance to use instead
-                of creating our own.  Create a listen_server instance and
-                reuse it across multiple barrier instances so that the
-                barrier code doesn't try to quickly re-bind on the same port
-                (packets still in transit for the previous barrier they may
-                reset new connections).
-        """
-        self._hostid = hostid
-        self._tag = tag
-        if listen_server:
-            if port:
-                raise error.BarrierError(
-                        '"port" and "listen_server" are mutually exclusive.')
-            self._port = listen_server.port
-        else:
-            self._port = port or _DEFAULT_PORT
-        self._server = listen_server  # A listen_server instance or None.
-        self._members = []  # List of hosts we expect to find at the barrier.
-        self._timeout_secs = timeout
-        self._start_time = None  # Timestamp of when we started waiting.
-        self._masterid = None  # Host/IP + optional tag of selected master.
-        logging.info("tag=%s port=%d timeout=%r",
-                     self._tag, self._port, self._timeout_secs)
-
-        # Number of clients seen (should be the length of self._waiting).
-        self._seen = 0
-
-        # Clients who have checked in and are waiting (if we are a master).
-        self._waiting = {}  # Maps from hostname -> (client, addr) tuples.
-
-
-    def _get_host_from_id(self, hostid):
-        # Remove any trailing local identifier following a #.
-        # This allows multiple members per host which is particularly
-        # helpful in testing.
-        if not hostid.startswith('#'):
-            return hostid.split('#')[0]
-        else:
-            raise error.BarrierError(
-                    "Invalid Host id: Host Address should be specified")
-
-
-    def _update_timeout(self, timeout):
-        if timeout is not None and self._start_time is not None:
-            self._timeout_secs = (time() - self._start_time) + timeout
-        else:
-            self._timeout_secs = timeout
-
-
-    def _remaining(self):
-        if self._timeout_secs is not None and self._start_time is not None:
-            timeout = self._timeout_secs - (time() - self._start_time)
-            if timeout <= 0:
-                errmsg = "timeout waiting for barrier: %s" % self._tag
-                logging.error(error)
-                raise error.BarrierError(errmsg)
-        else:
-            timeout = self._timeout_secs
-
-        if self._timeout_secs is not None:
-            logging.info("seconds remaining: %d", timeout)
-        return timeout
-
-
-    def _master_welcome(self, connection):
-        client, addr = connection
-        name = None
-
-        client.settimeout(5)
-        try:
-            # Get the clients name.
-            intro = client.recv(1024)
-            intro = intro.strip("\r\n")
-
-            intro_parts = intro.split(' ', 2)
-            if len(intro_parts) != 2:
-                logging.warn("Ignoring invalid data from %s: %r",
-                             client.getpeername(), intro)
-                client.close()
-                return
-            tag, name = intro_parts
-
-            logging.info("new client tag=%s, name=%s", tag, name)
-
-            # Ok, we know who is trying to attach.  Confirm that
-            # they are coming to the same meeting.  Also, everyone
-            # should be using a unique handle (their IP address).
-            # If we see a duplicate, something _bad_ has happened
-            # so drop them now.
-            if self._tag != tag:
-                logging.warn("client arriving for the wrong barrier: %s != %s",
-                             self._tag, tag)
-                client.settimeout(5)
-                client.send("!tag")
-                client.close()
-                return
-            elif name in self._waiting:
-                logging.warn("duplicate client")
-                client.settimeout(5)
-                client.send("!dup")
-                client.close()
-                return
-
-            # Acknowledge the client
-            client.send("wait")
-
-        except socket.timeout:
-            # This is nominally an error, but as we do not know
-            # who that was we cannot do anything sane other
-            # than report it and let the normal timeout kill
-            # us when thats appropriate.
-            logging.warn("client handshake timeout: (%s:%d)",
-                         addr[0], addr[1])
-            client.close()
-            return
-
-        logging.info("client now waiting: %s (%s:%d)",
-                     name, addr[0], addr[1])
-
-        # They seem to be valid record them.
-        self._waiting[name] = connection
-        self._seen += 1
-
-
-    def _slave_hello(self, connection):
-        (client, addr) = connection
-        name = None
-
-        client.settimeout(5)
-        try:
-            client.send(self._tag + " " + self._hostid)
-
-            reply = client.recv(4)
-            reply = reply.strip("\r\n")
-            logging.info("master said: %s", reply)
-
-            # Confirm the master accepted the connection.
-            if reply != "wait":
-                logging.warn("Bad connection request to master")
-                client.close()
-                return
-
-        except socket.timeout:
-            # This is nominally an error, but as we do not know
-            # who that was we cannot do anything sane other
-            # than report it and let the normal timeout kill
-            # us when thats appropriate.
-            logging.error("master handshake timeout: (%s:%d)",
-                          addr[0], addr[1])
-            client.close()
-            return
-
-        logging.info("slave now waiting: (%s:%d)", addr[0], addr[1])
-
-        # They seem to be valid record them.
-        self._waiting[self._hostid] = connection
-        self._seen = 1
-
-
-    def _master_release(self):
-        # Check everyone is still there, that they have not
-        # crashed or disconnected in the meantime.
-        allpresent = True
-        abort = self._abort
-        for name in self._waiting:
-            (client, addr) = self._waiting[name]
-
-            logging.info("checking client present: %s", name)
-
-            client.settimeout(5)
-            reply = 'none'
-            try:
-                client.send("ping")
-                reply = client.recv(1024)
-            except socket.timeout:
-                logging.warn("ping/pong timeout: %s", name)
-                pass
-
-            if reply == 'abrt':
-                logging.warn("Client %s requested abort", name)
-                abort = True
-            elif reply != "pong":
-                allpresent = False
-
-        if not allpresent:
-            raise error.BarrierError("master lost client")
-
-        if abort:
-            logging.info("Aborting the clients")
-            msg = 'abrt'
-        else:
-            logging.info("Releasing clients")
-            msg = 'rlse'
-
-        # If every ones checks in then commit the release.
-        for name in self._waiting:
-            (client, addr) = self._waiting[name]
-
-            client.settimeout(5)
-            try:
-                client.send(msg)
-            except socket.timeout:
-                logging.warn("release timeout: %s", name)
-                pass
-
-        if abort:
-            raise BarrierAbortError("Client requested abort")
-
-
-    def _waiting_close(self):
-        # Either way, close out all the clients.  If we have
-        # not released them then they know to abort.
-        for name in self._waiting:
-            (client, addr) = self._waiting[name]
-
-            logging.info("closing client: %s", name)
-
-            try:
-                client.close()
-            except:
-                pass
-
-
-    def _run_server(self, is_master):
-        server = self._server or listen_server(port=self._port)
-        failed = 0
-        try:
-            while True:
-                try:
-                    # Wait for callers welcoming each.
-                    server.socket.settimeout(self._remaining())
-                    connection = server.socket.accept()
-                    if is_master:
-                        self._master_welcome(connection)
-                    else:
-                        self._slave_hello(connection)
-                except socket.timeout:
-                    logging.warn("timeout waiting for remaining clients")
-                    pass
-
-                if is_master:
-                    # Check if everyone is here.
-                    logging.info("master seen %d of %d",
-                                 self._seen, len(self._members))
-                    if self._seen == len(self._members):
-                        self._master_release()
-                        break
-                else:
-                    # Check if master connected.
-                    if self._seen:
-                        logging.info("slave connected to master")
-                        self._slave_wait()
-                        break
-        finally:
-            self._waiting_close()
-            # if we created the listening_server in the beginning of this
-            # function then close the listening socket here
-            if not self._server:
-                server.close()
-
-
-    def _run_client(self, is_master):
-        while self._remaining() is None or self._remaining() > 0:
-            try:
-                remote = socket.socket(socket.AF_INET,
-                        socket.SOCK_STREAM)
-                remote.settimeout(30)
-                if is_master:
-                    # Connect to all slaves.
-                    host = self._get_host_from_id(self._members[self._seen])
-                    logging.info("calling slave: %s", host)
-                    connection = (remote, (host, self._port))
-                    remote.connect(connection[1])
-                    self._master_welcome(connection)
-                else:
-                    # Just connect to the master.
-                    host = self._get_host_from_id(self._masterid)
-                    logging.info("calling master")
-                    connection = (remote, (host, self._port))
-                    remote.connect(connection[1])
-                    self._slave_hello(connection)
-            except socket.timeout:
-                logging.warn("timeout calling host, retry")
-                sleep(10)
-                pass
-            except socket.error, err:
-                (code, str) = err
-                if (code != errno.ECONNREFUSED):
-                    raise
-                sleep(10)
-
-            if is_master:
-                # Check if everyone is here.
-                logging.info("master seen %d of %d",
-                             self._seen, len(self._members))
-                if self._seen == len(self._members):
-                    self._master_release()
-                    break
-            else:
-                # Check if master connected.
-                if self._seen:
-                    logging.info("slave connected to master")
-                    self._slave_wait()
-                    break
-
-        self._waiting_close()
-
-
-    def _slave_wait(self):
-        remote = self._waiting[self._hostid][0]
-        mode = "wait"
-        while True:
-            # All control messages are the same size to allow
-            # us to split individual messages easily.
-            remote.settimeout(self._remaining())
-            reply = remote.recv(4)
-            if not reply:
-                break
-
-            reply = reply.strip("\r\n")
-            logging.info("master said: %s", reply)
-
-            mode = reply
-            if reply == "ping":
-                # Ensure we have sufficient time for the
-                # ping/pong/rlse cyle to complete normally.
-                self._update_timeout(10 + 10 * len(self._members))
-
-                if self._abort:
-                    msg = "abrt"
-                else:
-                    msg = "pong"
-                logging.info(msg)
-                remote.settimeout(self._remaining())
-                remote.send(msg)
-
-            elif reply == "rlse" or reply == "abrt":
-                # Ensure we have sufficient time for the
-                # ping/pong/rlse cyle to complete normally.
-                self._update_timeout(10 + 10 * len(self._members))
-
-                logging.info("was released, waiting for close")
-
-        if mode == "rlse":
-            pass
-        elif mode == "wait":
-            raise error.BarrierError("master abort -- barrier timeout")
-        elif mode == "ping":
-            raise error.BarrierError("master abort -- client lost")
-        elif mode == "!tag":
-            raise error.BarrierError("master abort -- incorrect tag")
-        elif mode == "!dup":
-            raise error.BarrierError("master abort -- duplicate client")
-        elif mode == "abrt":
-            raise BarrierAbortError("Client requested abort")
-        else:
-            raise error.BarrierError("master handshake failure: " + mode)
-
-
-    def rendezvous(self, *hosts, **dargs):
-        # if called with abort=True, this will raise an exception
-        # on all the clients.
-        self._start_time = time()
-        self._members = list(hosts)
-        self._members.sort()
-        self._masterid = self._members.pop(0)
-        self._abort = dargs.get('abort', False)
-
-        logging.info("masterid: %s", self._masterid)
-        if self._abort:
-            logging.debug("%s is aborting", self._hostid)
-        if not len(self._members):
-            logging.info("No other members listed.")
-            return
-        logging.info("members: %s", ",".join(self._members))
-
-        self._seen = 0
-        self._waiting = {}
-
-        # Figure out who is the master in this barrier.
-        if self._hostid == self._masterid:
-            logging.info("selected as master")
-            self._run_server(is_master=True)
-        else:
-            logging.info("selected as slave")
-            self._run_client(is_master=False)
-
-
-    def rendezvous_servers(self, masterid, *hosts, **dargs):
-        # if called with abort=True, this will raise an exception
-        # on all the clients.
-        self._start_time = time()
-        self._members = list(hosts)
-        self._members.sort()
-        self._masterid = masterid
-        self._abort = dargs.get('abort', False)
-
-        logging.info("masterid: %s", self._masterid)
-        if not len(self._members):
-            logging.info("No other members listed.")
-            return
-        logging.info("members: %s", ",".join(self._members))
-
-        self._seen = 0
-        self._waiting = {}
-
-        # Figure out who is the master in this barrier.
-        if self._hostid == self._masterid:
-            logging.info("selected as master")
-            self._run_client(is_master=True)
-        else:
-            logging.info("selected as slave")
-            self._run_server(is_master=False)
+_SITE_MODULE_NAME = 'autotest_lib.client.common_lib.site_barrier'
+listen_server = utils.import_site_symbol(
+        __file__, _SITE_MODULE_NAME, 'listen_server', listen_server)
+barrier = utils.import_site_symbol(
+        __file__, _SITE_MODULE_NAME, 'barrier', barrier)
==== (deleted) 
//depot/google_vendor_src_branch/autotest/client/common_lib/barrier_unittest.py 
====
--- autotest/client/common_lib/barrier_unittest.py      2010-04-13 
16:00:38.000000000 -0700
+++ /dev/null   2009-12-17 12:29:38.000000000 -0800
@@ -1,202 +0,0 @@
-#!/usr/bin/python2.4
-
-__author__ = """Ashwin Ganti ([email protected])"""
-
-import os, sys, socket, errno, unittest, threading
-from time import time, sleep
-import common
-from autotest_lib.client.common_lib import error, barrier
-from autotest_lib.client.common_lib.test_utils import mock
-
-
-class listen_server_test(unittest.TestCase):
-
-    def test_init(self):
-        server = barrier.listen_server()
-        server.close()
-
-
-    def test_close(self):
-        server = barrier.listen_server()
-        # cannot bind on the same port again
-        self.assertRaises(socket.error, barrier.listen_server)
-        server.close()
-        # now we can
-        server = barrier.listen_server()
-        server.close()
-
-
-class barrier_test(unittest.TestCase):
-
-    def setUp(self):
-        self.god = mock.mock_god()
-        self.god.mock_io()
-
-
-    def tearDown(self):
-        self.god.unmock_io()
-
-
-    def test_initialize(self):
-        b = barrier.barrier('127.0.0.1#', 'testtag', 100, 11921)
-        self.assertEqual(b._hostid, '127.0.0.1#')
-        self.assertEqual(b._tag, 'testtag')
-        self.assertEqual(b._timeout_secs, 100)
-        self.assertEqual(b._port, 11921)
-
-
-    def test_get_host_from_id(self):
-        b = barrier.barrier('127.0.0.1#', 'testgethost', 100)
-
-        hostname = b._get_host_from_id('my_host')
-        self.assertEqual(hostname, 'my_host')
-
-        hostname = b._get_host_from_id('my_host#')
-        self.assertEqual(hostname, 'my_host')
-
-        self.assertRaises(error.BarrierError, b._get_host_from_id, '#my_host')
-
-
-    def test_update_timeout(self):
-        b = barrier.barrier('127.0.0.1#', 'update', 100)
-        b._update_timeout(120)
-        self.assertEqual(b._timeout_secs, 120)
-
-
-    def test_remaining(self):
-        b = barrier.barrier('127.0.0.1#', 'remain', 100)
-        remain = b._remaining()
-        self.assertEqual(remain, 100)
-
-
-    def test_master_welcome_garbage(self):
-        b = barrier.barrier('127.0.0.1#', 'garbage', 100)
-        waiting_before = dict(b._waiting)
-        seen_before = b._seen
-
-        sender, receiver = socket.socketpair()
-        try:
-            sender.send('GET /foobar?p=-1 HTTP/1.0\r\n\r\n')
-            # This should not raise an exception.
-            b._master_welcome((receiver, 'fakeaddr'))
-
-            self.assertEqual(waiting_before, b._waiting)
-            self.assertEqual(seen_before, b._seen)
-
-            sender, receiver = socket.socketpair()
-            sender.send('abcdefg\x00\x01\x02\n'*5)
-            # This should not raise an exception.
-            b._master_welcome((receiver, 'fakeaddr'))
-
-            self.assertEqual(waiting_before, b._waiting)
-            self.assertEqual(seen_before, b._seen)
-        finally:
-            sender.close()
-            receiver.close()
-
-
-    def test_rendezvous_basic(self):
-        # Basic rendezvous testing
-        self.rendezvous_test(60, port=11920)
-
-
-    def test_rendezvous_timeout(self):
-        # The rendezvous should time out here and throw a
-        # BarrierError since we are specifying a timeout of 0
-        self.assertRaises(error.BarrierError,
-                          self.rendezvous_test, 0, port=11921)
-
-
-    def test_rendezvous_abort_ok(self):
-        # Test with abort flag set to not abort.
-        self.rendezvous_test(60, port=11920,
-                             test_abort=True, abort=False)
-
-
-    def test_rendezvous_abort(self):
-        # The rendezvous should abort here and throw a
-        # BarrierError since we are asking to abort
-        self.assertRaises(error.BarrierError,
-                          self.rendezvous_test, 0, port=11921,
-                          test_abort=True, abort=True)
-
-
-    def test_rendezvous_servers_basic(self):
-        # The rendezvous should time out here and throw a
-        # BarrierError since we are specifying a timeout of 0
-        self.rendezvous_test(60, port=11921,
-                             rendezvous_servers=True)
-
-
-    def test_rendezvous_servers_timeout(self):
-        # The rendezvous should time out here and throw a
-        # BarrierError since we are specifying a timeout of 0
-        self.assertRaises(error.BarrierError,
-                          self.rendezvous_test, 0, port=11922,
-                          rendezvous_servers=True)
-
-
-    def test_rendezvous_servers_abort_ok(self):
-        # Test with abort flag set to not abort.
-        self.rendezvous_test(60, port=11920, rendezvous_servers=True,
-                             test_abort=True, abort=False)
-
-
-    def test_rendezvous_servers_abort(self):
-        # The rendezvous should abort here and throw a
-        # BarrierError since we are asking to abort
-        self.assertRaises(error.BarrierError,
-                          self.rendezvous_test, 0, port=11922,
-                          rendezvous_servers=True,
-                          test_abort=True, abort=True)
-
-
-    # Internal utility function (not a unit test)
-    def rendezvous_test(self, timeout, port=11922,
-                        rendezvous_servers=False, test_abort=False,
-                        abort=False, listen_server=None):
-        if listen_server:
-            port = None
-
-        def _rdv(addr):
-            b1 = barrier.barrier(addr, "test_meeting", timeout, port,
-                                 listen_server=listen_server)
-            if not rendezvous_servers:
-                if test_abort:
-                    b1.rendezvous('127.0.0.1#0', '127.0.0.1#1', abort=abort)
-                else:
-                    b1.rendezvous('127.0.0.1#0', '127.0.0.1#1')
-            else:
-                if test_abort:
-                    b1.rendezvous_servers('127.0.0.1#0', '127.0.0.1#1',
-                                          abort=abort)
-                else:
-                    b1.rendezvous_servers('127.0.0.1#0', '127.0.0.1#1')
-
-
-        def _thread_rdv(addr):
-            # We need to ignore the exception on one side.
-            try:
-                _rdv(addr)
-            except error.BarrierError:
-                pass
-
-        client = threading.Thread(target=_thread_rdv,
-                                  args=('127.0.0.1#0',))
-        client.start()
-        _rdv('127.0.0.1#1')
-        client.join()
-
-
-    def test_reusing_listen_server(self):
-        """
-        Test that reusing the same listen server object works.
-        """
-        server = barrier.listen_server()
-        self.rendezvous_test(10, listen_server=server)
-        self.rendezvous_test(10, listen_server=server)
-        self.rendezvous_test(10, listen_server=server)
-
-
-if __name__ == "__main__":
-    unittest.main()
--- autotest/client/common_lib/barrier.py       2010-04-13 16:00:38.000000000 
-0700
+++ autotest/client/common_lib/base_barrier.py  2010-04-13 18:15:53.000000000 
-0700
@@ -2,12 +2,8 @@
 from time import time, sleep
 from autotest_lib.client.common_lib import error
 
-# default barrier port
-_DEFAULT_PORT = 11922
-
-
-class BarrierAbortError(error.BarrierError):
-    """Special BarrierError raised when an explicit abort is requested."""
+# Default barrier TCP port.
+DEFAULT_PORT = 11922
 
 
 class listen_server(object):
@@ -23,7 +19,7 @@
     @attr port: Port to bind to.
     @attr socket: Listening socket object.
     """
-    def __init__(self, address='', port=_DEFAULT_PORT):
+    def __init__(self, address='', port=DEFAULT_PORT):
         """
         Create a listen_server instance for the given address/port.
 
@@ -143,7 +139,7 @@
                         '"port" and "listen_server" are mutually exclusive.')
             self._port = listen_server.port
         else:
-            self._port = port or _DEFAULT_PORT
+            self._port = port or DEFAULT_PORT
         self._server = listen_server  # A listen_server instance or None.
         self._members = []  # List of hosts we expect to find at the barrier.
         self._timeout_secs = timeout
@@ -334,7 +330,7 @@
                 pass
 
         if abort:
-            raise BarrierAbortError("Client requested abort")
+            raise error.BarrierAbortError("Client requested abort")
 
 
     def _waiting_close(self):
@@ -482,7 +478,7 @@
         elif mode == "!dup":
             raise error.BarrierError("master abort -- duplicate client")
         elif mode == "abrt":
-            raise BarrierAbortError("Client requested abort")
+            raise error.BarrierAbortError("Client requested abort")
         else:
             raise error.BarrierError("master handshake failure: " + mode)
 
--- autotest/client/common_lib/barrier_unittest.py      2010-04-13 
16:00:38.000000000 -0700
+++ autotest/client/common_lib/base_barrier_unittest.py 2010-04-13 
18:15:53.000000000 -0700
@@ -5,7 +5,8 @@
 import os, sys, socket, errno, unittest, threading
 from time import time, sleep
 import common
-from autotest_lib.client.common_lib import error, barrier
+from autotest_lib.client.common_lib import error, base_barrier
+barrier = base_barrier
 from autotest_lib.client.common_lib.test_utils import mock
 
 
--- autotest/client/common_lib/error.py 2010-04-13 18:15:53.000000000 -0700
+++ autotest/client/common_lib/error.py 2010-04-13 18:15:53.000000000 -0700
@@ -143,6 +143,11 @@
     pass
 
 
+class BarrierAbortError(BarrierError):
+    """Indicate that the barrier was explicitly aborted by a member."""
+    pass
+
+
 class InstallError(JobError):
     """Indicates an installation error which Terminates and fails the job."""
     pass
--- autotest/client/common_lib/utils.py 2010-04-12 09:44:38.000000000 -0700
+++ autotest/client/common_lib/utils.py 2010-04-13 18:15:53.000000000 -0700
@@ -8,7 +8,7 @@
     import hashlib
 except ImportError:
     import md5, sha
-from autotest_lib.client.common_lib import error, barrier, logging_manager
+from autotest_lib.client.common_lib import error, logging_manager
 
 def deprecated(func):
     """This is a decorator which can be used to mark functions as deprecated.
@@ -760,112 +760,6 @@
     return cpu_percent, to_return
 
 
-"""
-This function is used when there is a need to run more than one
-job simultaneously starting exactly at the same time. It basically returns
-a modified control file (containing the synchronization code prepended)
-whenever it is ready to run the control file. The synchronization
-is done using barriers to make sure that the jobs start at the same time.
-
-Here is how the synchronization is done to make sure that the tests
-start at exactly the same time on the client.
-sc_bar is a server barrier and s_bar, c_bar are the normal barriers
-
-                  Job1              Job2         ......      JobN
- Server:   |                        sc_bar
- Server:   |                        s_bar        ......      s_bar
- Server:   |      at.run()         at.run()      ......      at.run()
- ----------|------------------------------------------------------
- Client    |      sc_bar
- Client    |      c_bar             c_bar        ......      c_bar
- Client    |    <run test>         <run test>    ......     <run test>
-
-
-PARAMS:
-   control_file : The control file which to which the above synchronization
-                  code would be prepended to
-   host_name    : The host name on which the job is going to run
-   host_num (non negative) : A number to identify the machine so that we have
-                  different sets of s_bar_ports for each of the machines.
-   instance     : The number of the job
-   num_jobs     : Total number of jobs that are going to run in parallel with
-                  this job starting at the same time
-   port_base    : Port number that is used to derive the actual barrier ports.
-
-RETURN VALUE:
-    The modified control file.
-
-"""
-def get_sync_control_file(control, host_name, host_num,
-                          instance, num_jobs, port_base=63100):
-    sc_bar_port = port_base
-    c_bar_port = port_base
-    if host_num < 0:
-        print "Please provide a non negative number for the host"
-        return None
-    s_bar_port = port_base + 1 + host_num # The set of s_bar_ports are
-                                          # the same for a given machine
-
-    sc_bar_timeout = 180
-    s_bar_timeout = c_bar_timeout = 120
-
-    # The barrier code snippet is prepended into the conrol file
-    # dynamically before at.run() is called finally.
-    control_new = []
-
-    # jobid is the unique name used to identify the processes
-    # trying to reach the barriers
-    jobid = "%s#%d" % (host_name, instance)
-
-    rendv = []
-    # rendvstr is a temp holder for the rendezvous list of the processes
-    for n in range(num_jobs):
-        rendv.append("'%s#%d'" % (host_name, n))
-    rendvstr = ",".join(rendv)
-
-    if instance == 0:
-        # Do the setup and wait at the server barrier
-        # Clean up the tmp and the control dirs for the first instance
-        control_new.append('if os.path.exists(job.tmpdir):')
-        control_new.append("\t system('umount -f %s > /dev/null"
-                           "2> /dev/null' % job.tmpdir,"
-                           "ignore_status=True)")
-        control_new.append("\t system('rm -rf ' + job.tmpdir)")
-        control_new.append(
-            'b0 = job.barrier("%s", "sc_bar", %d, port=%d)'
-            % (jobid, sc_bar_timeout, sc_bar_port))
-        control_new.append(
-        'b0.rendezvous_servers("PARALLEL_MASTER", "%s")'
-         % jobid)
-
-    elif instance == 1:
-        # Wait at the server barrier to wait for instance=0
-        # process to complete setup
-        b0 = barrier.barrier("PARALLEL_MASTER", "sc_bar", sc_bar_timeout,
-                     port=sc_bar_port)
-        b0.rendezvous_servers("PARALLEL_MASTER", jobid)
-
-        if(num_jobs > 2):
-            b1 = barrier.barrier(jobid, "s_bar", s_bar_timeout,
-                         port=s_bar_port)
-            b1.rendezvous(rendvstr)
-
-    else:
-        # For the rest of the clients
-        b2 = barrier.barrier(jobid, "s_bar", s_bar_timeout, port=s_bar_port)
-        b2.rendezvous(rendvstr)
-
-    # Client side barrier for all the tests to start at the same time
-    control_new.append('b1 = job.barrier("%s", "c_bar", %d, port=%d)'
-                    % (jobid, c_bar_timeout, c_bar_port))
-    control_new.append("b1.rendezvous(%s)" % rendvstr)
-
-    # Stick in the rest of the control file
-    control_new.append(control)
-
-    return "\n".join(control_new)
-
-
 def get_arch(run_function=run):
     """
     Get the hardware architecture of the machine.
--- autotest/client/tests/barriertest/barriertest.py    2010-04-13 
16:00:38.000000000 -0700
+++ autotest/client/tests/barriertest/barriertest.py    2010-04-13 
18:15:53.000000000 -0700
@@ -2,45 +2,69 @@
 
 import logging, time
 from autotest_lib.client.bin import test
-from autotest_lib.client.common_lib import barrier
+from autotest_lib.client.common_lib import barrier, error
 
 
 class barriertest(test.test):
     version = 2
 
 
-    def run_once(self, our_addr, hostnames, master):
+    def run_once(self, our_addr, hostnames, master, timeout=120):
         # A reusable local server as we're using multiple barriers in one test.
         server = barrier.listen_server()
 
         # Basic barrier rendezvous test.
-        self.job.barrier(our_addr, 'First', timeout=60, listen_server=server
-                         ).rendezvous(*hostnames)
+        self.job.barrier(our_addr, 'First', timeout=timeout,
+                         listen_server=server).rendezvous(*hostnames)
         logging.info('1. rendezvous "First" complete.')
         time.sleep(2)
 
         # A rendezvous_servers using a different master than the default.
-        self.job.barrier(our_addr, 'Second', timeout=60, listen_server=server
+        self.job.barrier(our_addr, 'Second', timeout=timeout,
+                         listen_server=server
                          ).rendezvous_servers(hostnames[-1], *hostnames[:-1])
         logging.info('2. rendezvous_servers "Second" complete.')
         time.sleep(2)
 
         # A regular rendezvous, this time testing the abort functionality.
         try:
-            self.job.barrier(our_addr, 'Third', timeout=60,
+            self.job.barrier(our_addr, 'WillAbort', timeout=timeout,
                              listen_server=server
                              ).rendezvous(abort=True, *hostnames)
-        except barrier.BarrierAbortError:
+        except error.BarrierAbortError:
             pass
+        except error.BarrierError, e:
+            # We did get an error from the barrier, but was is acceptable or
+            # not?  Site code may not be able to indicate an explicit abort.
+            self.job.record('WARN', None, 'barriertest',
+                            'BarrierError %s instead of BarrierAbortError.' % 
e)
         else:
             raise error.TestFail('Explicit barrier rendezvous abort failed.')
-        logging.info('3. rendezvous(abort=True) "Third" complete.')
+        logging.info('3. rendezvous(abort=True) complete.')
         time.sleep(2)
 
         # Now attempt a rendezvous_servers that also includes the server.
-        self.job.barrier(our_addr, 'Final', timeout=60, listen_server=server
+        self.job.barrier(our_addr, 'FinalSync', timeout=timeout,
+                         listen_server=server
                          ).rendezvous_servers(master, *hostnames)
-        logging.info('N. rendezvous_servers "Final" complete.')
+        logging.info('4. rendezvous_servers "FinalSync" complete.')
+        time.sleep(2)
+
+        # rendezvous_servers, aborted from the master.
+        try:
+            self.job.barrier(our_addr, 'WillAbortServers', timeout=timeout,
+                             listen_server=server
+                             ).rendezvous_servers(master, *hostnames)
+        except error.BarrierAbortError:
+            pass
+        except error.BarrierError, e:
+            # We did get an error from the barrier, but was is acceptable or
+            # not?  Site code may not be able to indicate an explicit abort.
+            self.job.record('WARN', None, 'barriertest',
+                            'BarrierError %s instead of BarrierAbortError.' % 
e)
+        else:
+            raise error.TestFail('Explicit barrier rendezvous abort failed.')
+        logging.info('5. rendezvous_servers(abort=True) complete.')
         time.sleep(2)
 
         server.close()
--- autotest/server/base_utils.py       2010-04-13 18:15:53.000000000 -0700
+++ autotest/server/base_utils.py       2010-04-13 18:15:53.000000000 -0700
@@ -10,7 +10,7 @@
 
 import atexit, os, re, shutil, textwrap, sys, tempfile, types
 
-from autotest_lib.client.common_lib import utils
+from autotest_lib.client.common_lib import barrier, utils
 from autotest_lib.server import subcommand
 
 
@@ -323,3 +323,106 @@
     public_key.close()
 
     return public_key_str
+
+
+def get_sync_control_file(control, host_name, host_num,
+                          instance, num_jobs, port_base=63100):
+    """
+    This function is used when there is a need to run more than one
+    job simultaneously starting exactly at the same time. It basically returns
+    a modified control file (containing the synchronization code prepended)
+    whenever it is ready to run the control file. The synchronization
+    is done using barriers to make sure that the jobs start at the same time.
+
+    Here is how the synchronization is done to make sure that the tests
+    start at exactly the same time on the client.
+    sc_bar is a server barrier and s_bar, c_bar are the normal barriers
+
+                      Job1              Job2         ......      JobN
+     Server:   |                        sc_bar
+     Server:   |                        s_bar        ......      s_bar
+     Server:   |      at.run()         at.run()      ......      at.run()
+     ----------|------------------------------------------------------
+     Client    |      sc_bar
+     Client    |      c_bar             c_bar        ......      c_bar
+     Client    |    <run test>         <run test>    ......     <run test>
+
+    @param control: The control file which to which the above synchronization
+            code will be prepended.
+    @param host_name: The host name on which the job is going to run.
+    @param host_num: (non negative) A number to identify the machine so that
+            we have different sets of s_bar_ports for each of the machines.
+    @param instance: The number of the job
+    @param num_jobs: Total number of jobs that are going to run in parallel
+            with this job starting at the same time.
+    @param port_base: Port number that is used to derive the actual barrier
+            ports.
+
+    @returns The modified control file.
+    """
+    sc_bar_port = port_base
+    c_bar_port = port_base
+    if host_num < 0:
+        print "Please provide a non negative number for the host"
+        return None
+    s_bar_port = port_base + 1 + host_num # The set of s_bar_ports are
+                                          # the same for a given machine
+
+    sc_bar_timeout = 180
+    s_bar_timeout = c_bar_timeout = 120
+
+    # The barrier code snippet is prepended into the conrol file
+    # dynamically before at.run() is called finally.
+    control_new = []
+
+    # jobid is the unique name used to identify the processes
+    # trying to reach the barriers
+    jobid = "%s#%d" % (host_name, instance)
+
+    rendv = []
+    # rendvstr is a temp holder for the rendezvous list of the processes
+    for n in range(num_jobs):
+        rendv.append("'%s#%d'" % (host_name, n))
+    rendvstr = ",".join(rendv)
+
+    if instance == 0:
+        # Do the setup and wait at the server barrier
+        # Clean up the tmp and the control dirs for the first instance
+        control_new.append('if os.path.exists(job.tmpdir):')
+        control_new.append("\t system('umount -f %s > /dev/null"
+                           "2> /dev/null' % job.tmpdir,"
+                           "ignore_status=True)")
+        control_new.append("\t system('rm -rf ' + job.tmpdir)")
+        control_new.append(
+            'b0 = job.barrier("%s", "sc_bar", %d, port=%d)'
+            % (jobid, sc_bar_timeout, sc_bar_port))
+        control_new.append(
+        'b0.rendezvous_servers("PARALLEL_MASTER", "%s")'
+         % jobid)
+
+    elif instance == 1:
+        # Wait at the server barrier to wait for instance=0
+        # process to complete setup
+        b0 = barrier.barrier("PARALLEL_MASTER", "sc_bar", sc_bar_timeout,
+                     port=sc_bar_port)
+        b0.rendezvous_servers("PARALLEL_MASTER", jobid)
+
+        if(num_jobs > 2):
+            b1 = barrier.barrier(jobid, "s_bar", s_bar_timeout,
+                         port=s_bar_port)
+            b1.rendezvous(rendvstr)
+
+    else:
+        # For the rest of the clients
+        b2 = barrier.barrier(jobid, "s_bar", s_bar_timeout, port=s_bar_port)
+        b2.rendezvous(rendvstr)
+
+    # Client side barrier for all the tests to start at the same time
+    control_new.append('b1 = job.barrier("%s", "c_bar", %d, port=%d)'
+                    % (jobid, c_bar_timeout, c_bar_port))
+    control_new.append("b1.rendezvous(%s)" % rendvstr)
+
+    # Stick in the rest of the control file
+    control_new.append(control)
+
+    return "\n".join(control_new)
--- autotest/server/tests/barriertest/control.srv       2010-04-13 
16:00:38.000000000 -0700
+++ autotest/server/tests/barriertest/control.srv       2010-04-13 
18:15:53.000000000 -0700
@@ -11,7 +11,7 @@
 events across multiple hosts.
 """
 
-from autotest_lib.client.common_lib import barrier
+from autotest_lib.client.common_lib import barrier, error
 from autotest_lib.server import autotest, hosts, subcommand
 
 if len(machines) > 3:
@@ -35,9 +35,27 @@
     subcommand_list.append(subcommand.subcommand(
             host_at.run, (control, host.hostname)))
 
-# Synchronize with all of the clients launched above from the autoserv host.
-final_barrier = barrier.barrier(master, 'Final', timeout=600)
-subcommand_list.append(subcommand.subcommand(
-        final_barrier.rendezvous_servers, [master] + machines))
+listen_server = barrier.listen_server()
+aborting_barrier = barrier.barrier(master, 'WillAbortServers', timeout=600,
+                                   listen_server=listen_server)
+final_barrier = barrier.barrier(master, 'FinalSync', timeout=600,
+                                listen_server=listen_server)
+def verify_server_barriers():
+    # Synchronize with all of the clients launched from the autoserv host.
+    final_barrier.rendezvous_servers(master, *machines)
+
+    try:
+        aborting_barrier.rendezvous_servers(master, abort=True, *machines)
+    except error.BarrierAbortError:
+        pass
+    except error.BarrierError, e:
+        # We did get an error from the barrier, but was is acceptable or
+        # not?  Site code may not be able to indicate an explicit abort.
+        job.record('WARN', None, 'barriertest',
+                   'BarrierError %s instead of BarrierAbortError.' % e)
+    else:
+        raise error.TestFail('Explicit barrier rendezvous abort failed.')
+
+subcommand_list.append(subcommand.subcommand(verify_server_barriers, ()))
 
 subcommand.parallel(subcommand_list)
--- autotest/utils/unittest_suite.py    2010-04-13 18:15:53.000000000 -0700
+++ autotest/utils/unittest_suite.py    2010-04-13 18:15:53.000000000 -0700
@@ -61,7 +61,7 @@
         ))
 
 LONG_RUNTIME = set((
-    'barrier_unittest.py',
+    'base_barrier_unittest.py',
     'logging_manager_test.py',
     ))
_______________________________________________
Autotest mailing list
[email protected]
http://test.kernel.org/cgi-bin/mailman/listinfo/autotest

Reply via email to