Saggi Mizrahi has uploaded a new change for review.

Change subject: [WIP] EVENTS
......................................................................

[WIP] EVENTS

Change-Id: Id27b5ca1773139932eb5cb16921d5abec4991c5e
Signed-off-by: Saggi Mizrahi <[email protected]>
---
M lib/stompClient.py
M lib/vdsm/config.py.in
M lib/vdsm/sslutils.py
M lib/yajsonrpc/betterAsyncore.py
M lib/yajsonrpc/stomp.py
M lib/yajsonrpc/stompReactor.py
M tests/jsonRpcHelper.py
M tests/jsonRpcTests.py
M tests/miscTests.py
M tests/protocoldetectorTests.py
M vdsm.spec.in
M vdsm/API.py
M vdsm/clientIF.py
M vdsm/protocoldetector.py
M vdsm/rpc/BindingJsonRpc.py
15 files changed, 336 insertions(+), 141 deletions(-)


  git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/69/38069/1

diff --git a/lib/stompClient.py b/lib/stompClient.py
index d3cd0ab..5a6d7f5 100644
--- a/lib/stompClient.py
+++ b/lib/stompClient.py
@@ -1,6 +1,12 @@
 #!/usr/bin/python
 import yajsonrpc as yjrpc
-import yajsonrpc.stompReactor as sr
+from yajsonrpc.stompReactor import (
+    _DEFAULT_REQUEST_DESTINATION,
+    _DEFAULT_RESPONSE_DESTINATION,
+    ClientRpcTransportAdapter,
+    ServerRpcContextAdapter,
+    StompReactor,
+)
 import socket
 from threading import Thread, Lock
 import time
@@ -14,7 +20,7 @@
     if _reactor is None:
         with _reactorLock:
             if _reactor is None:
-                _reactor = sr.StompReactor()
+                _reactor = StompReactor()
                 t = Thread(target=_reactor.process_requests)
                 t.setDaemon(True)
                 t.start()
@@ -42,95 +48,38 @@
     t.setDaemon(True)
     t.start()
     sub = stomp_client.subscribe(
-        sr._DEFAULT_REQUEST_DESTINATION,
+        _DEFAULT_REQUEST_DESTINATION,
         message_handler=ServerRpcContextAdapter.subscription_handler(server)
     )
     server._sub_ = sub
     return server
 
 
-class ServerRpcContextAdapter(object):
-    @classmethod
-    def subscription_handler(cls, server):
-        def handler(sub, frame):
-            server.queueRequest(
-                (
-                    ServerRpcContextAdapter(sub.client, frame),
-                    frame.body
-                )
-            )
-
-        return handler
-
-    def __init__(self, client, request_frame):
-        self._client = client
-        self._reply_to = request_frame.headers.get('reply-to', None)
-
-    def get_local_address(self, *args, **kwargs):
-        return ""
-
-    def send(self, data):
-        if self._reply_to is None:
-            return
-
-        self._client.send(
-            self._reply_to,
-            data,
-            {
-                "content-type": "application/json",
-            }
-        )
-
-
-class ClientRpcTransportAdapter(object):
-    def __init__(self, sub, destination, client):
-        self._sub = sub
-        sub.set_message_handler(self._handle_message)
-        self._destination = destination
-        self._client = client
-        self._message_handler = lambda arg: None
-
-    def setMessageHandler(self, handler):
-        self._message_handler = handler
-
-    def send(self, data):
-        headers = {
-            "content-type": "application/json",
-            "reply-to": self._sub.destination,
-        }
-        self._client.send(
-            data,
-            self._destination,
-            headers,
-        )
-
-    def _handle_message(self, sub, frame):
-        self._message_handler((self, frame.body))
-
-    def close(self):
-        self._sub.unsubscribe()
-
-
 def connect(address):
     sock = socket.create_connection(address)
     reactor = get_reactor()
     stomp_client = reactor.createClient(sock)
-    subscription = stomp_client.subscribe(sr._DEFAULT_RESPONSE_DESTINATIOM)
+    subscription = stomp_client.subscribe(
+        _DEFAULT_RESPONSE_DESTINATION,
+        sub_id="__vdsm_fake_broker__",
+    )
     client = yjrpc.JsonRpcClient(
         ClientRpcTransportAdapter(
             subscription,
-            sr._DEFAULT_REQUEST_DESTINATION,
+            _DEFAULT_REQUEST_DESTINATION,
             stomp_client,
         )
     )
     return client
 
 
-BROKER_ADDRESS = ("127.0.0.1", 5445)
+BROKER_ADDRESS = ("127.0.0.1", 54321)
 
-server = dummy_server(BROKER_ADDRESS)
+# server = dummy_server(BROKER_ADDRESS)
 
 time.sleep(2)
 
 client = connect(BROKER_ADDRESS)
-client.callMethod("echo", ["123"], 1)
+client.callMethod("Host.ping", [], 1)
+client.callMethod("Host.ping", [], 1)
+client.callMethod("Host.ping", [], 1)
diff --git a/lib/vdsm/config.py.in b/lib/vdsm/config.py.in
index 94c6782..fb0150b 100644
--- a/lib/vdsm/config.py.in
+++ b/lib/vdsm/config.py.in
@@ -325,6 +325,20 @@
 
         ('guests_gateway_ip', '', None),
 
+        ('broker_address', '127.0.0.1',
+            'Address where the broker is listening at. Use an empty string '
+            'for none'),
+
+        ('broker_port', '5445',
+            'Port where the broker is listening at.'),
+
+        ('request_queues',
+            'jms.topic.vdsm_requests, jms.topic.vdsm_irs_requests',
+            'Queues to subscribe to for RPC requests'),
+
+        ('topic_prefix', 'jms.topic.',
+            'Prefix to use for events'),
+
     ]),
 
     # Section: [devel]
diff --git a/lib/vdsm/sslutils.py b/lib/vdsm/sslutils.py
index 74555ee..6568842 100644
--- a/lib/vdsm/sslutils.py
+++ b/lib/vdsm/sslutils.py
@@ -35,7 +35,7 @@
 class SSLSocket(object):
     def __init__(self, connection):
         self.connection = connection
-        self._data = None
+        self._data = ""
 
     def gettimeout(self):
         return self.connection.socket.gettimeout()
@@ -57,12 +57,14 @@
     def read(self, size=4096, flag=None):
         result = None
         if flag == socket.MSG_PEEK:
-            self._data = self.connection.read(size)
+            bytes_left = size - len(self._data)
+            if bytes_left > 0:
+                self._data += self.connection.read(bytes_left)
             result = self._data
         else:
             if self._data:
                 result = self._data
-                self._data = None
+                self._data = ""
             else:
                 result = self.connection.read(size)
         return result
diff --git a/lib/yajsonrpc/betterAsyncore.py b/lib/yajsonrpc/betterAsyncore.py
index af88877..8656df8 100644
--- a/lib/yajsonrpc/betterAsyncore.py
+++ b/lib/yajsonrpc/betterAsyncore.py
@@ -18,14 +18,17 @@
 # while enabling compositing instead of inheritance.
 import asyncore
 import socket
-import types
 from errno import EWOULDBLOCK
+import types
 
 from vdsm.infra.eventfd import EventFD
 
 
 class Dispatcher(asyncore.dispatcher):
     def __init__(self, impl=None, sock=None, map=None):
+        # This has to be done before the super initialization because
+        # dispatcher implements __getattr__.
+        self.__impl = None
         asyncore.dispatcher.__init__(self, sock=sock, map=map)
         if impl is not None:
             self.switch_implementation(impl)
@@ -82,8 +85,9 @@
 
         Note that this value is a reccomendation only.
         """
+        impl = self.__impl
         return getattr(
-            self.__impl,
+            impl,
             "next_check_interval",
             lambda d: None
         )(self)
@@ -100,7 +104,7 @@
         self.handle_read()
 
         if hasattr(self.socket, "pending"):
-            while self.socket.pending() > 0:
+            while self.socket.pending() > 0 and self.connected:
                 self.handle_read()
 
     def recv(self, buffer_size):
@@ -115,7 +119,9 @@
                 return data
         except socket.error, why:
             # winsock sometimes raises ENOTCONN
-            if why.args[0] in asyncore._DISCONNECTED:
+            if why.args[0] == EWOULDBLOCK:
+                return None
+            elif why.args[0] in asyncore._DISCONNECTED:
                 self.handle_close()
                 return ''
             else:
diff --git a/lib/yajsonrpc/stomp.py b/lib/yajsonrpc/stomp.py
index a3d63f3..cf24a7f 100644
--- a/lib/yajsonrpc/stomp.py
+++ b/lib/yajsonrpc/stomp.py
@@ -16,11 +16,12 @@
 import logging
 import os
 import socket
-from threading import Timer
 from uuid import uuid4
 from collections import deque
 
 import re
+
+FAKE_SUB_ID = "__vdsm_fake_broker__"
 
 _RE_ESCAPE_SEQUENCE = re.compile(r"\\(.)")
 
@@ -75,7 +76,10 @@
 
 class StompError(RuntimeError):
     def __init__(self, frame):
-        RuntimeError.__init__(self, frame.body)
+        self.frame = frame
+        super(RuntimeError, self).__init__(
+            self.body,
+        )
 
 
 class _HeartBeatFrame(object):
@@ -441,10 +445,9 @@
     def _process_message(self, frame, dispatcher):
         sub_id = frame.headers.get(Headers.SUBSCRIPTION)
         if sub_id is None:
-            self.log.warning(
-                "Got message without a subscription"
-            )
-            return
+            # This is actually an error, but because of the fake broker we
+            # injects a fake sub id
+            sub_id = "__vdsm_fake_broker__"
 
         sub = self._subscriptions.get(sub_id)
         if sub is None:
@@ -464,7 +467,8 @@
 
     def send(self, destination, data="", headers=None):
         final_headers = {"destination": destination}
-        final_headers.update(headers)
+        if headers is not None:
+            final_headers.update(headers)
         self.queue_frame(Frame(
             Command.SEND,
             final_headers,
@@ -552,8 +556,3 @@
                       {"id": str(subid)})
         client.put(frame)
         self._valid = False
-
-    def __del__(self):
-        # Using a timer because unsubscribe action might involve taking locks.
-        if self._valid:
-            Timer(0, self.unsubscribe)
diff --git a/lib/yajsonrpc/stompReactor.py b/lib/yajsonrpc/stompReactor.py
index 5a1b595..44fbcbb 100644
--- a/lib/yajsonrpc/stompReactor.py
+++ b/lib/yajsonrpc/stompReactor.py
@@ -20,15 +20,17 @@
 
 from betterAsyncore import Dispatcher, Reactor
 from vdsm.sslutils import SSLSocket
+from . import (
+    JsonRpcClient,
+    JsonRpcServer,
+)
 
 _STATE_LEN = "Waiting for message length"
 _STATE_MSG = "Waiting for message"
 
 
-_DEFAULT_RESPONSE_DESTINATIOM = "jms.topic.vdsm_legacy_responses"
+_DEFAULT_RESPONSE_DESTINATION = "jms.topic.vdsm_legacy_responses"
 _DEFAULT_REQUEST_DESTINATION = "jms.topic.vdsm_legacy_requests"
-
-_FAKE_SUB_ID = "__vdsm_fake_broker__"
 
 
 def parseHeartBeatHeader(v):
@@ -177,13 +179,16 @@
     def check_read(self):
             self._stompConn._dispatcher.handle_read_event()
 
-    def send(self, message):
+    def send(
+        self,
+        message,
+        destination=_DEFAULT_RESPONSE_DESTINATION,
+    ):
         self.log.debug("Sending response")
         res = stomp.Frame(
             stomp.Command.MESSAGE,
             {
-                stomp.Headers.DESTINATION: _DEFAULT_RESPONSE_DESTINATIOM,
-                stomp.Headers.SUBSCRIPTION: _FAKE_SUB_ID,
+                stomp.Headers.DESTINATION: destination,
                 stomp.Headers.CONTENT_TYPE: "application/json",
             },
             message
@@ -325,3 +330,101 @@
         dispatcher.del_channel()
         self.json_binding.add_socket(self._reactor, client_socket)
         self.log.debug("Stomp detected from %s", socket_address)
+
+
+class ServerRpcContextAdapter(object):
+    @classmethod
+    def subscription_handler(cls, server):
+        def handler(sub, frame):
+            server.queueRequest(
+                (
+                    ServerRpcContextAdapter(sub.client, frame),
+                    frame.body
+                )
+            )
+
+        return handler
+
+    def __init__(self, client, request_frame):
+        self._client = client
+        self._reply_to = request_frame.headers.get('reply-to', None)
+
+    def get_local_address(self, *args, **kwargs):
+        return ""
+
+    def send(self, data):
+        if self._reply_to is None:
+            return
+
+        self._client.send(
+            self._reply_to,
+            data,
+            {
+                "content-type": "application/json",
+            }
+        )
+
+
+class ClientRpcTransportAdapter(object):
+    def __init__(self, sub, destination, client):
+        self._sub = sub
+        sub.set_message_handler(self._handle_message)
+        self._destination = destination
+        self._client = client
+        self._message_handler = lambda arg: None
+
+    def setMessageHandler(self, handler):
+        self._message_handler = handler
+
+    def send(self, data):
+        headers = {
+            "content-type": "application/json",
+            "reply-to": self._sub.destination,
+        }
+        self._client.send(
+            data,
+            self._destination,
+            headers,
+        )
+
+    def _handle_message(self, sub, frame):
+        self._message_handler((self, frame.body))
+
+    def close(self):
+        self._sub.unsubscribe()
+
+
+def StompRpcClient(
+    stomp_client,
+    request_queue,
+    response_queue
+):
+    sub_id = None
+    if request_queue == stomp.FAKE_SUB_ID:
+        sub_id = stomp.FAKE_SUB_ID
+
+    return JsonRpcClient(
+        ClientRpcTransportAdapter(
+            stomp_client.subscribe(response_queue, sub_id=sub_id),
+            request_queue,
+            stomp_client,
+        )
+    )
+
+
+def StompRpcServer(
+    bridge,
+    stomp_client,
+    request_queue,
+):
+    server = JsonRpcServer(
+        bridge,
+    )
+
+    sub = stomp_client.subscribe(
+        request_queue,
+        message_handler=ServerRpcContextAdapter.subscription_handler(server)
+    )
+
+    server._sub_ = sub
+    return server
diff --git a/tests/jsonRpcHelper.py b/tests/jsonRpcHelper.py
index b3eb1fd..2fb7cfb 100644
--- a/tests/jsonRpcHelper.py
+++ b/tests/jsonRpcHelper.py
@@ -27,9 +27,17 @@
 from itertools import product
 from M2Crypto import SSL
 from rpc.BindingXMLRPC import BindingXMLRPC, XmlDetector
-from yajsonrpc.stompReactor import StompDetector
+from yajsonrpc.betterAsyncore import (
+    Reactor,
+)
+from yajsonrpc.stompReactor import (
+    StompDetector,
+    StompRpcClient,
+)
+from yajsonrpc.stomp import (
+    FAKE_SUB_ID,
+)
 from protocoldetector import MultiProtocolAcceptor
-from yajsonrpc import JsonRpcClient
 from rpc.BindingJsonRpc import BindingJsonRpc
 from sslhelper import DEAFAULT_SSL_CONTEXT
 
@@ -60,7 +68,13 @@
 @contextmanager
 def constructAcceptor(log, ssl, jsonBridge):
     sslctx = DEAFAULT_SSL_CONTEXT if ssl else None
-    acceptor = MultiProtocolAcceptor("127.0.0.1", 0, sslctx)
+    reactor = Reactor()
+    acceptor = MultiProtocolAcceptor(
+        reactor,
+        "127.0.0.1",
+        0,
+        sslctx,
+    )
     cif = FakeClientIf()
 
     xml_binding = BindingXMLRPC(cif, cif.log)
@@ -73,8 +87,10 @@
     stompDetector = StompDetector(json_binding)
     acceptor.add_detector(stompDetector)
 
-    thread = threading.Thread(target=acceptor.serve_forever,
-                              name='Detector thread')
+    thread = threading.Thread(
+        target=reactor.process_requests,
+        name='Detector thread',
+    )
     thread.setDaemon(True)
     thread.start()
 
@@ -103,9 +119,12 @@
                     reactor = handler._reactor
 
         if not client:
-            client = lambda client_socket: (
-                JsonRpcClient(reactor.createClient(client_socket))
-            )
+            def client(client_socket):
+                return StompRpcClient(
+                    reactor.createClient(client_socket),
+                    FAKE_SUB_ID,
+                    FAKE_SUB_ID,
+                )
 
         def clientFactory():
             return client(create_socket(
diff --git a/tests/jsonRpcTests.py b/tests/jsonRpcTests.py
index 230b61a..3254393 100644
--- a/tests/jsonRpcTests.py
+++ b/tests/jsonRpcTests.py
@@ -83,12 +83,7 @@
     @contextmanager
     def _client(self, clientFactory):
             client = clientFactory()
-            client.setTimeout(CALL_TIMEOUT)
-            client.connect()
-            try:
-                yield client
-            finally:
-                client.close()
+            yield client
 
     @MonkeyPatch(clientIF, 'getInstance', getInstance)
     @permutations(PERMUTATIONS)
diff --git a/tests/miscTests.py b/tests/miscTests.py
index 1fa572e..f98f539 100644
--- a/tests/miscTests.py
+++ b/tests/miscTests.py
@@ -991,6 +991,7 @@
         openFds = openFdNum()
         self.testStdOut()
         gc.collect()
+        print gc.garbage
         self.assertEquals(len(gc.garbage), 0)
         self.assertEquals(openFdNum(), openFds)
 
diff --git a/tests/protocoldetectorTests.py b/tests/protocoldetectorTests.py
index ea64619..c64a1e2 100644
--- a/tests/protocoldetectorTests.py
+++ b/tests/protocoldetectorTests.py
@@ -23,9 +23,12 @@
 import ssl
 import threading
 import time
-from contextlib import contextmanager, closing
+from contextlib import contextmanager
 
 import protocoldetector
+from yajsonrpc.betterAsyncore import (
+    Reactor,
+)
 
 from vdsm import sslutils
 from sslhelper import KEY_FILE, CRT_FILE
@@ -59,12 +62,13 @@
 
         def run():
             try:
-                assert client_socket.gettimeout() is None
-                rfile = client_socket.makefile('rb', -1)
-                with closing(rfile):
-                    request = rfile.readline()
-                    response = self.response(request)
-                    client_socket.sendall(response)
+                request = ""
+                while "\n" not in request:
+                    request += dispatcher.recv(1024)
+
+                response = self.response(request)
+                client_socket.setblocking(1)
+                client_socket.sendall(response)
             finally:
                 client_socket.shutdown(socket.SHUT_RDWR)
                 client_socket.close()
@@ -182,14 +186,14 @@
 
     def check_slow_client(self, use_ssl):
         with self.connect(use_ssl) as client:
-            time.sleep(self.acceptor.CLEANUP_INTERVAL - self.GRACETIME)
+            time.sleep(self.acceptor._handshake_timeout - self.GRACETIME)
             data = "echo let me in\n"
             client.sendall(data)
             self.assertEqual(client.recv(self.BUFSIZE), data)
 
     def check_very_slow_client(self, use_ssl):
         with self.connect(use_ssl) as client:
-            time.sleep(self.acceptor.CLEANUP_INTERVAL * 2 + self.GRACETIME)
+            time.sleep(self.acceptor._handshake_timeout * 2 + self.GRACETIME)
             client.sendall("echo too slow probably\n")
             self.check_disconnected(client)
 
@@ -204,12 +208,18 @@
     # Helpers
 
     def start_acceptor(self, use_ssl):
-        self.acceptor = TestingAcceptor(
-            '127.0.0.1', 0, sslctx=self.SSLCTX if use_ssl else None)
+        self.reactor = Reactor()
+        self.acceptor = protocoldetector.MultiProtocolAcceptor(
+            self.reactor,
+            '127.0.0.1',
+            0,
+            sslctx=self.SSLCTX if use_ssl else None
+        )
+        self.acceptor._handshake_timeout = 1
         self.acceptor.add_detector(Echo())
         self.acceptor.add_detector(Uppercase())
-        self.acceptor_address = self.acceptor._socket.getsockname()
-        t = threading.Thread(target=self.acceptor.serve_forever)
+        self.acceptor_address = self.acceptor._acceptor.socket.getsockname()
+        t = threading.Thread(target=self.reactor.process_requests)
         t.deamon = True
         t.start()
 
diff --git a/vdsm.spec.in b/vdsm.spec.in
index 0740530..845ddf0 100644
--- a/vdsm.spec.in
+++ b/vdsm.spec.in
@@ -1606,6 +1606,7 @@
 %{python_sitelib}/yajsonrpc/stompReactor.py*
 
 %files infra
+%{python_sitelib}/%{vdsm_name}/infra/eventfd/__init__.py*
 %{python_sitelib}/%{vdsm_name}/infra/filecontrol/__init__.py*
 %{python_sitelib}/%{vdsm_name}/infra/sigutils/__init__.py*
 %{python_sitelib}/%{vdsm_name}/infra/zombiereaper/__init__.py*
diff --git a/vdsm/API.py b/vdsm/API.py
index 42c471e..fdf2c62 100644
--- a/vdsm/API.py
+++ b/vdsm/API.py
@@ -1267,6 +1267,9 @@
     def ping(self):
         "Ping the server. Useful for tests"
         updateTimestamp()
+        n = self._cif.create_notification("vdsm.ping")
+        n.emit(pinged=True)
+
         return {'status': doneCode}
 
     def getRoute(self, ip):
@@ -1286,7 +1289,6 @@
         c = caps.get()
         c['netConfigDirty'] = str(self._cif._netConfigDirty)
         c = hooks.after_get_caps(c)
-
         return {'status': doneCode, 'info': c}
 
     def getHardwareInfo(self):
diff --git a/vdsm/clientIF.py b/vdsm/clientIF.py
index d29f8c2..46e7f41 100644
--- a/vdsm/clientIF.py
+++ b/vdsm/clientIF.py
@@ -23,6 +23,8 @@
 import time
 import threading
 import uuid
+import socket
+import json
 from functools import partial
 from weakref import proxy
 
@@ -54,6 +56,35 @@
     _glusterEnabled = True
 except ImportError:
     _glusterEnabled = False
+
+from yajsonrpc.betterAsyncore import (
+    Reactor,
+)
+
+from yajsonrpc.stompReactor import (
+    StompClient,
+    StompRpcServer,
+)
+
+
+class Notification(object):
+    def __init__(self, event_id, cb):
+        self._event_id = event_id
+        self._cb = cb
+
+    def emit(self, **kwargs):
+        notification = json.dumps(
+            {
+                'jsonrpc': '2.0',
+                'method': self._event_id,
+                'params': kwargs,
+            }
+        )
+
+        self._cb(
+            notification,
+            self._event_id
+        )
 
 
 class clientIF(object):
@@ -87,6 +118,7 @@
         self._generationID = str(uuid.uuid4())
         self.mom = None
         self.bindings = {}
+        self._clients = []
         if _glusterEnabled:
             self.gluster = gapi.GlusterApi(self, log)
         else:
@@ -109,9 +141,11 @@
 
             host = config.get('addresses', 'management_ip')
             port = config.getint('addresses', 'management_port')
+            self._createReactor()
             self._createAcceptor(host, port)
             self._prepareXMLRPCBinding()
             self._prepareJSONRPCBinding()
+            self._connectToBroker()
         except:
             self.log.error('failed to init clientIF, '
                            'shutting down storage dispatcher')
@@ -124,6 +158,21 @@
     @property
     def ready(self):
         return (self.irs is None or self.irs.ready) and not self._recovery
+
+    def create_notification(self, event_id):
+        prefix = config.get('addresses', 'topic_prefix')
+        return Notification(
+            prefix + event_id,
+            self._send_notification,
+        )
+
+    def _send_notification(self, message, destination):
+        if self._broker_client:
+            self._broker_client.send(message, destination)
+
+        self.log.warn("@@@@@")
+        for client in self._clients[:]:
+            client.send(message, destination)
 
     def contEIOVms(self, sdUUID, isDomainStateValid):
         # This method is called everytime the onDomainStateChange
@@ -158,10 +207,46 @@
                     cls._instance = clientIF(irs, log)
         return cls._instance
 
+    def _createReactor(self):
+        self._reactor = Reactor()
+
     def _createAcceptor(self, host, port):
         sslctx = self._createSSLContext()
 
-        self._acceptor = MultiProtocolAcceptor(host, port, sslctx)
+        self._acceptor = MultiProtocolAcceptor(
+            self._reactor,
+            host,
+            port,
+            sslctx
+        )
+
+    def _connectToBroker(self):
+        broker_address = config.get('addresses', 'broker_address')
+        broker_port = config.getint('addresses', 'broker_port')
+        request_queues = config.get('addresses', 'request_queues')
+        self._broker_client = None
+        if broker_address:
+            sslctx = self._createSSLContext()
+            sock = socket.socket()
+            sock.connect((broker_address, broker_port))
+            if sslctx:
+                sock = sslctx.wrapSocket(sock)
+
+            stomp_client = StompClient(sock, self._reactor)
+            subs = []
+            for destination in request_queues.split(","):
+                destination = destination.strip()
+                if destination:
+                    subs.append(
+                        StompRpcServer(
+                            self.bindings['jsonrpc'].server,
+                            stomp_client,
+                            destination,
+                        )
+                    )
+
+            self._subscriptions = subs
+            self._broker_client = stomp_client
 
     def _createSSLContext(self):
         sslctx = None
@@ -200,7 +285,7 @@
                               'Please make sure it is installed.')
             else:
                 bridge = Bridge.DynamicBridge()
-                json_binding = BindingJsonRpc(bridge)
+                json_binding = BindingJsonRpc(bridge, self._clients)
                 self.bindings['jsonrpc'] = json_binding
                 stomp_detector = StompDetector(json_binding)
                 self._acceptor.add_detector(stomp_detector)
@@ -243,8 +328,10 @@
     def start(self):
         for binding in self.bindings.values():
             binding.start()
-        self.thread = threading.Thread(target=self._acceptor.serve_forever,
-                                       name='Detector thread')
+        self.thread = threading.Thread(
+            target=self._reactor.process_requests,
+            name='Reactor thread'
+        )
         self.thread.setDaemon(True)
         self.thread.start()
 
diff --git a/vdsm/protocoldetector.py b/vdsm/protocoldetector.py
index 76d5a00..20a0b41 100644
--- a/vdsm/protocoldetector.py
+++ b/vdsm/protocoldetector.py
@@ -23,11 +23,9 @@
 
 from M2Crypto import SSL
 
-from vdsm.utils import traceback
 import vdsm.infra.filecontrol as filecontrol
 from yajsonrpc.betterAsyncore import (
     Dispatcher,
-    Reactor,
 )
 
 from vdsm.utils import (
@@ -48,8 +46,6 @@
     filecontrol.set_close_on_exec(server_socket.fileno())
     server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
     server_socket.bind(addr[0][4])
-
-    server_socket.setblocking(0)
     return server_socket
 
 
@@ -75,6 +71,7 @@
         except socket.error:
             pass
 
+        client.setblocking(0)
         self.log.info("Accepting connection from %s:%d", *client.getpeername())
         self._dispatcher_factory(client)
 
@@ -97,8 +94,16 @@
         sock = dispatcher.socket
         try:
             data = sock.recv(self._required_size, socket.MSG_PEEK)
-        except socket.error:
+        except socket.error, why:
+            if why.args[0] == socket.EWOULDBLOCK:
+                return
+
             dispatcher.handle_error()
+            return
+
+        if monotonic_time() > self._give_up_at:
+            self.log.debug("Timed out while waiting for data")
+            dispatcher.close()
             return
 
         if len(data) < self._required_size:
@@ -210,13 +215,15 @@
 
     def __init__(
         self,
+        reactor,
         host,
         port,
         sslctx=None,
         ssl_hanshake_timeout=SSL_HANDSHAKE_TIMEOUT,
     ):
         self._sslctx = sslctx
-        self._reactor = Reactor()
+        self._reactor = reactor
+
         sock = _create_socket(host, port)
         self._host, self._port = sock.getsockname()
         self.log.info("Listening at %s:%d", self._host, self._port)
@@ -261,13 +268,6 @@
         )
 
         return dispatcher
-
-    @traceback(on=log.name)
-    def serve_forever(self):
-        self.log.debug("Running")
-        required_size = max(h.REQUIRED_SIZE for h in self._handlers)
-        self.log.debug("Using required_size=%d", required_size)
-        self._reactor.process_requests()
 
     def add_detector(self, detector):
         self.log.debug("Adding detector %s", detector)
diff --git a/vdsm/rpc/BindingJsonRpc.py b/vdsm/rpc/BindingJsonRpc.py
index a5b5b4a..eeea8ba 100644
--- a/vdsm/rpc/BindingJsonRpc.py
+++ b/vdsm/rpc/BindingJsonRpc.py
@@ -29,14 +29,21 @@
 class BindingJsonRpc(object):
     log = logging.getLogger('BindingJsonRpc')
 
-    def __init__(self, bridge):
+    def __init__(self, bridge, clients=None):
         self._server = JsonRpcServer(bridge, _simpleThreadFactory)
+        self._clients = clients
         self._reactors = []
+
+    @property
+    def server(self):
+        return self._server
 
     def add_socket(self, reactor, client_socket):
         reactor.createListener(client_socket, self._onAccept)
 
     def _onAccept(self, listener, client):
+        if self._clients is not None:
+            self._clients.append(client)
         client.setMessageHandler(self._server.queueRequest)
 
     def createStompReactor(self):


-- 
To view, visit http://gerrit.ovirt.org/38069
To unsubscribe, visit http://gerrit.ovirt.org/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Id27b5ca1773139932eb5cb16921d5abec4991c5e
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Saggi Mizrahi <[email protected]>
_______________________________________________
vdsm-patches mailing list
[email protected]
https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches

Reply via email to