Piotr Kliczewski has uploaded a new change for review.

Change subject: stomp: reroute messages to different process
......................................................................

stomp: reroute messages to different process

When a client subscribes it can define which messages should be rerouted
to it based on a custom stomp header. Whenever vdsm receives message for
a verb specified it sends it to that client instead of processing it.


Change-Id: I622cb7f3b39a19314b7de4c325a62fa47faeaa4d
Signed-off-by: Piotr Kliczewski <piotr.kliczew...@gmail.com>
---
M lib/yajsonrpc/stomp.py
M lib/yajsonrpc/stompreactor.py
M tests/stompAdapterTests.py
3 files changed, 161 insertions(+), 16 deletions(-)


  git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/46/65846/1

diff --git a/lib/yajsonrpc/stomp.py b/lib/yajsonrpc/stomp.py
index 558c519..99d09ac 100644
--- a/lib/yajsonrpc/stomp.py
+++ b/lib/yajsonrpc/stomp.py
@@ -563,6 +563,7 @@
         self._valid = True
         self._message_handler = message_handler
         self._destination = destination
+        self._redirects = []
 
     def handle_message(self, frame):
         self._message_handler(self, frame)
@@ -576,6 +577,13 @@
     def set_message_handler(self, handler):
         self._message_handler = handler
 
+    def set_redirects(self, redirects):
+        self._redirects = redirects
+
+    @property
+    def redirects(self):
+        return self._redirects
+
     @property
     def id(self):
         return self._subid
diff --git a/lib/yajsonrpc/stompreactor.py b/lib/yajsonrpc/stompreactor.py
index 42d7d01..61bb98c 100644
--- a/lib/yajsonrpc/stompreactor.py
+++ b/lib/yajsonrpc/stompreactor.py
@@ -15,7 +15,7 @@
 
 from __future__ import absolute_import
 import logging
-from collections import deque
+from collections import deque, defaultdict
 from uuid import uuid4
 import functools
 
@@ -68,6 +68,7 @@
         self._outbox = deque()
         self._sub_dests = sub_map
         self._req_dest = req_dest
+        self._redirects = defaultdict(list)
         self._sub_ids = {}
         request_queues = config.get('addresses', 'request_queues')
         self.request_queues = request_queues.split(",")
@@ -128,6 +129,7 @@
         self.log.info("Subscribe command received")
         destination = frame.headers.get("destination", None)
         sub_id = frame.headers.get("id", None)
+        redirect_value = frame.headers.get("redirect", None)
 
         if not destination or not sub_id:
             self._send_error("Missing destination or subscription id header",
@@ -145,6 +147,12 @@
 
         self._sub_dests[destination].append(subscription)
         self._sub_ids[sub_id] = subscription
+
+        if redirect_value:
+            redirects = redirect_value.split(",")
+            subscription.set_redirects(redirects)
+            for redirect in redirects:
+                self._redirects[redirect].append(subscription)
 
     def _send_error(self, msg, connection):
         res = stomp.Frame(
@@ -188,17 +196,26 @@
         subs = self._sub_dests[subscription.destination]
         if len(subs) == 1:
             del self._sub_dests[subscription.destination]
+            self._remove_redirects(subscription)
         else:
             if subscription in subs:
                 subs.remove(subscription)
+                self._remove_redirects(subscription)
+
+    def _remove_redirects(self, subscription):
+        if subscription.redirects:
+            for redirect in subscription.redirects:
+                reds = self._redirects[redirect]
+                if len(reds) == 1:
+                    del self._redirects[redirect]
+                else:
+                    reds.remove(subscription)
 
     def _cmd_send(self, dispatcher, frame):
         destination = frame.headers.get(stomp.Headers.DESTINATION, None)
         if destination in self.request_queues:
             # default subscription
-            self._handle_internal(dispatcher,
-                                  frame.headers.get(stomp.Headers.REPLY_TO),
-                                  frame.body)
+            self._handle_internal(dispatcher, frame)
             return
         else:
             try:
@@ -224,31 +241,42 @@
                 )
                 subscription.client.send_raw(res)
 
-    def _handle_internal(self, dispatcher, req_dest, request):
+    def _handle_internal(self, dispatcher, frame):
         """
         We need to build response dictionary which maps message id
         with destination. For legacy mode we use known 3.5 destination
         or for standard mode we use 'reply-to' header.
         """
+        requests = []
         try:
-            self._handle_destination(dispatcher, req_dest, json.loads(request))
+            req_dest = frame.headers.get(stomp.Headers.REPLY_TO)
+            requests = self._handle_destination(dispatcher, req_dest,
+                                                frame, json.loads(frame.body))
         except Exception:
             # let json server process issue
             pass
-        dispatcher.connection.handleMessage(request)
+        for request in requests:
+            dispatcher.connection.handleMessage(request)
 
-    def _handle_destination(self, dispatcher, req_dest, request):
+    def _handle_destination(self, dispatcher, req_dest, frame, request):
         """
         We could receive single message or batch of messages. We need
         to build response map for each message.
         """
         if isinstance(request, list):
-            map(functools.partial(self._handle_destination, dispatcher,
-                                  req_dest),
-                request)
-            return
+            results = map(functools.partial(self._handle_destination,
+                                            dispatcher, req_dest, frame),
+                          request)
+            return results
+
+        if request.get("method") in self._redirects.keys():
+            subscriptions = self._redirects[request.get("method")]
+            for subscription in subscriptions:
+                subscription.client.send_raw(frame)
+            return []
 
         self._req_dest[request.get("id")] = req_dest
+        return [request]
 
     def handle_frame(self, dispatcher, frame):
         try:
@@ -318,7 +346,7 @@
     """
     Sends message to all subscribes that subscribed to destination.
     """
-    def send(self, message, destination):
+    def send(self, message, destination='jms.topic.vdsm_responses'):
         resp = json.loads(message)
         response_id = resp.get("id")
 
diff --git a/tests/stompAdapterTests.py b/tests/stompAdapterTests.py
index 610c722..8f98391 100644
--- a/tests/stompAdapterTests.py
+++ b/tests/stompAdapterTests.py
@@ -17,7 +17,7 @@
 #
 # Refer to the README and COPYING files for full details of the license
 #
-from collections import defaultdict
+from collections import defaultdict, deque
 
 from testlib import VdsmTestCase as TestCaseBase
 from yajsonrpc import JsonRpcRequest
@@ -36,6 +36,10 @@
 
     def send_raw(self, msg):
         self._client.queue_frame(msg)
+
+    @property
+    def queue(self):
+        return self._client._outbox
 
     def handleMessage(self, data):
         self._client.queue_frame(data)
@@ -59,6 +63,7 @@
     def __init__(self, destination, id):
         self._destination = destination
         self._id = id
+        self._redirects = []
 
     def set_client(self, client):
         self._client = TestConnection(client)
@@ -74,6 +79,29 @@
     @property
     def client(self):
         return self._client
+
+    def set_redirects(self, redirects):
+        self._redirects = redirects
+
+    @property
+    def redirects(self):
+        return self._redirects
+
+
+class TestClient(object):
+
+    def __init__(self):
+        self._outbox = deque()
+
+    def queue_frame(self, frame):
+        self._outbox.append(frame)
+
+    def pop_message(self):
+        return self._outbox.popleft()
+
+    @property
+    def has_outgoing_messages(self):
+        return (len(self._outbox) > 0)
 
 
 class ConnectFrameTest(TestCaseBase):
@@ -255,7 +283,7 @@
 
         data = adapter.pop_message()
         self.assertIsNot(data, None)
-        request = JsonRpcRequest.decode(data)
+        request = JsonRpcRequest.fromRawObject(data)
         self.assertEquals(request.method, 'Host.getAllVmStats')
         self.assertTrue(len(ids) == 1)
 
@@ -278,7 +306,7 @@
 
         data = adapter.pop_message()
         self.assertIsNot(data, None)
-        request = JsonRpcRequest.decode(data)
+        request = JsonRpcRequest.fromRawObject(data)
         self.assertEquals(request.method, 'Host.getAllVmStats')
         self.assertTrue(len(ids) == 1)
 
@@ -337,3 +365,84 @@
 
         resp_frame = adapter.pop_message()
         self.assertEquals(resp_frame.command, Command.MESSAGE)
+
+
+class RedirectTests(TestCaseBase):
+
+    def test_subscription(self):
+        frame = Frame(Command.SUBSCRIBE,
+                      {'ack': 'auto',
+                       Headers.DESTINATION: 'jms.queue.events',
+                       'id': '1',
+                       'redirect': 'VM.create'})
+        sub_map = defaultdict(list)
+
+        adapter = StompAdapterImpl(Reactor(), sub_map, {})
+        adapter.handle_frame(TestDispatcher(adapter), frame)
+
+        sub = sub_map['jms.queue.events']
+
+        self.assertEquals(len(sub), 1)
+        subscrption = sub[0]
+        self.assertEquals(subscrption.id, '1')
+
+        redirects = subscrption.redirects
+        self.assertEquals(len(redirects), 1)
+        self.assertEquals(redirects, ['VM.create'])
+        self.assertEquals(len(adapter._redirects), 1)
+
+    def test_unsubscribe(self):
+        frame = Frame(Command.UNSUBSCRIBE,
+                      {'id': '1'})
+
+        subscription = TestSubscription('jms.queue.events',
+                                        '1')
+        subscription.set_redirects(['VM.create'])
+        sub_map = defaultdict(list)
+        sub_map['jms.queue.events'].append(subscription)
+
+        adapter = StompAdapterImpl(Reactor(), sub_map, {})
+        adapter._sub_ids['1'] = subscription
+        adapter._redirects['VM.create'].append(subscription)
+
+        adapter.handle_frame(TestDispatcher(adapter), frame)
+
+        self.assertTrue(len(adapter._redirects) == 0)
+        self.assertTrue(len(adapter._sub_ids) == 0)
+        self.assertTrue(len(sub_map) == 0)
+
+    def test_redirect(self):
+        frame = Frame(command=Command.SEND,
+                      headers={Headers.DESTINATION: 'jms.topic.vdsm_requests',
+                               Headers.REPLY_TO: 'jms.topic.vdsm_responses',
+                               Headers.CONTENT_LENGTH: '103'},
+                      body=('{"jsonrpc":"2.0","method":"VM.create",'
+                            '"params":{},"id":"e8a936a6-d886-4cfa-97b9-2d54209'
+                            '053ff"}'
+                            )
+                      )
+
+        subscription = TestSubscription('jms.topic.vdsm_requests',
+                                        '1')
+        client = TestClient()
+        subscription.set_client(client)
+
+        subscription.set_redirects(['VM.create'])
+        sub_map = defaultdict(list)
+        sub_map['jms.topic.vdsm_requests'].append(subscription)
+        ids = {}
+
+        adapter = StompAdapterImpl(Reactor(), sub_map, ids)
+        adapter._sub_ids['1'] = subscription
+        adapter._redirects['VM.create'].append(subscription)
+
+        adapter.handle_frame(TestDispatcher(adapter), frame)
+
+        self.assertFalse(adapter.has_outgoing_messages)
+        self.assertTrue(len(ids) == 0)
+
+        self.assertTrue(client.has_outgoing_messages)
+        data = client.pop_message()
+        request = JsonRpcRequest.decode(data.body)
+        self.assertEquals(request.method, 'VM.create')
+        self.assertEquals(request.id, 'e8a936a6-d886-4cfa-97b9-2d54209053ff')


-- 
To view, visit https://gerrit.ovirt.org/65846
To unsubscribe, visit https://gerrit.ovirt.org/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I622cb7f3b39a19314b7de4c325a62fa47faeaa4d
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Piotr Kliczewski <piotr.kliczew...@gmail.com>
_______________________________________________
vdsm-patches mailing list -- vdsm-patches@lists.fedorahosted.org
To unsubscribe send an email to vdsm-patches-le...@lists.fedorahosted.org

Reply via email to