Piotr Kliczewski has uploaded a new change for review. Change subject: stomp: reroute messages to different process ......................................................................
stomp: reroute messages to different process When a client subscribes it can define which messages should be rerouted to it based on a custom stomp header. Whenever vdsm receives message for a verb specified it sends it to that client instead of processing it. Change-Id: I622cb7f3b39a19314b7de4c325a62fa47faeaa4d Signed-off-by: Piotr Kliczewski <piotr.kliczew...@gmail.com> --- M lib/yajsonrpc/stomp.py M lib/yajsonrpc/stompreactor.py M tests/stompAdapterTests.py 3 files changed, 161 insertions(+), 16 deletions(-) git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/46/65846/1 diff --git a/lib/yajsonrpc/stomp.py b/lib/yajsonrpc/stomp.py index 558c519..99d09ac 100644 --- a/lib/yajsonrpc/stomp.py +++ b/lib/yajsonrpc/stomp.py @@ -563,6 +563,7 @@ self._valid = True self._message_handler = message_handler self._destination = destination + self._redirects = [] def handle_message(self, frame): self._message_handler(self, frame) @@ -576,6 +577,13 @@ def set_message_handler(self, handler): self._message_handler = handler + def set_redirects(self, redirects): + self._redirects = redirects + + @property + def redirects(self): + return self._redirects + @property def id(self): return self._subid diff --git a/lib/yajsonrpc/stompreactor.py b/lib/yajsonrpc/stompreactor.py index 42d7d01..61bb98c 100644 --- a/lib/yajsonrpc/stompreactor.py +++ b/lib/yajsonrpc/stompreactor.py @@ -15,7 +15,7 @@ from __future__ import absolute_import import logging -from collections import deque +from collections import deque, defaultdict from uuid import uuid4 import functools @@ -68,6 +68,7 @@ self._outbox = deque() self._sub_dests = sub_map self._req_dest = req_dest + self._redirects = defaultdict(list) self._sub_ids = {} request_queues = config.get('addresses', 'request_queues') self.request_queues = request_queues.split(",") @@ -128,6 +129,7 @@ self.log.info("Subscribe command received") destination = frame.headers.get("destination", None) sub_id = frame.headers.get("id", None) + redirect_value = frame.headers.get("redirect", None) if not destination or not sub_id: self._send_error("Missing destination or subscription id header", @@ -145,6 +147,12 @@ self._sub_dests[destination].append(subscription) self._sub_ids[sub_id] = subscription + + if redirect_value: + redirects = redirect_value.split(",") + subscription.set_redirects(redirects) + for redirect in redirects: + self._redirects[redirect].append(subscription) def _send_error(self, msg, connection): res = stomp.Frame( @@ -188,17 +196,26 @@ subs = self._sub_dests[subscription.destination] if len(subs) == 1: del self._sub_dests[subscription.destination] + self._remove_redirects(subscription) else: if subscription in subs: subs.remove(subscription) + self._remove_redirects(subscription) + + def _remove_redirects(self, subscription): + if subscription.redirects: + for redirect in subscription.redirects: + reds = self._redirects[redirect] + if len(reds) == 1: + del self._redirects[redirect] + else: + reds.remove(subscription) def _cmd_send(self, dispatcher, frame): destination = frame.headers.get(stomp.Headers.DESTINATION, None) if destination in self.request_queues: # default subscription - self._handle_internal(dispatcher, - frame.headers.get(stomp.Headers.REPLY_TO), - frame.body) + self._handle_internal(dispatcher, frame) return else: try: @@ -224,31 +241,42 @@ ) subscription.client.send_raw(res) - def _handle_internal(self, dispatcher, req_dest, request): + def _handle_internal(self, dispatcher, frame): """ We need to build response dictionary which maps message id with destination. For legacy mode we use known 3.5 destination or for standard mode we use 'reply-to' header. """ + requests = [] try: - self._handle_destination(dispatcher, req_dest, json.loads(request)) + req_dest = frame.headers.get(stomp.Headers.REPLY_TO) + requests = self._handle_destination(dispatcher, req_dest, + frame, json.loads(frame.body)) except Exception: # let json server process issue pass - dispatcher.connection.handleMessage(request) + for request in requests: + dispatcher.connection.handleMessage(request) - def _handle_destination(self, dispatcher, req_dest, request): + def _handle_destination(self, dispatcher, req_dest, frame, request): """ We could receive single message or batch of messages. We need to build response map for each message. """ if isinstance(request, list): - map(functools.partial(self._handle_destination, dispatcher, - req_dest), - request) - return + results = map(functools.partial(self._handle_destination, + dispatcher, req_dest, frame), + request) + return results + + if request.get("method") in self._redirects.keys(): + subscriptions = self._redirects[request.get("method")] + for subscription in subscriptions: + subscription.client.send_raw(frame) + return [] self._req_dest[request.get("id")] = req_dest + return [request] def handle_frame(self, dispatcher, frame): try: @@ -318,7 +346,7 @@ """ Sends message to all subscribes that subscribed to destination. """ - def send(self, message, destination): + def send(self, message, destination='jms.topic.vdsm_responses'): resp = json.loads(message) response_id = resp.get("id") diff --git a/tests/stompAdapterTests.py b/tests/stompAdapterTests.py index 610c722..8f98391 100644 --- a/tests/stompAdapterTests.py +++ b/tests/stompAdapterTests.py @@ -17,7 +17,7 @@ # # Refer to the README and COPYING files for full details of the license # -from collections import defaultdict +from collections import defaultdict, deque from testlib import VdsmTestCase as TestCaseBase from yajsonrpc import JsonRpcRequest @@ -36,6 +36,10 @@ def send_raw(self, msg): self._client.queue_frame(msg) + + @property + def queue(self): + return self._client._outbox def handleMessage(self, data): self._client.queue_frame(data) @@ -59,6 +63,7 @@ def __init__(self, destination, id): self._destination = destination self._id = id + self._redirects = [] def set_client(self, client): self._client = TestConnection(client) @@ -74,6 +79,29 @@ @property def client(self): return self._client + + def set_redirects(self, redirects): + self._redirects = redirects + + @property + def redirects(self): + return self._redirects + + +class TestClient(object): + + def __init__(self): + self._outbox = deque() + + def queue_frame(self, frame): + self._outbox.append(frame) + + def pop_message(self): + return self._outbox.popleft() + + @property + def has_outgoing_messages(self): + return (len(self._outbox) > 0) class ConnectFrameTest(TestCaseBase): @@ -255,7 +283,7 @@ data = adapter.pop_message() self.assertIsNot(data, None) - request = JsonRpcRequest.decode(data) + request = JsonRpcRequest.fromRawObject(data) self.assertEquals(request.method, 'Host.getAllVmStats') self.assertTrue(len(ids) == 1) @@ -278,7 +306,7 @@ data = adapter.pop_message() self.assertIsNot(data, None) - request = JsonRpcRequest.decode(data) + request = JsonRpcRequest.fromRawObject(data) self.assertEquals(request.method, 'Host.getAllVmStats') self.assertTrue(len(ids) == 1) @@ -337,3 +365,84 @@ resp_frame = adapter.pop_message() self.assertEquals(resp_frame.command, Command.MESSAGE) + + +class RedirectTests(TestCaseBase): + + def test_subscription(self): + frame = Frame(Command.SUBSCRIBE, + {'ack': 'auto', + Headers.DESTINATION: 'jms.queue.events', + 'id': '1', + 'redirect': 'VM.create'}) + sub_map = defaultdict(list) + + adapter = StompAdapterImpl(Reactor(), sub_map, {}) + adapter.handle_frame(TestDispatcher(adapter), frame) + + sub = sub_map['jms.queue.events'] + + self.assertEquals(len(sub), 1) + subscrption = sub[0] + self.assertEquals(subscrption.id, '1') + + redirects = subscrption.redirects + self.assertEquals(len(redirects), 1) + self.assertEquals(redirects, ['VM.create']) + self.assertEquals(len(adapter._redirects), 1) + + def test_unsubscribe(self): + frame = Frame(Command.UNSUBSCRIBE, + {'id': '1'}) + + subscription = TestSubscription('jms.queue.events', + '1') + subscription.set_redirects(['VM.create']) + sub_map = defaultdict(list) + sub_map['jms.queue.events'].append(subscription) + + adapter = StompAdapterImpl(Reactor(), sub_map, {}) + adapter._sub_ids['1'] = subscription + adapter._redirects['VM.create'].append(subscription) + + adapter.handle_frame(TestDispatcher(adapter), frame) + + self.assertTrue(len(adapter._redirects) == 0) + self.assertTrue(len(adapter._sub_ids) == 0) + self.assertTrue(len(sub_map) == 0) + + def test_redirect(self): + frame = Frame(command=Command.SEND, + headers={Headers.DESTINATION: 'jms.topic.vdsm_requests', + Headers.REPLY_TO: 'jms.topic.vdsm_responses', + Headers.CONTENT_LENGTH: '103'}, + body=('{"jsonrpc":"2.0","method":"VM.create",' + '"params":{},"id":"e8a936a6-d886-4cfa-97b9-2d54209' + '053ff"}' + ) + ) + + subscription = TestSubscription('jms.topic.vdsm_requests', + '1') + client = TestClient() + subscription.set_client(client) + + subscription.set_redirects(['VM.create']) + sub_map = defaultdict(list) + sub_map['jms.topic.vdsm_requests'].append(subscription) + ids = {} + + adapter = StompAdapterImpl(Reactor(), sub_map, ids) + adapter._sub_ids['1'] = subscription + adapter._redirects['VM.create'].append(subscription) + + adapter.handle_frame(TestDispatcher(adapter), frame) + + self.assertFalse(adapter.has_outgoing_messages) + self.assertTrue(len(ids) == 0) + + self.assertTrue(client.has_outgoing_messages) + data = client.pop_message() + request = JsonRpcRequest.decode(data.body) + self.assertEquals(request.method, 'VM.create') + self.assertEquals(request.id, 'e8a936a6-d886-4cfa-97b9-2d54209053ff') -- To view, visit https://gerrit.ovirt.org/65846 To unsubscribe, visit https://gerrit.ovirt.org/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I622cb7f3b39a19314b7de4c325a62fa47faeaa4d Gerrit-PatchSet: 1 Gerrit-Project: vdsm Gerrit-Branch: master Gerrit-Owner: Piotr Kliczewski <piotr.kliczew...@gmail.com> _______________________________________________ vdsm-patches mailing list -- vdsm-patches@lists.fedorahosted.org To unsubscribe send an email to vdsm-patches-le...@lists.fedorahosted.org