Saggi Mizrahi has uploaded a new change for review. Change subject: asyncore: Move generic reactor functionality ......................................................................
asyncore: Move generic reactor functionality This is part of a larger move to unify the protocol detector and the stomp reactor. In this phase we move all the functionality not specific to stomp to a generic Reactor class. This patch still keeps the original StompReactor to minimize changes outside of yajsonrpc. Change-Id: I59a14b18f4c08d873763e1d3bd9d42b99de05fb1 Signed-off-by: Saggi Mizrahi <[email protected]> --- M lib/yajsonrpc/betterAsyncore.py M lib/yajsonrpc/stompReactor.py 2 files changed, 44 insertions(+), 24 deletions(-) git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/55/37055/1 diff --git a/lib/yajsonrpc/betterAsyncore.py b/lib/yajsonrpc/betterAsyncore.py index 0b9b99e..f58acd7 100644 --- a/lib/yajsonrpc/betterAsyncore.py +++ b/lib/yajsonrpc/betterAsyncore.py @@ -348,3 +348,36 @@ pass asyncore.file_dispatcher.close(self) + + +class Reactor(object): + def __init__(self): + self._map = {} + self._isRunning = False + self._wakeupEvent = AsyncoreEvent(self._map) + + def add_dispatcher(self, disp): + disp.add_channel(self._map) + + def remove_dispatcher(self, disp): + disp.del_channel(self._map) + + def process_requests(self): + self._isRunning = True + while self._isRunning: + asyncore.loop(use_poll=True, map=self._map, count=1, timeout=.5) + + for key, dispatcher in self._map.items(): + del self._map[key] + dispatcher.close() + + def wakeup(self): + self._wakeupEvent.set() + + def stop(self): + self._isRunning = False + try: + self.wakeup() + except (IOError, OSError): + # Client woke up and closed the event dispatcher without our help + pass diff --git a/lib/yajsonrpc/stompReactor.py b/lib/yajsonrpc/stompReactor.py index 1930de8..70c1bd1 100644 --- a/lib/yajsonrpc/stompReactor.py +++ b/lib/yajsonrpc/stompReactor.py @@ -13,11 +13,10 @@ # License along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA -import asyncore import logging import stomp -from betterAsyncore import Dispatcher, AsyncoreEvent +from betterAsyncore import Dispatcher, Reactor from vdsm.sslutils import SSLSocket _STATE_LEN = "Waiting for message length" @@ -242,37 +241,25 @@ class StompReactor(object): def __init__(self): - self._map = {} - self._isRunning = False - self._wakeupEvent = AsyncoreEvent(self._map) + self._reactor = Reactor() def createListener(self, connected_socket, acceptHandler): - listener = StompListener(self, acceptHandler, connected_socket) - self.wakeup() + listener = StompListener( + self._reactor, + acceptHandler, + connected_socket + ) + self._reactor.wakeup() return listener def createClient(self, connected_socket): - return StompClient(connected_socket, self) + return StompClient(connected_socket, self._reactor) def process_requests(self): - self._isRunning = True - while self._isRunning: - asyncore.loop(use_poll=True, map=self._map, count=1, timeout=.5) - - for key, dispatcher in self._map.items(): - del self._map[key] - dispatcher.close() - - def wakeup(self): - self._wakeupEvent.set() + self._reactor.process_requests() def stop(self): - self._isRunning = False - try: - self.wakeup() - except (IOError, OSError): - # Client woke up and closed the event dispatcher without our help - pass + self._reactor.stop() class StompDetector(): -- To view, visit http://gerrit.ovirt.org/37055 To unsubscribe, visit http://gerrit.ovirt.org/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I59a14b18f4c08d873763e1d3bd9d42b99de05fb1 Gerrit-PatchSet: 1 Gerrit-Project: vdsm Gerrit-Branch: master Gerrit-Owner: Saggi Mizrahi <[email protected]> _______________________________________________ vdsm-patches mailing list [email protected] https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches
