Nuria has submitted this change and it was merged. Change subject: Add support for kafka reader and writer, make eventlogging-forwarder use generic uri input ......................................................................
Add support for kafka reader and writer, make eventlogging-forwarder use generic uri input This change actually adds support to the streams abstraction for generic iterators; stream() will now work with any iterator of strings, not just zeromq and file objects. By supporting generic iterators, we can wrap a kafka message iterator with stream(), and get the same support for unicode decoding and json parsing that all other streams thus far could use. eventlogging-forwarder now takes 2 arguments, the first of which is an input uri. This will allow forwarding from any type of input directly to a ZeroMQ PUB socket, including kafka uris. I would also like to make eventlogging-processor more generic as well, so it could potentially read directly from Kafka, but I will do so in a subsequent commit. This Kafka reader and writer is based on the 0.9.3 version of kafka-python, found at https://github.com/mumrah/kafka-python/tree/v0.9.3. I have created a deb package of this version. Change-Id: I89bf383bd7cc1af603fd38b89816c68922491527 --- M server/bin/eventlogging-forwarder M server/eventlogging/compat.py M server/eventlogging/handlers.py M server/eventlogging/streams.py M server/eventlogging/utils.py M server/requirements.txt M server/setup.py 7 files changed, 76 insertions(+), 21 deletions(-) Approvals: Nuria: Looks good to me, approved diff --git a/server/bin/eventlogging-forwarder b/server/bin/eventlogging-forwarder index e478714..e249d3f 100755 --- a/server/bin/eventlogging-forwarder +++ b/server/bin/eventlogging-forwarder @@ -3,17 +3,22 @@ """ eventlogging-forwarder ---------------------- - UDP -> ZeroMQ socket forwarding. Reads line-oriented input from UDP socket - and writes it to a ZeroMQ TCP PUB socket bound to the same port number. + Arbitrary input -> ZeroMQ socket forwarding. Reads line-oriented input from + an input stream and writes it to a ZeroMQ TCP PUB socket bound to the same + port number. Because ZeroMQ is message-oriented, we cannot simply use recv_into to read - bytes from the UDP socket into the ZMQ socket. We use socket.makefile() to + bytes from the input into the ZMQ socket. We use socket.makefile() to facilitate reading and writing whole lines. - usage: eventlogging-udp-zmq-forwarder [-h] port + usage: eventlogging-forwarder [-h] input port positional arguments: - port Port to forward + input input uri + Examples: + udp://localhost:8599 + kafka://?brokers=broker1:9092,broker2:9092&topic=foo&group_id=bar + port ZeroMQ TCP port to forward to optional arguments: -h, --help show this help message and exit @@ -31,11 +36,14 @@ import argparse import logging -from eventlogging import iter_unicode, pub_socket, udp_socket, setup_logging +from eventlogging import pub_socket, setup_logging, get_reader, uri_force_raw ap = argparse.ArgumentParser(description='ZeroMQ UDP => PUB Device', fromfile_prefix_chars='@') +# Forward raw events. This keeps the reader +# attempting to parse the input as json. +ap.add_argument('input', help='URI of raw input stream', type=uri_force_raw) ap.add_argument('port', type=int, help='Port to forward') ap.add_argument('--count', action='store_true', help='Prepend an autoincrementing ID to each message') @@ -43,12 +51,13 @@ setup_logging() -logging.info('Forwarding udp:%d => tcp:%d...', args.port, args.port) +logging.info('Forwarding %s => tcp:%d...', args.input, args.port) +input_stream = get_reader(args.input) sock_out = pub_socket(args.port) -sock_in = udp_socket('0.0.0.0', args.port) -f = iter_unicode(sock_in) if args.count: - f = (str(id) + '\t' + msg for id, msg in enumerate(f)) -for line in f: + input_stream = ( + '{0}\t{1}'.format(str(id), msg) for id, msg in enumerate(input_stream) + ) +for line in input_stream: sock_out.send_string(line) diff --git a/server/eventlogging/compat.py b/server/eventlogging/compat.py index 2a7e355..70e783c 100644 --- a/server/eventlogging/compat.py +++ b/server/eventlogging/compat.py @@ -36,20 +36,20 @@ __all__ = ('http_get', 'integer_types', 'items', 'json', 'monotonic_clock', - 'string_types', 'unquote_plus', 'urisplit', 'urlopen', 'uuid5') + 'string_types', 'unquote_plus', 'urisplit', 'urlopen', 'urlencode', 'uuid5') PY3 = sys.version_info[0] == 3 if PY3: items = operator.methodcaller('items') from urllib.request import urlopen - from urllib.parse import (unquote_to_bytes as unquote, urlsplit, + from urllib.parse import (unquote_to_bytes as unquote, urlsplit, urlencode, parse_qsl, SplitResult) string_types = str, integer_types = int, else: items = operator.methodcaller('iteritems') - from urllib import unquote + from urllib import unquote, urlencode from urllib2 import urlopen from urlparse import urlsplit, parse_qsl, SplitResult string_types = basestring, diff --git a/server/eventlogging/handlers.py b/server/eventlogging/handlers.py index 2432b37..eee98f0 100644 --- a/server/eventlogging/handlers.py +++ b/server/eventlogging/handlers.py @@ -69,8 +69,8 @@ @writes('kafka') def kafka_writer(brokers, topic='eventlogging'): """Write events to Kafka, keyed by SCID.""" - from kafka.client import KafkaClient - from kafka.producer import KeyedProducer + from kafka import KafkaClient + from kafka import KeyedProducer kafka = KafkaClient(brokers) producer = KeyedProducer(kafka) @@ -199,6 +199,23 @@ @reads('udp') -def udp_reader(uri, raw=False): +def udp_reader(hostname, port, raw=False): """Reads data from a UDP socket.""" - return stream(udp_socket(uri), raw) + return stream(udp_socket(hostname, port), raw) + + +@reads('kafka') +def kafka_reader( + brokers, + topic='eventlogging', + group_id='eventlogging', + raw=False +): + """Reads events from Kafka""" + from kafka import KafkaConsumer + consumer = KafkaConsumer( + topic, + group_id=group_id, + metadata_broker_list=brokers + ) + return stream((message.value for message in consumer), raw) diff --git a/server/eventlogging/streams.py b/server/eventlogging/streams.py index b3f8a71..6254c83 100644 --- a/server/eventlogging/streams.py +++ b/server/eventlogging/streams.py @@ -87,7 +87,10 @@ """Iterator; read and decode unicode strings from a stream.""" if hasattr(stream, 'recv_unicode'): return iter(stream.recv_unicode, None) - return iter_file(stream) + elif hasattr(stream, 'fileno'): + return iter_file(stream) + else: + return (line.decode('utf-8', 'replace') for line in stream) def iter_json(stream): diff --git a/server/eventlogging/utils.py b/server/eventlogging/utils.py index ed4d6e1..6516f36 100644 --- a/server/eventlogging/utils.py +++ b/server/eventlogging/utils.py @@ -16,13 +16,13 @@ import threading import traceback -from .compat import items, monotonic_clock +from .compat import items, monotonic_clock, urisplit, urlencode from .factory import get_reader __all__ = ('EventConsumer', 'PeriodicThread', 'flatten', 'is_subset_dict', 'setup_logging', 'unflatten', 'update_recursive', - 'uri_delete_query_item') + 'uri_delete_query_item', 'uri_append_query_items', 'uri_force_raw') class PeriodicThread(threading.Thread): @@ -75,6 +75,28 @@ separator, trailing_ampersand = match.groups() return separator if trailing_ampersand else '' return re.sub('([?&])%s=[^&]*(&?)' % re.escape(key), repl, uri) + + +def uri_append_query_items(uri, params): + """ + Appends uri with the dict params as key=value pairs using + urlencode and returns the result. + """ + return "{0}{1}{2}".format( + uri, + '&' if urisplit(uri).query else '?', + urlencode(params) + ) + + +def uri_force_raw(uri): + """ + Returns a uri that sets raw=True as a query param if it isn't already set. + """ + if 'raw=True' not in uri: + return uri_append_query_items(uri, {'raw': True}) + else: + return uri def is_subset_dict(a, b): @@ -169,3 +191,5 @@ def setup_logging(): logging.basicConfig(stream=sys.stderr, level=logging.DEBUG, format='%(asctime)s (%(threadName)-10s) %(message)s') + # Set kafka module logging level to INFO, DEBUG is too noisy. + logging.getLogger("kafka").setLevel(logging.INFO) diff --git a/server/requirements.txt b/server/requirements.txt index 3dc0456..f274d72 100644 --- a/server/requirements.txt +++ b/server/requirements.txt @@ -2,3 +2,4 @@ pygments>=1.5 pyzmq>=2.1 sqlalchemy>=0.7 +kafka-python>=0.9.3 \ No newline at end of file diff --git a/server/setup.py b/server/setup.py index 85ba480..2933fc1 100644 --- a/server/setup.py +++ b/server/setup.py @@ -58,5 +58,6 @@ "pygments>=1.5", "pyzmq>=2.1", "sqlalchemy>=0.7", + "kafka-python>=0.9.3" ) ) -- To view, visit https://gerrit.wikimedia.org/r/196073 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: merged Gerrit-Change-Id: I89bf383bd7cc1af603fd38b89816c68922491527 Gerrit-PatchSet: 9 Gerrit-Project: mediawiki/extensions/EventLogging Gerrit-Branch: master Gerrit-Owner: Ottomata <[email protected]> Gerrit-Reviewer: Mforns <[email protected]> Gerrit-Reviewer: Milimetric <[email protected]> Gerrit-Reviewer: Nuria <[email protected]> Gerrit-Reviewer: Ori.livneh <[email protected]> Gerrit-Reviewer: Ottomata <[email protected]> Gerrit-Reviewer: jenkins-bot <> _______________________________________________ MediaWiki-commits mailing list [email protected] https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits
