Saggi Mizrahi has uploaded a new change for review. Change subject: [WIP] EVENTS ......................................................................
[WIP] EVENTS Change-Id: Id27b5ca1773139932eb5cb16921d5abec4991c5e Signed-off-by: Saggi Mizrahi <[email protected]> --- M lib/stompClient.py M lib/vdsm/config.py.in M lib/vdsm/sslutils.py M lib/yajsonrpc/betterAsyncore.py M lib/yajsonrpc/stomp.py M lib/yajsonrpc/stompReactor.py M tests/jsonRpcHelper.py M tests/jsonRpcTests.py M tests/miscTests.py M tests/protocoldetectorTests.py M vdsm.spec.in M vdsm/API.py M vdsm/clientIF.py M vdsm/protocoldetector.py M vdsm/rpc/BindingJsonRpc.py 15 files changed, 336 insertions(+), 141 deletions(-) git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/69/38069/1 diff --git a/lib/stompClient.py b/lib/stompClient.py index d3cd0ab..5a6d7f5 100644 --- a/lib/stompClient.py +++ b/lib/stompClient.py @@ -1,6 +1,12 @@ #!/usr/bin/python import yajsonrpc as yjrpc -import yajsonrpc.stompReactor as sr +from yajsonrpc.stompReactor import ( + _DEFAULT_REQUEST_DESTINATION, + _DEFAULT_RESPONSE_DESTINATION, + ClientRpcTransportAdapter, + ServerRpcContextAdapter, + StompReactor, +) import socket from threading import Thread, Lock import time @@ -14,7 +20,7 @@ if _reactor is None: with _reactorLock: if _reactor is None: - _reactor = sr.StompReactor() + _reactor = StompReactor() t = Thread(target=_reactor.process_requests) t.setDaemon(True) t.start() @@ -42,95 +48,38 @@ t.setDaemon(True) t.start() sub = stomp_client.subscribe( - sr._DEFAULT_REQUEST_DESTINATION, + _DEFAULT_REQUEST_DESTINATION, message_handler=ServerRpcContextAdapter.subscription_handler(server) ) server._sub_ = sub return server -class ServerRpcContextAdapter(object): - @classmethod - def subscription_handler(cls, server): - def handler(sub, frame): - server.queueRequest( - ( - ServerRpcContextAdapter(sub.client, frame), - frame.body - ) - ) - - return handler - - def __init__(self, client, request_frame): - self._client = client - self._reply_to = request_frame.headers.get('reply-to', None) - - def get_local_address(self, *args, **kwargs): - return "" - - def send(self, data): - if self._reply_to is None: - return - - self._client.send( - self._reply_to, - data, - { - "content-type": "application/json", - } - ) - - -class ClientRpcTransportAdapter(object): - def __init__(self, sub, destination, client): - self._sub = sub - sub.set_message_handler(self._handle_message) - self._destination = destination - self._client = client - self._message_handler = lambda arg: None - - def setMessageHandler(self, handler): - self._message_handler = handler - - def send(self, data): - headers = { - "content-type": "application/json", - "reply-to": self._sub.destination, - } - self._client.send( - data, - self._destination, - headers, - ) - - def _handle_message(self, sub, frame): - self._message_handler((self, frame.body)) - - def close(self): - self._sub.unsubscribe() - - def connect(address): sock = socket.create_connection(address) reactor = get_reactor() stomp_client = reactor.createClient(sock) - subscription = stomp_client.subscribe(sr._DEFAULT_RESPONSE_DESTINATIOM) + subscription = stomp_client.subscribe( + _DEFAULT_RESPONSE_DESTINATION, + sub_id="__vdsm_fake_broker__", + ) client = yjrpc.JsonRpcClient( ClientRpcTransportAdapter( subscription, - sr._DEFAULT_REQUEST_DESTINATION, + _DEFAULT_REQUEST_DESTINATION, stomp_client, ) ) return client -BROKER_ADDRESS = ("127.0.0.1", 5445) +BROKER_ADDRESS = ("127.0.0.1", 54321) -server = dummy_server(BROKER_ADDRESS) +# server = dummy_server(BROKER_ADDRESS) time.sleep(2) client = connect(BROKER_ADDRESS) -client.callMethod("echo", ["123"], 1) +client.callMethod("Host.ping", [], 1) +client.callMethod("Host.ping", [], 1) +client.callMethod("Host.ping", [], 1) diff --git a/lib/vdsm/config.py.in b/lib/vdsm/config.py.in index 94c6782..fb0150b 100644 --- a/lib/vdsm/config.py.in +++ b/lib/vdsm/config.py.in @@ -325,6 +325,20 @@ ('guests_gateway_ip', '', None), + ('broker_address', '127.0.0.1', + 'Address where the broker is listening at. Use an empty string ' + 'for none'), + + ('broker_port', '5445', + 'Port where the broker is listening at.'), + + ('request_queues', + 'jms.topic.vdsm_requests, jms.topic.vdsm_irs_requests', + 'Queues to subscribe to for RPC requests'), + + ('topic_prefix', 'jms.topic.', + 'Prefix to use for events'), + ]), # Section: [devel] diff --git a/lib/vdsm/sslutils.py b/lib/vdsm/sslutils.py index 74555ee..6568842 100644 --- a/lib/vdsm/sslutils.py +++ b/lib/vdsm/sslutils.py @@ -35,7 +35,7 @@ class SSLSocket(object): def __init__(self, connection): self.connection = connection - self._data = None + self._data = "" def gettimeout(self): return self.connection.socket.gettimeout() @@ -57,12 +57,14 @@ def read(self, size=4096, flag=None): result = None if flag == socket.MSG_PEEK: - self._data = self.connection.read(size) + bytes_left = size - len(self._data) + if bytes_left > 0: + self._data += self.connection.read(bytes_left) result = self._data else: if self._data: result = self._data - self._data = None + self._data = "" else: result = self.connection.read(size) return result diff --git a/lib/yajsonrpc/betterAsyncore.py b/lib/yajsonrpc/betterAsyncore.py index af88877..8656df8 100644 --- a/lib/yajsonrpc/betterAsyncore.py +++ b/lib/yajsonrpc/betterAsyncore.py @@ -18,14 +18,17 @@ # while enabling compositing instead of inheritance. import asyncore import socket -import types from errno import EWOULDBLOCK +import types from vdsm.infra.eventfd import EventFD class Dispatcher(asyncore.dispatcher): def __init__(self, impl=None, sock=None, map=None): + # This has to be done before the super initialization because + # dispatcher implements __getattr__. + self.__impl = None asyncore.dispatcher.__init__(self, sock=sock, map=map) if impl is not None: self.switch_implementation(impl) @@ -82,8 +85,9 @@ Note that this value is a reccomendation only. """ + impl = self.__impl return getattr( - self.__impl, + impl, "next_check_interval", lambda d: None )(self) @@ -100,7 +104,7 @@ self.handle_read() if hasattr(self.socket, "pending"): - while self.socket.pending() > 0: + while self.socket.pending() > 0 and self.connected: self.handle_read() def recv(self, buffer_size): @@ -115,7 +119,9 @@ return data except socket.error, why: # winsock sometimes raises ENOTCONN - if why.args[0] in asyncore._DISCONNECTED: + if why.args[0] == EWOULDBLOCK: + return None + elif why.args[0] in asyncore._DISCONNECTED: self.handle_close() return '' else: diff --git a/lib/yajsonrpc/stomp.py b/lib/yajsonrpc/stomp.py index a3d63f3..cf24a7f 100644 --- a/lib/yajsonrpc/stomp.py +++ b/lib/yajsonrpc/stomp.py @@ -16,11 +16,12 @@ import logging import os import socket -from threading import Timer from uuid import uuid4 from collections import deque import re + +FAKE_SUB_ID = "__vdsm_fake_broker__" _RE_ESCAPE_SEQUENCE = re.compile(r"\\(.)") @@ -75,7 +76,10 @@ class StompError(RuntimeError): def __init__(self, frame): - RuntimeError.__init__(self, frame.body) + self.frame = frame + super(RuntimeError, self).__init__( + self.body, + ) class _HeartBeatFrame(object): @@ -441,10 +445,9 @@ def _process_message(self, frame, dispatcher): sub_id = frame.headers.get(Headers.SUBSCRIPTION) if sub_id is None: - self.log.warning( - "Got message without a subscription" - ) - return + # This is actually an error, but because of the fake broker we + # injects a fake sub id + sub_id = "__vdsm_fake_broker__" sub = self._subscriptions.get(sub_id) if sub is None: @@ -464,7 +467,8 @@ def send(self, destination, data="", headers=None): final_headers = {"destination": destination} - final_headers.update(headers) + if headers is not None: + final_headers.update(headers) self.queue_frame(Frame( Command.SEND, final_headers, @@ -552,8 +556,3 @@ {"id": str(subid)}) client.put(frame) self._valid = False - - def __del__(self): - # Using a timer because unsubscribe action might involve taking locks. - if self._valid: - Timer(0, self.unsubscribe) diff --git a/lib/yajsonrpc/stompReactor.py b/lib/yajsonrpc/stompReactor.py index 5a1b595..44fbcbb 100644 --- a/lib/yajsonrpc/stompReactor.py +++ b/lib/yajsonrpc/stompReactor.py @@ -20,15 +20,17 @@ from betterAsyncore import Dispatcher, Reactor from vdsm.sslutils import SSLSocket +from . import ( + JsonRpcClient, + JsonRpcServer, +) _STATE_LEN = "Waiting for message length" _STATE_MSG = "Waiting for message" -_DEFAULT_RESPONSE_DESTINATIOM = "jms.topic.vdsm_legacy_responses" +_DEFAULT_RESPONSE_DESTINATION = "jms.topic.vdsm_legacy_responses" _DEFAULT_REQUEST_DESTINATION = "jms.topic.vdsm_legacy_requests" - -_FAKE_SUB_ID = "__vdsm_fake_broker__" def parseHeartBeatHeader(v): @@ -177,13 +179,16 @@ def check_read(self): self._stompConn._dispatcher.handle_read_event() - def send(self, message): + def send( + self, + message, + destination=_DEFAULT_RESPONSE_DESTINATION, + ): self.log.debug("Sending response") res = stomp.Frame( stomp.Command.MESSAGE, { - stomp.Headers.DESTINATION: _DEFAULT_RESPONSE_DESTINATIOM, - stomp.Headers.SUBSCRIPTION: _FAKE_SUB_ID, + stomp.Headers.DESTINATION: destination, stomp.Headers.CONTENT_TYPE: "application/json", }, message @@ -325,3 +330,101 @@ dispatcher.del_channel() self.json_binding.add_socket(self._reactor, client_socket) self.log.debug("Stomp detected from %s", socket_address) + + +class ServerRpcContextAdapter(object): + @classmethod + def subscription_handler(cls, server): + def handler(sub, frame): + server.queueRequest( + ( + ServerRpcContextAdapter(sub.client, frame), + frame.body + ) + ) + + return handler + + def __init__(self, client, request_frame): + self._client = client + self._reply_to = request_frame.headers.get('reply-to', None) + + def get_local_address(self, *args, **kwargs): + return "" + + def send(self, data): + if self._reply_to is None: + return + + self._client.send( + self._reply_to, + data, + { + "content-type": "application/json", + } + ) + + +class ClientRpcTransportAdapter(object): + def __init__(self, sub, destination, client): + self._sub = sub + sub.set_message_handler(self._handle_message) + self._destination = destination + self._client = client + self._message_handler = lambda arg: None + + def setMessageHandler(self, handler): + self._message_handler = handler + + def send(self, data): + headers = { + "content-type": "application/json", + "reply-to": self._sub.destination, + } + self._client.send( + data, + self._destination, + headers, + ) + + def _handle_message(self, sub, frame): + self._message_handler((self, frame.body)) + + def close(self): + self._sub.unsubscribe() + + +def StompRpcClient( + stomp_client, + request_queue, + response_queue +): + sub_id = None + if request_queue == stomp.FAKE_SUB_ID: + sub_id = stomp.FAKE_SUB_ID + + return JsonRpcClient( + ClientRpcTransportAdapter( + stomp_client.subscribe(response_queue, sub_id=sub_id), + request_queue, + stomp_client, + ) + ) + + +def StompRpcServer( + bridge, + stomp_client, + request_queue, +): + server = JsonRpcServer( + bridge, + ) + + sub = stomp_client.subscribe( + request_queue, + message_handler=ServerRpcContextAdapter.subscription_handler(server) + ) + + server._sub_ = sub + return server diff --git a/tests/jsonRpcHelper.py b/tests/jsonRpcHelper.py index b3eb1fd..2fb7cfb 100644 --- a/tests/jsonRpcHelper.py +++ b/tests/jsonRpcHelper.py @@ -27,9 +27,17 @@ from itertools import product from M2Crypto import SSL from rpc.BindingXMLRPC import BindingXMLRPC, XmlDetector -from yajsonrpc.stompReactor import StompDetector +from yajsonrpc.betterAsyncore import ( + Reactor, +) +from yajsonrpc.stompReactor import ( + StompDetector, + StompRpcClient, +) +from yajsonrpc.stomp import ( + FAKE_SUB_ID, +) from protocoldetector import MultiProtocolAcceptor -from yajsonrpc import JsonRpcClient from rpc.BindingJsonRpc import BindingJsonRpc from sslhelper import DEAFAULT_SSL_CONTEXT @@ -60,7 +68,13 @@ @contextmanager def constructAcceptor(log, ssl, jsonBridge): sslctx = DEAFAULT_SSL_CONTEXT if ssl else None - acceptor = MultiProtocolAcceptor("127.0.0.1", 0, sslctx) + reactor = Reactor() + acceptor = MultiProtocolAcceptor( + reactor, + "127.0.0.1", + 0, + sslctx, + ) cif = FakeClientIf() xml_binding = BindingXMLRPC(cif, cif.log) @@ -73,8 +87,10 @@ stompDetector = StompDetector(json_binding) acceptor.add_detector(stompDetector) - thread = threading.Thread(target=acceptor.serve_forever, - name='Detector thread') + thread = threading.Thread( + target=reactor.process_requests, + name='Detector thread', + ) thread.setDaemon(True) thread.start() @@ -103,9 +119,12 @@ reactor = handler._reactor if not client: - client = lambda client_socket: ( - JsonRpcClient(reactor.createClient(client_socket)) - ) + def client(client_socket): + return StompRpcClient( + reactor.createClient(client_socket), + FAKE_SUB_ID, + FAKE_SUB_ID, + ) def clientFactory(): return client(create_socket( diff --git a/tests/jsonRpcTests.py b/tests/jsonRpcTests.py index 230b61a..3254393 100644 --- a/tests/jsonRpcTests.py +++ b/tests/jsonRpcTests.py @@ -83,12 +83,7 @@ @contextmanager def _client(self, clientFactory): client = clientFactory() - client.setTimeout(CALL_TIMEOUT) - client.connect() - try: - yield client - finally: - client.close() + yield client @MonkeyPatch(clientIF, 'getInstance', getInstance) @permutations(PERMUTATIONS) diff --git a/tests/miscTests.py b/tests/miscTests.py index 1fa572e..f98f539 100644 --- a/tests/miscTests.py +++ b/tests/miscTests.py @@ -991,6 +991,7 @@ openFds = openFdNum() self.testStdOut() gc.collect() + print gc.garbage self.assertEquals(len(gc.garbage), 0) self.assertEquals(openFdNum(), openFds) diff --git a/tests/protocoldetectorTests.py b/tests/protocoldetectorTests.py index ea64619..c64a1e2 100644 --- a/tests/protocoldetectorTests.py +++ b/tests/protocoldetectorTests.py @@ -23,9 +23,12 @@ import ssl import threading import time -from contextlib import contextmanager, closing +from contextlib import contextmanager import protocoldetector +from yajsonrpc.betterAsyncore import ( + Reactor, +) from vdsm import sslutils from sslhelper import KEY_FILE, CRT_FILE @@ -59,12 +62,13 @@ def run(): try: - assert client_socket.gettimeout() is None - rfile = client_socket.makefile('rb', -1) - with closing(rfile): - request = rfile.readline() - response = self.response(request) - client_socket.sendall(response) + request = "" + while "\n" not in request: + request += dispatcher.recv(1024) + + response = self.response(request) + client_socket.setblocking(1) + client_socket.sendall(response) finally: client_socket.shutdown(socket.SHUT_RDWR) client_socket.close() @@ -182,14 +186,14 @@ def check_slow_client(self, use_ssl): with self.connect(use_ssl) as client: - time.sleep(self.acceptor.CLEANUP_INTERVAL - self.GRACETIME) + time.sleep(self.acceptor._handshake_timeout - self.GRACETIME) data = "echo let me in\n" client.sendall(data) self.assertEqual(client.recv(self.BUFSIZE), data) def check_very_slow_client(self, use_ssl): with self.connect(use_ssl) as client: - time.sleep(self.acceptor.CLEANUP_INTERVAL * 2 + self.GRACETIME) + time.sleep(self.acceptor._handshake_timeout * 2 + self.GRACETIME) client.sendall("echo too slow probably\n") self.check_disconnected(client) @@ -204,12 +208,18 @@ # Helpers def start_acceptor(self, use_ssl): - self.acceptor = TestingAcceptor( - '127.0.0.1', 0, sslctx=self.SSLCTX if use_ssl else None) + self.reactor = Reactor() + self.acceptor = protocoldetector.MultiProtocolAcceptor( + self.reactor, + '127.0.0.1', + 0, + sslctx=self.SSLCTX if use_ssl else None + ) + self.acceptor._handshake_timeout = 1 self.acceptor.add_detector(Echo()) self.acceptor.add_detector(Uppercase()) - self.acceptor_address = self.acceptor._socket.getsockname() - t = threading.Thread(target=self.acceptor.serve_forever) + self.acceptor_address = self.acceptor._acceptor.socket.getsockname() + t = threading.Thread(target=self.reactor.process_requests) t.deamon = True t.start() diff --git a/vdsm.spec.in b/vdsm.spec.in index 0740530..845ddf0 100644 --- a/vdsm.spec.in +++ b/vdsm.spec.in @@ -1606,6 +1606,7 @@ %{python_sitelib}/yajsonrpc/stompReactor.py* %files infra +%{python_sitelib}/%{vdsm_name}/infra/eventfd/__init__.py* %{python_sitelib}/%{vdsm_name}/infra/filecontrol/__init__.py* %{python_sitelib}/%{vdsm_name}/infra/sigutils/__init__.py* %{python_sitelib}/%{vdsm_name}/infra/zombiereaper/__init__.py* diff --git a/vdsm/API.py b/vdsm/API.py index 42c471e..fdf2c62 100644 --- a/vdsm/API.py +++ b/vdsm/API.py @@ -1267,6 +1267,9 @@ def ping(self): "Ping the server. Useful for tests" updateTimestamp() + n = self._cif.create_notification("vdsm.ping") + n.emit(pinged=True) + return {'status': doneCode} def getRoute(self, ip): @@ -1286,7 +1289,6 @@ c = caps.get() c['netConfigDirty'] = str(self._cif._netConfigDirty) c = hooks.after_get_caps(c) - return {'status': doneCode, 'info': c} def getHardwareInfo(self): diff --git a/vdsm/clientIF.py b/vdsm/clientIF.py index d29f8c2..46e7f41 100644 --- a/vdsm/clientIF.py +++ b/vdsm/clientIF.py @@ -23,6 +23,8 @@ import time import threading import uuid +import socket +import json from functools import partial from weakref import proxy @@ -54,6 +56,35 @@ _glusterEnabled = True except ImportError: _glusterEnabled = False + +from yajsonrpc.betterAsyncore import ( + Reactor, +) + +from yajsonrpc.stompReactor import ( + StompClient, + StompRpcServer, +) + + +class Notification(object): + def __init__(self, event_id, cb): + self._event_id = event_id + self._cb = cb + + def emit(self, **kwargs): + notification = json.dumps( + { + 'jsonrpc': '2.0', + 'method': self._event_id, + 'params': kwargs, + } + ) + + self._cb( + notification, + self._event_id + ) class clientIF(object): @@ -87,6 +118,7 @@ self._generationID = str(uuid.uuid4()) self.mom = None self.bindings = {} + self._clients = [] if _glusterEnabled: self.gluster = gapi.GlusterApi(self, log) else: @@ -109,9 +141,11 @@ host = config.get('addresses', 'management_ip') port = config.getint('addresses', 'management_port') + self._createReactor() self._createAcceptor(host, port) self._prepareXMLRPCBinding() self._prepareJSONRPCBinding() + self._connectToBroker() except: self.log.error('failed to init clientIF, ' 'shutting down storage dispatcher') @@ -124,6 +158,21 @@ @property def ready(self): return (self.irs is None or self.irs.ready) and not self._recovery + + def create_notification(self, event_id): + prefix = config.get('addresses', 'topic_prefix') + return Notification( + prefix + event_id, + self._send_notification, + ) + + def _send_notification(self, message, destination): + if self._broker_client: + self._broker_client.send(message, destination) + + self.log.warn("@@@@@") + for client in self._clients[:]: + client.send(message, destination) def contEIOVms(self, sdUUID, isDomainStateValid): # This method is called everytime the onDomainStateChange @@ -158,10 +207,46 @@ cls._instance = clientIF(irs, log) return cls._instance + def _createReactor(self): + self._reactor = Reactor() + def _createAcceptor(self, host, port): sslctx = self._createSSLContext() - self._acceptor = MultiProtocolAcceptor(host, port, sslctx) + self._acceptor = MultiProtocolAcceptor( + self._reactor, + host, + port, + sslctx + ) + + def _connectToBroker(self): + broker_address = config.get('addresses', 'broker_address') + broker_port = config.getint('addresses', 'broker_port') + request_queues = config.get('addresses', 'request_queues') + self._broker_client = None + if broker_address: + sslctx = self._createSSLContext() + sock = socket.socket() + sock.connect((broker_address, broker_port)) + if sslctx: + sock = sslctx.wrapSocket(sock) + + stomp_client = StompClient(sock, self._reactor) + subs = [] + for destination in request_queues.split(","): + destination = destination.strip() + if destination: + subs.append( + StompRpcServer( + self.bindings['jsonrpc'].server, + stomp_client, + destination, + ) + ) + + self._subscriptions = subs + self._broker_client = stomp_client def _createSSLContext(self): sslctx = None @@ -200,7 +285,7 @@ 'Please make sure it is installed.') else: bridge = Bridge.DynamicBridge() - json_binding = BindingJsonRpc(bridge) + json_binding = BindingJsonRpc(bridge, self._clients) self.bindings['jsonrpc'] = json_binding stomp_detector = StompDetector(json_binding) self._acceptor.add_detector(stomp_detector) @@ -243,8 +328,10 @@ def start(self): for binding in self.bindings.values(): binding.start() - self.thread = threading.Thread(target=self._acceptor.serve_forever, - name='Detector thread') + self.thread = threading.Thread( + target=self._reactor.process_requests, + name='Reactor thread' + ) self.thread.setDaemon(True) self.thread.start() diff --git a/vdsm/protocoldetector.py b/vdsm/protocoldetector.py index 76d5a00..20a0b41 100644 --- a/vdsm/protocoldetector.py +++ b/vdsm/protocoldetector.py @@ -23,11 +23,9 @@ from M2Crypto import SSL -from vdsm.utils import traceback import vdsm.infra.filecontrol as filecontrol from yajsonrpc.betterAsyncore import ( Dispatcher, - Reactor, ) from vdsm.utils import ( @@ -48,8 +46,6 @@ filecontrol.set_close_on_exec(server_socket.fileno()) server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server_socket.bind(addr[0][4]) - - server_socket.setblocking(0) return server_socket @@ -75,6 +71,7 @@ except socket.error: pass + client.setblocking(0) self.log.info("Accepting connection from %s:%d", *client.getpeername()) self._dispatcher_factory(client) @@ -97,8 +94,16 @@ sock = dispatcher.socket try: data = sock.recv(self._required_size, socket.MSG_PEEK) - except socket.error: + except socket.error, why: + if why.args[0] == socket.EWOULDBLOCK: + return + dispatcher.handle_error() + return + + if monotonic_time() > self._give_up_at: + self.log.debug("Timed out while waiting for data") + dispatcher.close() return if len(data) < self._required_size: @@ -210,13 +215,15 @@ def __init__( self, + reactor, host, port, sslctx=None, ssl_hanshake_timeout=SSL_HANDSHAKE_TIMEOUT, ): self._sslctx = sslctx - self._reactor = Reactor() + self._reactor = reactor + sock = _create_socket(host, port) self._host, self._port = sock.getsockname() self.log.info("Listening at %s:%d", self._host, self._port) @@ -261,13 +268,6 @@ ) return dispatcher - - @traceback(on=log.name) - def serve_forever(self): - self.log.debug("Running") - required_size = max(h.REQUIRED_SIZE for h in self._handlers) - self.log.debug("Using required_size=%d", required_size) - self._reactor.process_requests() def add_detector(self, detector): self.log.debug("Adding detector %s", detector) diff --git a/vdsm/rpc/BindingJsonRpc.py b/vdsm/rpc/BindingJsonRpc.py index a5b5b4a..eeea8ba 100644 --- a/vdsm/rpc/BindingJsonRpc.py +++ b/vdsm/rpc/BindingJsonRpc.py @@ -29,14 +29,21 @@ class BindingJsonRpc(object): log = logging.getLogger('BindingJsonRpc') - def __init__(self, bridge): + def __init__(self, bridge, clients=None): self._server = JsonRpcServer(bridge, _simpleThreadFactory) + self._clients = clients self._reactors = [] + + @property + def server(self): + return self._server def add_socket(self, reactor, client_socket): reactor.createListener(client_socket, self._onAccept) def _onAccept(self, listener, client): + if self._clients is not None: + self._clients.append(client) client.setMessageHandler(self._server.queueRequest) def createStompReactor(self): -- To view, visit http://gerrit.ovirt.org/38069 To unsubscribe, visit http://gerrit.ovirt.org/settings Gerrit-MessageType: newchange Gerrit-Change-Id: Id27b5ca1773139932eb5cb16921d5abec4991c5e Gerrit-PatchSet: 1 Gerrit-Project: vdsm Gerrit-Branch: master Gerrit-Owner: Saggi Mizrahi <[email protected]> _______________________________________________ vdsm-patches mailing list [email protected] https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches
