With this patch we convert the MasterClientHandler class to async_chat.
ClientRqHandler is completely replaced, and now it's the main thread
that reads asynchronously from all sockets. Then each luxi request is
dispatched as a separate job to a ClientRequestWorker. This makes it so
that just connection a client to the masterd doesn't waste a thread to
serve the connection, and makes DOSing the server by just generating
luxi connections a lot harder.

More work on wait-for-job-changes is needed before the
CLIENT_REQUEST_WORKERS size can be reduced.

Signed-off-by: Guido Trotter <[email protected]>
---
 daemons/ganeti-masterd |  125 +++++++++++++++++++++++++----------------------
 1 files changed, 66 insertions(+), 59 deletions(-)

diff --git a/daemons/ganeti-masterd b/daemons/ganeti-masterd
index 1ca2199..b0337f8 100755
--- a/daemons/ganeti-masterd
+++ b/daemons/ganeti-masterd
@@ -31,10 +31,10 @@ inheritance from parent classes requires it.
 
 import sys
 import socket
-import SocketServer
 import time
 import collections
 import logging
+import asynchat
 
 from optparse import OptionParser
 
@@ -62,23 +62,79 @@ EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
 
 class ClientRequestWorker(workerpool.BaseWorker):
    # pylint: disable-msg=W0221
-  def RunTask(self, server, request, client_address):
+  def RunTask(self, server, message, client):
     """Process the request.
 
     """
+    client_ops = ClientOps(server)
+
     try:
-      server.request_handler_class(request, client_address, server)
-    finally:
-      request.close()
+      (method, args) = luxi.ParseRequest(message)
+    except luxi.ProtocolError, err:
+      logging.error("Protocol Error: %s" % err)
+      client.close_log()
+      return
 
+    success = False
+    try:
+      result = client_ops.handle_request(method, args)
+      success = True
+    except errors.GenericError, err:
+      logging.exception("Unexpected exception")
+      success = False
+      result = errors.EncodeException(err)
+    except:
+      logging.exception("Unexpected exception")
+      err = sys.exc_info()
+      result = "Caught exception: %s" % str(err[1])
 
-class MasterClientHandler(object):
-  # TODO: change this to a real asyncore server
+    # FIXME: send data via push :)
+    client.connected_socket.sendall(luxi.FormatResponse(success, result) + 
constants.LUXI_EOM)
+    #client.push(serializer.DumpJson(response) + constants.LUXI_EOM)
+
+
+class MasterClientHandler(asynchat.async_chat):
   """Handler for master requests.
 
   """
   def __init__(self, server, connected_socket, socket_family, client_address):
-    server.request_workers.AddTask(server, connected_socket, client_address)
+    # python 2.4/2.5 uses conn=... while 2.6 has sock=... we have to cheat by
+    # using a positional argument rather than a keyword one.
+    asynchat.async_chat.__init__(self, connected_socket)
+    self.ibuffer = []
+    self.server = server
+    self.socket_family = socket_family
+    self.client_address = client_address
+    self.set_terminator(constants.LUXI_EOM)
+    self.connected_socket = connected_socket
+
+  # this method is overriding an asynchat.async_chat method
+  def collect_incoming_data(self, data):
+    self.ibuffer.append(data)
+
+  # this method is overriding an asynchat.async_chat method
+  def found_terminator(self):
+    message = "".join(self.ibuffer)
+    self.ibuffer = []
+    self.server.request_workers.AddTask(self.server, message, self)
+
+  def close_log(self):
+    logging.info("closing connection from %s"
+                   % self.server.format_address(self.socket_family,
+                                                self.client_address))
+    self.close()
+
+  # this method is overriding an asyncore.dispatcher method
+  def handle_expt(self):
+    self.close_log()
+
+  # this method is overriding an asyncore.dispatcher method
+  def handle_error(self):
+    """Log an error in handling any request, and proceed.
+
+    """
+    logging.exception("Error while handling asyncore request")
+    self.close_log()
 
 
 class MasterServer(daemon.AsyncStreamServer):
@@ -88,7 +144,7 @@ class MasterServer(daemon.AsyncStreamServer):
   master socket.
 
   """
-  def __init__(self, mainloop, address, handler_class):
+  def __init__(self, mainloop, address):
     """MasterServer constructor
 
     @type mainloop: ganeti.daemon.Mainloop
@@ -99,7 +155,6 @@ class MasterServer(daemon.AsyncStreamServer):
     """
     daemon.AsyncStreamServer.__init__(self, socket.AF_UNIX, address,
                                       MasterClientHandler)
-    self.request_handler_class = handler_class
     self.mainloop = mainloop
 
     # We'll only start threads once we've forked.
@@ -128,54 +183,6 @@ class MasterServer(daemon.AsyncStreamServer):
         self.context.jobqueue.Shutdown()
 
 
-class ClientRqHandler(SocketServer.BaseRequestHandler):
-  """Client handler"""
-  READ_SIZE = 4096
-
-  def setup(self):
-    # pylint: disable-msg=W0201
-    # setup() is the api for initialising for this class
-    self._buffer = ""
-    self._msgs = collections.deque()
-    self._ops = ClientOps(self.server)
-
-  def handle(self):
-    while True:
-      msg = self.read_message()
-      if msg is None:
-        logging.debug("client closed connection")
-        break
-
-      (method, args) = luxi.ParseRequest(msg)
-
-      success = False
-      try:
-        result = self._ops.handle_request(method, args)
-        success = True
-      except errors.GenericError, err:
-        logging.exception("Unexpected exception")
-        result = errors.EncodeException(err)
-      except:
-        logging.exception("Unexpected exception")
-        result = "Caught exception: %s" % str(sys.exc_info()[1])
-
-      self.send_message(luxi.FormatResponse(success, result))
-
-  def read_message(self):
-    while not self._msgs:
-      data = self.request.recv(self.READ_SIZE)
-      if not data:
-        return None
-      new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
-      self._buffer = new_msgs.pop()
-      self._msgs.extend(new_msgs)
-    return self._msgs.popleft()
-
-  def send_message(self, msg):
-    # TODO: sendall is not guaranteed to send everything
-    self.request.sendall(msg + constants.LUXI_EOM)
-
-
 class ClientOps:
   """Class holding high-level client operations."""
   def __init__(self, server):
@@ -513,7 +520,7 @@ def ExecMasterd (options, args): # pylint: disable-msg=W0613
   utils.RemoveFile(constants.MASTER_SOCKET)
 
   mainloop = daemon.Mainloop()
-  master = MasterServer(mainloop, constants.MASTER_SOCKET, ClientRqHandler)
+  master = MasterServer(mainloop, constants.MASTER_SOCKET)
   try:
     rpc.Init()
     try:
-- 
1.7.1

Reply via email to