As master daemon does not exist any more, the need to have special classes for handling requests to it is gone as well. Also remove the tests for the dead code.
Signed-off-by: Klaus Aehlig <[email protected]> --- lib/daemon.py | 52 ------ lib/server/masterd.py | 119 -------------- test/py/ganeti.daemon_unittest.py | 326 -------------------------------------- 3 files changed, 497 deletions(-) diff --git a/lib/daemon.py b/lib/daemon.py index cff0d84..3fff5d5 100644 --- a/lib/daemon.py +++ b/lib/daemon.py @@ -142,58 +142,6 @@ class GanetiBaseAsyncoreDispatcher(asyncore.dispatcher): return False -class AsyncStreamServer(GanetiBaseAsyncoreDispatcher): - """A stream server to use with asyncore. - - Each request is accepted, and then dispatched to a separate asyncore - dispatcher to handle. - - """ - - _REQUEST_QUEUE_SIZE = 5 - - def __init__(self, family, address): - """Constructor for AsyncStreamServer - - @type family: integer - @param family: socket family (one of socket.AF_*) - @type address: address family dependent - @param address: address to bind the socket to - - """ - GanetiBaseAsyncoreDispatcher.__init__(self) - self.family = family - self.create_socket(self.family, socket.SOCK_STREAM) - self.set_reuse_addr() - self.bind(address) - self.listen(self._REQUEST_QUEUE_SIZE) - - # this method is overriding an asyncore.dispatcher method - def handle_accept(self): - """Accept a new client connection. - - Creates a new instance of the handler class, which will use asyncore to - serve the client. - - """ - accept_result = utils.IgnoreSignals(self.accept) - if accept_result is not None: - connected_socket, client_address = accept_result - if self.family == socket.AF_UNIX: - # override the client address, as for unix sockets nothing meaningful - # is passed in from accept anyway - client_address = netutils.GetSocketCredentials(connected_socket) - logging.info("Accepted connection from %s", - netutils.FormatAddress(client_address, family=self.family)) - self.handle_connection(connected_socket, client_address) - - def handle_connection(self, connected_socket, client_address): - """Handle an already accepted connection. - - """ - raise NotImplementedError - - class AsyncTerminatedMessageStream(asynchat.async_chat): """A terminator separated message stream asyncore module. diff --git a/lib/server/masterd.py b/lib/server/masterd.py index be98f69..cff55b7 100644 --- a/lib/server/masterd.py +++ b/lib/server/masterd.py @@ -38,25 +38,17 @@ inheritance from parent classes requires it. # pylint: disable=C0103 # C0103: Invalid name ganeti-masterd -import os -import sys -import socket import time -import tempfile import logging - from ganeti import config from ganeti import constants from ganeti import daemon from ganeti import jqueue from ganeti import luxi -import ganeti.rpc.errors as rpcerr from ganeti import utils from ganeti import errors -from ganeti import workerpool import ganeti.rpc.node as rpc -import ganeti.rpc.client as rpccl from ganeti import ht @@ -79,49 +71,6 @@ def _LogNewJob(status, info, ops): info, op_summary) -class ClientRequestWorker(workerpool.BaseWorker): - # pylint: disable=W0221 - def RunTask(self, server, message, client): - """Process the request. - - """ - client_ops = ClientOps(server) - - try: - (method, args, ver) = rpccl.ParseRequest(message) - except rpcerr.ProtocolError, err: - logging.error("Protocol Error: %s", err) - client.close_log() - return - - success = False - try: - # Verify client's version if there was one in the request - if ver is not None and ver != constants.LUXI_VERSION: - raise errors.LuxiError("LUXI version mismatch, server %s, request %s" % - (constants.LUXI_VERSION, ver)) - - result = client_ops.handle_request(method, args) - success = True - except errors.GenericError, err: - logging.exception("Unexpected exception") - success = False - result = errors.EncodeException(err) - except: - logging.exception("Unexpected exception") - err = sys.exc_info() - result = "Caught exception: %s" % str(err[1]) - - try: - reply = rpccl.FormatResponse(success, result) - client.send_message(reply) - # awake the main thread so that it can write out the data. - server.awaker.signal() - except: # pylint: disable=W0702 - logging.exception("Send error") - client.close_log() - - class MasterClientHandler(daemon.AsyncTerminatedMessageStream): """Handler for master peers. @@ -196,74 +145,6 @@ class _MasterShutdownCheck(object): return remaining -class MasterServer(daemon.AsyncStreamServer): - """Master Server. - - This is the main asynchronous master server. It handles connections to the - master socket. - - """ - family = socket.AF_UNIX - - def __init__(self, address, uid, gid): - """MasterServer constructor - - @param address: the unix socket address to bind the MasterServer to - @param uid: The uid of the owner of the socket - @param gid: The gid of the owner of the socket - - """ - temp_name = tempfile.mktemp(dir=os.path.dirname(address)) - daemon.AsyncStreamServer.__init__(self, self.family, temp_name) - os.chmod(temp_name, 0770) - os.chown(temp_name, uid, gid) - os.rename(temp_name, address) - - self.awaker = daemon.AsyncAwaker() - - # We'll only start threads once we've forked. - self.context = None - self.request_workers = None - - self._shutdown_check = None - - def handle_connection(self, connected_socket, client_address): - # TODO: add connection count and limit the number of open connections to a - # maximum number to avoid breaking for lack of file descriptors or memory. - MasterClientHandler(self, connected_socket, client_address, self.family) - - def setup_context(self): - self.context = GanetiContext() - self.request_workers = workerpool.WorkerPool("ClientReq", - CLIENT_REQUEST_WORKERS, - ClientRequestWorker) - - def WaitForShutdown(self): - """Prepares server for shutdown. - - """ - if self._shutdown_check is None: - self._shutdown_check = _MasterShutdownCheck() - - return self._shutdown_check(self.context.jobqueue.PrepareShutdown()) - - def server_cleanup(self): - """Cleanup the server. - - This involves shutting down the processor threads and the master - socket. - - """ - try: - self.close() - finally: - if self.request_workers: - self.request_workers.TerminateWorkers() - if self.context: - self.context.jobqueue.Shutdown() - self.context.livelock.close() - - class ClientOps(object): """Class holding high-level client operations.""" def __init__(self, server): diff --git a/test/py/ganeti.daemon_unittest.py b/test/py/ganeti.daemon_unittest.py index ed0f992..129600d 100755 --- a/test/py/ganeti.daemon_unittest.py +++ b/test/py/ganeti.daemon_unittest.py @@ -290,332 +290,6 @@ class TestAsyncIP6UDPSocket(testutils.GanetiTestCase, _BaseAsyncUDPSocketTest): _BaseAsyncUDPSocketTest.tearDown(self) -class _MyAsyncStreamServer(daemon.AsyncStreamServer): - - def __init__(self, family, address, handle_connection_fn): - daemon.AsyncStreamServer.__init__(self, family, address) - self.handle_connection_fn = handle_connection_fn - self.error_count = 0 - self.expt_count = 0 - - def handle_connection(self, connected_socket, client_address): - self.handle_connection_fn(connected_socket, client_address) - - def handle_error(self): - self.error_count += 1 - self.close() - raise - - def handle_expt(self): - self.expt_count += 1 - self.close() - - -class _MyMessageStreamHandler(daemon.AsyncTerminatedMessageStream): - - def __init__(self, connected_socket, client_address, terminator, family, - message_fn, client_id, unhandled_limit): - daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket, - client_address, - terminator, family, - unhandled_limit) - self.message_fn = message_fn - self.client_id = client_id - self.error_count = 0 - - def handle_message(self, message, message_id): - self.message_fn(self, message, message_id) - - def handle_error(self): - self.error_count += 1 - raise - - -class TestAsyncStreamServerTCP(testutils.GanetiTestCase): - """Test daemon.AsyncStreamServer with a TCP connection""" - - family = socket.AF_INET - - def setUp(self): - testutils.GanetiTestCase.setUp(self) - self.mainloop = daemon.Mainloop() - self.address = self.getAddress() - self.server = _MyAsyncStreamServer(self.family, self.address, - self.handle_connection) - self.client_handler = _MyMessageStreamHandler - self.unhandled_limit = None - self.terminator = "\3" - self.address = self.server.getsockname() - self.clients = [] - self.connections = [] - self.messages = {} - self.connect_terminate_count = 0 - self.message_terminate_count = 0 - self.next_client_id = 0 - # Save utils.IgnoreSignals so we can do evil things to it... - self.saved_utils_ignoresignals = utils.IgnoreSignals - - def tearDown(self): - for c in self.clients: - c.close() - for c in self.connections: - c.close() - self.server.close() - # ...and restore it as well - utils.IgnoreSignals = self.saved_utils_ignoresignals - testutils.GanetiTestCase.tearDown(self) - - def getAddress(self): - return ("127.0.0.1", 0) - - def countTerminate(self, name): - value = getattr(self, name) - if value is not None: - value -= 1 - setattr(self, name, value) - if value <= 0: - os.kill(os.getpid(), signal.SIGTERM) - - def handle_connection(self, connected_socket, client_address): - client_id = self.next_client_id - self.next_client_id += 1 - client_handler = self.client_handler(connected_socket, client_address, - self.terminator, self.family, - self.handle_message, - client_id, self.unhandled_limit) - self.connections.append(client_handler) - self.countTerminate("connect_terminate_count") - - def handle_message(self, handler, message, message_id): - self.messages.setdefault(handler.client_id, []) - # We should just check that the message_ids are monotonically increasing. - # If in the unit tests we never remove messages from the received queue, - # though, we can just require that the queue length is the same as the - # message id, before pushing the message to it. This forces a more - # restrictive check, but we can live with this for now. - self.assertEquals(len(self.messages[handler.client_id]), message_id) - self.messages[handler.client_id].append(message) - if message == "error": - raise errors.GenericError("error") - self.countTerminate("message_terminate_count") - - def getClient(self): - client = socket.socket(self.family, socket.SOCK_STREAM) - client.connect(self.address) - self.clients.append(client) - return client - - def tearDown(self): - testutils.GanetiTestCase.tearDown(self) - self.server.close() - - def testConnect(self): - self.getClient() - self.mainloop.Run() - self.assertEquals(len(self.connections), 1) - self.getClient() - self.mainloop.Run() - self.assertEquals(len(self.connections), 2) - self.connect_terminate_count = 4 - self.getClient() - self.getClient() - self.getClient() - self.getClient() - self.mainloop.Run() - self.assertEquals(len(self.connections), 6) - - def testBasicMessage(self): - self.connect_terminate_count = None - client = self.getClient() - client.send("ciao\3") - self.mainloop.Run() - self.assertEquals(len(self.connections), 1) - self.assertEquals(len(self.messages[0]), 1) - self.assertEquals(self.messages[0][0], "ciao") - - def testDoubleMessage(self): - self.connect_terminate_count = None - client = self.getClient() - client.send("ciao\3") - self.mainloop.Run() - client.send("foobar\3") - self.mainloop.Run() - self.assertEquals(len(self.connections), 1) - self.assertEquals(len(self.messages[0]), 2) - self.assertEquals(self.messages[0][1], "foobar") - - def testComposedMessage(self): - self.connect_terminate_count = None - self.message_terminate_count = 3 - client = self.getClient() - client.send("one\3composed\3message\3") - self.mainloop.Run() - self.assertEquals(len(self.messages[0]), 3) - self.assertEquals(self.messages[0], ["one", "composed", "message"]) - - def testLongTerminator(self): - self.terminator = "\0\1\2" - self.connect_terminate_count = None - self.message_terminate_count = 3 - client = self.getClient() - client.send("one\0\1\2composed\0\1\2message\0\1\2") - self.mainloop.Run() - self.assertEquals(len(self.messages[0]), 3) - self.assertEquals(self.messages[0], ["one", "composed", "message"]) - - def testErrorHandling(self): - self.connect_terminate_count = None - self.message_terminate_count = None - client = self.getClient() - client.send("one\3two\3error\3three\3") - self.assertRaises(errors.GenericError, self.mainloop.Run) - self.assertEquals(self.connections[0].error_count, 1) - self.assertEquals(self.messages[0], ["one", "two", "error"]) - client.send("error\3") - self.assertRaises(errors.GenericError, self.mainloop.Run) - self.assertEquals(self.connections[0].error_count, 2) - self.assertEquals(self.messages[0], ["one", "two", "error", "three", - "error"]) - - def testDoubleClient(self): - self.connect_terminate_count = None - self.message_terminate_count = 2 - client1 = self.getClient() - client2 = self.getClient() - client1.send("c1m1\3") - client2.send("c2m1\3") - self.mainloop.Run() - self.assertEquals(self.messages[0], ["c1m1"]) - self.assertEquals(self.messages[1], ["c2m1"]) - - def testUnterminatedMessage(self): - self.connect_terminate_count = None - self.message_terminate_count = 3 - client1 = self.getClient() - client2 = self.getClient() - client1.send("message\3unterminated") - client2.send("c2m1\3c2m2\3") - self.mainloop.Run() - self.assertEquals(self.messages[0], ["message"]) - self.assertEquals(self.messages[1], ["c2m1", "c2m2"]) - client1.send("message\3") - self.mainloop.Run() - self.assertEquals(self.messages[0], ["message", "unterminatedmessage"]) - - def testSignaledWhileAccepting(self): - utils.IgnoreSignals = lambda fn, *args, **kwargs: None - client1 = self.getClient() - self.server.handle_accept() - # When interrupted while accepting we don't have a connection, but we - # didn't crash either. - self.assertEquals(len(self.connections), 0) - utils.IgnoreSignals = self.saved_utils_ignoresignals - self.mainloop.Run() - self.assertEquals(len(self.connections), 1) - - def testSendMessage(self): - self.connect_terminate_count = None - self.message_terminate_count = 3 - client1 = self.getClient() - client2 = self.getClient() - client1.send("one\3composed\3message\3") - self.mainloop.Run() - self.assertEquals(self.messages[0], ["one", "composed", "message"]) - self.assertFalse(self.connections[0].writable()) - self.assertFalse(self.connections[1].writable()) - self.connections[0].send_message("r0") - self.assert_(self.connections[0].writable()) - self.assertFalse(self.connections[1].writable()) - self.connections[0].send_message("r1") - self.connections[0].send_message("r2") - # We currently have no way to terminate the mainloop on write events, but - # let's assume handle_write will be called if writable() is True. - while self.connections[0].writable(): - self.connections[0].handle_write() - client1.setblocking(0) - client2.setblocking(0) - self.assertEquals(client1.recv(4096), "r0\3r1\3r2\3") - self.assertRaises(socket.error, client2.recv, 4096) - - def testLimitedUnhandledMessages(self): - self.connect_terminate_count = None - self.message_terminate_count = 3 - self.unhandled_limit = 2 - client1 = self.getClient() - client2 = self.getClient() - client1.send("one\3composed\3long\3message\3") - client2.send("c2one\3") - self.mainloop.Run() - self.assertEquals(self.messages[0], ["one", "composed"]) - self.assertEquals(self.messages[1], ["c2one"]) - self.assertFalse(self.connections[0].readable()) - self.assert_(self.connections[1].readable()) - self.connections[0].send_message("r0") - self.message_terminate_count = None - client1.send("another\3") - # when we write replies messages queued also get handled, but not the ones - # in the socket. - while self.connections[0].writable(): - self.connections[0].handle_write() - self.assertFalse(self.connections[0].readable()) - self.assertEquals(self.messages[0], ["one", "composed", "long"]) - self.connections[0].send_message("r1") - self.connections[0].send_message("r2") - while self.connections[0].writable(): - self.connections[0].handle_write() - self.assertEquals(self.messages[0], ["one", "composed", "long", "message"]) - self.assert_(self.connections[0].readable()) - - def testLimitedUnhandledMessagesOne(self): - self.connect_terminate_count = None - self.message_terminate_count = 2 - self.unhandled_limit = 1 - client1 = self.getClient() - client2 = self.getClient() - client1.send("one\3composed\3message\3") - client2.send("c2one\3") - self.mainloop.Run() - self.assertEquals(self.messages[0], ["one"]) - self.assertEquals(self.messages[1], ["c2one"]) - self.assertFalse(self.connections[0].readable()) - self.assertFalse(self.connections[1].readable()) - self.connections[0].send_message("r0") - self.message_terminate_count = None - while self.connections[0].writable(): - self.connections[0].handle_write() - self.assertFalse(self.connections[0].readable()) - self.assertEquals(self.messages[0], ["one", "composed"]) - self.connections[0].send_message("r2") - self.connections[0].send_message("r3") - while self.connections[0].writable(): - self.connections[0].handle_write() - self.assertEquals(self.messages[0], ["one", "composed", "message"]) - self.assert_(self.connections[0].readable()) - - -class TestAsyncStreamServerUnixPath(TestAsyncStreamServerTCP): - """Test daemon.AsyncStreamServer with a Unix path connection""" - - family = socket.AF_UNIX - - def getAddress(self): - self.tmpdir = tempfile.mkdtemp() - return os.path.join(self.tmpdir, "server.sock") - - def tearDown(self): - shutil.rmtree(self.tmpdir) - TestAsyncStreamServerTCP.tearDown(self) - - -class TestAsyncStreamServerUnixAbstract(TestAsyncStreamServerTCP): - """Test daemon.AsyncStreamServer with a Unix abstract connection""" - - family = socket.AF_UNIX - - def getAddress(self): - return "\0myabstractsocketaddress" - - class TestAsyncAwaker(testutils.GanetiTestCase): """Test daemon.AsyncAwaker""" -- 2.2.0.rc0.207.ga3a616c
