Saggi Mizrahi has uploaded a new change for review. Change subject: [WIP] AMQP support ......................................................................
[WIP] AMQP support Change-Id: I850005e7375472bbf6238fd38b10d8f1b5e2a191 Signed-off-by: Saggi Mizrahi <[email protected]> --- M tests/jsonRpcTests.py A vdsm_api/jsonrpc/protonReactor.py 2 files changed, 380 insertions(+), 1 deletion(-) git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/87/9987/1 diff --git a/tests/jsonRpcTests.py b/tests/jsonRpcTests.py index 85b98a6..dd9148f 100644 --- a/tests/jsonRpcTests.py +++ b/tests/jsonRpcTests.py @@ -22,12 +22,22 @@ from contextlib import contextmanager from functools import partial from contextlib import closing +import uuid + +from nose.plugins.skip import SkipTest from testrunner import VdsmTestCase as TestCaseBase, \ expandPermutations, \ permutations from jsonrpc import tcpReactor + +protonReactor = None +try: + import proton + from jsonrpc import protonReactor +except ImportError: + pass PORT_RANGE = xrange(49152, 65535) @@ -65,7 +75,22 @@ reactor.stop() -REACTOR_CONSTRUCTORS = {"tcp": _tcpServerConstructor} +@contextmanager +def _protonServerConstructor(messageHandler): + if protonReactor is None: + raise SkipTest("qpid-proton python bindings are not installed") + + port = _getFreePort() + serverAddress = "amqp://127.0.0.1:%d" % (port,) + reactor = protonReactor.ProtonReactor(("127.0.0.1", port), messageHandler) + + try: + yield reactor, partial(ProtonReactorClient, serverAddress) + finally: + reactor.stop() + +REACTOR_CONSTRUCTORS = {"tcp": _tcpServerConstructor, + "proton": _protonServerConstructor} REACTOR_TYPE_PERMUTATIONS = [[r] for r in REACTOR_CONSTRUCTORS.iterkeys()] @@ -103,6 +128,58 @@ self.sock.close() +class ProtonReactorClient(object): + def __init__(self, serverAddress): + self._serverAddress = serverAddress + self._msngr = proton.Messenger("client-%s" % str(uuid.uuid4())) + + def connect(self): + self._msngr.start() + + def sendMessage(self, data, timeout=None): + if timeout is None: + timeout = -1 + else: + timeout *= 1000 + + msg = proton.Message() + msg.address = self._serverAddress + msg.body = unicode(data) + self._msngr.timeout = timeout + t = self._msngr.put(msg) + try: + self._msngr.send() + except: + self._msngr.settle(t) + raise + + def recvMessage(self, timeout=None): + if timeout is None: + timeout = -1 + else: + timeout *= 1000 + + self._msngr.timeout = timeout + self._msngr.recv(10) + + if not self._msngr.incoming: + raise socket.timeout() + + if self._msngr.incoming > 1: + raise Exception("Got %d repsones instead of 1" % + self._msngr.incoming) + + msg = proton.Message() + self._msngr.get(msg) + return msg.body + + def close(self): + try: + self._msngr.stop() + except: + pass + + @expandPermutations class ReactorTests(TestCaseBase): @permutations(REACTOR_TYPE_PERMUTATIONS) diff --git a/vdsm_api/jsonrpc/protonReactor.py b/vdsm_api/jsonrpc/protonReactor.py new file mode 100644 index 0000000..2138427 --- /dev/null +++ b/vdsm_api/jsonrpc/protonReactor.py @@ -0,0 +1,302 @@ +# Copyright (C) 2012 Saggi Mizrahi, Red Hat Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public +# License along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +import threading +import logging +import uuid +from Queue import Queue, Empty + +import proton + +FAILED = 0 +CONNECTED = 1 +AUTHENTICATING = 2 + + +class ProtonContext(object): + log = logging.getLogger("jsonrpc.ProtonContext") + + def __init__(self, reactor, msg): + self._reactor = reactor + self._msg = msg + + @property + def data(self): + return self._msg.body + + def sendReply(self, data): + self._reactor.sendReply(self, data) + + +class ProtonReactor(object): + log = logging.getLogger("jsonrpc.ProtonReactor") + + def __init__(self, address, messageHandler): + self._messageHandler = messageHandler + host, port = address + self.host = host + self.port = port + + self._isRunning = False + + self._hasData = threading.Event() + + self.driver = proton.pn_driver() + self.listener = proton.pn_listener(self.driver, self.host, + str(self.port), None) + if self.listener is None: + raise RuntimeError("Could not listen on %s:%s" % (self.host, + self.port)) + + self._outQueue = Queue() + self._outMesseges = [] + + def _convertTimeout(self, timeout): + """ + Timeouts in puthon are usually floats representing seconds, this + converts the conventional python timeout to proton compatible + millisecond timeouts + """ + + if timeout is None: + return -1 + + return int(timeout * 1000) + + def _waitDriverEvent(self, timeout=None): + self.log.debug("Waiting for events") + timeout = self._convertTimeout(timeout) + proton.pn_driver_wait(self.driver, timeout) + + def _acceptConnectionRequests(self): + l = proton.pn_driver_listener(self.driver) + while l: + self.log.debug("Accepting Connection.") + cxtr = proton.pn_listener_accept(l) + proton.pn_connector_set_context(cxtr, AUTHENTICATING) + + l = proton.pn_driver_listener(self.driver) + + def _authenticateConnector(self, cxtr): + self.log.debug("Authenticating...") + sasl = proton.pn_connector_sasl(cxtr) + state = proton.pn_sasl_state(sasl) + while state == proton.PN_SASL_CONF or state == proton.PN_SASL_STEP: + if state == proton.PN_SASL_CONF: + self.log.debug("Authenticating-CONF...") + proton.pn_sasl_mechanisms(sasl, "ANONYMOUS") + proton.pn_sasl_server(sasl) + elif state == proton.PN_SASL_STEP: + self.log.debug("Authenticating-STEP...") + mech = proton.pn_sasl_remote_mechanisms(sasl) + if mech == "ANONYMOUS": + proton.pn_sasl_done(sasl, proton.PN_SASL_OK) + else: + proton.pn_sasl_done(sasl, proton.PN_SASL_AUTH) + state = proton.pn_sasl_state(sasl) + + if state == proton.PN_SASL_PASS: + proton.pn_connector_set_connection(cxtr, proton.pn_connection()) + proton.pn_connector_set_context(cxtr, CONNECTED) + self.log.debug("Authentication-PASSED") + elif state == proton.PN_SASL_FAIL: + proton.pn_connector_set_context(cxtr, FAILED) + self.log.debug("Authentication-FAILED") + else: + self.log.debug("Authentication-PENDING") + + def _processConnectors(self): + cxtr = proton.pn_driver_connector(self.driver) + while cxtr: + self.log.debug("Process Connector") + + # releaes any connector that has been closed + if proton.pn_connector_closed(cxtr): + self.log.debug("Closing connector") + proton.pn_connector_free(cxtr) + else: + proton.pn_connector_process(cxtr) + + state = proton.pn_connector_context(cxtr) + if state == AUTHENTICATING: + self._authenticateConnector(cxtr) + elif state == CONNECTED: + self._serviceConnector(cxtr) + else: + self.log.warning("Unknown Connection state '%s'" % state) + + proton.pn_connector_process(cxtr) + + cxtr = proton.pn_driver_connector(self.driver) + + def _initConnection(self, conn): + if proton.pn_connection_state(conn) & proton.PN_LOCAL_UNINIT: + self.log.debug("Connection Opened.") + proton.pn_connection_open(conn) + + def _openPendingSessions(self, conn): + ssn = proton.pn_session_head(conn, proton.PN_LOCAL_UNINIT) + while ssn: + proton.pn_session_open(ssn) + self.log.debug("Session Opened.") + ssn = proton.pn_session_next(ssn, proton.PN_LOCAL_UNINIT) + + def _openLinks(self, conn): + link = proton.pn_link_head(conn, proton.PN_LOCAL_UNINIT) + while link: + self.log.debug("Opening Link") + proton.pn_terminus_copy(proton.pn_link_source(link), + proton.pn_link_remote_source(link)) + proton.pn_terminus_copy(proton.pn_link_target(link), + proton.pn_link_remote_target(link)) + + if proton.pn_link_is_sender(link): + linkSource = proton.pn_link_source(link) + addr = proton.pn_terminus_get_address(linkSource) + + self.log.debug("Opening Link to send data to '%s'" % addr) + proton.pn_delivery(link, + "VDSM-delivery-%d" % str(uuid.uuid4())) + else: + linkTarget = proton.pn_link_target(link) + addr = proton.pn_terminus_get_address(linkTarget) + self.log.debug("Opening Link to recv messages from '%s'" % + addr) + proton.pn_link_flow(link, 1) + + proton.pn_link_open(link) + link = proton.pn_link_next(link, proton.PN_LOCAL_UNINIT) + + def _distributeMessages(self): + while True: + try: + msg = self._outQueue.get_nowait() + except Empty: + break + + self._outMesseges.append(msg) + + def _processDeliveries(self, conn): + delivery = proton.pn_work_head(conn) + while delivery: + self.log.debug("Process delivery %s." % + proton.pn_delivery_tag(delivery)) + + if proton.pn_delivery_readable(delivery): + self._processIncoming(delivery) + elif proton.pn_delivery_writable(delivery): + self._processOutgoing(delivery) + + delivery = proton.pn_work_next(delivery) + + def _cleanLinks(self, conn): + link = proton.pn_link_head(conn, (proton.PN_LOCAL_ACTIVE | + proton.PN_REMOTE_CLOSED)) + while link: + proton.pn_link_close(link) + link = proton.pn_link_next(link, (proton.PN_LOCAL_ACTIVE | + proton.PN_REMOTE_CLOSED)) + + def _cleanSessions(self, conn): + ssn = proton.pn_session_head(conn, (proton.PN_LOCAL_ACTIVE | + proton.PN_REMOTE_CLOSED)) + while ssn: + proton.pn_session_close(ssn) + self.log.debug("Session Closed") + ssn = proton.pn_session_next(ssn, (proton.PN_LOCAL_ACTIVE | + proton.PN_REMOTE_CLOSED)) + + def _teardownConnection(self, conn): + if proton.pn_connection_state(conn) == ((proton.PN_LOCAL_ACTIVE | + proton.PN_REMOTE_CLOSED)): + proton.pn_connection_close(conn) + + def _serviceConnector(self, cxtr): + self.log.debug("Service Connector") + conn = proton.pn_connector_connection(cxtr) + self._initConnection(conn) + self._openPendingSessions(conn) + self._openLinks(conn) + self._processDeliveries(conn) + self._cleanLinks(conn) + + conn = proton.pn_connector_connection(cxtr) + + def _processIncoming(self, delivery): + link = proton.pn_delivery_link(delivery) + msg = [] + rc, buff = proton.pn_link_recv(link, 1024) + while rc >= 0: + msg.append(buff) + rc, buff = proton.pn_link_recv(link, 1024) + + msg = ''.join(msg) + + proton.pn_delivery_update(delivery, proton.PN_ACCEPTED) + msgObj = proton.Message() + msgObj.decode(msg) + self._messageHandler.handleMessage(ProtonContext(self, msgObj)) + + proton.pn_delivery_settle(delivery) + proton.pn_link_advance(link) + + # if more credit is needed, grant it + if proton.pn_link_credit(link) == 0: + proton.pn_link_flow(link, 1) + + def _processOutgoing(self, delivery): + link = proton.pn_delivery_link(delivery) + mbox = proton.pn_terminus_get_address(proton.pn_link_source(link)) + print mbox + self.log.debug("Request for Mailbox=%s" % str(mbox)) + if mbox in self.mailboxes and self.mailboxes[mbox]: + msg = self.queuedMessages[mbox].pop(0) + self.log.debug("Fetched message %s" % str(msg)) + msg = "%s=%s" % (mbox, msg) + else: + print("Warning: mailbox %s is empty, sending empty message." % mbox) + msg = "<EMPTY>" + sent = pn_link_send(link, msg) + assert(sent == len(msg)) + self.log.debug("Msg Sent %d" % sent); + + # if the link can accept more ???RAFI, is that correct????, grant + # another delivery + if pn_link_advance(link): + pn_delivery(link, "server-delivery-%d" % self.counter) + self.counter += 1 + + # do not settle the delivery now - wait until the remote sets the disposition. + + def serve_forever(self): + self._isRunning = True + while self._isRunning: + self._waitDriverEvent() + self._acceptConnectionRequests() + self._processConnectors() + + def sendReply(self, ctx, data): + msg = proton.Message() + msg.address = ctx._msg.reply_to + print msg.address + msg.body = data + self._outQueue.put_nowait(msg) + + def stop(self): + #TODO: proper cleanup, release all connections + self._isRunning = False + #proton.pn_listener_close(self.listener) + #proton.pn_driver_free(self.driver) + self._isRunning = False -- To view, visit http://gerrit.ovirt.org/9987 To unsubscribe, visit http://gerrit.ovirt.org/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I850005e7375472bbf6238fd38b10d8f1b5e2a191 Gerrit-PatchSet: 1 Gerrit-Project: vdsm Gerrit-Branch: master Gerrit-Owner: Saggi Mizrahi <[email protected]> _______________________________________________ vdsm-patches mailing list [email protected] https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches
