LGTM, thanks

On Wed, 29 Apr 2015 at 15:58 'Klaus Aehlig' via ganeti-devel <
[email protected]> wrote:

> As master daemon does not exist any more, the need to have special
> classes for handling requests to it is gone as well. Also remove the
> tests for the dead code.
>
> Signed-off-by: Klaus Aehlig <[email protected]>
> ---
>  lib/daemon.py                     |  52 ------
>  lib/server/masterd.py             | 119 --------------
>  test/py/ganeti.daemon_unittest.py | 326
> --------------------------------------
>  3 files changed, 497 deletions(-)
>
> diff --git a/lib/daemon.py b/lib/daemon.py
> index cff0d84..3fff5d5 100644
> --- a/lib/daemon.py
> +++ b/lib/daemon.py
> @@ -142,58 +142,6 @@ class
> GanetiBaseAsyncoreDispatcher(asyncore.dispatcher):
>      return False
>
>
> -class AsyncStreamServer(GanetiBaseAsyncoreDispatcher):
> -  """A stream server to use with asyncore.
> -
> -  Each request is accepted, and then dispatched to a separate asyncore
> -  dispatcher to handle.
> -
> -  """
> -
> -  _REQUEST_QUEUE_SIZE = 5
> -
> -  def __init__(self, family, address):
> -    """Constructor for AsyncStreamServer
> -
> -    @type family: integer
> -    @param family: socket family (one of socket.AF_*)
> -    @type address: address family dependent
> -    @param address: address to bind the socket to
> -
> -    """
> -    GanetiBaseAsyncoreDispatcher.__init__(self)
> -    self.family = family
> -    self.create_socket(self.family, socket.SOCK_STREAM)
> -    self.set_reuse_addr()
> -    self.bind(address)
> -    self.listen(self._REQUEST_QUEUE_SIZE)
> -
> -  # this method is overriding an asyncore.dispatcher method
> -  def handle_accept(self):
> -    """Accept a new client connection.
> -
> -    Creates a new instance of the handler class, which will use asyncore
> to
> -    serve the client.
> -
> -    """
> -    accept_result = utils.IgnoreSignals(self.accept)
> -    if accept_result is not None:
> -      connected_socket, client_address = accept_result
> -      if self.family == socket.AF_UNIX:
> -        # override the client address, as for unix sockets nothing
> meaningful
> -        # is passed in from accept anyway
> -        client_address = netutils.GetSocketCredentials(connected_socket)
> -      logging.info("Accepted connection from %s",
> -                   netutils.FormatAddress(client_address,
> family=self.family))
> -      self.handle_connection(connected_socket, client_address)
> -
> -  def handle_connection(self, connected_socket, client_address):
> -    """Handle an already accepted connection.
> -
> -    """
> -    raise NotImplementedError
> -
> -
>  class AsyncTerminatedMessageStream(asynchat.async_chat):
>    """A terminator separated message stream asyncore module.
>
> diff --git a/lib/server/masterd.py b/lib/server/masterd.py
> index be98f69..cff55b7 100644
> --- a/lib/server/masterd.py
> +++ b/lib/server/masterd.py
> @@ -38,25 +38,17 @@ inheritance from parent classes requires it.
>  # pylint: disable=C0103
>  # C0103: Invalid name ganeti-masterd
>
> -import os
> -import sys
> -import socket
>  import time
> -import tempfile
>  import logging
>
> -
>  from ganeti import config
>  from ganeti import constants
>  from ganeti import daemon
>  from ganeti import jqueue
>  from ganeti import luxi
> -import ganeti.rpc.errors as rpcerr
>  from ganeti import utils
>  from ganeti import errors
> -from ganeti import workerpool
>  import ganeti.rpc.node as rpc
> -import ganeti.rpc.client as rpccl
>  from ganeti import ht
>
>
> @@ -79,49 +71,6 @@ def _LogNewJob(status, info, ops):
>                   info, op_summary)
>
>
> -class ClientRequestWorker(workerpool.BaseWorker):
> -  # pylint: disable=W0221
> -  def RunTask(self, server, message, client):
> -    """Process the request.
> -
> -    """
> -    client_ops = ClientOps(server)
> -
> -    try:
> -      (method, args, ver) = rpccl.ParseRequest(message)
> -    except rpcerr.ProtocolError, err:
> -      logging.error("Protocol Error: %s", err)
> -      client.close_log()
> -      return
> -
> -    success = False
> -    try:
> -      # Verify client's version if there was one in the request
> -      if ver is not None and ver != constants.LUXI_VERSION:
> -        raise errors.LuxiError("LUXI version mismatch, server %s, request
> %s" %
> -                               (constants.LUXI_VERSION, ver))
> -
> -      result = client_ops.handle_request(method, args)
> -      success = True
> -    except errors.GenericError, err:
> -      logging.exception("Unexpected exception")
> -      success = False
> -      result = errors.EncodeException(err)
> -    except:
> -      logging.exception("Unexpected exception")
> -      err = sys.exc_info()
> -      result = "Caught exception: %s" % str(err[1])
> -
> -    try:
> -      reply = rpccl.FormatResponse(success, result)
> -      client.send_message(reply)
> -      # awake the main thread so that it can write out the data.
> -      server.awaker.signal()
> -    except: # pylint: disable=W0702
> -      logging.exception("Send error")
> -      client.close_log()
> -
> -
>  class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
>    """Handler for master peers.
>
> @@ -196,74 +145,6 @@ class _MasterShutdownCheck(object):
>        return remaining
>
>
> -class MasterServer(daemon.AsyncStreamServer):
> -  """Master Server.
> -
> -  This is the main asynchronous master server. It handles connections to
> the
> -  master socket.
> -
> -  """
> -  family = socket.AF_UNIX
> -
> -  def __init__(self, address, uid, gid):
> -    """MasterServer constructor
> -
> -    @param address: the unix socket address to bind the MasterServer to
> -    @param uid: The uid of the owner of the socket
> -    @param gid: The gid of the owner of the socket
> -
> -    """
> -    temp_name = tempfile.mktemp(dir=os.path.dirname(address))
> -    daemon.AsyncStreamServer.__init__(self, self.family, temp_name)
> -    os.chmod(temp_name, 0770)
> -    os.chown(temp_name, uid, gid)
> -    os.rename(temp_name, address)
> -
> -    self.awaker = daemon.AsyncAwaker()
> -
> -    # We'll only start threads once we've forked.
> -    self.context = None
> -    self.request_workers = None
> -
> -    self._shutdown_check = None
> -
> -  def handle_connection(self, connected_socket, client_address):
> -    # TODO: add connection count and limit the number of open connections
> to a
> -    # maximum number to avoid breaking for lack of file descriptors or
> memory.
> -    MasterClientHandler(self, connected_socket, client_address,
> self.family)
> -
> -  def setup_context(self):
> -    self.context = GanetiContext()
> -    self.request_workers = workerpool.WorkerPool("ClientReq",
> -                                                 CLIENT_REQUEST_WORKERS,
> -                                                 ClientRequestWorker)
> -
> -  def WaitForShutdown(self):
> -    """Prepares server for shutdown.
> -
> -    """
> -    if self._shutdown_check is None:
> -      self._shutdown_check = _MasterShutdownCheck()
> -
> -    return self._shutdown_check(self.context.jobqueue.PrepareShutdown())
> -
> -  def server_cleanup(self):
> -    """Cleanup the server.
> -
> -    This involves shutting down the processor threads and the master
> -    socket.
> -
> -    """
> -    try:
> -      self.close()
> -    finally:
> -      if self.request_workers:
> -        self.request_workers.TerminateWorkers()
> -      if self.context:
> -        self.context.jobqueue.Shutdown()
> -        self.context.livelock.close()
> -
> -
>  class ClientOps(object):
>    """Class holding high-level client operations."""
>    def __init__(self, server):
> diff --git a/test/py/ganeti.daemon_unittest.py b/test/py/
> ganeti.daemon_unittest.py
> index ed0f992..129600d 100755
> --- a/test/py/ganeti.daemon_unittest.py
> +++ b/test/py/ganeti.daemon_unittest.py
> @@ -290,332 +290,6 @@ class
> TestAsyncIP6UDPSocket(testutils.GanetiTestCase, _BaseAsyncUDPSocketTest):
>      _BaseAsyncUDPSocketTest.tearDown(self)
>
>
> -class _MyAsyncStreamServer(daemon.AsyncStreamServer):
> -
> -  def __init__(self, family, address, handle_connection_fn):
> -    daemon.AsyncStreamServer.__init__(self, family, address)
> -    self.handle_connection_fn = handle_connection_fn
> -    self.error_count = 0
> -    self.expt_count = 0
> -
> -  def handle_connection(self, connected_socket, client_address):
> -    self.handle_connection_fn(connected_socket, client_address)
> -
> -  def handle_error(self):
> -    self.error_count += 1
> -    self.close()
> -    raise
> -
> -  def handle_expt(self):
> -    self.expt_count += 1
> -    self.close()
> -
> -
> -class _MyMessageStreamHandler(daemon.AsyncTerminatedMessageStream):
> -
> -  def __init__(self, connected_socket, client_address, terminator, family,
> -               message_fn, client_id, unhandled_limit):
> -    daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket,
> -                                                 client_address,
> -                                                 terminator, family,
> -                                                 unhandled_limit)
> -    self.message_fn = message_fn
> -    self.client_id = client_id
> -    self.error_count = 0
> -
> -  def handle_message(self, message, message_id):
> -    self.message_fn(self, message, message_id)
> -
> -  def handle_error(self):
> -    self.error_count += 1
> -    raise
> -
> -
> -class TestAsyncStreamServerTCP(testutils.GanetiTestCase):
> -  """Test daemon.AsyncStreamServer with a TCP connection"""
> -
> -  family = socket.AF_INET
> -
> -  def setUp(self):
> -    testutils.GanetiTestCase.setUp(self)
> -    self.mainloop = daemon.Mainloop()
> -    self.address = self.getAddress()
> -    self.server = _MyAsyncStreamServer(self.family, self.address,
> -                                       self.handle_connection)
> -    self.client_handler = _MyMessageStreamHandler
> -    self.unhandled_limit = None
> -    self.terminator = "\3"
> -    self.address = self.server.getsockname()
> -    self.clients = []
> -    self.connections = []
> -    self.messages = {}
> -    self.connect_terminate_count = 0
> -    self.message_terminate_count = 0
> -    self.next_client_id = 0
> -    # Save utils.IgnoreSignals so we can do evil things to it...
> -    self.saved_utils_ignoresignals = utils.IgnoreSignals
> -
> -  def tearDown(self):
> -    for c in self.clients:
> -      c.close()
> -    for c in self.connections:
> -      c.close()
> -    self.server.close()
> -    # ...and restore it as well
> -    utils.IgnoreSignals = self.saved_utils_ignoresignals
> -    testutils.GanetiTestCase.tearDown(self)
> -
> -  def getAddress(self):
> -    return ("127.0.0.1", 0)
> -
> -  def countTerminate(self, name):
> -    value = getattr(self, name)
> -    if value is not None:
> -      value -= 1
> -      setattr(self, name, value)
> -      if value <= 0:
> -        os.kill(os.getpid(), signal.SIGTERM)
> -
> -  def handle_connection(self, connected_socket, client_address):
> -    client_id = self.next_client_id
> -    self.next_client_id += 1
> -    client_handler = self.client_handler(connected_socket, client_address,
> -                                         self.terminator, self.family,
> -                                         self.handle_message,
> -                                         client_id, self.unhandled_limit)
> -    self.connections.append(client_handler)
> -    self.countTerminate("connect_terminate_count")
> -
> -  def handle_message(self, handler, message, message_id):
> -    self.messages.setdefault(handler.client_id, [])
> -    # We should just check that the message_ids are monotonically
> increasing.
> -    # If in the unit tests we never remove messages from the received
> queue,
> -    # though, we can just require that the queue length is the same as the
> -    # message id, before pushing the message to it. This forces a more
> -    # restrictive check, but we can live with this for now.
> -    self.assertEquals(len(self.messages[handler.client_id]), message_id)
> -    self.messages[handler.client_id].append(message)
> -    if message == "error":
> -      raise errors.GenericError("error")
> -    self.countTerminate("message_terminate_count")
> -
> -  def getClient(self):
> -    client = socket.socket(self.family, socket.SOCK_STREAM)
> -    client.connect(self.address)
> -    self.clients.append(client)
> -    return client
> -
> -  def tearDown(self):
> -    testutils.GanetiTestCase.tearDown(self)
> -    self.server.close()
> -
> -  def testConnect(self):
> -    self.getClient()
> -    self.mainloop.Run()
> -    self.assertEquals(len(self.connections), 1)
> -    self.getClient()
> -    self.mainloop.Run()
> -    self.assertEquals(len(self.connections), 2)
> -    self.connect_terminate_count = 4
> -    self.getClient()
> -    self.getClient()
> -    self.getClient()
> -    self.getClient()
> -    self.mainloop.Run()
> -    self.assertEquals(len(self.connections), 6)
> -
> -  def testBasicMessage(self):
> -    self.connect_terminate_count = None
> -    client = self.getClient()
> -    client.send("ciao\3")
> -    self.mainloop.Run()
> -    self.assertEquals(len(self.connections), 1)
> -    self.assertEquals(len(self.messages[0]), 1)
> -    self.assertEquals(self.messages[0][0], "ciao")
> -
> -  def testDoubleMessage(self):
> -    self.connect_terminate_count = None
> -    client = self.getClient()
> -    client.send("ciao\3")
> -    self.mainloop.Run()
> -    client.send("foobar\3")
> -    self.mainloop.Run()
> -    self.assertEquals(len(self.connections), 1)
> -    self.assertEquals(len(self.messages[0]), 2)
> -    self.assertEquals(self.messages[0][1], "foobar")
> -
> -  def testComposedMessage(self):
> -    self.connect_terminate_count = None
> -    self.message_terminate_count = 3
> -    client = self.getClient()
> -    client.send("one\3composed\3message\3")
> -    self.mainloop.Run()
> -    self.assertEquals(len(self.messages[0]), 3)
> -    self.assertEquals(self.messages[0], ["one", "composed", "message"])
> -
> -  def testLongTerminator(self):
> -    self.terminator = "\0\1\2"
> -    self.connect_terminate_count = None
> -    self.message_terminate_count = 3
> -    client = self.getClient()
> -    client.send("one\0\1\2composed\0\1\2message\0\1\2")
> -    self.mainloop.Run()
> -    self.assertEquals(len(self.messages[0]), 3)
> -    self.assertEquals(self.messages[0], ["one", "composed", "message"])
> -
> -  def testErrorHandling(self):
> -    self.connect_terminate_count = None
> -    self.message_terminate_count = None
> -    client = self.getClient()
> -    client.send("one\3two\3error\3three\3")
> -    self.assertRaises(errors.GenericError, self.mainloop.Run)
> -    self.assertEquals(self.connections[0].error_count, 1)
> -    self.assertEquals(self.messages[0], ["one", "two", "error"])
> -    client.send("error\3")
> -    self.assertRaises(errors.GenericError, self.mainloop.Run)
> -    self.assertEquals(self.connections[0].error_count, 2)
> -    self.assertEquals(self.messages[0], ["one", "two", "error", "three",
> -                                         "error"])
> -
> -  def testDoubleClient(self):
> -    self.connect_terminate_count = None
> -    self.message_terminate_count = 2
> -    client1 = self.getClient()
> -    client2 = self.getClient()
> -    client1.send("c1m1\3")
> -    client2.send("c2m1\3")
> -    self.mainloop.Run()
> -    self.assertEquals(self.messages[0], ["c1m1"])
> -    self.assertEquals(self.messages[1], ["c2m1"])
> -
> -  def testUnterminatedMessage(self):
> -    self.connect_terminate_count = None
> -    self.message_terminate_count = 3
> -    client1 = self.getClient()
> -    client2 = self.getClient()
> -    client1.send("message\3unterminated")
> -    client2.send("c2m1\3c2m2\3")
> -    self.mainloop.Run()
> -    self.assertEquals(self.messages[0], ["message"])
> -    self.assertEquals(self.messages[1], ["c2m1", "c2m2"])
> -    client1.send("message\3")
> -    self.mainloop.Run()
> -    self.assertEquals(self.messages[0], ["message",
> "unterminatedmessage"])
> -
> -  def testSignaledWhileAccepting(self):
> -    utils.IgnoreSignals = lambda fn, *args, **kwargs: None
> -    client1 = self.getClient()
> -    self.server.handle_accept()
> -    # When interrupted while accepting we don't have a connection, but we
> -    # didn't crash either.
> -    self.assertEquals(len(self.connections), 0)
> -    utils.IgnoreSignals = self.saved_utils_ignoresignals
> -    self.mainloop.Run()
> -    self.assertEquals(len(self.connections), 1)
> -
> -  def testSendMessage(self):
> -    self.connect_terminate_count = None
> -    self.message_terminate_count = 3
> -    client1 = self.getClient()
> -    client2 = self.getClient()
> -    client1.send("one\3composed\3message\3")
> -    self.mainloop.Run()
> -    self.assertEquals(self.messages[0], ["one", "composed", "message"])
> -    self.assertFalse(self.connections[0].writable())
> -    self.assertFalse(self.connections[1].writable())
> -    self.connections[0].send_message("r0")
> -    self.assert_(self.connections[0].writable())
> -    self.assertFalse(self.connections[1].writable())
> -    self.connections[0].send_message("r1")
> -    self.connections[0].send_message("r2")
> -    # We currently have no way to terminate the mainloop on write events,
> but
> -    # let's assume handle_write will be called if writable() is True.
> -    while self.connections[0].writable():
> -      self.connections[0].handle_write()
> -    client1.setblocking(0)
> -    client2.setblocking(0)
> -    self.assertEquals(client1.recv(4096), "r0\3r1\3r2\3")
> -    self.assertRaises(socket.error, client2.recv, 4096)
> -
> -  def testLimitedUnhandledMessages(self):
> -    self.connect_terminate_count = None
> -    self.message_terminate_count = 3
> -    self.unhandled_limit = 2
> -    client1 = self.getClient()
> -    client2 = self.getClient()
> -    client1.send("one\3composed\3long\3message\3")
> -    client2.send("c2one\3")
> -    self.mainloop.Run()
> -    self.assertEquals(self.messages[0], ["one", "composed"])
> -    self.assertEquals(self.messages[1], ["c2one"])
> -    self.assertFalse(self.connections[0].readable())
> -    self.assert_(self.connections[1].readable())
> -    self.connections[0].send_message("r0")
> -    self.message_terminate_count = None
> -    client1.send("another\3")
> -    # when we write replies messages queued also get handled, but not the
> ones
> -    # in the socket.
> -    while self.connections[0].writable():
> -      self.connections[0].handle_write()
> -    self.assertFalse(self.connections[0].readable())
> -    self.assertEquals(self.messages[0], ["one", "composed", "long"])
> -    self.connections[0].send_message("r1")
> -    self.connections[0].send_message("r2")
> -    while self.connections[0].writable():
> -      self.connections[0].handle_write()
> -    self.assertEquals(self.messages[0], ["one", "composed", "long",
> "message"])
> -    self.assert_(self.connections[0].readable())
> -
> -  def testLimitedUnhandledMessagesOne(self):
> -    self.connect_terminate_count = None
> -    self.message_terminate_count = 2
> -    self.unhandled_limit = 1
> -    client1 = self.getClient()
> -    client2 = self.getClient()
> -    client1.send("one\3composed\3message\3")
> -    client2.send("c2one\3")
> -    self.mainloop.Run()
> -    self.assertEquals(self.messages[0], ["one"])
> -    self.assertEquals(self.messages[1], ["c2one"])
> -    self.assertFalse(self.connections[0].readable())
> -    self.assertFalse(self.connections[1].readable())
> -    self.connections[0].send_message("r0")
> -    self.message_terminate_count = None
> -    while self.connections[0].writable():
> -      self.connections[0].handle_write()
> -    self.assertFalse(self.connections[0].readable())
> -    self.assertEquals(self.messages[0], ["one", "composed"])
> -    self.connections[0].send_message("r2")
> -    self.connections[0].send_message("r3")
> -    while self.connections[0].writable():
> -      self.connections[0].handle_write()
> -    self.assertEquals(self.messages[0], ["one", "composed", "message"])
> -    self.assert_(self.connections[0].readable())
> -
> -
> -class TestAsyncStreamServerUnixPath(TestAsyncStreamServerTCP):
> -  """Test daemon.AsyncStreamServer with a Unix path connection"""
> -
> -  family = socket.AF_UNIX
> -
> -  def getAddress(self):
> -    self.tmpdir = tempfile.mkdtemp()
> -    return os.path.join(self.tmpdir, "server.sock")
> -
> -  def tearDown(self):
> -    shutil.rmtree(self.tmpdir)
> -    TestAsyncStreamServerTCP.tearDown(self)
> -
> -
> -class TestAsyncStreamServerUnixAbstract(TestAsyncStreamServerTCP):
> -  """Test daemon.AsyncStreamServer with a Unix abstract connection"""
> -
> -  family = socket.AF_UNIX
> -
> -  def getAddress(self):
> -    return "\0myabstractsocketaddress"
> -
> -
>  class TestAsyncAwaker(testutils.GanetiTestCase):
>    """Test daemon.AsyncAwaker"""
>
> --
> 2.2.0.rc0.207.ga3a616c
>
>

Reply via email to