Ottomata has uploaded a new change for review. ( https://gerrit.wikimedia.org/r/345246 )
Change subject: Port rcstream to EventStreams recentchange stream ...................................................................... Port rcstream to EventStreams recentchange stream RCStream will be decomissioned in July. This change moves the core streaming functionality to evenstreams.py. rcstream.py classes and functions have the same interfaces they had before, but now they wrap eventstreams. This should be a backwards compatible change, and should allow for future EventStreams endpoints to be added as well. Bug: T158943 Change-Id: Ie21d19cdb452122420885617b3270e9aecebc05b --- A pywikibot/comms/eventstreams.py M pywikibot/comms/rcstream.py 2 files changed, 245 insertions(+), 131 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/pywikibot/core refs/changes/46/345246/1 diff --git a/pywikibot/comms/eventstreams.py b/pywikibot/comms/eventstreams.py new file mode 100644 index 0000000..c520778 --- /dev/null +++ b/pywikibot/comms/eventstreams.py @@ -0,0 +1,200 @@ +# -*- coding: utf-8 -*- +""" +EventStreams based stream client. + +This file is part of the Pywikibot framework. + +This module requires sseclient to be installed: + pip install sseclient +""" +# +# (C) 2014 Merlijn van Deen +# (C) Pywikibot team, 2014-2016 +# (C) Andrew Otto, 2017 +# +# Distributed under the terms of the MIT license. +# +from __future__ import absolute_import, unicode_literals + +__version__ = '$Id$' +# + +import sys +import threading + +from sseclient import SSEClient as EventSource +import json + +if sys.version_info[0] > 2: + from queue import Queue, Empty +else: + from Queue import Queue, Empty + +from pywikibot.bot import debug, warning + +_logger = 'pywikibot.eventstream' + + +class EventStreamThread(threading.Thread): + + """ + Low-level EventStreams Listener Thread, pushing stream events into a queue. + + @param url: The SSE/EventSource endpoint from which to consume events. + @param filter_fn: A function returns true if an event should be enqueued, + or false if it should be skipped. + @param total: the maximum number of entries to return. The underlying + thread is shut down then this number is reached. + + This runs in a Thread. It makes the actual SSE/EventSource + connection to the EventStreams server and pushes + event dicts into a queue. + + Usage: + + >>> t = EventStreamThread('https://stream.wikimedia.org/v2/recentchange') + >>> t.start() + >>> change = t.queue.get() + >>> change + {'server_name': 'en.wikipedia.org', 'wiki': 'enwiki', 'minor': True, + 'length': {'new': 2038, 'old': 2009}, 'timestamp': 1419964350, + 'server_script_path': '/w', 'bot': False, 'user': 'Od Mishehu', + 'comment': 'stub sorting', 'title': 'Bradwell Bay Wilderness', + 'server_url': 'http://en.wikipedia.org', 'id': 703158386, + 'revision': {'new': 640271171, 'old': 468264850}, + 'type': 'edit', 'namespace': 0} + >>> t.stop() # optional, the thread will shut down on exiting python + """ + def __init__( + self, + url, + filter_fn=None, + total=None + ): + """Constructor for EventStreamThread.""" + + super(EventStreamThread, self).__init__() + self.url = url + self.filter_fn = filter_fn + + self.daemon = True + self.running = False + self.queue = Queue() + + self.warn_queue_length = 100 + + self.total = total + self.count = 0 + + debug('Opening connection to %r' % self, _logger) + + + def __repr__(self): + """Return representation.""" + return "<EventStream from %s>" % (self.url) + + + def enqueue(self, element): + """ + Enqueues element. If we have reached our total, stop the thread. + If the queue starts to grow, beyond warn_queue_length, this will + log a warning. + """ + self.count += 1 + self.queue.put(element) + if self.queue.qsize() > self.warn_queue_length: + warning('%r queue length exceeded %i' + % (self, + self.warn_queue_length), + _logger=_logger) + self.warn_queue_length = self.warn_queue_length + 100 + + if self.total is not None and self.count >= self.total: + self.stop() + + + def filter(self, element): + """ + Returns True if not filter_fn or returns filter_fn(element) + This just wraps up filter_fn so we don't have to check for + its existence while filtering. + """ + if not self.filter_fn: + return True + else: + return bool(self.filter_fn(element)) + + + def run(self): + """ + Threaded function. + + Runs inside the thread when started with .start(). + """ + self.running = True + + # Connect to the SSE/EventSource endpoint + eventsource = EventSource(self.url) + while self.running: + # Consume an event. + event = next(eventsource) + + # If event type is error, log and continue. + if event.event == 'error': + warning( + 'Encountered error reading event: %s \'%s\'' + % (self, event.data), _logger=_logger + ) + + # Else assume event.data is a JSON string. + elif event.event == 'message' and event.data: + try: + element = json.loads(event.data) + except Exception as e: + warning('Could not parse event.data as json: %s' + % (event), _logger=_logger + ) + else: + # If this element passes the filter, then enqueue it. + if self.filter(element): + self.enqueue(element) + + debug('Shut down event loop for %r' % self, _logger) + del eventsource + debug('Disconnected %r' % self, _logger) + self.queue.put(None) + + def stop(self): + """Stop the thread.""" + self.running = False + + +def eventstream(url, total=None, filter_fn=None): + """Yield changes received from an EventStream. + + @param url: The SSE/EventSource endpoint from which to consume events. + @param filter_fn: A function returns true if an event should be enqueued, + or false if it should be skipped. + @param total: the maximum number of entries to return. The underlying + thread is shut down then this number is reached. + + @return: yield dict event consumed from the stream. + @rtype: generator + """ + thread = EventStreamThread( + url=url, + total=total, + filter_fn=filter_fn + ) + + debug('Starting EventStreams thread from %r' % url, _logger) + thread.start() + + while True: + try: + element = thread.queue.get(timeout=0.1) + except Empty: + continue + if element is None: + return + yield element diff --git a/pywikibot/comms/rcstream.py b/pywikibot/comms/rcstream.py index bb9ed7b..4707b44 100644 --- a/pywikibot/comms/rcstream.py +++ b/pywikibot/comms/rcstream.py @@ -1,15 +1,16 @@ # -*- coding: utf-8 -*- """ -SocketIO-based rcstream client. +EventStreams based RecentChange stream interface. This file is part of the Pywikibot framework. -This module requires socketIO_client to be installed: - pip install socketIO_client +This module requires sseclient to be installed: + pip install sseclient """ # # (C) 2014 Merlijn van Deen # (C) Pywikibot team, 2014-2016 +# (C) 2017 Andrew Otto # # Distributed under the terms of the MIT license. # @@ -18,37 +19,42 @@ __version__ = '$Id$' # -import sys -import threading - -if sys.version_info[0] > 2: - from queue import Queue, Empty -else: - from Queue import Queue, Empty - -try: - import socketIO_client -except ImportError as e: - socketIO_client = e - from pywikibot.bot import debug, warning - +from pywikibot.comms.eventstreams import EventStreamThread, eventstream _logger = 'pywikibot.rcstream' -class RcListenerThread(threading.Thread): +def create_wikihost_filter_fn(wikihost): + if wikihost in [None, '*']: + filter_fn = None + else: + filter_fn = lambda e: e['server_name'] == wikihost + + return filter_fn + + +def get_eventstreams_recentchange_url(rchost, rcport, rcpath): + url = rchost + if rcport: + url += ':' + str(rcport) + url += rcpath + return url + + +class RcListenerThread(EventStreamThread): """ Low-level RC Listener Thread, pushing RC stream events into a queue. @param wikihost: the hostname of the wiki we want to get changes for. This - is passed to rcstream using a 'subscribe' command. Pass - '*' to listen to all wikis for a given rc host. - @param rchost: the recent changes stream host to connect to. For Wikimedia + is used to filter events for server_name. Pass + None or '*' to listen to all wikis. + @param rchost: the eventstreams host to connect to. For Wikimedia wikis, this is 'https://stream.wikimedia.org' - @param rcport: the port to connect to (default: 80) - @param rcpath: the sockets.io path. For Wikimedia wikis, this is '/rc'. - (default: '/rc') + @param rcport: the port to connect to (default: None) + @param rcpath: the recentchange stream path. For Wikimedia wikis, + this is '/v2/stream/recentchange'. + (default: '/v2/stream/recentchange') @param total: the maximum number of entries to return. The underlying thread is shut down then this number is reached. @@ -72,102 +78,27 @@ >>> t.stop() # optional, the thread will shut down on exiting python """ - def __init__(self, wikihost, rchost, rcport=80, rcpath='/rc', total=None): + def __init__(self, wikihost=None, rchost='https://stream.wikimedia.org', rcport=None, rcpath='/v2/stream/recentchange', total=None): """Constructor for RcListenerThread.""" - super(RcListenerThread, self).__init__() - self.rchost = rchost - self.rcport = rcport - self.rcpath = rcpath - self.wikihost = wikihost - self.daemon = True - self.running = False - self.queue = Queue() - - self.warn_queue_length = 100 - - self.total = total - self.count = 0 - - debug('Opening connection to %r' % self, _logger) - self.client = socketIO_client.SocketIO(rchost, rcport) - - thread = self - - class RCListener(socketIO_client.BaseNamespace): - def on_change(self, change): - debug('Received change %r' % change, _logger) - if not thread.running: - debug('Thread in shutdown mode; ignoring change.', _logger) - return - - thread.count += 1 - thread.queue.put(change) - if thread.queue.qsize() > thread.warn_queue_length: - warning('%r queue length exceeded %i' - % (thread, - thread.warn_queue_length), - _logger=_logger) - thread.warn_queue_length = thread.warn_queue_length + 100 - - if thread.total is not None and thread.count >= thread.total: - thread.stop() - return - - def on_connect(self): - debug('Connected to %r; subscribing to %s' - % (thread, thread.wikihost), - _logger) - self.emit('subscribe', thread.wikihost) - debug('Subscribed to %s' % thread.wikihost, _logger) - - def on_reconnect(self): - debug('Reconnected to %r' % (thread,), _logger) - self.on_connect() - - class GlobalListener(socketIO_client.BaseNamespace): - def on_heartbeat(self): - self._transport.send_heartbeat() - - self.client.define(RCListener, rcpath) - self.client.define(GlobalListener) - - def __repr__(self): - """Return representation.""" - return "<rcstream for socketio://%s@%s:%s%s>" % ( - self.wikihost, self.rchost, self.rcport, self.rcpath + super(RcListenerThread, self).__init__( + url=get_eventstreams_recentchange_url(rchost, rcport, rcpath), + filter_fn=create_wikihost_filter_fn(wikihost), + total=total ) - def run(self): - """ - Threaded function. - Runs inside the thread when started with .start(). - """ - self.running = True - while self.running: - self.client.wait(seconds=0.1) - debug('Shut down event loop for %r' % self, _logger) - self.client.disconnect() - debug('Disconnected %r' % self, _logger) - self.queue.put(None) - - def stop(self): - """Stop the thread.""" - self.running = False - - -def rc_listener(wikihost, rchost, rcport=80, rcpath='/rc', total=None): +def rc_listener(wikihost=None, rchost='https://stream.wikimedia.org', rcport=None, rcpath='/v2/stream/recentchange', total=None): """Yield changes received from RCstream. @param wikihost: the hostname of the wiki we want to get changes for. This - is passed to rcstream using a 'subscribe' command. Pass - '*' to listen to all wikis for a given rc host. - @param rchost: the recent changes stream host to connect to. For Wikimedia + is used to filter events for server_name. Pass + None or '*' to listen to all wikis. + @param rchost: the eventstreams host to connect to. For Wikimedia wikis, this is 'https://stream.wikimedia.org' - @param rcport: the port to connect to (default: 80) - @param rcpath: the sockets.io path. For Wikimedia wikis, this is '/rc'. - (default: '/rc') + @param rcport: the port to connect to (default: None) + @param rcpath: the recentchange stream path. For Wikimedia wikis, this is '/v2/stream/recentchange'. + (default: '/v2/stream/recentchange') @param total: the maximum number of entries to return. The underlying thread is shut down then this number is reached. @@ -179,30 +110,13 @@ @see: U{MachineReadableRCFeedFormatter<https://doc.wikimedia.org/ mediawiki-core/master/php/classMachineReadableRCFeedFormatter.html>} @rtype: generator - @raises ImportError """ - if isinstance(socketIO_client, Exception): - raise ImportError('socketIO_client is required for the rc stream;\n' - 'install it with pip install "socketIO_client==0.5.6"') - rc_thread = RcListenerThread( - wikihost=wikihost, - rchost=rchost, rcport=rcport, rcpath=rcpath, + return eventstream( + url=get_eventstreams_recentchange_url(rchost, rcport, rcpath), + filter_fn=create_wikihost_filter_fn(wikihost), total=total ) - - debug('Starting rcstream thread %r' % rc_thread, - _logger) - rc_thread.start() - - while True: - try: - element = rc_thread.queue.get(timeout=0.1) - except Empty: - continue - if element is None: - return - yield element def site_rc_listener(site, total=None): -- To view, visit https://gerrit.wikimedia.org/r/345246 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: Ie21d19cdb452122420885617b3270e9aecebc05b Gerrit-PatchSet: 1 Gerrit-Project: pywikibot/core Gerrit-Branch: master Gerrit-Owner: Ottomata <ao...@wikimedia.org> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits