With this patch we convert the MasterClientHandler class to async_chat. ClientRqHandler is completely replaced, and now it's the main thread that reads asynchronously from all sockets. Then each luxi request is dispatched as a separate job to a ClientRequestWorker. This makes it so that just connection a client to the masterd doesn't waste a thread to serve the connection, and makes DOSing the server by just generating luxi connections a lot harder.
More work on wait-for-job-changes is needed before the CLIENT_REQUEST_WORKERS size can be reduced. Signed-off-by: Guido Trotter <[email protected]> --- daemons/ganeti-masterd | 125 +++++++++++++++++++++++++---------------------- 1 files changed, 66 insertions(+), 59 deletions(-) diff --git a/daemons/ganeti-masterd b/daemons/ganeti-masterd index 1ca2199..b0337f8 100755 --- a/daemons/ganeti-masterd +++ b/daemons/ganeti-masterd @@ -31,10 +31,10 @@ inheritance from parent classes requires it. import sys import socket -import SocketServer import time import collections import logging +import asynchat from optparse import OptionParser @@ -62,23 +62,79 @@ EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR class ClientRequestWorker(workerpool.BaseWorker): # pylint: disable-msg=W0221 - def RunTask(self, server, request, client_address): + def RunTask(self, server, message, client): """Process the request. """ + client_ops = ClientOps(server) + try: - server.request_handler_class(request, client_address, server) - finally: - request.close() + (method, args) = luxi.ParseRequest(message) + except luxi.ProtocolError, err: + logging.error("Protocol Error: %s" % err) + client.close_log() + return + success = False + try: + result = client_ops.handle_request(method, args) + success = True + except errors.GenericError, err: + logging.exception("Unexpected exception") + success = False + result = errors.EncodeException(err) + except: + logging.exception("Unexpected exception") + err = sys.exc_info() + result = "Caught exception: %s" % str(err[1]) -class MasterClientHandler(object): - # TODO: change this to a real asyncore server + # FIXME: send data via push :) + client.connected_socket.sendall(luxi.FormatResponse(success, result) + constants.LUXI_EOM) + #client.push(serializer.DumpJson(response) + constants.LUXI_EOM) + + +class MasterClientHandler(asynchat.async_chat): """Handler for master requests. """ def __init__(self, server, connected_socket, socket_family, client_address): - server.request_workers.AddTask(server, connected_socket, client_address) + # python 2.4/2.5 uses conn=... while 2.6 has sock=... we have to cheat by + # using a positional argument rather than a keyword one. + asynchat.async_chat.__init__(self, connected_socket) + self.ibuffer = [] + self.server = server + self.socket_family = socket_family + self.client_address = client_address + self.set_terminator(constants.LUXI_EOM) + self.connected_socket = connected_socket + + # this method is overriding an asynchat.async_chat method + def collect_incoming_data(self, data): + self.ibuffer.append(data) + + # this method is overriding an asynchat.async_chat method + def found_terminator(self): + message = "".join(self.ibuffer) + self.ibuffer = [] + self.server.request_workers.AddTask(self.server, message, self) + + def close_log(self): + logging.info("closing connection from %s" + % self.server.format_address(self.socket_family, + self.client_address)) + self.close() + + # this method is overriding an asyncore.dispatcher method + def handle_expt(self): + self.close_log() + + # this method is overriding an asyncore.dispatcher method + def handle_error(self): + """Log an error in handling any request, and proceed. + + """ + logging.exception("Error while handling asyncore request") + self.close_log() class MasterServer(daemon.AsyncStreamServer): @@ -88,7 +144,7 @@ class MasterServer(daemon.AsyncStreamServer): master socket. """ - def __init__(self, mainloop, address, handler_class): + def __init__(self, mainloop, address): """MasterServer constructor @type mainloop: ganeti.daemon.Mainloop @@ -99,7 +155,6 @@ class MasterServer(daemon.AsyncStreamServer): """ daemon.AsyncStreamServer.__init__(self, socket.AF_UNIX, address, MasterClientHandler) - self.request_handler_class = handler_class self.mainloop = mainloop # We'll only start threads once we've forked. @@ -128,54 +183,6 @@ class MasterServer(daemon.AsyncStreamServer): self.context.jobqueue.Shutdown() -class ClientRqHandler(SocketServer.BaseRequestHandler): - """Client handler""" - READ_SIZE = 4096 - - def setup(self): - # pylint: disable-msg=W0201 - # setup() is the api for initialising for this class - self._buffer = "" - self._msgs = collections.deque() - self._ops = ClientOps(self.server) - - def handle(self): - while True: - msg = self.read_message() - if msg is None: - logging.debug("client closed connection") - break - - (method, args) = luxi.ParseRequest(msg) - - success = False - try: - result = self._ops.handle_request(method, args) - success = True - except errors.GenericError, err: - logging.exception("Unexpected exception") - result = errors.EncodeException(err) - except: - logging.exception("Unexpected exception") - result = "Caught exception: %s" % str(sys.exc_info()[1]) - - self.send_message(luxi.FormatResponse(success, result)) - - def read_message(self): - while not self._msgs: - data = self.request.recv(self.READ_SIZE) - if not data: - return None - new_msgs = (self._buffer + data).split(constants.LUXI_EOM) - self._buffer = new_msgs.pop() - self._msgs.extend(new_msgs) - return self._msgs.popleft() - - def send_message(self, msg): - # TODO: sendall is not guaranteed to send everything - self.request.sendall(msg + constants.LUXI_EOM) - - class ClientOps: """Class holding high-level client operations.""" def __init__(self, server): @@ -513,7 +520,7 @@ def ExecMasterd (options, args): # pylint: disable-msg=W0613 utils.RemoveFile(constants.MASTER_SOCKET) mainloop = daemon.Mainloop() - master = MasterServer(mainloop, constants.MASTER_SOCKET, ClientRqHandler) + master = MasterServer(mainloop, constants.MASTER_SOCKET) try: rpc.Init() try: -- 1.7.1
