Nuria has submitted this change and it was merged.

Change subject: Add support for kafka reader and writer, make 
eventlogging-forwarder use generic uri input
......................................................................


Add support for kafka reader and writer, make eventlogging-forwarder use 
generic uri input

This change actually adds support to the streams abstraction for generic 
iterators;
stream() will now work with any iterator of strings, not just zeromq and file 
objects.

By supporting generic iterators, we can wrap a kafka message iterator with 
stream(),
and get the same support for unicode decoding and json parsing that all other 
streams
thus far could use.

eventlogging-forwarder now takes 2 arguments, the first of which is an input 
uri.  This
will allow forwarding from any type of input directly to a ZeroMQ PUB socket, 
including
kafka uris.

I would also like to make eventlogging-processor more generic as well, so it 
could potentially
read directly from Kafka, but I will do so in a subsequent commit.

This Kafka reader and writer is based on the 0.9.3 version of kafka-python, 
found at
https://github.com/mumrah/kafka-python/tree/v0.9.3. I have created a deb 
package of this version.

Change-Id: I89bf383bd7cc1af603fd38b89816c68922491527
---
M server/bin/eventlogging-forwarder
M server/eventlogging/compat.py
M server/eventlogging/handlers.py
M server/eventlogging/streams.py
M server/eventlogging/utils.py
M server/requirements.txt
M server/setup.py
7 files changed, 76 insertions(+), 21 deletions(-)

Approvals:
  Nuria: Looks good to me, approved



diff --git a/server/bin/eventlogging-forwarder 
b/server/bin/eventlogging-forwarder
index e478714..e249d3f 100755
--- a/server/bin/eventlogging-forwarder
+++ b/server/bin/eventlogging-forwarder
@@ -3,17 +3,22 @@
 """
   eventlogging-forwarder
   ----------------------
-  UDP -> ZeroMQ socket forwarding. Reads line-oriented input from UDP socket
-  and writes it to a ZeroMQ TCP PUB socket bound to the same port number.
+  Arbitrary input -> ZeroMQ socket forwarding. Reads line-oriented input from
+  an input stream and writes it to a ZeroMQ TCP PUB socket bound to the same
+  port number.
 
   Because ZeroMQ is message-oriented, we cannot simply use recv_into to read
-  bytes from the UDP socket into the ZMQ socket. We use socket.makefile() to
+  bytes from the input into the ZMQ socket. We use socket.makefile() to
   facilitate reading and writing whole lines.
 
-  usage: eventlogging-udp-zmq-forwarder [-h] port
+  usage: eventlogging-forwarder [-h] input port
 
   positional arguments:
-    port        Port to forward
+    input    input uri
+      Examples:
+        udp://localhost:8599
+        kafka://?brokers=broker1:9092,broker2:9092&topic=foo&group_id=bar
+    port     ZeroMQ TCP port to forward to
 
   optional arguments:
     -h, --help  show this help message and exit
@@ -31,11 +36,14 @@
 import argparse
 import logging
 
-from eventlogging import iter_unicode, pub_socket, udp_socket, setup_logging
+from eventlogging import pub_socket, setup_logging, get_reader, uri_force_raw
 
 
 ap = argparse.ArgumentParser(description='ZeroMQ UDP => PUB Device',
                              fromfile_prefix_chars='@')
+# Forward raw events.  This keeps the reader
+# attempting to parse the input as json.
+ap.add_argument('input', help='URI of raw input stream', type=uri_force_raw)
 ap.add_argument('port', type=int, help='Port to forward')
 ap.add_argument('--count', action='store_true',
                 help='Prepend an autoincrementing ID to each message')
@@ -43,12 +51,13 @@
 
 setup_logging()
 
-logging.info('Forwarding udp:%d => tcp:%d...', args.port, args.port)
+logging.info('Forwarding %s => tcp:%d...', args.input, args.port)
+input_stream = get_reader(args.input)
 sock_out = pub_socket(args.port)
-sock_in = udp_socket('0.0.0.0', args.port)
 
-f = iter_unicode(sock_in)
 if args.count:
-    f = (str(id) + '\t' + msg for id, msg in enumerate(f))
-for line in f:
+    input_stream = (
+        '{0}\t{1}'.format(str(id), msg) for id, msg in enumerate(input_stream)
+    )
+for line in input_stream:
     sock_out.send_string(line)
diff --git a/server/eventlogging/compat.py b/server/eventlogging/compat.py
index 2a7e355..70e783c 100644
--- a/server/eventlogging/compat.py
+++ b/server/eventlogging/compat.py
@@ -36,20 +36,20 @@
 
 
 __all__ = ('http_get', 'integer_types', 'items', 'json', 'monotonic_clock',
-           'string_types', 'unquote_plus', 'urisplit', 'urlopen', 'uuid5')
+           'string_types', 'unquote_plus', 'urisplit', 'urlopen', 'urlencode', 
'uuid5')
 
 PY3 = sys.version_info[0] == 3
 
 if PY3:
     items = operator.methodcaller('items')
     from urllib.request import urlopen
-    from urllib.parse import (unquote_to_bytes as unquote, urlsplit,
+    from urllib.parse import (unquote_to_bytes as unquote, urlsplit, urlencode,
                               parse_qsl, SplitResult)
     string_types = str,
     integer_types = int,
 else:
     items = operator.methodcaller('iteritems')
-    from urllib import unquote
+    from urllib import unquote, urlencode
     from urllib2 import urlopen
     from urlparse import urlsplit, parse_qsl, SplitResult
     string_types = basestring,
diff --git a/server/eventlogging/handlers.py b/server/eventlogging/handlers.py
index 2432b37..eee98f0 100644
--- a/server/eventlogging/handlers.py
+++ b/server/eventlogging/handlers.py
@@ -69,8 +69,8 @@
 @writes('kafka')
 def kafka_writer(brokers, topic='eventlogging'):
     """Write events to Kafka, keyed by SCID."""
-    from kafka.client import KafkaClient
-    from kafka.producer import KeyedProducer
+    from kafka import KafkaClient
+    from kafka import KeyedProducer
 
     kafka = KafkaClient(brokers)
     producer = KeyedProducer(kafka)
@@ -199,6 +199,23 @@
 
 
 @reads('udp')
-def udp_reader(uri, raw=False):
+def udp_reader(hostname, port, raw=False):
     """Reads data from a UDP socket."""
-    return stream(udp_socket(uri), raw)
+    return stream(udp_socket(hostname, port), raw)
+
+
+@reads('kafka')
+def kafka_reader(
+    brokers,
+    topic='eventlogging',
+    group_id='eventlogging',
+    raw=False
+):
+    """Reads events from Kafka"""
+    from kafka import KafkaConsumer
+    consumer = KafkaConsumer(
+        topic,
+        group_id=group_id,
+        metadata_broker_list=brokers
+    )
+    return stream((message.value for message in consumer), raw)
diff --git a/server/eventlogging/streams.py b/server/eventlogging/streams.py
index b3f8a71..6254c83 100644
--- a/server/eventlogging/streams.py
+++ b/server/eventlogging/streams.py
@@ -87,7 +87,10 @@
     """Iterator; read and decode unicode strings from a stream."""
     if hasattr(stream, 'recv_unicode'):
         return iter(stream.recv_unicode, None)
-    return iter_file(stream)
+    elif hasattr(stream, 'fileno'):
+        return iter_file(stream)
+    else:
+        return (line.decode('utf-8', 'replace') for line in stream)
 
 
 def iter_json(stream):
diff --git a/server/eventlogging/utils.py b/server/eventlogging/utils.py
index ed4d6e1..6516f36 100644
--- a/server/eventlogging/utils.py
+++ b/server/eventlogging/utils.py
@@ -16,13 +16,13 @@
 import threading
 import traceback
 
-from .compat import items, monotonic_clock
+from .compat import items, monotonic_clock, urisplit, urlencode
 from .factory import get_reader
 
 
 __all__ = ('EventConsumer', 'PeriodicThread', 'flatten', 'is_subset_dict',
            'setup_logging', 'unflatten', 'update_recursive',
-           'uri_delete_query_item')
+           'uri_delete_query_item', 'uri_append_query_items', 'uri_force_raw')
 
 
 class PeriodicThread(threading.Thread):
@@ -75,6 +75,28 @@
         separator, trailing_ampersand = match.groups()
         return separator if trailing_ampersand else ''
     return re.sub('([?&])%s=[^&]*(&?)' % re.escape(key), repl, uri)
+
+
+def uri_append_query_items(uri, params):
+    """
+    Appends uri with the dict params as key=value pairs using
+    urlencode and returns the result.
+    """
+    return "{0}{1}{2}".format(
+        uri,
+        '&' if urisplit(uri).query else '?',
+        urlencode(params)
+    )
+
+
+def uri_force_raw(uri):
+    """
+    Returns a uri that sets raw=True as a query param if it isn't already set.
+    """
+    if 'raw=True' not in uri:
+        return uri_append_query_items(uri, {'raw': True})
+    else:
+        return uri
 
 
 def is_subset_dict(a, b):
@@ -169,3 +191,5 @@
 def setup_logging():
     logging.basicConfig(stream=sys.stderr, level=logging.DEBUG,
                         format='%(asctime)s (%(threadName)-10s) %(message)s')
+    # Set kafka module logging level to INFO, DEBUG is too noisy.
+    logging.getLogger("kafka").setLevel(logging.INFO)
diff --git a/server/requirements.txt b/server/requirements.txt
index 3dc0456..f274d72 100644
--- a/server/requirements.txt
+++ b/server/requirements.txt
@@ -2,3 +2,4 @@
 pygments>=1.5
 pyzmq>=2.1
 sqlalchemy>=0.7
+kafka-python>=0.9.3
\ No newline at end of file
diff --git a/server/setup.py b/server/setup.py
index 85ba480..2933fc1 100644
--- a/server/setup.py
+++ b/server/setup.py
@@ -58,5 +58,6 @@
         "pygments>=1.5",
         "pyzmq>=2.1",
         "sqlalchemy>=0.7",
+        "kafka-python>=0.9.3"
     )
 )

-- 
To view, visit https://gerrit.wikimedia.org/r/196073
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I89bf383bd7cc1af603fd38b89816c68922491527
Gerrit-PatchSet: 9
Gerrit-Project: mediawiki/extensions/EventLogging
Gerrit-Branch: master
Gerrit-Owner: Ottomata <[email protected]>
Gerrit-Reviewer: Mforns <[email protected]>
Gerrit-Reviewer: Milimetric <[email protected]>
Gerrit-Reviewer: Nuria <[email protected]>
Gerrit-Reviewer: Ori.livneh <[email protected]>
Gerrit-Reviewer: Ottomata <[email protected]>
Gerrit-Reviewer: jenkins-bot <>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to