Saggi Mizrahi has uploaded a new change for review. Change subject: jsonrpc: Make reactor clients 1st class objects ......................................................................
jsonrpc: Make reactor clients 1st class objects Also add support for batch requests, tests to come soon Change-Id: I64e4dfeb4eece4a85a3c6d36324b1e00f366a1cd Signed-off-by: Saggi Mizrahi <[email protected]> --- M tests/jsonRpcTests.py M vdsm_api/BindingJsonRpc.py M vdsm_api/jsonrpc/__init__.py M vdsm_api/jsonrpc/protonReactor.py M vdsm_api/jsonrpc/tcpReactor.py 5 files changed, 360 insertions(+), 235 deletions(-) git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/02/11402/1 diff --git a/tests/jsonRpcTests.py b/tests/jsonRpcTests.py index 180914d..57b18fa 100644 --- a/tests/jsonRpcTests.py +++ b/tests/jsonRpcTests.py @@ -25,6 +25,7 @@ import json import uuid import logging +from Queue import Queue from nose.plugins.skip import SkipTest @@ -73,10 +74,10 @@ @contextmanager -def _tcpServerConstructor(messageHandler): +def _tcpServerConstructor(): port = _getFreePort() address = ("localhost", port) - reactor = tcpReactor.TCPReactor(messageHandler) + reactor = tcpReactor.TCPReactor() try: yield reactor, partial(TCPReactorClient, address), address @@ -85,13 +86,13 @@ @contextmanager -def _protonServerConstructor(messageHandler): +def _protonServerConstructor(): if protonReactor is None: raise SkipTest("qpid-proton python bindings are not installed") port = _getFreePort() serverAddress = "amqp://127.0.0.1:%d/vdsm_test" % (port,) - reactor = protonReactor.ProtonReactor(messageHandler) + reactor = protonReactor.ProtonReactor() try: yield (reactor, @@ -107,19 +108,24 @@ @contextmanager -def constructReactor(tp, messageHandler): - with REACTOR_CONSTRUCTORS[tp](messageHandler) as res: +def constructReactor(tp): + with REACTOR_CONSTRUCTORS[tp]() as res: yield res @contextmanager def constructServer(tp, bridge): - server = JsonRpcServer(bridge) - with constructReactor(tp, server) as (reactor, clientFactory, laddr): + queue = Queue() + server = JsonRpcServer(bridge, queue) + with constructReactor(tp) as (reactor, clientFactory, laddr): t = threading.Thread(target=reactor.process_requests) t.setDaemon(True) t.start() - reactor.createListener(laddr) + + def _accept(listener, client): + client.setInbox(queue) + + reactor.createListener(laddr, _accept) t = threading.Thread(target=server.serve_requests) t.setDaemon(True) @@ -132,11 +138,6 @@ yield server, jsonClientFactory finally: server.stop() - - -class _EchoMessageHandler(object): - def handleMessage(self, msgCtx): - msgCtx.sendReply(msgCtx.data) class TCPReactorClient(object): @@ -255,12 +256,36 @@ self._msngr.stop() +class _EchoServer(object): + log = logging.getLogger("EchoServer") + + def __init__(self): + self._queue = Queue() + + def accept(self, l, c): + c.setInbox(self._queue) + + def serve(self): + while True: + try: + client, msg = self._queue.get() + if client is None: + return + + self.log.info("Echoing message") + client.send(msg) + except Exception: + self.log.error("EchoServer died unexpectedly", exc_info=True) + + @expandPermutations class ReactorTests(TestCaseBase): @permutations(REACTOR_TYPE_PERMUTATIONS) def test(self, reactorType): data = dummyTextGenerator(((2 ** 10) * 200)) - msgHandler = _EchoMessageHandler() + queue = Queue() + + echosrv = _EchoServer() def serve(reactor): try: @@ -268,16 +293,20 @@ except socket.error as e: pass except Exception as e: - self.log.error("Server died unexpectedly", exc_info=True) - self.fail("Server died: (%s) %s" % (type(e), e)) + self.log.error("Reactor died unexpectedly", exc_info=True) + self.fail("Reactor died: (%s) %s" % (type(e), e)) - with constructReactor(reactorType, msgHandler) as (reactor, - clientFactory, - laddr): + with constructReactor(reactorType) as \ + (reactor, clientFactory, laddr): t = threading.Thread(target=serve, args=(reactor,)) t.setDaemon(True) t.start() - reactor.createListener(laddr) + + t = threading.Thread(target=echosrv.serve) + t.setDaemon(True) + t.start() + + reactor.createListener(laddr, echosrv.accept) clientNum = 1 repeats = 1 @@ -306,6 +335,8 @@ for client in clients: client.close() + queue.put((None, None)) + class _DummyBridge(object): def echo(self, text): diff --git a/vdsm_api/BindingJsonRpc.py b/vdsm_api/BindingJsonRpc.py index a4c6d81..21f250c 100644 --- a/vdsm_api/BindingJsonRpc.py +++ b/vdsm_api/BindingJsonRpc.py @@ -16,6 +16,7 @@ import threading import logging import struct +from Queue import Queue _Size = struct.Struct("!Q") @@ -40,7 +41,9 @@ def __init__(self, bridge, backendConfig): reactors = {} self.bridge = bridge - self.server = JsonRpcServer(bridge, _simpleThreadFactory) + self._messageQueue = Queue() + self.server = JsonRpcServer(bridge, self._messageQueue, + _simpleThreadFactory) self._cfg = backendConfig for backendType, cfg in backendConfig: @@ -62,7 +65,11 @@ except KeyError: raise ValueError("cfg") - return self._reactors["tcp"].createListener((address, port)) + return self._reactors["tcp"].createListener((address, port), + self._onAccept) + + def _onAccept(self, listener, client): + client.setInbox(self._messageQueue) def _createProtonListener(self, cfg): address = cfg.get("host", "0.0.0.0") @@ -70,10 +77,10 @@ return self._reactors["amqp"].createListener((address, port)) def _createTcpReactor(self): - return TCPReactor(self.server) + return TCPReactor() def _createProtonReactor(self): - return ProtonReactor(self.server) + return ProtonReactor() def start(self): t = threading.Thread(target=self.server.serve_requests, diff --git a/vdsm_api/jsonrpc/__init__.py b/vdsm_api/jsonrpc/__init__.py index 49e8788..c1ebf2d 100644 --- a/vdsm_api/jsonrpc/__init__.py +++ b/vdsm_api/jsonrpc/__init__.py @@ -14,7 +14,6 @@ # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA import json import logging -from Queue import Queue from functools import partial __all__ = ["tcpReactor"] @@ -70,21 +69,17 @@ self.params = params self.id = reqId - def encode(self): - res = {'jsonrpc': '2.0', - 'method': self.method, - 'params': self.params, - 'id': self.id} - - return json.dumps(res, 'utf-8') - - @staticmethod - def decode(msg): + @classmethod + def decode(cls, msg): try: obj = json.loads(msg, 'utf-8') except: raise JsonRpcParseError() + return cls.fromRawObject(obj) + + @staticmethod + def fromRawObject(obj): if obj.get("jsonrpc") != "2.0": raise JsonRpcInvalidRequestError() @@ -101,6 +96,14 @@ raise JsonRpcInvalidRequestError() return JsonRpcRequest(method, params, reqId) + + def encode(self): + res = {'jsonrpc': '2.0', + 'method': self.method, + 'params': self.params, + 'id': self.id} + + return json.dumps(res, 'utf-8') def isNotification(self): return (self.id is None) @@ -138,37 +141,53 @@ return JsonRpcResponse(result, error, reqId) -class _JsonRpcRequestContext(object): - def __init__(self, ctx, queue, request): - self.request = request - self._ctx = ctx - self._queue = queue +class _JsonRpcServeRequestContext(object): + def __init__(self, client): + self._requests = [] + self._client = client + self._counter = 0 + self._requests = {} + self._responses = [] - def isNotification(self): - return self.id is None + def setRequests(self, requests): + for request in requests: + if not request.isNotification(): + self._counter += 1 + self._requests[request.id] = request - def sendReply(self, result, error): - # TBD: Should calling this for a notification raise an error or be - # ignored - resp = JsonRpcResponse(result, - error, - self.request.id) - self._queue.put_nowait(_JsonRpcResponseContext(self._ctx, resp)) + self.sendReply() + @property + def counter(self): + return self._counter -class _JsonRpcResponseContext(object): - def __init__(self, ctx, response): - self.response = response - self.ctx = ctx + def sendReply(self): + if len(self._requests) > 0: + return + encodedObjects = [] + for response in self._responses: + try: + encodedObjects.append(response.encode()) + except: # Error encoding data + response = JsonRpcResponse(None, JsonRpcInternalError, + response.id) + encodedObjects.append(response.encode()) -class JsonRpcBatchRequest(object): - def __init__(self, requests): - self._requests = requests + if len(encodedObjects) == 1: + data = encodedObjects[0] + else: + data = '[' + ','.join(encodedObjects) + ']' - def encode(self): - obj = [r.toDict() for r in self._requests] - return json.dumps(obj, 'utf-8') + self._client.send(data.encode('utf-8')) + + def addResponse(self, response): + self._responses.append(response) + + def requestDone(self, response): + del self._requests[response.id] + self.addResponse(response) + self.sendReply() class JsonRpcCall(object): @@ -202,45 +221,40 @@ class JsonRpcServer(object): log = logging.getLogger("jsonrpc.JsonRpcServer") - def __init__(self, bridge, threadFactory=None): + def __init__(self, bridge, messageQueue, threadFactory=None): self._bridge = bridge - self._workQueue = Queue() + self._workQueue = messageQueue self._threadFactory = threadFactory - def _serveRequest(self, ctx): - req = ctx.request + def _serveRequest(self, ctx, req): mangledMethod = req.method.replace(".", "_") self.log.debug("Looking for method '%s' in bridge", mangledMethod) try: method = getattr(self._bridge, mangledMethod) except AttributeError: - ctx.sendReply(None, JsonRpcMethodNotFoundError()) - else: - try: - params = req.params - if isinstance(req.params, list): - res = method(*params) - else: - res = method(**params) - except JsonRpcError as e: - ctx.sendReply(None, e) - except Exception as e: - ctx.sendReply(None, JsonRpcInternalError(str(e))) - else: - return ctx.sendReply(res, None) + if req.isNotification(): + return - def _processResponse(self, ctx): + ctx.requestDone(JsonRpcResponse(None, + JsonRpcMethodNotFoundError(), + req.id)) + return + try: - msg = ctx.response.encode() + params = req.params + if isinstance(req.params, list): + res = method(*params) + else: + res = method(**params) + except JsonRpcError as e: + ctx.requestDone(JsonRpcResponse(None, e, req.id)) except Exception as e: - # Probably result failed to be serialized as json - errResp = JsonRpcResponse(error=JsonRpcInternalError(str(e)), - reqId=ctx.response.id) - - msg = errResp.encode() - - ctx.ctx.sendReply(msg) + ctx.requestDone(JsonRpcResponse(None, + JsonRpcInternalError(str(e)), + req.id)) + else: + ctx.requestDone(JsonRpcResponse(res, None, req.id)) def serve_requests(self): while True: @@ -248,45 +262,58 @@ if obj is None: break - if isinstance(obj, _JsonRpcRequestContext): - if self._threadFactory is None: - self._serveRequest(obj) - else: - self._threadFactory(partial(self._serveRequest, obj)) - else: - self._processResponse(obj) + client, msg = obj + self._parseMessage(client, msg) - def handleMessage(self, msgCtx): - #TODO: support batch requests - req = None - error = None + def _parseMessage(self, client, msg): + ctx = _JsonRpcServeRequestContext(client) + try: - req = JsonRpcRequest.decode(msgCtx.data) - ctx = _JsonRpcRequestContext(msgCtx, self._workQueue, req) - - self._workQueue.put_nowait(ctx) - return - - except JsonRpcError as e: - self.log.error("Error processing request", exc_info=True) - error = e + rawRequests = json.loads(msg) except: - self.log.error("Unexpected error while processing request", - exc_info=True) - - error = JsonRpcInternalError() - - # Notification, don't respond even on errors - if req is not None and req.isNotification(): + ctx.addResponse(JsonRpcResponse(None, JsonRpcParseError(), None)) + ctx.sendReply() return - if req is None: - resp = JsonRpcResponse(None, error, None) + if isinstance(rawRequests, list): + # Empty batch request + if len(rawRequests) == 0: + ctx.addResponse( + JsonRpcResponse(None, JsonRpcInvalidRequestError(), + None)) + ctx.sendReply() + return else: - resp = JsonRpcResponse(None, error, req.id) + # From this point on we know it's always a list + rawRequests = [rawRequests] - ctx = _JsonRpcResponseContext(msgCtx, resp) - self._workQueue.put_nowait(ctx) + # JSON Parsed handling each request + requests = [] + for rawRequest in rawRequests: + try: + req = JsonRpcRequest.fromRawObject(rawRequest) + requests.append(req) + except JsonRpcError as err: + ctx.addResponse(JsonRpcResponse(None, err, None)) + except: + ctx.addResponse(JsonRpcResponse(None, + JsonRpcInternalError(), + None)) + + ctx.setRequests(requests) + + # No request was built successfully or is only notifications + if ctx.counter == 0: + ctx.sendReply() + + for request in requests: + self._runRequest(ctx, request) + + def _runRequest(self, ctx, request): + if self._threadFactory is None: + self._serveRequest(ctx, request) + else: + self._threadFactory(partial(self._serveRequest, ctx, request)) def stop(self): self._workQueue.put_nowait(None) diff --git a/vdsm_api/jsonrpc/protonReactor.py b/vdsm_api/jsonrpc/protonReactor.py index da903e5..307e526 100644 --- a/vdsm_api/jsonrpc/protonReactor.py +++ b/vdsm_api/jsonrpc/protonReactor.py @@ -27,45 +27,62 @@ AUTHENTICATING = 2 -class ProtonContext(object): - log = logging.getLogger("jsonrpc.ProtonContext") - - def __init__(self, reactor, messageQueue, connector, msg): - self._reactor = reactor - self._msg = msg - self._mq = messageQueue - self._connector = connector - - @property - def data(self): - return self._msg.body - - def sendReply(self, data): - msg = proton.Message() - msg.address = self._msg.reply_to - msg.body = data - self._mq.put_nowait(msg) - self.log.debug("Message Queued") - self._reactor._activate(self._connector, proton.PN_CONNECTOR_WRITABLE) - self._reactor._wakeup() - - class ProtonListener(object): - def __init__(self, address, reactor): + def __init__(self, listener, address, reactor, acceptHandler): + self._listener = listener self._reactor = reactor self._address = address + self._acceptHandler = acceptHandler + + def fileno(self): + return proton.pn_listener_fd(self._listener) def close(self): - self._reactor._scheduleOp(False, self._reactor._stop_listening, - self._address) + self._reactor._scheduleOp(False, proton.pn_listener_close, + self._listener) + + +class ProtonClient(object): + def __init__(self, reactor, connection, connector, session): + self.connector = connector + self.connection = connection + self.session = session + self.sender = None + self.links = [] + self._inbox = None + self._outbox = Queue() + self._reactor = reactor + + def _pushIncomingMessage(self, msg): + try: + self._inbox.put_nowait((self, msg)) + except AttributeError: + # Inbox not set + pass + + def _popPendingMessage(self): + return self._outbox.get_nowait() + + def fileno(self): + return proton.pn_connctor_fd(self._connector) + + def setInbox(self, queue): + self._inbox = queue + + def send(self, msg): + self._outbox.put_nowait(msg) + self._reactor._activate(self.connector, + proton.PN_CONNECTOR_WRITABLE) + + def close(self): + #TODO + pass class ProtonReactor(object): log = logging.getLogger("jsonrpc.ProtonReactor") - def __init__(self, messageHandler, deliveryTimeout=5): - self._messageHandler = messageHandler - + def __init__(self, deliveryTimeout=5): self._isRunning = False self._driver = proton.pn_driver() @@ -162,15 +179,16 @@ self.log.debug("Connection Opened.") proton.pn_connection_open(conn) - def _openPendingSessions(self, conn): + def _openPendingSessions(self, conn, connector): ssn = proton.pn_session_head(conn, proton.PN_LOCAL_UNINIT) while ssn: proton.pn_session_open(ssn) - ctx = {'connection': conn, - 'session': ssn, - 'mqueue': Queue(), - 'sender': None, - 'links': []} + ctx = ProtonClient(self, conn, connector, ssn) + l = proton.pn_connector_listener(connector) + listener = proton.pn_listener_context(l) + listener._acceptHandler(listener, ctx) + self.log.debug("Found related listener") + self._sessionContexts.append(ctx) proton.pn_session_set_context(ssn, ctx) self.log.debug("Session Opened.") @@ -249,11 +267,11 @@ self.log.debug("Closing Link") proton.pn_link_close(link) for ctx in self._sessionContexts: - if link in ctx['links']: - ctx['links'].remove(link) + if link in ctx.links: + ctx.links.remove(link) - if link == ctx['sender']: - ctx['sender'] = None + if link == ctx.sender: + ctx.sender = None link = proton.pn_link_next(link, (proton.PN_LOCAL_ACTIVE | proton.PN_REMOTE_CLOSED)) @@ -275,25 +293,26 @@ def _queueOutgoingDeliveries(self, conn): ctxs = (ctx for ctx in self._sessionContexts - if ctx['connection'] == conn) + if ctx.connection == conn) for ctx in ctxs: - sender = ctx['sender'] + sender = ctx.sender if sender is None: # No sender link - sender = proton.pn_sender(ctx['session'], + sender = proton.pn_sender(ctx.session, "sender-%s" % str(uuid.uuid4())) - ctx['sender'] = sender + ctx.sender = sender proton.pn_link_open(sender) continue - mq = ctx['mqueue'] try: - msg = mq.get_nowait() + data = ctx._popPendingMessage() except Empty: continue else: + msg = proton.Message() + msg.body = data self.log.debug("Creating delivery") proton.pn_link_set_context(sender, msg.encode()) if proton.pn_link_credit(sender) == 0: @@ -308,7 +327,7 @@ conn = proton.pn_connector_connection(connector) self._initConnection(conn) - self._openPendingSessions(conn) + self._openPendingSessions(conn, connector) self._openLinks(conn) self._queueOutgoingDeliveries(conn) self._processDeliveries(conn, connector) @@ -336,9 +355,7 @@ msgObj = proton.Message() msgObj.decode(msg) ctx = proton.pn_session_get_context(ssn) - mq = ctx['mqueue'] - self._messageHandler.handleMessage(ProtonContext(self, mq, connector, - msgObj)) + ctx._pushIncomingMessage(msgObj.body) proton.pn_delivery_settle(delivery) proton.pn_link_advance(link) @@ -364,24 +381,20 @@ proton.pn_delivery_set_context(delivery, time.time()) proton.pn_link_advance(link) - def createListener(self, address): + def createListener(self, address, acceptHandler): host, port = address - l = self._scheduleOp(True, proton.pn_listener, self._driver, - host, str(port), None) + return self._scheduleOp(True, self._createListener, address, + acceptHandler) + + def _createListener(self, address, acceptHandler): + host, port = address + l = proton.pn_listener(self._driver, host, str(port), None) if l is None: raise RuntimeError("Could not listen on %s:%s" % (host, port)) - self._listeners[address] = l - return ProtonListener(address, self) - - def _stop_listening(self, address): - try: - l = self._listeners[address] - except KeyError: - return - - proton.pn_listener_close(l) - del self._listeners[address] + lObj = ProtonListener(l, address, self, acceptHandler) + proton.pn_listener_set_context(l, lObj) + return lObj def _emptyCommandQueue(self): while True: @@ -417,8 +430,10 @@ self._acceptConnectionRequests() self._processConnectors() - for listener in self._listeners.keys(): - self._stop_listening(listener) + l = proton.pn_listener_head(self._driver) + while l: + proton.pn_listener_close(l) + l = proton.pn_listener_next(l) def _wakeup(self): proton.pn_driver_wakeup(self._driver) diff --git a/vdsm_api/jsonrpc/tcpReactor.py b/vdsm_api/jsonrpc/tcpReactor.py index bda9057..8c9cc7d 100644 --- a/vdsm_api/jsonrpc/tcpReactor.py +++ b/vdsm_api/jsonrpc/tcpReactor.py @@ -81,26 +81,68 @@ return res -class TCPMessageContext(object): - def __init__(self, server, conn, data): - self._server = server - self._conn = conn - self._data = data +class TCPClient(object): + def __init__(self, reactor, sock): + self._sock = sock + self._conn = TCPConnection(sock) + self._inbox = None + self._reactor = reactor - @property - def data(self): - return self._data + def _pushRecievedMessage(self, msg): + try: + self._inbox.put_nowait((self, msg)) + except AttributeError: + # Inbox not set + pass - def sendReply(self, data): - self._server.sendReply(self, data) + def fileno(self): + return self._sock.fileno() + + def setInbox(self, queue): + self._inbox = queue + + def send(self, message): + self._conn.addSendData(_Size.pack(len(message)) + message) + self._reactor.wakeup() + + def _processInput(self): + msg = self._conn.processInput() + while msg is not None: + self._pushRecievedMessage(msg) + msg = self._conn.processInput() + + def _processOutput(self): + self._conn.processOutput() + + def _hasSendData(self): + return self._conn.hasSendData() + + def close(self): + self._sock.close() class TCPListener(object): - def __init__(self, address): + log = logging.getLogger("jsonrpc.TCPListener") + + def __init__(self, reactor, address, acceptHandler): self._address = address self.sock = sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.bind(address) sock.listen(10) + self._acceptHandler = acceptHandler + self._reactor = reactor + + def _accept(self): + sock, addr = self.sock.accept() + + client = TCPClient(self._reactor, sock) + try: + self._acceptHandler(self, client) + except: + self.log.warning("Accept handler threw an unexpected exception", + exc_info=True) + + return client def fileno(self): return self.sock.fileno() @@ -112,83 +154,86 @@ class TCPReactor(object): log = logging.getLogger("jsonrpc.TCPReactor") - def __init__(self, messageHandler): - self._messageHandler = messageHandler + def __init__(self): self._inputEvent = utils.PollEvent() # TODO: Close on exec - self._listeners = {} + self._trackedObjects = set() + self._isRunning = False - def createListener(self, address): - l = TCPListener(address) + def createListener(self, address, acceptHandler): + l = TCPListener(self, address, acceptHandler) - self._listeners[address] = l + self._trackedObjects.add(l) self._inputEvent.set() return l + def wakeup(self): + self._inputEvent.set() + def process_requests(self): poller = poll() - connections = {} self.log.debug("Starting to accept clients") - listenerFDs = {} + objMap = {} poller.register(self._inputEvent, POLLIN | POLLPRI) # TODO: Exist condition while True: - for l in self._listeners.values(): + for obj in self._trackedObjects: try: - fd = l.fileno() + fd = obj.fileno() except: continue - if fd not in listenerFDs: - listenerFDs[fd] = l + if fd not in objMap: + objMap[fd] = obj poller.register(fd, POLLIN | POLLPRI) - for fd, conn in connections.iteritems(): - if conn.hasSendData(): + for fd, obj in objMap.iteritems(): + if not isinstance(obj, TCPClient): + continue + + if obj._hasSendData(): poller.modify(fd, (POLLIN | POLLPRI | POLLOUT)) else: poller.modify(fd, (POLLIN | POLLPRI)) for fd, ev in poller.poll(): + if fd == self._inputEvent.fileno(): + self._inputEvent.clear() + continue + + obj = objMap[fd] if ev & (POLLERR | POLLHUP): - if fd in listenerFDs: + if isinstance(obj, TCPListener): self.log.info("Listening socket closed") - del listenerFDs[fd] else: self.log.debug("Connection closed") - del connections[fd] + self._trackedObjects.discard(obj) + del objMap[fd] poller.unregister(fd) - elif fd == self._inputEvent.fileno(): - self._inputEvent.clear() - - elif fd in listenerFDs: + elif isinstance(obj, TCPListener): try: - conn, addr = listenerFDs[fd].sock.accept() + client = obj._accept() except (OSError, IOError): continue self.log.debug("Processing new connection") - connections[conn.fileno()] = TCPConnection(conn) - poller.register(conn, (POLLIN | POLLPRI)) - else: - conn = connections[fd] - if ev & (POLLIN | POLLPRI): - msg = conn.processInput() - while msg is not None: - ctx = TCPMessageContext(self, conn, msg) - self._messageHandler.handleMessage(ctx) - msg = conn.processInput() - if ev & POLLOUT: - conn.processOutput() + self._trackedObjects.add(client) + else: # TCPClient + try: + if ev & (POLLIN | POLLPRI): + obj._processInput() - def sendReply(self, ctx, message): - conn = ctx._conn - conn.addSendData(_Size.pack(len(message)) + message) - self._inputEvent.set() + if ev & POLLOUT: + obj._processOutput() + except: + poller.unregister(fd) + obj.close() + self._trackedObjects.discard(obj) + del objMap[fd] def stop(self): - for listener in self._listeners.itervalues(): - listener.close() + for obj in self._trackedObjects: + obj.close() -- To view, visit http://gerrit.ovirt.org/11402 To unsubscribe, visit http://gerrit.ovirt.org/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I64e4dfeb4eece4a85a3c6d36324b1e00f366a1cd Gerrit-PatchSet: 1 Gerrit-Project: vdsm Gerrit-Branch: master Gerrit-Owner: Saggi Mizrahi <[email protected]> _______________________________________________ vdsm-patches mailing list [email protected] https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches
