[ https://issues.apache.org/jira/browse/STORM-822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15174701#comment-15174701 ]
ASF GitHub Bot commented on STORM-822: -------------------------------------- Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/1131#discussion_r54661378 --- Diff: external/storm-kafka-new-consumer-api/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java --- @@ -0,0 +1,445 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichSpout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NavigableSet; +import java.util.TreeSet; +import java.util.concurrent.TimeUnit; + +import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST; +import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST; +import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST; +import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST; + +public class KafkaSpout<K,V> extends BaseRichSpout { + private static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class); + private static final Comparator<MessageId> OFFSET_COMPARATOR = new OffsetComparator(); + + // Storm + protected SpoutOutputCollector collector; + + // Kafka + private final KafkaSpoutConfig<K, V> kafkaSpoutConfig; + private KafkaConsumer<K, V> kafkaConsumer; + private transient boolean consumerAutoCommitMode; + private transient FirstPollOffsetStrategy firstPollOffsetStrategy; + + // Bookkeeping + private KafkaSpoutStreams kafkaSpoutStreams; + private KafkaTupleBuilder<K,V> tupleBuilder; + private transient Timer timer; // timer == null for auto commit mode + private transient Map<TopicPartition, OffsetEntry> acked; // emitted tuples that were successfully acked. These tuples will be committed by the commitOffsetsTask or on consumer rebalance + private transient int maxRetries; // Max number of times a tuple is retried + private transient boolean initialized; // Flag indicating that the spout is still undergoing initialization process. + // Initialization is only complete after the first call to KafkaSpoutConsumerRebalanceListener.onPartitionsAssigned() + + + public KafkaSpout(KafkaSpoutConfig<K,V> kafkaSpoutConfig, KafkaSpoutStreams kafkaSpoutStreams, KafkaTupleBuilder<K,V> tupleBuilder) { + this.kafkaSpoutConfig = kafkaSpoutConfig; // Pass in configuration + this.kafkaSpoutStreams = kafkaSpoutStreams; + this.tupleBuilder = tupleBuilder; + } + + @Override + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { + initialized = false; + + // Spout internals + this.collector = collector; + maxRetries = kafkaSpoutConfig.getMaxTupleRetries(); + + // Offset management + firstPollOffsetStrategy = kafkaSpoutConfig.getFirstPollOffsetStrategy(); + consumerAutoCommitMode = kafkaSpoutConfig.isConsumerAutoCommitMode(); + + if (!consumerAutoCommitMode) { // If it is auto commit, no need to commit offsets manually + timer = new Timer(kafkaSpoutConfig.getOffsetsCommitFreqMs(), 500, TimeUnit.MILLISECONDS); + acked = new HashMap<>(); + } + + // Kafka consumer + kafkaConsumer = new KafkaConsumer<>(kafkaSpoutConfig.getKafkaProps(), + kafkaSpoutConfig.getKeyDeserializer(), kafkaSpoutConfig.getValueDeserializer()); + kafkaConsumer.subscribe(kafkaSpoutConfig.getSubscribedTopics(), new KafkaSpoutConsumerRebalanceListener()); + // Initial poll to get the consumer registration process going. + // KafkaSpoutConsumerRebalanceListener will be called foloowing this poll upon partition registration + kafkaConsumer.poll(0); + + LOG.debug("Kafka Spout opened with the following configuration: {}", kafkaSpoutConfig.toString()); + } + + // ======== Next Tuple ======= + + @Override + public void nextTuple() { + if (initialized) { + if(commit()) { + commitOffsetsForAckedTuples(); + } else { + emitTuples(poll()); + } + } else { + LOG.debug("Spout not initialized. Not sending tuples until initialization completes"); + } + } + + private boolean commit() { + return !consumerAutoCommitMode && timer.expired(); // timer != null for non auto commit mode + } + + private ConsumerRecords<K, V> poll() { + final ConsumerRecords<K, V> consumerRecords = kafkaConsumer.poll(kafkaSpoutConfig.getPollTimeoutMs()); + LOG.debug("Polled [{}] records from Kafka", consumerRecords.count()); + return consumerRecords; + } + + private void emitTuples(ConsumerRecords<K, V> consumerRecords) { + for (TopicPartition tp : consumerRecords.partitions()) { + final Iterable<ConsumerRecord<K, V>> records = consumerRecords.records(tp.topic()); // TODO Decide if want to give flexibility to emmit/poll either per topic or per partition + + for (final ConsumerRecord<K, V> record : records) { + if (record.offset() == 0 || record.offset() > acked.get(tp).committedOffset) { // The first poll includes the last committed offset. This if avoids duplication + final List<Object> tuple = tupleBuilder.buildTuple(record); + final MessageId messageId = new MessageId(record, tuple); // TODO don't create message for non acking mode. Should we support non acking mode? + + kafkaSpoutStreams.emit(collector, messageId); // emits one tuple per record + LOG.debug("Emitted tuple [{}] for record [{}]", tuple, record); + } + } + } + } + + // ======== Ack ======= + @Override + public void ack(Object messageId) { + final MessageId msgId = (MessageId) messageId; + final TopicPartition tp = msgId.getTopicPartition(); + + if (!consumerAutoCommitMode) { // Only need to keep track of acked tuples if commits are not done automatically + acked.get(tp).add(msgId); + LOG.debug("Adding acked message to [{}] to list of messages to be committed to Kafka", msgId); + } + } + + // ======== Fail ======= + + @Override + public void fail(Object messageId) { + final MessageId msgId = (MessageId) messageId; + if (msgId.numFails() < maxRetries) { + msgId.incrementNumFails(); + kafkaSpoutStreams.emit(collector, msgId); + LOG.debug("Retried tuple with message id [{}]", messageId); + } else { // limit to max number of retries + LOG.debug("Reached maximum number of retries. Message being marked as acked."); + ack(msgId); + } + } + + // ======== Activate / Deactivate ======= + + @Override + public void activate() { + // Shouldn't have to do anything for now. If specific cases need to be handled logic will go here + } + + @Override + public void deactivate() { + if(!consumerAutoCommitMode) { + commitOffsetsForAckedTuples(); + } + } + + @Override + public void close() { + try { + kafkaConsumer.wakeup(); + if(!consumerAutoCommitMode) { + commitOffsetsForAckedTuples(); + } + } finally { + //remove resources + kafkaConsumer.close(); + } + } + + // TODO must declare multiple output streams + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + kafkaSpoutStreams.declareOutputFields(declarer); + } + + // ====== Private helper methods ====== + + private void commitOffsetsForAckedTuples() { + final Map<TopicPartition, OffsetAndMetadata> nextCommitOffsets = new HashMap<>(); + + try { + for (Map.Entry<TopicPartition, OffsetEntry> tpOffset : acked.entrySet()) { + final OffsetAndMetadata offsetAndMetadata = tpOffset.getValue().findNextCommitOffset(); + if (offsetAndMetadata != null) { + nextCommitOffsets.put(tpOffset.getKey(), offsetAndMetadata); + } + } + + if (!nextCommitOffsets.isEmpty()) { + kafkaConsumer.commitSync(nextCommitOffsets); + LOG.debug("Offsets successfully committed to Kafka [{}]", nextCommitOffsets); + // Instead of iterating again, we could commit and update state for each TopicPartition in the prior loop, + // but the multiple network calls should be more expensive than iterating twice over a small loop + for (Map.Entry<TopicPartition, OffsetEntry> tpOffset : acked.entrySet()) { + final OffsetEntry offsetEntry = tpOffset.getValue(); + offsetEntry.updateAckedState(nextCommitOffsets.get(tpOffset.getKey())); + } + } else { + LOG.trace("No offsets to commit. {}", toString()); + } + } catch (Exception e) { + LOG.error("Exception occurred while committing to Kafka offsets of acked tuples", e); --- End diff -- Done. Catch removed. > As a storm developer I’d like to use the new kafka consumer API (0.8.3) to > reduce dependencies and use long term supported kafka apis > -------------------------------------------------------------------------------------------------------------------------------------- > > Key: STORM-822 > URL: https://issues.apache.org/jira/browse/STORM-822 > Project: Apache Storm > Issue Type: Story > Components: storm-kafka > Reporter: Thomas Becker > Assignee: Hugo Louro > -- This message was sent by Atlassian JIRA (v6.3.4#6332)