LGTM, thanks On Wed, 29 Apr 2015 at 15:58 'Klaus Aehlig' via ganeti-devel < [email protected]> wrote:
> As master daemon does not exist any more, the need to have special > classes for handling requests to it is gone as well. Also remove the > tests for the dead code. > > Signed-off-by: Klaus Aehlig <[email protected]> > --- > lib/daemon.py | 52 ------ > lib/server/masterd.py | 119 -------------- > test/py/ganeti.daemon_unittest.py | 326 > -------------------------------------- > 3 files changed, 497 deletions(-) > > diff --git a/lib/daemon.py b/lib/daemon.py > index cff0d84..3fff5d5 100644 > --- a/lib/daemon.py > +++ b/lib/daemon.py > @@ -142,58 +142,6 @@ class > GanetiBaseAsyncoreDispatcher(asyncore.dispatcher): > return False > > > -class AsyncStreamServer(GanetiBaseAsyncoreDispatcher): > - """A stream server to use with asyncore. > - > - Each request is accepted, and then dispatched to a separate asyncore > - dispatcher to handle. > - > - """ > - > - _REQUEST_QUEUE_SIZE = 5 > - > - def __init__(self, family, address): > - """Constructor for AsyncStreamServer > - > - @type family: integer > - @param family: socket family (one of socket.AF_*) > - @type address: address family dependent > - @param address: address to bind the socket to > - > - """ > - GanetiBaseAsyncoreDispatcher.__init__(self) > - self.family = family > - self.create_socket(self.family, socket.SOCK_STREAM) > - self.set_reuse_addr() > - self.bind(address) > - self.listen(self._REQUEST_QUEUE_SIZE) > - > - # this method is overriding an asyncore.dispatcher method > - def handle_accept(self): > - """Accept a new client connection. > - > - Creates a new instance of the handler class, which will use asyncore > to > - serve the client. > - > - """ > - accept_result = utils.IgnoreSignals(self.accept) > - if accept_result is not None: > - connected_socket, client_address = accept_result > - if self.family == socket.AF_UNIX: > - # override the client address, as for unix sockets nothing > meaningful > - # is passed in from accept anyway > - client_address = netutils.GetSocketCredentials(connected_socket) > - logging.info("Accepted connection from %s", > - netutils.FormatAddress(client_address, > family=self.family)) > - self.handle_connection(connected_socket, client_address) > - > - def handle_connection(self, connected_socket, client_address): > - """Handle an already accepted connection. > - > - """ > - raise NotImplementedError > - > - > class AsyncTerminatedMessageStream(asynchat.async_chat): > """A terminator separated message stream asyncore module. > > diff --git a/lib/server/masterd.py b/lib/server/masterd.py > index be98f69..cff55b7 100644 > --- a/lib/server/masterd.py > +++ b/lib/server/masterd.py > @@ -38,25 +38,17 @@ inheritance from parent classes requires it. > # pylint: disable=C0103 > # C0103: Invalid name ganeti-masterd > > -import os > -import sys > -import socket > import time > -import tempfile > import logging > > - > from ganeti import config > from ganeti import constants > from ganeti import daemon > from ganeti import jqueue > from ganeti import luxi > -import ganeti.rpc.errors as rpcerr > from ganeti import utils > from ganeti import errors > -from ganeti import workerpool > import ganeti.rpc.node as rpc > -import ganeti.rpc.client as rpccl > from ganeti import ht > > > @@ -79,49 +71,6 @@ def _LogNewJob(status, info, ops): > info, op_summary) > > > -class ClientRequestWorker(workerpool.BaseWorker): > - # pylint: disable=W0221 > - def RunTask(self, server, message, client): > - """Process the request. > - > - """ > - client_ops = ClientOps(server) > - > - try: > - (method, args, ver) = rpccl.ParseRequest(message) > - except rpcerr.ProtocolError, err: > - logging.error("Protocol Error: %s", err) > - client.close_log() > - return > - > - success = False > - try: > - # Verify client's version if there was one in the request > - if ver is not None and ver != constants.LUXI_VERSION: > - raise errors.LuxiError("LUXI version mismatch, server %s, request > %s" % > - (constants.LUXI_VERSION, ver)) > - > - result = client_ops.handle_request(method, args) > - success = True > - except errors.GenericError, err: > - logging.exception("Unexpected exception") > - success = False > - result = errors.EncodeException(err) > - except: > - logging.exception("Unexpected exception") > - err = sys.exc_info() > - result = "Caught exception: %s" % str(err[1]) > - > - try: > - reply = rpccl.FormatResponse(success, result) > - client.send_message(reply) > - # awake the main thread so that it can write out the data. > - server.awaker.signal() > - except: # pylint: disable=W0702 > - logging.exception("Send error") > - client.close_log() > - > - > class MasterClientHandler(daemon.AsyncTerminatedMessageStream): > """Handler for master peers. > > @@ -196,74 +145,6 @@ class _MasterShutdownCheck(object): > return remaining > > > -class MasterServer(daemon.AsyncStreamServer): > - """Master Server. > - > - This is the main asynchronous master server. It handles connections to > the > - master socket. > - > - """ > - family = socket.AF_UNIX > - > - def __init__(self, address, uid, gid): > - """MasterServer constructor > - > - @param address: the unix socket address to bind the MasterServer to > - @param uid: The uid of the owner of the socket > - @param gid: The gid of the owner of the socket > - > - """ > - temp_name = tempfile.mktemp(dir=os.path.dirname(address)) > - daemon.AsyncStreamServer.__init__(self, self.family, temp_name) > - os.chmod(temp_name, 0770) > - os.chown(temp_name, uid, gid) > - os.rename(temp_name, address) > - > - self.awaker = daemon.AsyncAwaker() > - > - # We'll only start threads once we've forked. > - self.context = None > - self.request_workers = None > - > - self._shutdown_check = None > - > - def handle_connection(self, connected_socket, client_address): > - # TODO: add connection count and limit the number of open connections > to a > - # maximum number to avoid breaking for lack of file descriptors or > memory. > - MasterClientHandler(self, connected_socket, client_address, > self.family) > - > - def setup_context(self): > - self.context = GanetiContext() > - self.request_workers = workerpool.WorkerPool("ClientReq", > - CLIENT_REQUEST_WORKERS, > - ClientRequestWorker) > - > - def WaitForShutdown(self): > - """Prepares server for shutdown. > - > - """ > - if self._shutdown_check is None: > - self._shutdown_check = _MasterShutdownCheck() > - > - return self._shutdown_check(self.context.jobqueue.PrepareShutdown()) > - > - def server_cleanup(self): > - """Cleanup the server. > - > - This involves shutting down the processor threads and the master > - socket. > - > - """ > - try: > - self.close() > - finally: > - if self.request_workers: > - self.request_workers.TerminateWorkers() > - if self.context: > - self.context.jobqueue.Shutdown() > - self.context.livelock.close() > - > - > class ClientOps(object): > """Class holding high-level client operations.""" > def __init__(self, server): > diff --git a/test/py/ganeti.daemon_unittest.py b/test/py/ > ganeti.daemon_unittest.py > index ed0f992..129600d 100755 > --- a/test/py/ganeti.daemon_unittest.py > +++ b/test/py/ganeti.daemon_unittest.py > @@ -290,332 +290,6 @@ class > TestAsyncIP6UDPSocket(testutils.GanetiTestCase, _BaseAsyncUDPSocketTest): > _BaseAsyncUDPSocketTest.tearDown(self) > > > -class _MyAsyncStreamServer(daemon.AsyncStreamServer): > - > - def __init__(self, family, address, handle_connection_fn): > - daemon.AsyncStreamServer.__init__(self, family, address) > - self.handle_connection_fn = handle_connection_fn > - self.error_count = 0 > - self.expt_count = 0 > - > - def handle_connection(self, connected_socket, client_address): > - self.handle_connection_fn(connected_socket, client_address) > - > - def handle_error(self): > - self.error_count += 1 > - self.close() > - raise > - > - def handle_expt(self): > - self.expt_count += 1 > - self.close() > - > - > -class _MyMessageStreamHandler(daemon.AsyncTerminatedMessageStream): > - > - def __init__(self, connected_socket, client_address, terminator, family, > - message_fn, client_id, unhandled_limit): > - daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket, > - client_address, > - terminator, family, > - unhandled_limit) > - self.message_fn = message_fn > - self.client_id = client_id > - self.error_count = 0 > - > - def handle_message(self, message, message_id): > - self.message_fn(self, message, message_id) > - > - def handle_error(self): > - self.error_count += 1 > - raise > - > - > -class TestAsyncStreamServerTCP(testutils.GanetiTestCase): > - """Test daemon.AsyncStreamServer with a TCP connection""" > - > - family = socket.AF_INET > - > - def setUp(self): > - testutils.GanetiTestCase.setUp(self) > - self.mainloop = daemon.Mainloop() > - self.address = self.getAddress() > - self.server = _MyAsyncStreamServer(self.family, self.address, > - self.handle_connection) > - self.client_handler = _MyMessageStreamHandler > - self.unhandled_limit = None > - self.terminator = "\3" > - self.address = self.server.getsockname() > - self.clients = [] > - self.connections = [] > - self.messages = {} > - self.connect_terminate_count = 0 > - self.message_terminate_count = 0 > - self.next_client_id = 0 > - # Save utils.IgnoreSignals so we can do evil things to it... > - self.saved_utils_ignoresignals = utils.IgnoreSignals > - > - def tearDown(self): > - for c in self.clients: > - c.close() > - for c in self.connections: > - c.close() > - self.server.close() > - # ...and restore it as well > - utils.IgnoreSignals = self.saved_utils_ignoresignals > - testutils.GanetiTestCase.tearDown(self) > - > - def getAddress(self): > - return ("127.0.0.1", 0) > - > - def countTerminate(self, name): > - value = getattr(self, name) > - if value is not None: > - value -= 1 > - setattr(self, name, value) > - if value <= 0: > - os.kill(os.getpid(), signal.SIGTERM) > - > - def handle_connection(self, connected_socket, client_address): > - client_id = self.next_client_id > - self.next_client_id += 1 > - client_handler = self.client_handler(connected_socket, client_address, > - self.terminator, self.family, > - self.handle_message, > - client_id, self.unhandled_limit) > - self.connections.append(client_handler) > - self.countTerminate("connect_terminate_count") > - > - def handle_message(self, handler, message, message_id): > - self.messages.setdefault(handler.client_id, []) > - # We should just check that the message_ids are monotonically > increasing. > - # If in the unit tests we never remove messages from the received > queue, > - # though, we can just require that the queue length is the same as the > - # message id, before pushing the message to it. This forces a more > - # restrictive check, but we can live with this for now. > - self.assertEquals(len(self.messages[handler.client_id]), message_id) > - self.messages[handler.client_id].append(message) > - if message == "error": > - raise errors.GenericError("error") > - self.countTerminate("message_terminate_count") > - > - def getClient(self): > - client = socket.socket(self.family, socket.SOCK_STREAM) > - client.connect(self.address) > - self.clients.append(client) > - return client > - > - def tearDown(self): > - testutils.GanetiTestCase.tearDown(self) > - self.server.close() > - > - def testConnect(self): > - self.getClient() > - self.mainloop.Run() > - self.assertEquals(len(self.connections), 1) > - self.getClient() > - self.mainloop.Run() > - self.assertEquals(len(self.connections), 2) > - self.connect_terminate_count = 4 > - self.getClient() > - self.getClient() > - self.getClient() > - self.getClient() > - self.mainloop.Run() > - self.assertEquals(len(self.connections), 6) > - > - def testBasicMessage(self): > - self.connect_terminate_count = None > - client = self.getClient() > - client.send("ciao\3") > - self.mainloop.Run() > - self.assertEquals(len(self.connections), 1) > - self.assertEquals(len(self.messages[0]), 1) > - self.assertEquals(self.messages[0][0], "ciao") > - > - def testDoubleMessage(self): > - self.connect_terminate_count = None > - client = self.getClient() > - client.send("ciao\3") > - self.mainloop.Run() > - client.send("foobar\3") > - self.mainloop.Run() > - self.assertEquals(len(self.connections), 1) > - self.assertEquals(len(self.messages[0]), 2) > - self.assertEquals(self.messages[0][1], "foobar") > - > - def testComposedMessage(self): > - self.connect_terminate_count = None > - self.message_terminate_count = 3 > - client = self.getClient() > - client.send("one\3composed\3message\3") > - self.mainloop.Run() > - self.assertEquals(len(self.messages[0]), 3) > - self.assertEquals(self.messages[0], ["one", "composed", "message"]) > - > - def testLongTerminator(self): > - self.terminator = "\0\1\2" > - self.connect_terminate_count = None > - self.message_terminate_count = 3 > - client = self.getClient() > - client.send("one\0\1\2composed\0\1\2message\0\1\2") > - self.mainloop.Run() > - self.assertEquals(len(self.messages[0]), 3) > - self.assertEquals(self.messages[0], ["one", "composed", "message"]) > - > - def testErrorHandling(self): > - self.connect_terminate_count = None > - self.message_terminate_count = None > - client = self.getClient() > - client.send("one\3two\3error\3three\3") > - self.assertRaises(errors.GenericError, self.mainloop.Run) > - self.assertEquals(self.connections[0].error_count, 1) > - self.assertEquals(self.messages[0], ["one", "two", "error"]) > - client.send("error\3") > - self.assertRaises(errors.GenericError, self.mainloop.Run) > - self.assertEquals(self.connections[0].error_count, 2) > - self.assertEquals(self.messages[0], ["one", "two", "error", "three", > - "error"]) > - > - def testDoubleClient(self): > - self.connect_terminate_count = None > - self.message_terminate_count = 2 > - client1 = self.getClient() > - client2 = self.getClient() > - client1.send("c1m1\3") > - client2.send("c2m1\3") > - self.mainloop.Run() > - self.assertEquals(self.messages[0], ["c1m1"]) > - self.assertEquals(self.messages[1], ["c2m1"]) > - > - def testUnterminatedMessage(self): > - self.connect_terminate_count = None > - self.message_terminate_count = 3 > - client1 = self.getClient() > - client2 = self.getClient() > - client1.send("message\3unterminated") > - client2.send("c2m1\3c2m2\3") > - self.mainloop.Run() > - self.assertEquals(self.messages[0], ["message"]) > - self.assertEquals(self.messages[1], ["c2m1", "c2m2"]) > - client1.send("message\3") > - self.mainloop.Run() > - self.assertEquals(self.messages[0], ["message", > "unterminatedmessage"]) > - > - def testSignaledWhileAccepting(self): > - utils.IgnoreSignals = lambda fn, *args, **kwargs: None > - client1 = self.getClient() > - self.server.handle_accept() > - # When interrupted while accepting we don't have a connection, but we > - # didn't crash either. > - self.assertEquals(len(self.connections), 0) > - utils.IgnoreSignals = self.saved_utils_ignoresignals > - self.mainloop.Run() > - self.assertEquals(len(self.connections), 1) > - > - def testSendMessage(self): > - self.connect_terminate_count = None > - self.message_terminate_count = 3 > - client1 = self.getClient() > - client2 = self.getClient() > - client1.send("one\3composed\3message\3") > - self.mainloop.Run() > - self.assertEquals(self.messages[0], ["one", "composed", "message"]) > - self.assertFalse(self.connections[0].writable()) > - self.assertFalse(self.connections[1].writable()) > - self.connections[0].send_message("r0") > - self.assert_(self.connections[0].writable()) > - self.assertFalse(self.connections[1].writable()) > - self.connections[0].send_message("r1") > - self.connections[0].send_message("r2") > - # We currently have no way to terminate the mainloop on write events, > but > - # let's assume handle_write will be called if writable() is True. > - while self.connections[0].writable(): > - self.connections[0].handle_write() > - client1.setblocking(0) > - client2.setblocking(0) > - self.assertEquals(client1.recv(4096), "r0\3r1\3r2\3") > - self.assertRaises(socket.error, client2.recv, 4096) > - > - def testLimitedUnhandledMessages(self): > - self.connect_terminate_count = None > - self.message_terminate_count = 3 > - self.unhandled_limit = 2 > - client1 = self.getClient() > - client2 = self.getClient() > - client1.send("one\3composed\3long\3message\3") > - client2.send("c2one\3") > - self.mainloop.Run() > - self.assertEquals(self.messages[0], ["one", "composed"]) > - self.assertEquals(self.messages[1], ["c2one"]) > - self.assertFalse(self.connections[0].readable()) > - self.assert_(self.connections[1].readable()) > - self.connections[0].send_message("r0") > - self.message_terminate_count = None > - client1.send("another\3") > - # when we write replies messages queued also get handled, but not the > ones > - # in the socket. > - while self.connections[0].writable(): > - self.connections[0].handle_write() > - self.assertFalse(self.connections[0].readable()) > - self.assertEquals(self.messages[0], ["one", "composed", "long"]) > - self.connections[0].send_message("r1") > - self.connections[0].send_message("r2") > - while self.connections[0].writable(): > - self.connections[0].handle_write() > - self.assertEquals(self.messages[0], ["one", "composed", "long", > "message"]) > - self.assert_(self.connections[0].readable()) > - > - def testLimitedUnhandledMessagesOne(self): > - self.connect_terminate_count = None > - self.message_terminate_count = 2 > - self.unhandled_limit = 1 > - client1 = self.getClient() > - client2 = self.getClient() > - client1.send("one\3composed\3message\3") > - client2.send("c2one\3") > - self.mainloop.Run() > - self.assertEquals(self.messages[0], ["one"]) > - self.assertEquals(self.messages[1], ["c2one"]) > - self.assertFalse(self.connections[0].readable()) > - self.assertFalse(self.connections[1].readable()) > - self.connections[0].send_message("r0") > - self.message_terminate_count = None > - while self.connections[0].writable(): > - self.connections[0].handle_write() > - self.assertFalse(self.connections[0].readable()) > - self.assertEquals(self.messages[0], ["one", "composed"]) > - self.connections[0].send_message("r2") > - self.connections[0].send_message("r3") > - while self.connections[0].writable(): > - self.connections[0].handle_write() > - self.assertEquals(self.messages[0], ["one", "composed", "message"]) > - self.assert_(self.connections[0].readable()) > - > - > -class TestAsyncStreamServerUnixPath(TestAsyncStreamServerTCP): > - """Test daemon.AsyncStreamServer with a Unix path connection""" > - > - family = socket.AF_UNIX > - > - def getAddress(self): > - self.tmpdir = tempfile.mkdtemp() > - return os.path.join(self.tmpdir, "server.sock") > - > - def tearDown(self): > - shutil.rmtree(self.tmpdir) > - TestAsyncStreamServerTCP.tearDown(self) > - > - > -class TestAsyncStreamServerUnixAbstract(TestAsyncStreamServerTCP): > - """Test daemon.AsyncStreamServer with a Unix abstract connection""" > - > - family = socket.AF_UNIX > - > - def getAddress(self): > - return "\0myabstractsocketaddress" > - > - > class TestAsyncAwaker(testutils.GanetiTestCase): > """Test daemon.AsyncAwaker""" > > -- > 2.2.0.rc0.207.ga3a616c > >
