[
https://issues.apache.org/jira/browse/STORM-822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15174701#comment-15174701
]
ASF GitHub Bot commented on STORM-822:
--------------------------------------
Github user hmcl commented on a diff in the pull request:
https://github.com/apache/storm/pull/1131#discussion_r54661378
--- Diff:
external/storm-kafka-new-consumer-api/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
---
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+
+import static
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
+import static
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST;
+import static
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
+import static
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST;
+
+public class KafkaSpout<K,V> extends BaseRichSpout {
+ private static final Logger LOG =
LoggerFactory.getLogger(KafkaSpout.class);
+ private static final Comparator<MessageId> OFFSET_COMPARATOR = new
OffsetComparator();
+
+ // Storm
+ protected SpoutOutputCollector collector;
+
+ // Kafka
+ private final KafkaSpoutConfig<K, V> kafkaSpoutConfig;
+ private KafkaConsumer<K, V> kafkaConsumer;
+ private transient boolean consumerAutoCommitMode;
+ private transient FirstPollOffsetStrategy firstPollOffsetStrategy;
+
+ // Bookkeeping
+ private KafkaSpoutStreams kafkaSpoutStreams;
+ private KafkaTupleBuilder<K,V> tupleBuilder;
+ private transient Timer timer; //
timer == null for auto commit mode
+ private transient Map<TopicPartition, OffsetEntry> acked; //
emitted tuples that were successfully acked. These tuples will be committed by
the commitOffsetsTask or on consumer rebalance
+ private transient int maxRetries; //
Max number of times a tuple is retried
+ private transient boolean initialized; // Flag indicating
that the spout is still undergoing initialization process.
+ // Initialization is
only complete after the first call to
KafkaSpoutConsumerRebalanceListener.onPartitionsAssigned()
+
+
+ public KafkaSpout(KafkaSpoutConfig<K,V> kafkaSpoutConfig,
KafkaSpoutStreams kafkaSpoutStreams, KafkaTupleBuilder<K,V> tupleBuilder) {
+ this.kafkaSpoutConfig = kafkaSpoutConfig; // Pass
in configuration
+ this.kafkaSpoutStreams = kafkaSpoutStreams;
+ this.tupleBuilder = tupleBuilder;
+ }
+
+ @Override
+ public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
+ initialized = false;
+
+ // Spout internals
+ this.collector = collector;
+ maxRetries = kafkaSpoutConfig.getMaxTupleRetries();
+
+ // Offset management
+ firstPollOffsetStrategy =
kafkaSpoutConfig.getFirstPollOffsetStrategy();
+ consumerAutoCommitMode =
kafkaSpoutConfig.isConsumerAutoCommitMode();
+
+ if (!consumerAutoCommitMode) { // If it is auto commit, no
need to commit offsets manually
+ timer = new Timer(kafkaSpoutConfig.getOffsetsCommitFreqMs(),
500, TimeUnit.MILLISECONDS);
+ acked = new HashMap<>();
+ }
+
+ // Kafka consumer
+ kafkaConsumer = new
KafkaConsumer<>(kafkaSpoutConfig.getKafkaProps(),
+ kafkaSpoutConfig.getKeyDeserializer(),
kafkaSpoutConfig.getValueDeserializer());
+ kafkaConsumer.subscribe(kafkaSpoutConfig.getSubscribedTopics(),
new KafkaSpoutConsumerRebalanceListener());
+ // Initial poll to get the consumer registration process going.
+ // KafkaSpoutConsumerRebalanceListener will be called foloowing
this poll upon partition registration
+ kafkaConsumer.poll(0);
+
+ LOG.debug("Kafka Spout opened with the following configuration:
{}", kafkaSpoutConfig.toString());
+ }
+
+ // ======== Next Tuple =======
+
+ @Override
+ public void nextTuple() {
+ if (initialized) {
+ if(commit()) {
+ commitOffsetsForAckedTuples();
+ } else {
+ emitTuples(poll());
+ }
+ } else {
+ LOG.debug("Spout not initialized. Not sending tuples until
initialization completes");
+ }
+ }
+
+ private boolean commit() {
+ return !consumerAutoCommitMode && timer.expired(); // timer !=
null for non auto commit mode
+ }
+
+ private ConsumerRecords<K, V> poll() {
+ final ConsumerRecords<K, V> consumerRecords =
kafkaConsumer.poll(kafkaSpoutConfig.getPollTimeoutMs());
+ LOG.debug("Polled [{}] records from Kafka",
consumerRecords.count());
+ return consumerRecords;
+ }
+
+ private void emitTuples(ConsumerRecords<K, V> consumerRecords) {
+ for (TopicPartition tp : consumerRecords.partitions()) {
+ final Iterable<ConsumerRecord<K, V>> records =
consumerRecords.records(tp.topic()); // TODO Decide if want to give
flexibility to emmit/poll either per topic or per partition
+
+ for (final ConsumerRecord<K, V> record : records) {
+ if (record.offset() == 0 || record.offset() >
acked.get(tp).committedOffset) { // The first poll includes the last
committed offset. This if avoids duplication
+ final List<Object> tuple =
tupleBuilder.buildTuple(record);
+ final MessageId messageId = new MessageId(record,
tuple); // TODO don't create message for non
acking mode. Should we support non acking mode?
+
+ kafkaSpoutStreams.emit(collector, messageId);
// emits one tuple per record
+ LOG.debug("Emitted tuple [{}] for record [{}]", tuple,
record);
+ }
+ }
+ }
+ }
+
+ // ======== Ack =======
+ @Override
+ public void ack(Object messageId) {
+ final MessageId msgId = (MessageId) messageId;
+ final TopicPartition tp = msgId.getTopicPartition();
+
+ if (!consumerAutoCommitMode) { // Only need to keep track of
acked tuples if commits are not done automatically
+ acked.get(tp).add(msgId);
+ LOG.debug("Adding acked message to [{}] to list of messages to
be committed to Kafka", msgId);
+ }
+ }
+
+ // ======== Fail =======
+
+ @Override
+ public void fail(Object messageId) {
+ final MessageId msgId = (MessageId) messageId;
+ if (msgId.numFails() < maxRetries) {
+ msgId.incrementNumFails();
+ kafkaSpoutStreams.emit(collector, msgId);
+ LOG.debug("Retried tuple with message id [{}]", messageId);
+ } else { // limit to max number of retries
+ LOG.debug("Reached maximum number of retries. Message being
marked as acked.");
+ ack(msgId);
+ }
+ }
+
+ // ======== Activate / Deactivate =======
+
+ @Override
+ public void activate() {
+ // Shouldn't have to do anything for now. If specific cases need
to be handled logic will go here
+ }
+
+ @Override
+ public void deactivate() {
+ if(!consumerAutoCommitMode) {
+ commitOffsetsForAckedTuples();
+ }
+ }
+
+ @Override
+ public void close() {
+ try {
+ kafkaConsumer.wakeup();
+ if(!consumerAutoCommitMode) {
+ commitOffsetsForAckedTuples();
+ }
+ } finally {
+ //remove resources
+ kafkaConsumer.close();
+ }
+ }
+
+ // TODO must declare multiple output streams
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ kafkaSpoutStreams.declareOutputFields(declarer);
+ }
+
+ // ====== Private helper methods ======
+
+ private void commitOffsetsForAckedTuples() {
+ final Map<TopicPartition, OffsetAndMetadata> nextCommitOffsets =
new HashMap<>();
+
+ try {
+ for (Map.Entry<TopicPartition, OffsetEntry> tpOffset :
acked.entrySet()) {
+ final OffsetAndMetadata offsetAndMetadata =
tpOffset.getValue().findNextCommitOffset();
+ if (offsetAndMetadata != null) {
+ nextCommitOffsets.put(tpOffset.getKey(),
offsetAndMetadata);
+ }
+ }
+
+ if (!nextCommitOffsets.isEmpty()) {
+ kafkaConsumer.commitSync(nextCommitOffsets);
+ LOG.debug("Offsets successfully committed to Kafka [{}]",
nextCommitOffsets);
+ // Instead of iterating again, we could commit and update
state for each TopicPartition in the prior loop,
+ // but the multiple network calls should be more expensive
than iterating twice over a small loop
+ for (Map.Entry<TopicPartition, OffsetEntry> tpOffset :
acked.entrySet()) {
+ final OffsetEntry offsetEntry = tpOffset.getValue();
+
offsetEntry.updateAckedState(nextCommitOffsets.get(tpOffset.getKey()));
+ }
+ } else {
+ LOG.trace("No offsets to commit. {}", toString());
+ }
+ } catch (Exception e) {
+ LOG.error("Exception occurred while committing to Kafka
offsets of acked tuples", e);
--- End diff --
Done. Catch removed.
> As a storm developer I’d like to use the new kafka consumer API (0.8.3) to
> reduce dependencies and use long term supported kafka apis
> --------------------------------------------------------------------------------------------------------------------------------------
>
> Key: STORM-822
> URL: https://issues.apache.org/jira/browse/STORM-822
> Project: Apache Storm
> Issue Type: Story
> Components: storm-kafka
> Reporter: Thomas Becker
> Assignee: Hugo Louro
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)