[ https://issues.apache.org/jira/browse/STORM-822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15196241#comment-15196241 ]
ASF GitHub Bot commented on STORM-822: -------------------------------------- Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1131#discussion_r56239098 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java --- @@ -0,0 +1,517 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy; +import org.apache.storm.kafka.spout.KafkaSpoutConfig.PollStrategy; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichSpout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NavigableSet; +import java.util.TreeSet; +import java.util.concurrent.TimeUnit; + +import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST; +import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST; +import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST; +import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST; + +public class KafkaSpout<K, V> extends BaseRichSpout { + private static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class); + private static final Comparator<KafkaSpoutMessageId> OFFSET_COMPARATOR = new OffsetComparator(); + + // Storm + protected SpoutOutputCollector collector; + + // Kafka + private final KafkaSpoutConfig<K, V> kafkaSpoutConfig; + private transient KafkaConsumer<K, V> kafkaConsumer; + private transient boolean consumerAutoCommitMode; + + + // Bookkeeping + private KafkaSpoutStreams kafkaSpoutStreams; + private KafkaSpoutTupleBuilder<K, V> tupleBuilder; + private transient Timer commitTimer; // timer == null for auto commit mode + private transient Timer logTimer; + private transient Map<TopicPartition, OffsetEntry> acked; // emitted tuples that were successfully acked. These tuples will be committed periodically when the timer expires, on consumer rebalance, or on close/deactivate + private transient int maxRetries; // Max number of times a tuple is retried + private transient boolean initialized; // Flag indicating that the spout is still undergoing initialization process. + // Initialization is only complete after the first call to KafkaSpoutConsumerRebalanceListener.onPartitionsAssigned() + private transient long numUncommittedOffsets; // Number of offsets that have been polled and emitted but not yet been committed + private transient FirstPollOffsetStrategy firstPollOffsetStrategy; + private transient PollStrategy pollStrategy; + + + public KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig, KafkaSpoutTupleBuilder<K, V> tupleBuilder) { + this.kafkaSpoutConfig = kafkaSpoutConfig; // Pass in configuration + this.kafkaSpoutStreams = kafkaSpoutConfig.getKafkaSpoutStreams(); + this.tupleBuilder = tupleBuilder; + } + + @Override + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { + initialized = false; + + // Spout internals + this.collector = collector; + maxRetries = kafkaSpoutConfig.getMaxTupleRetries(); + numUncommittedOffsets = 0; + logTimer = new Timer(500, Math.min(1000, kafkaSpoutConfig.getOffsetsCommitPeriodMs()/2), TimeUnit.MILLISECONDS); + + // Offset management + firstPollOffsetStrategy = kafkaSpoutConfig.getFirstPollOffsetStrategy(); + pollStrategy = kafkaSpoutConfig.getPollStrategy(); + consumerAutoCommitMode = kafkaSpoutConfig.isConsumerAutoCommitMode(); + + if (!consumerAutoCommitMode) { // If it is auto commit, no need to commit offsets manually + commitTimer = new Timer(500, kafkaSpoutConfig.getOffsetsCommitPeriodMs(), TimeUnit.MILLISECONDS); + acked = new HashMap<>(); + } + + LOG.info("Kafka Spout opened with the following configuration: {}", kafkaSpoutConfig); + } + + // =========== Consumer Rebalance Listener - On the same thread as the caller =========== + + private class KafkaSpoutConsumerRebalanceListener implements ConsumerRebalanceListener { + @Override + public void onPartitionsRevoked(Collection<TopicPartition> partitions) { + LOG.debug("Partitions revoked. [consumer-group={}, consumer={}, topic-partitions={}]", + kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions); + if (!consumerAutoCommitMode && initialized) { + initialized = false; + commitOffsetsForAckedTuples(); + } + } + + @Override + public void onPartitionsAssigned(Collection<TopicPartition> partitions) { + LOG.debug("Partitions reassignment. [consumer-group={}, consumer={}, topic-partitions={}]", + kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions); + + initialize(partitions); + } + + private void initialize(Collection<TopicPartition> partitions) { + if (!consumerAutoCommitMode) { + acked.keySet().retainAll(partitions); // remove from acked all partitions that are no longer assigned to this spout + } + + for (TopicPartition tp : partitions) { + final OffsetAndMetadata committedOffset = kafkaConsumer.committed(tp); + final long fetchOffset = doSeek(tp, committedOffset); + setAcked(tp, fetchOffset); + } + initialized = true; + LOG.debug("Initialization complete"); + } + + /** + * sets the cursor to the location dictated by the first poll strategy and returns the fetch offset + */ + private long doSeek(TopicPartition tp, OffsetAndMetadata committedOffset) { + long fetchOffset; + if (committedOffset != null) { // offset was committed for this TopicPartition + if (firstPollOffsetStrategy.equals(EARLIEST)) { + kafkaConsumer.seekToBeginning(tp); + fetchOffset = kafkaConsumer.position(tp); + } else if (firstPollOffsetStrategy.equals(LATEST)) { + kafkaConsumer.seekToEnd(tp); + fetchOffset = kafkaConsumer.position(tp); + } else { + // By default polling starts at the last committed offset. +1 to point fetch to the first uncommitted offset. + fetchOffset = committedOffset.offset() + 1; + kafkaConsumer.seek(tp, fetchOffset); + } + } else { // no commits have ever been done, so start at the beginning or end depending on the strategy + if (firstPollOffsetStrategy.equals(EARLIEST) || firstPollOffsetStrategy.equals(UNCOMMITTED_EARLIEST)) { + kafkaConsumer.seekToBeginning(tp); + } else if (firstPollOffsetStrategy.equals(LATEST) || firstPollOffsetStrategy.equals(UNCOMMITTED_LATEST)) { + kafkaConsumer.seekToEnd(tp); + } + fetchOffset = kafkaConsumer.position(tp); + } + return fetchOffset; + } + } + + private void setAcked(TopicPartition tp, long fetchOffset) { + // If this partition was previously assigned to this spout, leave the acked offsets as they were to resume where it left off + if (!consumerAutoCommitMode && !acked.containsKey(tp)) { + acked.put(tp, new OffsetEntry(tp, fetchOffset)); + } + } + + // ======== Next Tuple ======= + + @Override + public void nextTuple() { + if (initialized) { + if (commit()) { + commitOffsetsForAckedTuples(); + } else if (poll()) { + emitTuples(pollKafkaBroker()); + } else if (logTimer.isExpiredResetOnTrue()){ + log(); + } + } else { + LOG.debug("Spout not initialized. Not sending tuples until initialization completes"); + } + } + + private void log() { + switch(pollStrategy) { + case STREAM: + LOG.trace("Reached the maximum number number of uncommitted records [{}]. " + + "No more polls will occur until a sequence of commits sets the count under the [{}] threshold ", + numUncommittedOffsets, kafkaSpoutConfig.getMaxUncommittedOffsets()); + break; + case BATCH: + LOG.trace("No more polls will occur until the last batch completes. [{}] emitted tuples pending", numUncommittedOffsets); + break; + default: + throw new IllegalStateException("No implementation defined for polling strategy " + pollStrategy); + } + + } + + // always poll in auto commit mode because no state is kept and therefore there is no need to set an upper limit in memory + private boolean poll() { + switch(pollStrategy) { + case STREAM: + return consumerAutoCommitMode || numUncommittedOffsets < kafkaSpoutConfig.getMaxUncommittedOffsets(); + case BATCH: + return consumerAutoCommitMode || numUncommittedOffsets <= 0; + default: + throw new IllegalStateException("No implementation defined for polling strategy " + pollStrategy); + } + } + + private boolean commit() { + return !consumerAutoCommitMode && commitTimer.isExpiredResetOnTrue(); // timer != null for non auto commit mode + } + + private ConsumerRecords<K, V> pollKafkaBroker() { + final ConsumerRecords<K, V> consumerRecords = kafkaConsumer.poll(kafkaSpoutConfig.getPollTimeoutMs()); + final int numPolledRecords = consumerRecords.count(); + numUncommittedOffsets+= numPolledRecords; + LOG.debug("Polled [{}] records from Kafka. NumUncommittedOffsets=[{}]", numPolledRecords, numUncommittedOffsets); + return consumerRecords; + } + + private void emitTuples(ConsumerRecords<K, V> consumerRecords) { + for (TopicPartition tp : consumerRecords.partitions()) { + final Iterable<ConsumerRecord<K, V>> records = consumerRecords.records(tp.topic()); + + for (final ConsumerRecord<K, V> record : records) { + final List<Object> tuple = tupleBuilder.buildTuple(record, kafkaSpoutStreams); + final KafkaSpoutMessageId messageId = new KafkaSpoutMessageId(record, tuple); + + kafkaSpoutStreams.emit(collector, messageId); // emits one tuple per record + LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record); + } + } + } + + private void commitOffsetsForAckedTuples() { + // Find offsets that are ready to be committed for every topic partition + final Map<TopicPartition, OffsetAndMetadata> nextCommitOffsets = new HashMap<>(); + for (Map.Entry<TopicPartition, OffsetEntry> tpOffset : acked.entrySet()) { + final OffsetAndMetadata nextCommitOffset = tpOffset.getValue().findNextCommitOffset(); + if (nextCommitOffset != null) { + nextCommitOffsets.put(tpOffset.getKey(), nextCommitOffset); + } + } + + // Commit offsets that are ready to be committed for every topic partition + if (!nextCommitOffsets.isEmpty()) { + kafkaConsumer.commitSync(nextCommitOffsets); + LOG.debug("Offsets successfully committed to Kafka [{}]", nextCommitOffsets); + // Instead of iterating again, it would be possible to commit and update the state for each TopicPartition + // in the prior loop, but the multiple network calls should be more expensive than iterating twice over a small loop + for (Map.Entry<TopicPartition, OffsetEntry> tpOffset : acked.entrySet()) { + final OffsetEntry offsetEntry = tpOffset.getValue(); + offsetEntry.commit(nextCommitOffsets.get(tpOffset.getKey())); + } + } else { + LOG.trace("No offsets to commit. {}", this); + } + } + + // ======== Ack ======= + + @Override + public void ack(Object messageId) { + if (!consumerAutoCommitMode) { // Only need to keep track of acked tuples if commits are not done automatically + final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId; + acked.get(msgId.getTopicPartition()).add(msgId); + LOG.trace("Acked message [{}]. Messages acked and pending commit [{}]", msgId, acked); + } + } + + // ======== Fail ======= + + @Override + public void fail(Object messageId) { // TODO poll all tuples after the failed tuple + final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId; + if (msgId.numFails() < maxRetries) { + msgId.incrementNumFails(); + kafkaSpoutStreams.emit(collector, msgId); + LOG.trace("Retried tuple with message id [{}]", msgId); + } else { // limit to max number of retries + LOG.debug("Reached maximum number of retries. Message [{}] being marked as acked.", msgId); + ack(msgId); + } + } + + // ======== Activate / Deactivate / Close / Declare Outputs ======= + + @Override + public void activate() { + subscribeKafkaConsumer(); + } + + private void subscribeKafkaConsumer() { + kafkaConsumer = new KafkaConsumer<>(kafkaSpoutConfig.getKafkaProps(), + kafkaSpoutConfig.getKeyDeserializer(), kafkaSpoutConfig.getValueDeserializer()); + kafkaConsumer.subscribe(kafkaSpoutConfig.getSubscribedTopics(), new KafkaSpoutConsumerRebalanceListener()); + // Initial poll to get the consumer registration process going. + // KafkaSpoutConsumerRebalanceListener will be called following this poll, upon partition registration + kafkaConsumer.poll(0); + } + + @Override + public void deactivate() { + shutdown(); + } + + @Override + public void close() { + shutdown(); + } + + private void shutdown() { + try { + kafkaConsumer.wakeup(); + if (!consumerAutoCommitMode) { + commitOffsetsForAckedTuples(); + } + } finally { + //remove resources + kafkaConsumer.close(); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + kafkaSpoutStreams.declareOutputFields(declarer); + } + + @Override + public String toString() { + return "{acked=" + acked + "} "; + } + + // ======= Offsets Commit Management ========== + + private static class OffsetComparator implements Comparator<KafkaSpoutMessageId> { + public int compare(KafkaSpoutMessageId m1, KafkaSpoutMessageId m2) { + return m1.offset() < m2.offset() ? -1 : m1.offset() == m2.offset() ? 0 : 1; + } + } + + /** + * This class is not thread safe + */ + private class OffsetEntry { + private final TopicPartition tp; + private final long initialFetchOffset; /* First offset to be fetched. It is either set to the beginning, end, or to the first uncommitted offset. + * Initial value depends on offset strategy. See KafkaSpoutConsumerRebalanceListener */ + private long committedOffset; // last offset committed to Kafka. Initially it is set to fetchOffset - 1 + private final NavigableSet<KafkaSpoutMessageId> ackedMsgs = new TreeSet<>(OFFSET_COMPARATOR); // acked messages sorted by ascending order of offset + + public OffsetEntry(TopicPartition tp, long initialFetchOffset) { + this.tp = tp; + this.initialFetchOffset = initialFetchOffset; + this.committedOffset = initialFetchOffset - 1; + LOG.debug("Created OffsetEntry. {}", this); + } + + public void add(KafkaSpoutMessageId msgId) { // O(Log N) + ackedMsgs.add(msgId); + } + + /** + * @return the next OffsetAndMetadata to commit, or null if no offset is ready to commit. + */ + public OffsetAndMetadata findNextCommitOffset() { + boolean found = false; + long currOffset; + long nextCommitOffset = committedOffset; + KafkaSpoutMessageId nextCommitMsg = null; // this is a convenience variable to make it faster to create OffsetAndMetadata + + for (KafkaSpoutMessageId currAckedMsg : ackedMsgs) { // complexity is that of a linear scan on a TreeMap + if ((currOffset = currAckedMsg.offset()) == initialFetchOffset || currOffset == nextCommitOffset + 1) { // found the next offset to commit + found = true; + nextCommitMsg = currAckedMsg; + nextCommitOffset = currOffset; + LOG.trace("Found offset to commit [{}]. {}", currOffset, this); + } else if (currAckedMsg.offset() > nextCommitOffset + 1) { // offset found is not continuous to the offsets listed to go in the next commit, so stop search + LOG.debug("Non continuous offset found [{}]. It will be processed in a subsequent batch. {}", currOffset, this); + break; + } else { + LOG.debug("Unexpected offset found [{}]. {}", currOffset, this); + break; + } + } + + OffsetAndMetadata nextCommitOffsetAndMetadata = null; + if (found) { + nextCommitOffsetAndMetadata = new OffsetAndMetadata(nextCommitOffset, nextCommitMsg.getMetadata(Thread.currentThread())); + LOG.debug("Offset to be committed next: [{}] {}", nextCommitOffsetAndMetadata.offset(), this); + } else { + LOG.debug("No offsets ready to commit. {}", this); + } + return nextCommitOffsetAndMetadata; + } + + /** + * Marks an offset has committed. This method has side effects - it sets the internal state in such a way that future + * calls to {@link #findNextCommitOffset()} will return offsets greater than the offset specified, if any. + * + * @param committedOffset offset to be marked as committed + */ + public void commit(OffsetAndMetadata committedOffset) { + if (committedOffset != null) { + /*final long numCommittedOffsets = this.committedOffset < initialFetchOffset + ? committedOffset.offset() - this.committedOffset + 1 // +1 because fetchOffset is pointing to the first uncommitted offset + : committedOffset.offset() - this.committedOffset; +*/ + final long numCommittedOffsets = committedOffset.offset() - this.committedOffset; + + this.committedOffset = committedOffset.offset(); + for (Iterator<KafkaSpoutMessageId> iterator = ackedMsgs.iterator(); iterator.hasNext(); ) { + if (iterator.next().offset() <= committedOffset.offset()) { + iterator.remove(); + } else { + break; + } + } + numUncommittedOffsets-= numCommittedOffsets; + } + LOG.trace("Object state after update: {}, numUncommittedOffsets [{}]", this, numUncommittedOffsets); + } + + public boolean isEmpty() { + return ackedMsgs.isEmpty(); + } + + /*@Override + public String toString() { + return "OffsetEntry{" + + "topic-partition=" + tp + + ", committedOffset=" + committedOffset + + ", ackedMsgs=" + ackedMsgs + + '}'; + }*/ --- End diff -- again lest remove commented out code. > As a storm developer I’d like to use the new kafka consumer API (0.8.3) to > reduce dependencies and use long term supported kafka apis > -------------------------------------------------------------------------------------------------------------------------------------- > > Key: STORM-822 > URL: https://issues.apache.org/jira/browse/STORM-822 > Project: Apache Storm > Issue Type: Story > Components: storm-kafka > Reporter: Thomas Becker > Assignee: Hugo Louro > -- This message was sent by Atlassian JIRA (v6.3.4#6332)