Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/1131#discussion_r57972451 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java --- @@ -0,0 +1,547 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichSpout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.NavigableSet; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.TimeUnit; + +import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST; +import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST; +import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST; +import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST; + +public class KafkaSpout<K, V> extends BaseRichSpout { + private static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class); + private static final Comparator<KafkaSpoutMessageId> OFFSET_COMPARATOR = new OffsetComparator(); + + // Storm + protected SpoutOutputCollector collector; + + // Kafka + private final KafkaSpoutConfig<K, V> kafkaSpoutConfig; + private transient KafkaConsumer<K, V> kafkaConsumer; + private transient boolean consumerAutoCommitMode; + + + // Bookkeeping + private transient int maxRetries; // Max number of times a tuple is retried + private transient FirstPollOffsetStrategy firstPollOffsetStrategy; // Strategy to determine the fetch offset of the first realized by the spout upon activation + private transient KafkaSpoutRetryService retryService; // Class that has the logic to handle tuple failure + private transient Timer commitTimer; // timer == null for auto commit mode + private transient boolean initialized; // Flag indicating that the spout is still undergoing initialization process. + // Initialization is only complete after the first call to KafkaSpoutConsumerRebalanceListener.onPartitionsAssigned() + + private KafkaSpoutStreams kafkaSpoutStreams; // Object that wraps all the logic to declare output fields and emit tuples + private transient KafkaSpoutTuplesBuilder<K, V> tuplesBuilder; // Object that contains the logic to build tuples for each ConsumerRecord + + private transient Map<TopicPartition, OffsetEntry> acked; // Tuples that were successfully acked. These tuples will be committed periodically when the commit timer expires, after consumer rebalance, or on close/deactivate + private transient Set<KafkaSpoutMessageId> emitted; // Tuples that have been emitted but that are "on the wire", i.e. pending being acked or failed + private transient Iterator<ConsumerRecord<K, V>> waitingToEmit; // Records that have been polled and are queued to be emitted in the nextTuple() call. One record is emitted per nextTuple() + private transient long numUncommittedOffsets; // Number of offsets that have been polled and emitted but not yet been committed + + + public KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig) { + this.kafkaSpoutConfig = kafkaSpoutConfig; // Pass in configuration + this.kafkaSpoutStreams = kafkaSpoutConfig.getKafkaSpoutStreams(); + } + + @Override + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { + initialized = false; + + // Spout internals + this.collector = collector; + maxRetries = kafkaSpoutConfig.getMaxTupleRetries(); + numUncommittedOffsets = 0; + + // Offset management + firstPollOffsetStrategy = kafkaSpoutConfig.getFirstPollOffsetStrategy(); + consumerAutoCommitMode = kafkaSpoutConfig.isConsumerAutoCommitMode(); + + // Retries management + retryService = kafkaSpoutConfig.getRetryService(); + + // Tuples builder delegate + tuplesBuilder = kafkaSpoutConfig.getTuplesBuilder(); + + if (!consumerAutoCommitMode) { // If it is auto commit, no need to commit offsets manually + commitTimer = new Timer(500, kafkaSpoutConfig.getOffsetsCommitPeriodMs(), TimeUnit.MILLISECONDS); + } + + acked = new HashMap<>(); + emitted = new HashSet<>(); + waitingToEmit = Collections.emptyListIterator(); + + LOG.info("Kafka Spout opened with the following configuration: {}", kafkaSpoutConfig); + } + + // =========== Consumer Rebalance Listener - On the same thread as the caller =========== + + private class KafkaSpoutConsumerRebalanceListener implements ConsumerRebalanceListener { + @Override + public void onPartitionsRevoked(Collection<TopicPartition> partitions) { + LOG.debug("Partitions revoked. [consumer-group={}, consumer={}, topic-partitions={}]", + kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions); + if (!consumerAutoCommitMode && initialized) { + initialized = false; + commitOffsetsForAckedTuples(); + } + } + + @Override + public void onPartitionsAssigned(Collection<TopicPartition> partitions) { + LOG.debug("Partitions reassignment. [consumer-group={}, consumer={}, topic-partitions={}]", + kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions); + + initialize(partitions); + } + + private void initialize(Collection<TopicPartition> partitions) { + if (!consumerAutoCommitMode) { + acked.keySet().retainAll(partitions); // remove from acked all partitions that are no longer assigned to this spout + } + + retryService.remove(partitions); + + for (TopicPartition tp : partitions) { + final OffsetAndMetadata committedOffset = kafkaConsumer.committed(tp); + final long fetchOffset = doSeek(tp, committedOffset); + setAcked(tp, fetchOffset); + } + initialized = true; + LOG.debug("Initialization complete"); + } + + /** + * sets the cursor to the location dictated by the first poll strategy and returns the fetch offset + */ + private long doSeek(TopicPartition tp, OffsetAndMetadata committedOffset) { + long fetchOffset; + if (committedOffset != null) { // offset was committed for this TopicPartition + if (firstPollOffsetStrategy.equals(EARLIEST)) { + kafkaConsumer.seekToBeginning(tp); + fetchOffset = kafkaConsumer.position(tp); + } else if (firstPollOffsetStrategy.equals(LATEST)) { + kafkaConsumer.seekToEnd(tp); + fetchOffset = kafkaConsumer.position(tp); + } else { + // By default polling starts at the last committed offset. +1 to point fetch to the first uncommitted offset. + fetchOffset = committedOffset.offset() + 1; + kafkaConsumer.seek(tp, fetchOffset); + } + } else { // no commits have ever been done, so start at the beginning or end depending on the strategy + if (firstPollOffsetStrategy.equals(EARLIEST) || firstPollOffsetStrategy.equals(UNCOMMITTED_EARLIEST)) { + kafkaConsumer.seekToBeginning(tp); + } else if (firstPollOffsetStrategy.equals(LATEST) || firstPollOffsetStrategy.equals(UNCOMMITTED_LATEST)) { + kafkaConsumer.seekToEnd(tp); + } + fetchOffset = kafkaConsumer.position(tp); + } + return fetchOffset; + } + } + + private void setAcked(TopicPartition tp, long fetchOffset) { + // If this partition was previously assigned to this spout, leave the acked offsets as they were to resume where it left off + if (!consumerAutoCommitMode && !acked.containsKey(tp)) { + acked.put(tp, new OffsetEntry(tp, fetchOffset)); + } + } + + // ======== Next Tuple ======= + + @Override + public void nextTuple() { + if (initialized) { + if (commit()) { + commitOffsetsForAckedTuples(); + } + + if (poll()) { + setWaitingToEmit(pollKafkaBroker()); + } + + if (waitingToEmit()) { + emit(); + } + } else { + LOG.debug("Spout not initialized. Not sending tuples until initialization completes"); + } + } + + private boolean commit() { + return !consumerAutoCommitMode && commitTimer.isExpiredResetOnTrue(); // timer != null for non auto commit mode + } + + private boolean poll() { + return !waitingToEmit() && numUncommittedOffsets < kafkaSpoutConfig.getMaxUncommittedOffsets(); + } + + private boolean waitingToEmit() { + return waitingToEmit != null && waitingToEmit.hasNext(); + } + + public void setWaitingToEmit(ConsumerRecords<K,V> consumerRecords) { + List<ConsumerRecord<K,V>> waitingToEmitList = new LinkedList<>(); + for (TopicPartition tp : consumerRecords.partitions()) { + waitingToEmitList.addAll(consumerRecords.records(tp)); + } + waitingToEmit = waitingToEmitList.iterator(); + LOG.trace("Records waiting to be emitted {}", waitingToEmitList); + } + + // ======== poll ========= + private ConsumerRecords<K, V> pollKafkaBroker() { + doSeekRetriableTopicPartitions(); + + final ConsumerRecords<K, V> consumerRecords = kafkaConsumer.poll(kafkaSpoutConfig.getPollTimeoutMs()); + final int numPolledRecords = consumerRecords.count(); + LOG.debug("Polled [{}] records from Kafka. NumUncommittedOffsets=[{}]", numPolledRecords, numUncommittedOffsets); + return consumerRecords; + } + + private void doSeekRetriableTopicPartitions() { + final Set<TopicPartition> retriableTopicPartitions = retryService.retriableTopicPartitions(); + + for (TopicPartition rtp : retriableTopicPartitions) { + final OffsetAndMetadata offsetAndMeta = acked.get(rtp).findNextCommitOffset(); + if (offsetAndMeta != null) { + kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1); // seek to the next offset that is ready to commit in next commit cycle + } else { + kafkaConsumer.seekToEnd(rtp); // Seek to last committed offset + } + } + } + + // ======== emit ========= + private void emit() { + emitTupleIfNotEmitted(waitingToEmit.next()); + waitingToEmit.remove(); --- End diff -- @revans2 I apologize but I am really not following your thought. I am totally willing to accommodate what you have in mind, but I am not quite understanding what to do. I believe this should be a simple change. Can you please paste a code snippet, or some pseudo code, on how to do what you have in mind. I would be happy to integrate that. Thanks a lot.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---