Piotr Kliczewski has uploaded a new change for review. Change subject: subscriptions ......................................................................
subscriptions Change-Id: I1493070f2ba66ca9d39a6661876c82c4727cad62 Signed-off-by: pkliczewski <[email protected]> --- M lib/stompClient.py M lib/yajsonrpc/__init__.py M lib/yajsonrpc/stomp.py M lib/yajsonrpc/stompReactor.py M tests/jsonRpcHelper.py M vdsm/rpc/BindingJsonRpc.py 6 files changed, 177 insertions(+), 80 deletions(-) git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/51/38451/1 diff --git a/lib/stompClient.py b/lib/stompClient.py index 5a6d7f5..db6f3d0 100644 --- a/lib/stompClient.py +++ b/lib/stompClient.py @@ -10,6 +10,7 @@ import socket from threading import Thread, Lock import time +import uuid _reactor = None _reactorLock = Lock() @@ -61,7 +62,7 @@ stomp_client = reactor.createClient(sock) subscription = stomp_client.subscribe( _DEFAULT_RESPONSE_DESTINATION, - sub_id="__vdsm_fake_broker__", + sub_id=str(uuid.uuid4()), ) client = yjrpc.JsonRpcClient( ClientRpcTransportAdapter( diff --git a/lib/yajsonrpc/__init__.py b/lib/yajsonrpc/__init__.py index 0bf3a54..f12d19b 100644 --- a/lib/yajsonrpc/__init__.py +++ b/lib/yajsonrpc/__init__.py @@ -205,9 +205,10 @@ class _JsonRpcServeRequestContext(object): - def __init__(self, client): + def __init__(self, client, conn): self._requests = [] self.client = client + self.conn = conn self._counter = 0 self._requests = {} self._responses = [] @@ -441,7 +442,7 @@ try: params = req.params - server_address = ctx.client.get_local_address() + server_address = ctx.conn.get_local_address() self._bridge.register_server_address(server_address) if isinstance(req.params, list): res = method(*params) @@ -469,11 +470,11 @@ if obj is None: break - client, msg = obj - self._parseMessage(client, msg) + client, conn, msg = obj + self._parseMessage(client, conn, msg) - def _parseMessage(self, client, msg): - ctx = _JsonRpcServeRequestContext(client) + def _parseMessage(self, client, conn, msg): + ctx = _JsonRpcServeRequestContext(client, conn) try: rawRequests = json.loads(msg) diff --git a/lib/yajsonrpc/stomp.py b/lib/yajsonrpc/stomp.py index 0dd6617..8b0a296 100644 --- a/lib/yajsonrpc/stomp.py +++ b/lib/yajsonrpc/stomp.py @@ -21,8 +21,9 @@ from vdsm.utils import monotonic_time import re +LEGACY_SUB_ID_REQ = "/queue/_local/vdsm/requests" +LEGACY_SUB_ID_RES = "/queue/_local/vdsm/reponses" -FAKE_SUB_ID = "__vdsm_fake_broker__" _RE_ESCAPE_SEQUENCE = re.compile(r"\\(.)") @@ -296,8 +297,9 @@ class AsyncDispatcher(object): log = logging.getLogger("stomp.AsyncDispatcher") - def __init__(self, frame_handler, bufferSize=4096): + def __init__(self, connection, frame_handler, bufferSize=4096): self._frame_handler = frame_handler + self.connection = connection self._bufferSize = bufferSize self._parser = Parser() self._outbuf = None @@ -445,11 +447,6 @@ def _process_message(self, frame, dispatcher): sub_id = frame.headers.get(Headers.SUBSCRIPTION) - if sub_id is None: - # This is actually an error, but because of the fake broker we - # injects a fake sub id - sub_id = "__vdsm_fake_broker__" - sub = self._subscriptions.get(sub_id) if sub is None: self.log.warning( @@ -458,7 +455,7 @@ ) return - sub._handle_message(frame) + sub.handle_message(frame) def _process_receipt(self, frame, dispatcher): self.log.warning("Receipt frame received and ignored") @@ -529,7 +526,7 @@ self._message_handler = message_handler self._destination = destination - def _handle_message(self, frame): + def handle_message(self, frame): self._message_handler(self, frame) def set_message_handler(self, handler): diff --git a/lib/yajsonrpc/stompReactor.py b/lib/yajsonrpc/stompReactor.py index 9ea45c0..c94fdc7 100644 --- a/lib/yajsonrpc/stompReactor.py +++ b/lib/yajsonrpc/stompReactor.py @@ -15,9 +15,11 @@ import logging from collections import deque +from uuid import uuid4 +import json import stomp - +from vdsm import utils from betterAsyncore import Dispatcher, Reactor from vdsm.sslutils import SSLSocket from . import ( @@ -55,10 +57,12 @@ class StompAdapterImpl(object): log = logging.getLogger("Broker.StompAdapter") - def __init__(self, reactor, messageHandler): + def __init__(self, reactor, sub_map, req_dest): self._reactor = reactor self._outbox = deque() - self._messageHandler = messageHandler + self._sub_dests = sub_map + self._req_dest = req_dest + self._sub_ids = {} self._commands = { stomp.Command.CONNECT: self._cmd_connect, stomp.Command.SEND: self._cmd_send, @@ -106,13 +110,99 @@ self._reactor.wakeup() def _cmd_subscribe(self, dispatcher, frame): - self.log.debug("Subscribe command ignored") + self.log.debug("Subscribe command received") + destination = frame.headers.get("destination", None) + sub_id = frame.headers.get("id", None) + + if not destination or not sub_id: + self._send_error("Missing required header", + dispatcher.connection) + return + + ack = frame.headers.get("ack", stomp.AckMode.AUTO) + subscription = stomp.Subscription(dispatcher.connection, destination, + sub_id, ack, None) + + # TODO make sure to cleanup when connection lost + try: + subs = self._sub_dests[destination] + except KeyError: + self._sub_dests[destination] = [subscription] + else: + subs.append(subscription) + + self._sub_ids[sub_id] = subscription + + def _send_error(self, msg, connection): + res = stomp.Frame( + stomp.Command.ERROR, + None, + msg + ) + connection.send_raw(res) def _cmd_unsubscribe(self, dispatcher, frame): - self.log.debug("Unsubscribe command ignored") + self.log.debug("Unsubscribe command received") + sub_id = frame.headers.get("id", None) + + if not sub_id: + self._send_error("Missing required header", + dispatcher.connection) + return + + subscription = self.subs_ids[sub_id] + if not subscription: + # ignore + return + + del self.subs_ids[sub_id] + subs = self._sub_dests[subscription.destination] + if len(subs) == 1: + del self._sub_dests[subscription.destination] + else: + if subscription in subs: + subs.remove(subscription) def _cmd_send(self, dispatcher, frame): - self._messageHandler(self, frame.body) + destination = frame.headers.get(stomp.Headers.DESTINATION, None) + if _DEFAULT_REQUEST_DESTINATION == destination: + # default subscription + try: + reqs = json.loads(frame.body) + self._req_dest[reqs[0].get("id")] = frame.headers.get( + stomp.Headers.REPLY_TO + ) + except: + # let the jsonrpc server to reply + pass + + dispatcher.connection.handleMessage(frame.body) + return + elif stomp.LEGACY_SUB_ID_REQ == destination: + try: + reqs = json.loads(frame.body) + self._req_dest[reqs[0].get("id")] = stomp.LEGACY_SUB_ID_RES + except: + # let the jsonrpc server to reply + pass + dispatcher.connection.handleMessage(frame.body) + return + + subs = self._sub_dests[destination] + if len(subs) == 0: + self._send_error("Subscription not available", + dispatcher.connection) + return + + for subscription in subs: + headers = utils.picklecopy(frame.headers) + headers[stomp.Headers.SUBSCRIPTION] = subscription.id + res = stomp.Frame( + stomp.Command.MESSAGE, + headers, + frame.body + ) + subscription.client.send_raw(res) def handle_frame(self, dispatcher, frame): self.log.debug("Handling message %s", frame) @@ -124,13 +214,15 @@ class _StompConnection(object): - def __init__(self, aclient, sock, reactor): + log = logging.getLogger("yajsonrpc.StompServer") + + def __init__(self, server, aclient, sock, reactor): self._socket = sock self._reactor = reactor - self._messageHandler = None + self._server = server self._aclient = aclient - adisp = self._adisp = stomp.AsyncDispatcher(aclient) + adisp = self._adisp = stomp.AsyncDispatcher(self, aclient) self._dispatcher = Dispatcher(adisp, sock=sock, map=reactor._map) def send_raw(self, msg): @@ -146,38 +238,32 @@ def close(self): self._dispatcher.close() + def get_local_address(self): + return self._socket.getsockname()[0] + + def setMessageHandler(self, msgHandler): + self._messageHandler = msgHandler + self._dispatcher.handle_read_event() + + def handleMessage(self, data): + if self._messageHandler is not None: + self._messageHandler((self._server, self, data)) + class StompServer(object): log = logging.getLogger("yajsonrpc.StompServer") - def __init__(self, sock, reactor): + def __init__(self, reactor): self._reactor = reactor self._messageHandler = None - self._socket = sock + self._sub_map = {} + self._req_dest = {} - adapter = StompAdapterImpl(reactor, self._handleMessage) - self._stompConn = _StompConnection( - adapter, - sock, - reactor, - ) - - def setTimeout(self, timeout): - self._stompConn.setTimeout(timeout) - - def connect(self): - self._stompConn.connect() - - def _handleMessage(self, impl, data): - if self._messageHandler is not None: - self._messageHandler((self, data)) - - def setMessageHandler(self, msgHandler): - self._messageHandler = msgHandler - self.check_read() - - def check_read(self): - self._stompConn._dispatcher.handle_read_event() + def add_client(self, sock): + adapter = StompAdapterImpl(self._reactor, self._sub_map, + self._req_dest) + return _StompConnection(self, adapter, sock, + self._reactor) def send( self, @@ -185,21 +271,31 @@ destination=_DEFAULT_RESPONSE_DESTINATION, ): self.log.debug("Sending response") - res = stomp.Frame( - stomp.Command.MESSAGE, - { - stomp.Headers.DESTINATION: destination, - stomp.Headers.CONTENT_TYPE: "application/json", - }, - message - ) - self._stompConn.send_raw(res) + + resp = json.loads(message) + try: + id = resp.get("id") + destination = self._req_dest[id] + del self._req_dest[id] + except: + # we could have no reply-to + pass + + for connection in self._sub_map[destination]: + res = stomp.Frame( + stomp.Command.MESSAGE, + { + stomp.Headers.DESTINATION: destination, + stomp.Headers.CONTENT_TYPE: "application/json", + stomp.Headers.SUBSCRIPTION: connection.id + }, + message + ) + connection.client.send_raw(res) def close(self): - self._stompConn.close() - - def get_local_address(self): - return self._socket.getsockname()[0] + for connection in self._sub_map.values(): + connection.close() class StompClient(object): @@ -212,6 +308,7 @@ self._aclient = stomp.AsyncClient("vdsm") self._stompConn = _StompConnection( + self, self._aclient, sock, reactor @@ -226,7 +323,7 @@ def handle_message(self, sub, frame): if self._messageHandler is not None: - self._messageHandler((self, frame.body)) + self._messageHandler((self, self, frame.body)) def setMessageHandler(self, msgHandler): self._messageHandler = msgHandler @@ -241,7 +338,9 @@ *args, **kwargs ): - return self._aclient.subscribe(*args, **kwargs) + sub = self._aclient.subscribe(*args, **kwargs) + self._reactor.wakeup() + return sub def send( self, @@ -264,8 +363,8 @@ self._stompConn.close() -def StompListener(reactor, acceptHandler, connected_socket): - impl = StompListenerImpl(reactor, acceptHandler, connected_socket) +def StompListener(reactor, server, acceptHandler, connected_socket): + impl = StompListenerImpl(server, acceptHandler, connected_socket) return Dispatcher(impl, connected_socket, map=reactor._map) @@ -275,16 +374,16 @@ class StompListenerImpl(object): log = logging.getLogger("jsonrpc.StompListener") - def __init__(self, reactor, acceptHandler, connected_socket): - self._reactor = reactor + def __init__(self, server, acceptHandler, connected_socket): self._socket = connected_socket self._acceptHandler = acceptHandler + self._server = server def init(self, dispatcher): dispatcher.set_reuse_addr() - client = StompServer(self._socket, self._reactor) - self._acceptHandler(self, client) + conn = self._server.add_client(self._socket) + self._acceptHandler(conn) def writable(self, dispatcher): return False @@ -293,10 +392,12 @@ class StompReactor(object): def __init__(self): self._reactor = Reactor() + self._server = StompServer(self._reactor) def createListener(self, connected_socket, acceptHandler): listener = StompListener( self._reactor, + self._server, acceptHandler, connected_socket ) @@ -399,13 +500,9 @@ request_queue, response_queue ): - sub_id = None - if request_queue == stomp.FAKE_SUB_ID: - sub_id = stomp.FAKE_SUB_ID - return JsonRpcClient( ClientRpcTransportAdapter( - stomp_client.subscribe(response_queue, sub_id=sub_id), + stomp_client.subscribe(response_queue, sub_id=str(uuid4())), request_queue, stomp_client, ) diff --git a/tests/jsonRpcHelper.py b/tests/jsonRpcHelper.py index 2fb7cfb..ad9c00a 100644 --- a/tests/jsonRpcHelper.py +++ b/tests/jsonRpcHelper.py @@ -35,7 +35,8 @@ StompRpcClient, ) from yajsonrpc.stomp import ( - FAKE_SUB_ID, + LEGACY_SUB_ID_REQ, + LEGACY_SUB_ID_RES, ) from protocoldetector import MultiProtocolAcceptor from rpc.BindingJsonRpc import BindingJsonRpc @@ -122,8 +123,8 @@ def client(client_socket): return StompRpcClient( reactor.createClient(client_socket), - FAKE_SUB_ID, - FAKE_SUB_ID, + LEGACY_SUB_ID_REQ, + LEGACY_SUB_ID_RES, ) def clientFactory(): diff --git a/vdsm/rpc/BindingJsonRpc.py b/vdsm/rpc/BindingJsonRpc.py index eeea8ba..dc2d97e 100644 --- a/vdsm/rpc/BindingJsonRpc.py +++ b/vdsm/rpc/BindingJsonRpc.py @@ -41,7 +41,7 @@ def add_socket(self, reactor, client_socket): reactor.createListener(client_socket, self._onAccept) - def _onAccept(self, listener, client): + def _onAccept(self, client): if self._clients is not None: self._clients.append(client) client.setMessageHandler(self._server.queueRequest) -- To view, visit https://gerrit.ovirt.org/38451 To unsubscribe, visit https://gerrit.ovirt.org/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I1493070f2ba66ca9d39a6661876c82c4727cad62 Gerrit-PatchSet: 1 Gerrit-Project: vdsm Gerrit-Branch: master Gerrit-Owner: Piotr Kliczewski <[email protected]> _______________________________________________ vdsm-patches mailing list [email protected] https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches
