Saggi Mizrahi has uploaded a new change for review. Change subject: broker_support ......................................................................
broker_support Change-Id: I8e43658f1cebd637ea3abf4396d388afa041ae71 Signed-off-by: Saggi Mizrahi <[email protected]> --- A lib/stompClient.py M lib/yajsonrpc/__init__.py M lib/yajsonrpc/stomp.py M lib/yajsonrpc/stompReactor.py 4 files changed, 340 insertions(+), 215 deletions(-) git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/68/36368/1 diff --git a/lib/stompClient.py b/lib/stompClient.py new file mode 100644 index 0000000..d3cd0ab --- /dev/null +++ b/lib/stompClient.py @@ -0,0 +1,136 @@ +#!/usr/bin/python +import yajsonrpc as yjrpc +import yajsonrpc.stompReactor as sr +import socket +from threading import Thread, Lock +import time + +_reactor = None +_reactorLock = Lock() + + +def get_reactor(): + global _reactor + if _reactor is None: + with _reactorLock: + if _reactor is None: + _reactor = sr.StompReactor() + t = Thread(target=_reactor.process_requests) + t.setDaemon(True) + t.start() + + return _reactor + + +class EchoServer(object): + def echo(self, text): + return text + + def register_server_address(self, *args, **kwargs): + pass + + def unregister_server_address(self, *args, **kwargs): + pass + + +def dummy_server(address): + sock = socket.create_connection(address) + reactor = get_reactor() + stomp_client = reactor.createClient(sock) + server = yjrpc.JsonRpcServer(EchoServer()) + t = Thread(target=server.serve_requests) + t.setDaemon(True) + t.start() + sub = stomp_client.subscribe( + sr._DEFAULT_REQUEST_DESTINATION, + message_handler=ServerRpcContextAdapter.subscription_handler(server) + ) + server._sub_ = sub + return server + + +class ServerRpcContextAdapter(object): + @classmethod + def subscription_handler(cls, server): + def handler(sub, frame): + server.queueRequest( + ( + ServerRpcContextAdapter(sub.client, frame), + frame.body + ) + ) + + return handler + + def __init__(self, client, request_frame): + self._client = client + self._reply_to = request_frame.headers.get('reply-to', None) + + def get_local_address(self, *args, **kwargs): + return "" + + def send(self, data): + if self._reply_to is None: + return + + self._client.send( + self._reply_to, + data, + { + "content-type": "application/json", + } + ) + + +class ClientRpcTransportAdapter(object): + def __init__(self, sub, destination, client): + self._sub = sub + sub.set_message_handler(self._handle_message) + self._destination = destination + self._client = client + self._message_handler = lambda arg: None + + def setMessageHandler(self, handler): + self._message_handler = handler + + def send(self, data): + headers = { + "content-type": "application/json", + "reply-to": self._sub.destination, + } + self._client.send( + data, + self._destination, + headers, + ) + + def _handle_message(self, sub, frame): + self._message_handler((self, frame.body)) + + def close(self): + self._sub.unsubscribe() + + +def connect(address): + sock = socket.create_connection(address) + reactor = get_reactor() + stomp_client = reactor.createClient(sock) + subscription = stomp_client.subscribe(sr._DEFAULT_RESPONSE_DESTINATIOM) + client = yjrpc.JsonRpcClient( + ClientRpcTransportAdapter( + subscription, + sr._DEFAULT_REQUEST_DESTINATION, + stomp_client, + ) + ) + return client + + +BROKER_ADDRESS = ("127.0.0.1", 5445) + +server = dummy_server(BROKER_ADDRESS) + +time.sleep(2) + +client = connect(BROKER_ADDRESS) +client.callMethod("echo", ["123"], 1) diff --git a/lib/yajsonrpc/__init__.py b/lib/yajsonrpc/__init__.py index 8120c03..6e485bf 100644 --- a/lib/yajsonrpc/__init__.py +++ b/lib/yajsonrpc/__init__.py @@ -50,7 +50,7 @@ log = logging.getLogger("JsonRpcInvalidRequestError") def __init__(self, object_name, msg_content): - self.log.error("Invalid message found " + msg_content) + self.log.error("Invalid message found %s", msg_content) JsonRpcError.__init__(self, -32600, "The JSON sent is not a valid Request object " "with " + object_name) @@ -100,7 +100,7 @@ raise JsonRpcInvalidRequestError("missing method header", obj) reqId = obj.get("id") - if not isinstance(reqId, (str, unicode)): + if not isinstance(reqId, (str, unicode, int)): raise JsonRpcInvalidRequestError("missing request identifier", obj) @@ -151,19 +151,7 @@ @staticmethod def decode(msg): obj = json.loads(msg, 'utf-8') - - if "result" not in obj and "error" not in obj: - raise JsonRpcInvalidRequestError("missing result or error info", - obj) - - result = obj.get('result') - error = JsonRpcError(**obj.get('error')) - - reqId = obj.get('id') - if not isinstance(reqId, (str, unicode)): - raise JsonRpcInvalidRequestError("missing response identifier", - obj) - return JsonRpcResponse(result, error, reqId) + return JsonRpcResponse.fromRawObject(obj) @staticmethod def fromRawObject(obj): @@ -178,9 +166,6 @@ error = obj.get("error") reqId = obj.get("id") - if not isinstance(reqId, (str, unicode)): - raise JsonRpcInvalidRequestError("missing response identifier", - obj) return JsonRpcResponse(result, error, reqId) @@ -297,12 +282,6 @@ self._lock = Lock() self._eventcbs = [] - def setTimeout(self, timeout): - self._transport.setTimeout(timeout) - - def connect(self): - self._transport.connect() - def callMethod(self, methodName, params=[], rid=None): resp = self.call(JsonRpcRequest(methodName, params, rid))[0] if resp.error: @@ -354,6 +333,12 @@ resp = JsonRpcResponse.fromRawObject(resp) with self._lock: + if resp.id is None: + self.log.warning( + "Got an error from server without an ID (%s)", + resp.error, + ) + ctx = self._runningRequests.pop(resp.id) ctx.addResponse(resp) diff --git a/lib/yajsonrpc/stomp.py b/lib/yajsonrpc/stomp.py index 142b22a..250b0bd 100644 --- a/lib/yajsonrpc/stomp.py +++ b/lib/yajsonrpc/stomp.py @@ -16,13 +16,10 @@ import logging import os import socket -from threading import Timer, Event +from threading import Timer from uuid import uuid4 from collections import deque -import time -from betterAsyncore import Dispatcher -import asyncore import re _RE_ESCAPE_SEQUENCE = re.compile(r"\\(.)") @@ -53,6 +50,16 @@ CONNECTED = "CONNECTED" ERROR = "ERROR" RECEIPT = "RECEIPT" + + +class Headers: + CONTENT_LENGTH = "content-length" + CONTENT_TYPE = "content-type" + SUBSCRIPTION = "subscription" + DESTINATION = "destination" + ACCEPT_VERSION = "accept-version" + REPLY_TO = "reply-to" + HOST = "host" COMMANDS = tuple([command for command in dir(Command) @@ -120,8 +127,11 @@ def decodeValue(s): # Make sure to leave this check before decoding as ':' can appear in the # value after decoding using \c - if ":" in s: - raise ValueError("Contains illigal charachter `:`") + # Disabled due to bug in hornetq: + # https://issues.jboss.org/browse/HORNETQ-1454 + + # if ":" in s: + # raise ValueError("Contains illigal charachter `:`") try: s = _RE_ESCAPE_SEQUENCE.sub( @@ -278,141 +288,15 @@ return None -class Client(object): - def __init__(self, sock=None): - """ - Initialize the client. - - The socket parameter can be an already initialized socket. Should be - used to pass specialized socket objects like SSL sockets. - """ - if sock is None: - sock = self._sock = socket.socket() - else: - self._sock = sock - - self._map = {} - # Because we don't know how large the frames are - # we have to use non bolocking IO - sock.setblocking(False) - - # We have our own timeout for operations we - # pretend to be synchronous (like connect). - self._timeout = None - self._connected = Event() - self._subscriptions = {} - - self._aclient = None - self._adisp = None - - self._inbox = deque() - - @property - def outgoing(self): - return self._adisp.outgoing - - def _registerSubscription(self, sub): - self._subscriptions[sub.id] = sub - - def _unregisterSubscription(self, sub): - del self._subscriptions[sub.id] - - @property - def connected(self): - return self._connected.isSet() - - def handle_connect(self, aclient, frame): - self._connected.set() - - def handle_message(self, aclient, frame): - self._inbox.append(frame) - - def process(self, timeout=None): - if timeout is None: - timeout = self._timeout - - asyncore.loop(use_poll=True, timeout=timeout, map=self._map, count=1) - - def connect(self, address, hostname): - sock = self._sock - - self._aclient = AsyncClient(self, hostname) - adisp = self._adisp = AsyncDispatcher(self._aclient) - disp = self._disp = Dispatcher(adisp, sock, self._map) - sock.setblocking(True) - disp.connect(address) - sock.setblocking(False) - self.recv() # wait for CONNECTED msg - - if not self._connected.isSet(): - sock.close() - raise socket.error() - - def recv(self): - timeout = self._timeout - s = time.time() - duration = 0 - while timeout is None or duration < timeout: - try: - return self._inbox.popleft() - except IndexError: - td = timeout - duration if timeout is not None else None - self.process(td) - duration = time.time() - s - - return None - - def put_subscribe(self, destination, ack=None): - subid = self._aclient.subscribe(self._adisp, destination, ack) - sub = Subscription(self, subid, ack) - self._registerSubscription(sub) - return sub - - def put_send(self, destination, data="", headers=None): - self._aclient.send(self._adisp, destination, data, headers) - - def put(self, frame): - self._adisp.send_raw(frame) - - def send(self): - disp = self._disp - timeout = self._timeout - duration = 0 - s = time.time() - while ((timeout is None or duration < timeout) and - (disp.writable() or not self._connected.isSet())): - td = timeout - duration if timeout is not None else None - self.process(td) - duration = time.time() - s - - def gettimout(self): - return self._timeout - - def settimeout(self, value): - self._timeout = value - - class AsyncDispatcher(object): log = logging.getLogger("stomp.AsyncDispatcher") - def __init__(self, frameHandler, bufferSize=4096): - self._frameHandler = frameHandler + def __init__(self, frame_handler, bufferSize=4096): + self._frame_handler = frame_handler self._bufferSize = bufferSize self._parser = Parser() - self._outbox = deque() self._outbuf = None self._outgoingHeartBeat = 0 - - def _queueFrame(self, frame): - self._outbox.append(frame) - - @property - def outgoing(self): - n = len(self._outbox) - if self._outbuf != "": - n += 1 - - return n def setHeartBeat(self, outgoing, incoming=0): if incoming != 0: @@ -422,7 +306,8 @@ self._outgoingHeartBeat = outgoing def handle_connect(self, dispatcher): - self._frameHandler.handle_connect(self) + self._outbuf = None + self._frame_handler.handle_connect(self) def handle_read(self, dispatcher): pending = self._bufferSize @@ -444,11 +329,10 @@ if data is not None: parser.parse(data) - frameHandler = self._frameHandler + frame_handler = self._frame_handler while parser.pending > 0: frame = parser.popFrame() - if hasattr(frameHandler, "handle_frame"): - frameHandler.handle_frame(self, frame) + frame_handler.handle_frame(self, frame) def popFrame(self): return self._parser.popFrame() @@ -456,7 +340,7 @@ def handle_write(self, dispatcher): if self._outbuf is None: try: - frame = self._outbox.popleft() + frame = self._frame_handler.peek_message() except IndexError: return @@ -467,14 +351,16 @@ self._lastOutgoingTimeStamp = self._milis() if numSent == len(data): self._outbuf = None + # Throw away the frame that was sent to the server + self._frame_handler.pop_message() else: self._outbuf = data[numSent:] - def send_raw(self, frame): - self._queueFrame(frame) - def writable(self, dispatcher): - if len(self._outbox) > 0 or self._outbuf is not None: + if self._frame_handler.has_outgoing_messages: + return True + + if self._outbuf is not None: return True if (self._outgoingHeartBeat > 0 @@ -493,51 +379,79 @@ class AsyncClient(object): - log = logging.getLogger("yajsonrpc.protocols.stomp.AsyncClient") + log = logging.getLogger("yajsonrpc.stomp.AsyncClient") - def __init__(self, frameHandler, hostname): + def __init__(self, hostname): self._hostname = hostname - self._frameHandler = frameHandler self._connected = False + self._outbox = deque() self._error = None + self._subscriptions = {} self._commands = { Command.CONNECTED: self._process_connected, Command.MESSAGE: self._process_message, Command.RECEIPT: self._process_receipt, - Command.ERROR: self._process_error} + Command.ERROR: self._process_error, + } @property def connected(self): return self._connected + def queue_frame(self, frame): + self._outbox.append(frame) + + @property + def has_outgoing_messages(self): + return (self._outbox.count > 0) + + def peek_message(self): + return self._outbox[0] + + def pop_message(self): + return self._outbox.popleft() + def getLastError(self): return self._error - def handle_connect(self, dispatcher): + def handle_connect(self): hostname = self._hostname - frame = Frame( + # TODO : reset subscriptions + # We use appendleft to make sure this is the first frame we send in + # case of a reconnect + self._outbox.appendleft(Frame( Command.CONNECT, - {"accept-version": "1.2", - "host": hostname}) - - dispatcher.send_raw(frame) + { + Headers.ACCEPT_VERSION: "1.2", + Headers.HOST: hostname, + } + )) def handle_frame(self, dispatcher, frame): self._commands[frame.command](frame, dispatcher) def _process_connected(self, frame, dispatcher): self._connected = True - frameHandler = self._frameHandler - if hasattr(frameHandler, "handle_connect"): - frameHandler.handle_connect(self, frame) self.log.debug("Stomp connection established") def _process_message(self, frame, dispatcher): - frameHandler = self._frameHandler + sub_id = frame.headers.get(Headers.SUBSCRIPTION) + if sub_id is None: + self.log.warning( + "Got message without a subscription" + ) + return - if hasattr(frameHandler, "handle_message"): - frameHandler.handle_message(self, frame) + sub = self._subscriptions.get(sub_id) + if sub is None: + self.log.warning( + "Got message without an unknown subscription id '%s'", + sub_id + ) + return + + sub._handle_message(frame) def _process_receipt(self, frame, dispatcher): self.log.warning("Receipt frame received and ignored") @@ -545,42 +459,86 @@ def _process_error(self, frame, dispatcher): raise StompError(frame) - def send(self, dispatcher, destination, data="", headers=None): - frame = Frame( + def send(self, destination, data="", headers=None): + final_headers = {"destination": destination} + final_headers.update(headers) + self.queue_frame(Frame( Command.SEND, - {"destination": destination}, - data) + final_headers, + data + )) - dispatcher.send_raw(frame) - - def subscribe(self, dispatcher, destination, ack=None): + def subscribe( + self, + destination, + ack=None, + sub_id=None, + message_handler=None + ): if ack is None: ack = AckMode.AUTO - subscriptionID = str(uuid4()) + if message_handler is None: + message_handler = lambda sub, frame: None - frame = Frame( + if sub_id is None: + sub_id = str(uuid4()) + + self.queue_frame(Frame( Command.SUBSCRIBE, - {"destination": destination, - "ack": ack, - "id": subscriptionID}) + { + "destination": destination, + "ack": ack, + "id": sub_id + } + )) - dispatcher.send_raw(frame) + sub = Subscription( + self, + destination, + sub_id, + ack, + message_handler, + ) + self._subscriptions[sub_id] = sub - return subscriptionID + return sub class Subscription(object): - def __init__(self, client, subid, ack): + def __init__( + self, + client, + destination, + subid, + ack, + message_handler + ): self._ack = ack self._subid = subid self._client = client self._valid = True + self._message_handler = message_handler + self._destination = destination + + def _handle_message(self, frame): + self._message_handler(self, frame) + + def set_message_handler(self, handler): + self._message_handler = handler @property def id(self): return self._subid + @property + def destination(self): + return self._destination + + @property + def client(self): + return self._client + def unsubscribe(self): client = self._client subid = self._subid diff --git a/lib/yajsonrpc/stompReactor.py b/lib/yajsonrpc/stompReactor.py index 3aa85e8..68c21b6 100644 --- a/lib/yajsonrpc/stompReactor.py +++ b/lib/yajsonrpc/stompReactor.py @@ -17,6 +17,7 @@ import os import threading import logging +from collections import deque import stomp @@ -27,8 +28,10 @@ _STATE_MSG = "Waiting for message" -_DEFAULT_RESPONSE_DESTINATIOM = "/queue/_local/vdsm/reponses" -_DEFAULT_REQUEST_DESTINATION = "/queue/_local/vdsm/requests" +_DEFAULT_RESPONSE_DESTINATIOM = "jms.topic.vdsm_legacy_responses" +_DEFAULT_REQUEST_DESTINATION = "jms.topic.vdsm_legacy_requests" + +_FAKE_SUB_ID = "__vdsm_fake_broker__" def parseHeartBeatHeader(v): @@ -55,6 +58,7 @@ def __init__(self, reactor, messageHandler): self._reactor = reactor + self._outbox = deque() self._messageHandler = messageHandler self._commands = { stomp.Command.CONNECT: self._cmd_connect, @@ -62,13 +66,30 @@ stomp.Command.SUBSCRIBE: self._cmd_subscribe, stomp.Command.UNSUBSCRIBE: self._cmd_unsubscribe} + @property + def has_outgoing_messages(self): + return (self._outbox.count > 0) + + def peek_message(self): + return self._outbox[0] + + def pop_message(self): + return self._outbox.popleft() + + def queue_frame(self, frame): + self._outbox.append(frame) + def _cmd_connect(self, dispatcher, frame): self.log.info("Processing CONNECT request") version = frame.headers.get("accept-version", None) if version != "1.2": - res = stomp.Frame(stomp.Command.ERROR, None, "Version unsupported") + resp = stomp.Frame( + stomp.Command.ERROR, + None, + "Version unsupported" + ) else: - res = stomp.Frame(stomp.Command.CONNECTED, {"version": "1.2"}) + resp = stomp.Frame(stomp.Command.CONNECTED, {"version": "1.2"}) cx, cy = parseHeartBeatHeader( frame.headers.get("heart-beat", "0,0") ) @@ -79,10 +100,10 @@ # The server can send a heart-beat every cy ms and doesn't want # to receive any heart-beat from the client. - res.headers["heart-beat"] = "%d,0" % (cy,) + resp.headers["heart-beat"] = "%d,0" % (cy,) dispatcher.setHeartBeat(cy) - dispatcher.send_raw(res) + self.queue_frame(resp) self._reactor.wakeup() def _cmd_subscribe(self, dispatcher, frame): @@ -109,11 +130,12 @@ self._reactor = reactor self._messageHandler = None + self._aclient = aclient adisp = self._adisp = stomp.AsyncDispatcher(aclient) self._dispatcher = Dispatcher(adisp, sock=sock, map=reactor._map) def send_raw(self, msg): - self._adisp.send_raw(msg) + self._aclient.queue_frame(msg) self._reactor.wakeup() def setTimeout(self, timeout): @@ -161,10 +183,15 @@ def send(self, message): self.log.debug("Sending response") - res = stomp.Frame(stomp.Command.MESSAGE, - {"destination": _DEFAULT_RESPONSE_DESTINATIOM, - "content-type": "application/json"}, - message) + res = stomp.Frame( + stomp.Command.MESSAGE, + { + stomp.Headers.DESTINATION: _DEFAULT_RESPONSE_DESTINATIOM, + stomp.Headers.SUBSCRIPTION: _FAKE_SUB_ID, + stomp.Headers.CONTENT_TYPE: "application/json", + }, + message + ) self._stompConn.send_raw(res) def close(self): @@ -182,12 +209,13 @@ self._messageHandler = None self._socket = sock - self._aclient = stomp.AsyncClient(self, "vdsm") + self._aclient = stomp.AsyncClient("vdsm") self._stompConn = _StompConnection( self._aclient, sock, reactor ) + self._aclient.handle_connect() def setTimeout(self, timeout): self._stompConn.setTimeout(timeout) @@ -195,7 +223,7 @@ def connect(self): self._stompConn.connect() - def handle_message(self, impl, frame): + def handle_message(self, sub, frame): if self._messageHandler is not None: self._messageHandler((self, frame.body)) @@ -207,11 +235,29 @@ if isinstance(self._socket, SSLSocket) and self._socket.pending() > 0: self._stompConn._dispatcher.handle_read() - def send(self, message): + def subscribe( + self, + *args, + **kwargs + ): + return self._aclient.subscribe(*args, **kwargs) + + def send( + self, + message, + destination=None, + headers=None + ): + if destination is None: + destination = _DEFAULT_REQUEST_DESTINATION + self.log.debug("Sending response") - self._aclient.send(self._stompConn, _DEFAULT_REQUEST_DESTINATION, - message, - {"content-type": "application/json"}) + self._aclient.send( + destination, + message, + headers + ) + self._reactor.wakeup() def close(self): self._stompConn.close() -- To view, visit http://gerrit.ovirt.org/36368 To unsubscribe, visit http://gerrit.ovirt.org/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I8e43658f1cebd637ea3abf4396d388afa041ae71 Gerrit-PatchSet: 1 Gerrit-Project: vdsm Gerrit-Branch: master Gerrit-Owner: Saggi Mizrahi <[email protected]> _______________________________________________ vdsm-patches mailing list [email protected] https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches
