[ https://issues.apache.org/jira/browse/STORM-822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15218968#comment-15218968 ]
ASF GitHub Bot commented on STORM-822: -------------------------------------- Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/1131#discussion_r57972451 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java --- @@ -0,0 +1,547 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichSpout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.NavigableSet; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.TimeUnit; + +import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST; +import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST; +import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST; +import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST; + +public class KafkaSpout<K, V> extends BaseRichSpout { + private static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class); + private static final Comparator<KafkaSpoutMessageId> OFFSET_COMPARATOR = new OffsetComparator(); + + // Storm + protected SpoutOutputCollector collector; + + // Kafka + private final KafkaSpoutConfig<K, V> kafkaSpoutConfig; + private transient KafkaConsumer<K, V> kafkaConsumer; + private transient boolean consumerAutoCommitMode; + + + // Bookkeeping + private transient int maxRetries; // Max number of times a tuple is retried + private transient FirstPollOffsetStrategy firstPollOffsetStrategy; // Strategy to determine the fetch offset of the first realized by the spout upon activation + private transient KafkaSpoutRetryService retryService; // Class that has the logic to handle tuple failure + private transient Timer commitTimer; // timer == null for auto commit mode + private transient boolean initialized; // Flag indicating that the spout is still undergoing initialization process. + // Initialization is only complete after the first call to KafkaSpoutConsumerRebalanceListener.onPartitionsAssigned() + + private KafkaSpoutStreams kafkaSpoutStreams; // Object that wraps all the logic to declare output fields and emit tuples + private transient KafkaSpoutTuplesBuilder<K, V> tuplesBuilder; // Object that contains the logic to build tuples for each ConsumerRecord + + private transient Map<TopicPartition, OffsetEntry> acked; // Tuples that were successfully acked. These tuples will be committed periodically when the commit timer expires, after consumer rebalance, or on close/deactivate + private transient Set<KafkaSpoutMessageId> emitted; // Tuples that have been emitted but that are "on the wire", i.e. pending being acked or failed + private transient Iterator<ConsumerRecord<K, V>> waitingToEmit; // Records that have been polled and are queued to be emitted in the nextTuple() call. One record is emitted per nextTuple() + private transient long numUncommittedOffsets; // Number of offsets that have been polled and emitted but not yet been committed + + + public KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig) { + this.kafkaSpoutConfig = kafkaSpoutConfig; // Pass in configuration + this.kafkaSpoutStreams = kafkaSpoutConfig.getKafkaSpoutStreams(); + } + + @Override + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { + initialized = false; + + // Spout internals + this.collector = collector; + maxRetries = kafkaSpoutConfig.getMaxTupleRetries(); + numUncommittedOffsets = 0; + + // Offset management + firstPollOffsetStrategy = kafkaSpoutConfig.getFirstPollOffsetStrategy(); + consumerAutoCommitMode = kafkaSpoutConfig.isConsumerAutoCommitMode(); + + // Retries management + retryService = kafkaSpoutConfig.getRetryService(); + + // Tuples builder delegate + tuplesBuilder = kafkaSpoutConfig.getTuplesBuilder(); + + if (!consumerAutoCommitMode) { // If it is auto commit, no need to commit offsets manually + commitTimer = new Timer(500, kafkaSpoutConfig.getOffsetsCommitPeriodMs(), TimeUnit.MILLISECONDS); + } + + acked = new HashMap<>(); + emitted = new HashSet<>(); + waitingToEmit = Collections.emptyListIterator(); + + LOG.info("Kafka Spout opened with the following configuration: {}", kafkaSpoutConfig); + } + + // =========== Consumer Rebalance Listener - On the same thread as the caller =========== + + private class KafkaSpoutConsumerRebalanceListener implements ConsumerRebalanceListener { + @Override + public void onPartitionsRevoked(Collection<TopicPartition> partitions) { + LOG.debug("Partitions revoked. [consumer-group={}, consumer={}, topic-partitions={}]", + kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions); + if (!consumerAutoCommitMode && initialized) { + initialized = false; + commitOffsetsForAckedTuples(); + } + } + + @Override + public void onPartitionsAssigned(Collection<TopicPartition> partitions) { + LOG.debug("Partitions reassignment. [consumer-group={}, consumer={}, topic-partitions={}]", + kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions); + + initialize(partitions); + } + + private void initialize(Collection<TopicPartition> partitions) { + if (!consumerAutoCommitMode) { + acked.keySet().retainAll(partitions); // remove from acked all partitions that are no longer assigned to this spout + } + + retryService.remove(partitions); + + for (TopicPartition tp : partitions) { + final OffsetAndMetadata committedOffset = kafkaConsumer.committed(tp); + final long fetchOffset = doSeek(tp, committedOffset); + setAcked(tp, fetchOffset); + } + initialized = true; + LOG.debug("Initialization complete"); + } + + /** + * sets the cursor to the location dictated by the first poll strategy and returns the fetch offset + */ + private long doSeek(TopicPartition tp, OffsetAndMetadata committedOffset) { + long fetchOffset; + if (committedOffset != null) { // offset was committed for this TopicPartition + if (firstPollOffsetStrategy.equals(EARLIEST)) { + kafkaConsumer.seekToBeginning(tp); + fetchOffset = kafkaConsumer.position(tp); + } else if (firstPollOffsetStrategy.equals(LATEST)) { + kafkaConsumer.seekToEnd(tp); + fetchOffset = kafkaConsumer.position(tp); + } else { + // By default polling starts at the last committed offset. +1 to point fetch to the first uncommitted offset. + fetchOffset = committedOffset.offset() + 1; + kafkaConsumer.seek(tp, fetchOffset); + } + } else { // no commits have ever been done, so start at the beginning or end depending on the strategy + if (firstPollOffsetStrategy.equals(EARLIEST) || firstPollOffsetStrategy.equals(UNCOMMITTED_EARLIEST)) { + kafkaConsumer.seekToBeginning(tp); + } else if (firstPollOffsetStrategy.equals(LATEST) || firstPollOffsetStrategy.equals(UNCOMMITTED_LATEST)) { + kafkaConsumer.seekToEnd(tp); + } + fetchOffset = kafkaConsumer.position(tp); + } + return fetchOffset; + } + } + + private void setAcked(TopicPartition tp, long fetchOffset) { + // If this partition was previously assigned to this spout, leave the acked offsets as they were to resume where it left off + if (!consumerAutoCommitMode && !acked.containsKey(tp)) { + acked.put(tp, new OffsetEntry(tp, fetchOffset)); + } + } + + // ======== Next Tuple ======= + + @Override + public void nextTuple() { + if (initialized) { + if (commit()) { + commitOffsetsForAckedTuples(); + } + + if (poll()) { + setWaitingToEmit(pollKafkaBroker()); + } + + if (waitingToEmit()) { + emit(); + } + } else { + LOG.debug("Spout not initialized. Not sending tuples until initialization completes"); + } + } + + private boolean commit() { + return !consumerAutoCommitMode && commitTimer.isExpiredResetOnTrue(); // timer != null for non auto commit mode + } + + private boolean poll() { + return !waitingToEmit() && numUncommittedOffsets < kafkaSpoutConfig.getMaxUncommittedOffsets(); + } + + private boolean waitingToEmit() { + return waitingToEmit != null && waitingToEmit.hasNext(); + } + + public void setWaitingToEmit(ConsumerRecords<K,V> consumerRecords) { + List<ConsumerRecord<K,V>> waitingToEmitList = new LinkedList<>(); + for (TopicPartition tp : consumerRecords.partitions()) { + waitingToEmitList.addAll(consumerRecords.records(tp)); + } + waitingToEmit = waitingToEmitList.iterator(); + LOG.trace("Records waiting to be emitted {}", waitingToEmitList); + } + + // ======== poll ========= + private ConsumerRecords<K, V> pollKafkaBroker() { + doSeekRetriableTopicPartitions(); + + final ConsumerRecords<K, V> consumerRecords = kafkaConsumer.poll(kafkaSpoutConfig.getPollTimeoutMs()); + final int numPolledRecords = consumerRecords.count(); + LOG.debug("Polled [{}] records from Kafka. NumUncommittedOffsets=[{}]", numPolledRecords, numUncommittedOffsets); + return consumerRecords; + } + + private void doSeekRetriableTopicPartitions() { + final Set<TopicPartition> retriableTopicPartitions = retryService.retriableTopicPartitions(); + + for (TopicPartition rtp : retriableTopicPartitions) { + final OffsetAndMetadata offsetAndMeta = acked.get(rtp).findNextCommitOffset(); + if (offsetAndMeta != null) { + kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1); // seek to the next offset that is ready to commit in next commit cycle + } else { + kafkaConsumer.seekToEnd(rtp); // Seek to last committed offset + } + } + } + + // ======== emit ========= + private void emit() { + emitTupleIfNotEmitted(waitingToEmit.next()); + waitingToEmit.remove(); --- End diff -- @revans2 I apologize but I am really not following your thought. I am totally willing to accommodate what you have in mind, but I am not quite understanding what to do. I believe this should be a simple change. Can you please paste a code snippet, or some pseudo code, on how to do what you have in mind. I would be happy to integrate that. Thanks a lot. > As a storm developer I’d like to use the new kafka consumer API (0.8.3) to > reduce dependencies and use long term supported kafka apis > -------------------------------------------------------------------------------------------------------------------------------------- > > Key: STORM-822 > URL: https://issues.apache.org/jira/browse/STORM-822 > Project: Apache Storm > Issue Type: Story > Components: storm-kafka > Reporter: Thomas Becker > Assignee: Hugo Louro > -- This message was sent by Atlassian JIRA (v6.3.4#6332)