This is a new asyncore server which handles listening stream sockets by instantiating an instance of the specified asyncore class for every accepted request.
Signed-off-by: Guido Trotter <[email protected]> --- lib/daemon.py | 94 +++++++++++++++++++++++++++++++++++++++++++++++++++----- 1 files changed, 85 insertions(+), 9 deletions(-) diff --git a/lib/daemon.py b/lib/daemon.py index 2fdaa76..818fadd 100644 --- a/lib/daemon.py +++ b/lib/daemon.py @@ -72,7 +72,90 @@ class AsyncoreScheduler(sched.scheduler): sched.scheduler.__init__(self, timefunc, AsyncoreDelayFunction) -class AsyncUDPSocket(asyncore.dispatcher): +class GanetiBaseAsyncoreDispatcher(asyncore.dispatcher): + """Base Ganeti Asyncore Dispacher + + """ + # this method is overriding an asyncore.dispatcher method + def handle_error(self): + """Log an error in handling any request, and proceed. + + """ + logging.exception("Error while handling asyncore request") + + # this method is overriding an asyncore.dispatcher method + def writable(self): + """Most of the time we don't want to check for writability. + + """ + return False + + +class AsyncStreamServer(GanetiBaseAsyncoreDispatcher): + """A stream server to use with asyncore. + + Each request is accepted, and then dispatched to a separate asyncore + dispatcher to handle. + + """ + + _REQUEST_QUEUE_SIZE = 5 + + def __init__(self, family, address, handler_class): + """Constructor for AsyncUnixStreamSocket + + @type family: integer + @param family: socket family (one of socket.AF_*) + @type handler_class: class + @param handler_class: class to use to handle connections + @type address: address family dependent + @param address: address to bind the socket to + + """ + GanetiBaseAsyncoreDispatcher.__init__(self) + self.family = family + self.handler_class = handler_class + self.create_socket(self.family, socket.SOCK_STREAM) + self.set_reuse_addr() + self.bind(address) + self.listen(self._REQUEST_QUEUE_SIZE) + + # this method is overriding an asyncore.dispatcher method + def handle_accept(self): + """Accept a new client connection. + + Creates a new instance of the handler class, which will use asyncore to + serve the client. + + """ + connected_socket, client_address = utils.IgnoreSignals(self.accept) + if self.family == socket.AF_UNIX: + # override the client address, as for unix sockets nothing meaningful is + # passed in from accept anyway + client_address = utils.GetSocketCredentials(connected_socket) + logging.info("Accepted connection from %s", + self.format_address(self.family, client_address)) + self.handler_class(self, connected_socket, self.family, client_address) + + @staticmethod + def format_address(family, address): + """Format a client's address + + @type family: integer + @param family: socket family (one of socket.AF_*) + @type address: family specific (usually tuple) + @param address: address, as reported by this class + + """ + if family == socket.AF_INET: + return "%s:%d" % address + elif family == socket.AF_UNIX: + return "pid=%s, uid=%s, gid=%s" % address + else: + return str(address) + + +class AsyncUDPSocket(GanetiBaseAsyncoreDispatcher): """An improved asyncore udp socket. """ @@ -80,7 +163,7 @@ class AsyncUDPSocket(asyncore.dispatcher): """Constructor for AsyncUDPSocket """ - asyncore.dispatcher.__init__(self) + GanetiBaseAsyncoreDispatcher.__init__(self) self._out_queue = [] self.create_socket(socket.AF_INET, socket.SOCK_DGRAM) @@ -119,13 +202,6 @@ class AsyncUDPSocket(asyncore.dispatcher): utils.IgnoreSignals(self.sendto, payload, 0, (ip, port)) self._out_queue.pop(0) - # this method is overriding an asyncore.dispatcher method - def handle_error(self): - """Log an error in handling any request, and proceed. - - """ - logging.exception("Error while handling asyncore request") - def enqueue_send(self, ip, port, payload): """Enqueue a datagram to be sent when possible -- 1.7.1
