Ottomata has submitted this change and it was merged.
Change subject: Replace pykafka reader with kafka-python, making it the default
kafka handler
......................................................................
Replace pykafka reader with kafka-python, making it the default kafka handler
This also makes the kafka-python handler's arguments more compatible
with those that the pykafka handler took, so we won't need to coordinate
config changes with a deployment of this change.
kafka handlers are now addressable by their specific client names,
while whatever we pick as the default kafka handler will remain addressable
as 'kafka://'. E.g. currently kafka-python:// and kafka:// refer to the
same handler. Along the way, just to make things more consistent,
the experimental confluent handlers have been renamed from 'confluent-kafka'
to 'kafka-confluent'.
Perhaps we should just remove the confluent handlers :/
Change-Id: I5e513690a760d0b51bbf9f15bb51d0c717a49ff1
---
M eventlogging/handlers.py
1 file changed, 95 insertions(+), 132 deletions(-)
Approvals:
Ottomata: Looks good to me, approved
jenkins-bot: Verified
diff --git a/eventlogging/handlers.py b/eventlogging/handlers.py
index c5bbdd5..830bd9f 100644
--- a/eventlogging/handlers.py
+++ b/eventlogging/handlers.py
@@ -69,8 +69,11 @@
db[collection].insert(event)
+# Can be addressed as default kafka:// handler, and as specific
+# kafka client name, kafka-python://
@writes('kafka')
-def kafka_writer(
+@writes('kafka-python')
+def kafka_python_writer(
path,
topic=None,
key=None,
@@ -245,10 +248,9 @@
response_future = None
-# NOTE: confluent-kafka is experimental, and may replace the above
-# kafka-python based kafka:// writer.
-@writes('confluent-kafka')
-def confluent_kafka_writer(
+# NOTE: kafka-confluent is experimental.
+@writes('kafka-confluent')
+def kafka_confluent_writer(
path,
topic=None,
key=None,
@@ -587,30 +589,30 @@
return stream(udp_socket(hostname, port), raw)
+# Can be addressed as default kafka:// handler, and as specific
+# kafka client name, kafka-python://
@reads('kafka')
-def kafka_reader(
+@reads('kafka-python')
+def kafka_python_reader(
path,
- topic='eventlogging',
- identity='',
+ topics=None,
+ topic=None,
+ identity=None,
raw=False,
- **kafka_consumer_args
+ **kafka_args
):
"""
- Reads events from Kafka. This handler uses pykafka.
+ Reads events from Kafka.
Kafka URIs look like:
- kafka:///b1:9092,b2:9092?topic=topic_name&identity=consumer_group_name&
- auto_commit_enable=True&auto_commit_interval_ms=1000...
+ kafka:///b1:9092,b2:9092?topics=topic1,topic2&identity=consumer_group_name&
+ &auto_commit_interval_ms=1000...
- This reader uses the pykafka BalancedConsumer. You may pass
- any configs that BalancedConsumer takes as keyword arguments via
- the kafka URI query params.
+ This reader uses the kafka-python KafkaConsumer. You may pass
+ any configs that KafkaConsumer takes as keyword arguments as
+ URI query params.
- The auto_commit_interval_ms is by default 60 seconds. This is pretty high
- and may lead to more duplicate message consumption (Kafka has at atleast
- once message delivery guarantee). Lowering this(to 1 second?) makes sure
- that there aren't as many duplicates, but incurs the overhead of committing
- offsets to zookeeper more often.
+ auto_commit_interval_ms is by default 5 seconds.
If auto_commit_enable is True, then messages will be marked as done based
on the auto_commit_interval_ms time period.
@@ -620,43 +622,90 @@
that message offsets will be committed to Kafka for messages
that have not been inserted into MySQL. Future work
will have to fix this problem somehow. Perhaps a callback?
+
+ The 'topic' parameter is provided for backwards compatibility.
+ It will be used if topics is not given.
+
+ Arguments:
+ *path (str): Comma separated list of broker hostname:ports.
+
+ *topics (list): List of topics to subscribe to.
+
+ *topic (string): (deprecated) topic to subscribe to. Use topics.
+
+ *identity (str): Used as the Kafka consumer group id, and the prefix of
+ the Kafka client id. If not given, a new unique identity will
+ be created.
+
+ *raw (bool): If True, the generator returned will yield a stream of
+ strings, else a stream of Events.
"""
- from pykafka import KafkaClient as PyKafkaClient
- from pykafka import BalancedConsumer
+ # Support use of deprecated topic arg, for now.
+ if topic and not topics:
+ topics = topic
+ logging.warn('kafka \'topic\' argument is deprecated, use \'topics\'')
- # Get consumer group_id based on identity.
- (_, group_id) = kafka_ids(identity)
+ if not topics:
+ raise ValueError(
+ 'Cannot consume from Kafka without providing topics.'
+ )
- # Brokers should be in the uri path
- # path.strip returns type 'unicode' and pykafka expects a string, so
- # converting unicode to str
- brokers = path.strip('/').encode('ascii', 'ignore')
+ from kafka import KafkaConsumer
- # remove non KafkaConsumer args from kafka_consumer_args
- kafka_consumer_args = {
- k: v for k, v in items(kafka_consumer_args)
- if k in inspect.getargspec(BalancedConsumer.__init__).args
+ # Get kafka client_id and group_id based on identity.
+ (client_id, group_id) = kafka_ids(identity)
+
+ # Use topics as an array.
+ if type(topics) != list:
+ topics = topics.split(',')
+
+ # remove non KafkaConsumer args from kafka_args
+ kafka_args = {
+ k: v for k, v in items(kafka_args)
+ if k in KafkaConsumer.DEFAULT_CONFIG
}
- kafka_client = PyKafkaClient(hosts=brokers)
- kafka_topic = kafka_client.topics[topic.encode('ascii', 'ignore')]
+ # Be flexible with auto_offset_reset values. The enum names
+ # have changed in different clients and versions, but the int
+ # values have never changed. Allow setting this by int value on
+ # kafka-python handler. kafka-python itself already
+ # handles the deprecated enum names 'smallest' and 'largest'.
+ if ('auto_offset_reset' in kafka_args and
+ kafka_args['auto_offset_reset'] in (-1, -2)):
+ logging.warn(
+ 'kafka auto_offset_reset int values are deprecated, use '
+ 'either \'earliest\' or \'latest\''
+ )
+ # Map integer values to names that kafka-python accepts.
+ kafka_args['auto_offset_reset'] = {
+ -1: 'latest',
+ -2: 'earliest',
+ }.get(kafka_args['auto_offset_reset'])
- consumer = kafka_topic.get_balanced_consumer(
- consumer_group=group_id.encode('ascii', 'ignore'),
- **kafka_consumer_args)
+ kafka_consumer = KafkaConsumer(
+ # Brokers should be in the URI path.
+ bootstrap_servers=path.strip('/'),
+ group_id=group_id,
+ client_id=client_id,
+ **kafka_args
+ )
- # Define a generator to read from the BalancedConsumer instance
- def message_stream(consumer):
- while True:
- yield consumer.consume()
+ logging.info(
+ 'Consuming topics %s from Kafka in group %s as %s',
+ topics,
+ kafka_consumer.config['group_id'],
+ kafka_consumer.config['client_id']
+ )
+ # Subscribe to list of topics.
+ kafka_consumer.subscribe(topics)
- return stream((message.value for message in message_stream(consumer)), raw)
+ # Return a stream of message values.
+ return stream((message.value for message in kafka_consumer), raw)
-# NOTE: confluent-kafka and kafka-python readers are experimental.
-# one may be chosen to replace the above pykafka based kafka:// reader.
-@reads('confluent-kafka')
-def confluent_kafka_reader(
+# NOTE: kafka-confluent is experimental.
+@reads('kafka-confluent')
+def kafka_confluent_reader(
path,
topics=None,
topic=None, # deprecated
@@ -808,89 +857,3 @@
# Return a stream of message values.
return stream(consume(kafka_consumer, poll_timeout), raw)
-
-
-@reads('kafka-python')
-def kafka_python_reader(
- path,
- topics=None,
- identity=None,
- raw=False,
- **kafka_args
-):
- """
- Reads events from Kafka.
-
- Kafka URIs look like:
- kafka:///b1:9092,b2:9092?topics=topic1,topic2&identity=consumer_group_name&
- &auto_commit_interval_ms=1000...
-
- This reader uses the kafka-python KafkaConsumer. You may pass
- any configs that KafkaConsumer takes as keyword arguments as
- URI query params.
-
- auto_commit_interval_ms is by default 5 seconds.
-
- If auto_commit_enable is True, then messages will be marked as done based
- on the auto_commit_interval_ms time period.
- This has the downside of committing message offsets before
- work might be actually complete. E.g. if inserting into MySQL, and
- the process dies somewhere along the way, it is possible
- that message offsets will be committed to Kafka for messages
- that have not been inserted into MySQL. Future work
- will have to fix this problem somehow. Perhaps a callback?
-
- The 'topic' parameter is provided for backwards compatibility.
- It will be used if topics is not given.
-
- Arguments:
- *path (str): Comma separated list of broker hostname:ports.
-
- *topics (list): List of topics to subscribe to.
-
- *identity (str): Used as the Kafka consumer group id, and the prefix of
- the Kafka client id. If not given, a new unique identity will
- be created.
-
- *raw (bool): If True, the generator returned will yield a stream of
- strings, else a stream of Events.
- """
- if not topics:
- raise ValueError(
- 'Cannot consume from Kafka without providing topics.'
- )
-
- from kafka import KafkaConsumer
-
- # Get kafka client_id and group_id based on identity.
- (client_id, group_id) = kafka_ids(identity)
-
- # Use topics as an array.
- if type(topics) != list:
- topics = topics.split(',')
-
- # remove non KafkaConsumer args from kafka_args
- kafka_args = {
- k: v for k, v in items(kafka_args)
- if k in inspect.getargspec(KafkaConsumer.__init__).args
- }
-
- kafka_consumer = KafkaConsumer(
- # Brokers should be in the URI path.
- bootstrap_servers=path.strip('/'),
- group_id=group_id,
- client_id=client_id,
- **kafka_args
- )
-
- logging.info(
- 'Consuming topics %s from Kafka in group %s as %s',
- topics,
- kafka_consumer.config['group_id'],
- kafka_consumer.config['client_id']
- )
- # Subscribe to list of topics.
- kafka_consumer.subscribe(topics)
-
- # Return a stream of message values.
- return stream((message.value for message in kafka_consumer), raw)
--
To view, visit https://gerrit.wikimedia.org/r/303177
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: merged
Gerrit-Change-Id: I5e513690a760d0b51bbf9f15bb51d0c717a49ff1
Gerrit-PatchSet: 7
Gerrit-Project: eventlogging
Gerrit-Branch: master
Gerrit-Owner: Ottomata <[email protected]>
Gerrit-Reviewer: Nuria <[email protected]>
Gerrit-Reviewer: Ottomata <[email protected]>
Gerrit-Reviewer: jenkins-bot <>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits