Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/1131#discussion_r54774473
--- Diff:
external/storm-kafka-new-consumer-api/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
---
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+
+import static
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
+import static
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST;
+import static
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
+import static
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST;
+
+public class KafkaSpout<K, V> extends BaseRichSpout {
+ private static final Logger LOG =
LoggerFactory.getLogger(KafkaSpout.class);
+ private static final Comparator<MessageId> OFFSET_COMPARATOR = new
OffsetComparator();
+
+ // Storm
+ protected SpoutOutputCollector collector;
+
+ // Kafka
+ private final KafkaSpoutConfig<K, V> kafkaSpoutConfig;
+ private KafkaConsumer<K, V> kafkaConsumer;
+ private transient boolean consumerAutoCommitMode;
+ private transient FirstPollOffsetStrategy firstPollOffsetStrategy;
+
+ // Bookkeeping
+ private KafkaSpoutStreams kafkaSpoutStreams;
+ private KafkaTupleBuilder<K, V> tupleBuilder;
+ private transient Timer timer; //
timer == null for auto commit mode
+ private transient Map<TopicPartition, OffsetEntry> acked; //
emitted tuples that were successfully acked. These tuples will be committed
periodically when the timer expires, on consumer rebalance, or on
close/deactivate
+ private transient int maxRetries; //
Max number of times a tuple is retried
+ private transient boolean initialized; // Flag indicating
that the spout is still undergoing initialization process.
+ // Initialization is
only complete after the first call to
KafkaSpoutConsumerRebalanceListener.onPartitionsAssigned()
+
+
+ public KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig,
KafkaSpoutStreams kafkaSpoutStreams, KafkaTupleBuilder<K, V> tupleBuilder) {
+ this.kafkaSpoutConfig = kafkaSpoutConfig; // Pass
in configuration
+ this.kafkaSpoutStreams = kafkaSpoutStreams;
+ this.tupleBuilder = tupleBuilder;
+ }
+
+ @Override
+ public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
+ initialized = false;
+
+ // Spout internals
+ this.collector = collector;
+ maxRetries = kafkaSpoutConfig.getMaxTupleRetries();
+
+ // Offset management
+ firstPollOffsetStrategy =
kafkaSpoutConfig.getFirstPollOffsetStrategy();
+ consumerAutoCommitMode =
kafkaSpoutConfig.isConsumerAutoCommitMode();
+
+ if (!consumerAutoCommitMode) { // If it is auto commit, no
need to commit offsets manually
+ timer = new Timer(500,
kafkaSpoutConfig.getOffsetsCommitFreqMs(), TimeUnit.MILLISECONDS);
+ acked = new HashMap<>();
+ }
+
+ // Kafka consumer
+ kafkaConsumer = new
KafkaConsumer<>(kafkaSpoutConfig.getKafkaProps(),
+ kafkaSpoutConfig.getKeyDeserializer(),
kafkaSpoutConfig.getValueDeserializer());
+ kafkaConsumer.subscribe(kafkaSpoutConfig.getSubscribedTopics(),
new KafkaSpoutConsumerRebalanceListener());
+ // Initial poll to get the consumer registration process going.
+ // KafkaSpoutConsumerRebalanceListener will be called following
this poll, upon partition registration
+ kafkaConsumer.poll(0);
+
+ LOG.debug("Kafka Spout opened with the following configuration:
{}", kafkaSpoutConfig);
+ }
+
+ // =========== Consumer Rebalance Listener - On the same thread as the
caller ===========
+
+ private class KafkaSpoutConsumerRebalanceListener implements
ConsumerRebalanceListener {
+ @Override
+ public void onPartitionsRevoked(Collection<TopicPartition>
partitions) {
+ LOG.debug("Partitions revoked. [consumer-group={},
consumer={}, topic-partitions={}]",
+ kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer,
partitions);
+ if (!consumerAutoCommitMode && initialized) {
+ initialized = false;
+ commitOffsetsForAckedTuples();
+ }
+ }
+
+ @Override
+ public void onPartitionsAssigned(Collection<TopicPartition>
partitions) {
+ LOG.debug("Partitions reassignment. [consumer-group={},
consumer={}, topic-partitions={}]",
+ kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer,
partitions);
+
+ initialize(partitions);
+ }
+
+ private void initialize(Collection<TopicPartition> partitions) {
+ acked.keySet().retainAll(partitions); // remove from acked
all partitions that are no longer assigned to this spout
+ for (TopicPartition tp : partitions) {
+ final OffsetAndMetadata committedOffset =
kafkaConsumer.committed(tp);
+ final long fetchOffset = doSeek(tp, committedOffset);
+ setAcked(tp, fetchOffset);
+ }
+ initialized = true;
+ LOG.debug("Initialization complete");
+ }
+
+ /**
+ * sets the cursor to the location dictated by the first poll
strategy and returns the fetch offset
+ */
+ private long doSeek(TopicPartition tp, OffsetAndMetadata
committedOffset) {
+ long fetchOffset;
+ if (committedOffset != null) { // offset was
committed for this TopicPartition
+ if (firstPollOffsetStrategy.equals(EARLIEST)) {
+ kafkaConsumer.seekToBeginning(tp);
+ fetchOffset = kafkaConsumer.position(tp);
+ } else if (firstPollOffsetStrategy.equals(LATEST)) {
+ kafkaConsumer.seekToEnd(tp);
+ fetchOffset = kafkaConsumer.position(tp);
+ } else {
+ // do nothing - by default polling starts at the last
committed offset
+ fetchOffset = committedOffset.offset();
+ }
+ } else { // no commits have ever been done, so start at the
beginning or end depending on the strategy
+ if (firstPollOffsetStrategy.equals(EARLIEST) ||
firstPollOffsetStrategy.equals(UNCOMMITTED_EARLIEST)) {
+ kafkaConsumer.seekToBeginning(tp);
+ } else if (firstPollOffsetStrategy.equals(LATEST) ||
firstPollOffsetStrategy.equals(UNCOMMITTED_LATEST)) {
+ kafkaConsumer.seekToEnd(tp);
+ }
+ fetchOffset = kafkaConsumer.position(tp);
+ }
+ return fetchOffset;
+ }
+ }
+
+ private void setAcked(TopicPartition tp, long fetchOffset) {
+ // If this partition was previously assigned to this spout, leave
the acked offsets as they were to resume where it left off
+ if (!consumerAutoCommitMode && !acked.containsKey(tp)) {
+ acked.put(tp, new OffsetEntry(tp, fetchOffset));
+ }
+ }
+
+ // ======== Next Tuple =======
+
+ @Override
+ public void nextTuple() {
+ if (initialized) {
+ if (commit()) {
+ commitOffsetsForAckedTuples();
+ } else {
+ emitTuples(poll());
+ }
+ } else {
+ LOG.debug("Spout not initialized. Not sending tuples until
initialization completes");
+ }
+ }
+
+ private boolean commit() {
+ return !consumerAutoCommitMode && timer.isExpiredResetOnTrue();
// timer != null for non auto commit mode
+ }
+
+ private ConsumerRecords<K, V> poll() {
+ final ConsumerRecords<K, V> consumerRecords =
kafkaConsumer.poll(kafkaSpoutConfig.getPollTimeoutMs());
+ LOG.debug("Polled [{}] records from Kafka",
consumerRecords.count());
+ return consumerRecords;
+ }
+
+ private void emitTuples(ConsumerRecords<K, V> consumerRecords) {
+ for (TopicPartition tp : consumerRecords.partitions()) {
+ final Iterable<ConsumerRecord<K, V>> records =
consumerRecords.records(tp.topic());
+
+ for (final ConsumerRecord<K, V> record : records) {
+ if (record.offset() == 0 || record.offset() >
acked.get(tp).committedOffset) { // The first poll includes the last
committed offset. This if avoids duplication
+ final List<Object> tuple =
tupleBuilder.buildTuple(record);
+ final MessageId messageId = new MessageId(record,
tuple);
+
+ kafkaSpoutStreams.emit(collector, messageId);
// emits one tuple per record
+ LOG.debug("Emitted tuple [{}] for record [{}]", tuple,
record);
+ }
+ }
+ }
+ }
+
+ private void commitOffsetsForAckedTuples() {
+ // Find offsets that are ready to be committed for every topic
partition
+ final Map<TopicPartition, OffsetAndMetadata> nextCommitOffsets =
new HashMap<>();
+ for (Map.Entry<TopicPartition, OffsetEntry> tpOffset :
acked.entrySet()) {
+ final OffsetAndMetadata nextCommitOffset =
tpOffset.getValue().findNextCommitOffset();
+ if (nextCommitOffset != null) {
+ nextCommitOffsets.put(tpOffset.getKey(), nextCommitOffset);
+ }
+ }
+
+ // Commit offsets that are ready to be committed for every topic
partition
+ if (!nextCommitOffsets.isEmpty()) {
+ kafkaConsumer.commitSync(nextCommitOffsets);
+ LOG.debug("Offsets successfully committed to Kafka [{}]",
nextCommitOffsets);
+ // Instead of iterating again, it would be possible to commit
and update the state for each TopicPartition
+ // in the prior loop, but the multiple network calls should be
more expensive than iterating twice over a small loop
+ for (Map.Entry<TopicPartition, OffsetEntry> tpOffset :
acked.entrySet()) {
+ final OffsetEntry offsetEntry = tpOffset.getValue();
+
offsetEntry.commit(nextCommitOffsets.get(tpOffset.getKey()));
+ }
+ } else {
+ LOG.trace("No offsets to commit. {}", toString());
+ }
+ }
+
+ // ======== Ack =======
+ @Override
+ public void ack(Object messageId) {
+ if (!consumerAutoCommitMode) { // Only need to keep track of
acked tuples if commits are not done automatically
+ final MessageId msgId = (MessageId) messageId;
+ acked.get(msgId.getTopicPartition()).add(msgId);
+ LOG.debug("Added acked message [{}] to list of messages to be
committed to Kafka", msgId);
+ }
+ }
+
+ // ======== Fail =======
+
+ @Override
+ public void fail(Object messageId) {
+ final MessageId msgId = (MessageId) messageId;
+ if (msgId.numFails() < maxRetries) {
+ msgId.incrementNumFails();
+ kafkaSpoutStreams.emit(collector, msgId);
+ LOG.debug("Retried tuple with message id [{}]", msgId);
+ } else { // limit to max number of retries
+ LOG.debug("Reached maximum number of retries. Message [{}]
being marked as acked.", msgId);
+ ack(msgId);
+ }
+ }
+
+ // ======== Activate / Deactivate / Close / Declare Outputs =======
+
+ @Override
+ public void activate() {
+ // Shouldn't have to do anything for now. If specific cases need
to be handled logic will go here
--- End diff --
activate gets called after deactivate is called. It can potentially go
back and forth between the two states multiple times. If we shutdown in
deactivate, which I think is fine, then we need to reinitialize in activate.
I assume that once the kafkaConsumer is closed poll will not work on it any
more.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---