Ottomata has submitted this change and it was merged.

Change subject: Replace pykafka reader with kafka-python, making it the default 
kafka handler
......................................................................


Replace pykafka reader with kafka-python, making it the default kafka handler

This also makes the kafka-python handler's arguments more compatible
with those that the pykafka handler took, so we won't need to coordinate
config changes with a deployment of this change.

kafka handlers are now addressable by their specific client names,
while whatever we pick as the default kafka handler will remain addressable
as 'kafka://'.  E.g. currently kafka-python:// and kafka:// refer to the
same handler.  Along the way, just to make things more consistent,
the experimental confluent handlers have been renamed from 'confluent-kafka'
to 'kafka-confluent'.

Perhaps we should just remove the confluent handlers :/

Change-Id: I5e513690a760d0b51bbf9f15bb51d0c717a49ff1
---
M eventlogging/handlers.py
1 file changed, 95 insertions(+), 132 deletions(-)

Approvals:
  Ottomata: Looks good to me, approved
  jenkins-bot: Verified



diff --git a/eventlogging/handlers.py b/eventlogging/handlers.py
index c5bbdd5..830bd9f 100644
--- a/eventlogging/handlers.py
+++ b/eventlogging/handlers.py
@@ -69,8 +69,11 @@
         db[collection].insert(event)
 
 
+# Can be addressed as default kafka:// handler, and as specific
+# kafka client name, kafka-python://
 @writes('kafka')
-def kafka_writer(
+@writes('kafka-python')
+def kafka_python_writer(
     path,
     topic=None,
     key=None,
@@ -245,10 +248,9 @@
             response_future = None
 
 
-# NOTE: confluent-kafka is experimental, and may replace the above
-#       kafka-python based kafka:// writer.
-@writes('confluent-kafka')
-def confluent_kafka_writer(
+# NOTE: kafka-confluent is experimental.
+@writes('kafka-confluent')
+def kafka_confluent_writer(
     path,
     topic=None,
     key=None,
@@ -587,30 +589,30 @@
     return stream(udp_socket(hostname, port), raw)
 
 
+# Can be addressed as default kafka:// handler, and as specific
+# kafka client name, kafka-python://
 @reads('kafka')
-def kafka_reader(
+@reads('kafka-python')
+def kafka_python_reader(
     path,
-    topic='eventlogging',
-    identity='',
+    topics=None,
+    topic=None,
+    identity=None,
     raw=False,
-    **kafka_consumer_args
+    **kafka_args
 ):
     """
-    Reads events from Kafka.  This handler uses pykafka.
+    Reads events from Kafka.
 
     Kafka URIs look like:
-    kafka:///b1:9092,b2:9092?topic=topic_name&identity=consumer_group_name&
-    auto_commit_enable=True&auto_commit_interval_ms=1000...
+    kafka:///b1:9092,b2:9092?topics=topic1,topic2&identity=consumer_group_name&
+    &auto_commit_interval_ms=1000...
 
-    This reader uses the pykafka BalancedConsumer.  You may pass
-    any configs that BalancedConsumer takes as keyword arguments via
-    the kafka URI query params.
+    This reader uses the kafka-python KafkaConsumer.  You may pass
+    any configs that KafkaConsumer takes as keyword arguments as
+    URI query params.
 
-    The auto_commit_interval_ms is by default 60 seconds. This is pretty high
-    and may lead to more duplicate message consumption (Kafka has at atleast
-    once message delivery guarantee). Lowering this(to 1 second?) makes sure
-    that there aren't as many duplicates, but incurs the overhead of committing
-    offsets to zookeeper more often.
+    auto_commit_interval_ms is by default 5 seconds.
 
     If auto_commit_enable is True, then messages will be marked as done based
     on the auto_commit_interval_ms time period.
@@ -620,43 +622,90 @@
     that message offsets will be committed to Kafka for messages
     that have not been inserted into MySQL.  Future work
     will have to fix this problem somehow.  Perhaps a callback?
+
+    The 'topic' parameter is provided for backwards compatibility.
+    It will be used if topics is not given.
+
+    Arguments:
+        *path (str): Comma separated list of broker hostname:ports.
+
+        *topics (list): List of topics to subscribe to.
+
+        *topic (string): (deprecated) topic to subscribe to.  Use topics.
+
+        *identity (str): Used as the Kafka consumer group id, and the prefix of
+            the Kafka client id.  If not given, a new unique identity will
+            be created.
+
+        *raw (bool): If True, the generator returned will yield a stream of
+            strings, else a stream of Events.
     """
-    from pykafka import KafkaClient as PyKafkaClient
-    from pykafka import BalancedConsumer
+    # Support use of deprecated topic arg, for now.
+    if topic and not topics:
+        topics = topic
+        logging.warn('kafka \'topic\' argument is deprecated, use \'topics\'')
 
-    # Get consumer group_id based on identity.
-    (_, group_id) = kafka_ids(identity)
+    if not topics:
+        raise ValueError(
+            'Cannot consume from Kafka without providing topics.'
+        )
 
-    # Brokers should be in the uri path
-    # path.strip returns type 'unicode' and pykafka expects a string, so
-    # converting unicode to str
-    brokers = path.strip('/').encode('ascii', 'ignore')
+    from kafka import KafkaConsumer
 
-    # remove non KafkaConsumer args from kafka_consumer_args
-    kafka_consumer_args = {
-        k: v for k, v in items(kafka_consumer_args)
-        if k in inspect.getargspec(BalancedConsumer.__init__).args
+    # Get kafka client_id and group_id based on identity.
+    (client_id, group_id) = kafka_ids(identity)
+
+    # Use topics as an array.
+    if type(topics) != list:
+        topics = topics.split(',')
+
+    # remove non KafkaConsumer args from kafka_args
+    kafka_args = {
+        k: v for k, v in items(kafka_args)
+        if k in KafkaConsumer.DEFAULT_CONFIG
     }
 
-    kafka_client = PyKafkaClient(hosts=brokers)
-    kafka_topic = kafka_client.topics[topic.encode('ascii', 'ignore')]
+    # Be flexible with auto_offset_reset values.  The enum names
+    # have changed in different clients and versions, but the int
+    # values have never changed.  Allow setting this by int value on
+    # kafka-python handler.  kafka-python itself already
+    # handles the deprecated enum names 'smallest' and 'largest'.
+    if ('auto_offset_reset' in kafka_args and
+            kafka_args['auto_offset_reset'] in (-1, -2)):
+        logging.warn(
+            'kafka auto_offset_reset int values are deprecated, use '
+            'either \'earliest\' or \'latest\''
+        )
+        # Map integer values to names that kafka-python accepts.
+        kafka_args['auto_offset_reset'] = {
+            -1: 'latest',
+            -2: 'earliest',
+        }.get(kafka_args['auto_offset_reset'])
 
-    consumer = kafka_topic.get_balanced_consumer(
-        consumer_group=group_id.encode('ascii', 'ignore'),
-        **kafka_consumer_args)
+    kafka_consumer = KafkaConsumer(
+        # Brokers should be in the URI path.
+        bootstrap_servers=path.strip('/'),
+        group_id=group_id,
+        client_id=client_id,
+        **kafka_args
+    )
 
-    # Define a generator to read from the BalancedConsumer instance
-    def message_stream(consumer):
-        while True:
-            yield consumer.consume()
+    logging.info(
+        'Consuming topics %s from Kafka in group %s as %s',
+        topics,
+        kafka_consumer.config['group_id'],
+        kafka_consumer.config['client_id']
+    )
+    # Subscribe to list of topics.
+    kafka_consumer.subscribe(topics)
 
-    return stream((message.value for message in message_stream(consumer)), raw)
+    # Return a stream of message values.
+    return stream((message.value for message in kafka_consumer), raw)
 
 
-# NOTE: confluent-kafka and kafka-python readers are experimental.
-#       one may be chosen to replace the above pykafka based kafka:// reader.
-@reads('confluent-kafka')
-def confluent_kafka_reader(
+# NOTE: kafka-confluent is experimental.
+@reads('kafka-confluent')
+def kafka_confluent_reader(
     path,
     topics=None,
     topic=None,  # deprecated
@@ -808,89 +857,3 @@
 
     # Return a stream of message values.
     return stream(consume(kafka_consumer, poll_timeout), raw)
-
-
-@reads('kafka-python')
-def kafka_python_reader(
-    path,
-    topics=None,
-    identity=None,
-    raw=False,
-    **kafka_args
-):
-    """
-    Reads events from Kafka.
-
-    Kafka URIs look like:
-    kafka:///b1:9092,b2:9092?topics=topic1,topic2&identity=consumer_group_name&
-    &auto_commit_interval_ms=1000...
-
-    This reader uses the kafka-python KafkaConsumer.  You may pass
-    any configs that KafkaConsumer takes as keyword arguments as
-    URI query params.
-
-    auto_commit_interval_ms is by default 5 seconds.
-
-    If auto_commit_enable is True, then messages will be marked as done based
-    on the auto_commit_interval_ms time period.
-    This has the downside of committing message offsets before
-    work might be actually complete.  E.g. if inserting into MySQL, and
-    the process dies somewhere along the way, it is possible
-    that message offsets will be committed to Kafka for messages
-    that have not been inserted into MySQL.  Future work
-    will have to fix this problem somehow.  Perhaps a callback?
-
-    The 'topic' parameter is provided for backwards compatibility.
-    It will be used if topics is not given.
-
-    Arguments:
-        *path (str): Comma separated list of broker hostname:ports.
-
-        *topics (list): List of topics to subscribe to.
-
-        *identity (str): Used as the Kafka consumer group id, and the prefix of
-            the Kafka client id.  If not given, a new unique identity will
-            be created.
-
-        *raw (bool): If True, the generator returned will yield a stream of
-            strings, else a stream of Events.
-    """
-    if not topics:
-        raise ValueError(
-            'Cannot consume from Kafka without providing topics.'
-        )
-
-    from kafka import KafkaConsumer
-
-    # Get kafka client_id and group_id based on identity.
-    (client_id, group_id) = kafka_ids(identity)
-
-    # Use topics as an array.
-    if type(topics) != list:
-        topics = topics.split(',')
-
-    # remove non KafkaConsumer args from kafka_args
-    kafka_args = {
-        k: v for k, v in items(kafka_args)
-        if k in inspect.getargspec(KafkaConsumer.__init__).args
-    }
-
-    kafka_consumer = KafkaConsumer(
-        # Brokers should be in the URI path.
-        bootstrap_servers=path.strip('/'),
-        group_id=group_id,
-        client_id=client_id,
-        **kafka_args
-    )
-
-    logging.info(
-        'Consuming topics %s from Kafka in group %s as %s',
-        topics,
-        kafka_consumer.config['group_id'],
-        kafka_consumer.config['client_id']
-    )
-    # Subscribe to list of topics.
-    kafka_consumer.subscribe(topics)
-
-    # Return a stream of message values.
-    return stream((message.value for message in kafka_consumer), raw)

-- 
To view, visit https://gerrit.wikimedia.org/r/303177
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I5e513690a760d0b51bbf9f15bb51d0c717a49ff1
Gerrit-PatchSet: 7
Gerrit-Project: eventlogging
Gerrit-Branch: master
Gerrit-Owner: Ottomata <[email protected]>
Gerrit-Reviewer: Nuria <[email protected]>
Gerrit-Reviewer: Ottomata <[email protected]>
Gerrit-Reviewer: jenkins-bot <>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to