[ 
https://issues.apache.org/jira/browse/STORM-822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15176281#comment-15176281
 ] 

ASF GitHub Bot commented on STORM-822:
--------------------------------------

Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1131#discussion_r54774473
  
    --- Diff: 
external/storm-kafka-new-consumer-api/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
    @@ -0,0 +1,454 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + *   or more contributor license agreements.  See the NOTICE file
    + *   distributed with this work for additional information
    + *   regarding copyright ownership.  The ASF licenses this file
    + *   to you under the Apache License, Version 2.0 (the
    + *   "License"); you may not use this file except in compliance
    + *   with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *   Unless required by applicable law or agreed to in writing, software
    + *   distributed under the License is distributed on an "AS IS" BASIS,
    + *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
    + *   See the License for the specific language governing permissions and
    + *   limitations under the License.
    + */
    +
    +package org.apache.storm.kafka.spout;
    +
    +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
    +import org.apache.kafka.clients.consumer.ConsumerRecord;
    +import org.apache.kafka.clients.consumer.ConsumerRecords;
    +import org.apache.kafka.clients.consumer.KafkaConsumer;
    +import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    +import org.apache.kafka.common.TopicPartition;
    +import 
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy;
    +import org.apache.storm.spout.SpoutOutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.topology.base.BaseRichSpout;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.Collection;
    +import java.util.Comparator;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.NavigableSet;
    +import java.util.TreeSet;
    +import java.util.concurrent.TimeUnit;
    +
    +import static 
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
    +import static 
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST;
    +import static 
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
    +import static 
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST;
    +
    +public class KafkaSpout<K, V> extends BaseRichSpout {
    +    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaSpout.class);
    +    private static final Comparator<MessageId> OFFSET_COMPARATOR = new 
OffsetComparator();
    +
    +    // Storm
    +    protected SpoutOutputCollector collector;
    +
    +    // Kafka
    +    private final KafkaSpoutConfig<K, V> kafkaSpoutConfig;
    +    private KafkaConsumer<K, V> kafkaConsumer;
    +    private transient boolean consumerAutoCommitMode;
    +    private transient FirstPollOffsetStrategy firstPollOffsetStrategy;
    +
    +    // Bookkeeping
    +    private KafkaSpoutStreams kafkaSpoutStreams;
    +    private KafkaTupleBuilder<K, V> tupleBuilder;
    +    private transient Timer timer;                                    // 
timer == null for auto commit mode
    +    private transient Map<TopicPartition, OffsetEntry> acked;         // 
emitted tuples that were successfully acked. These tuples will be committed 
periodically when the timer expires, on consumer rebalance, or on 
close/deactivate
    +    private transient int maxRetries;                                 // 
Max number of times a tuple is retried
    +    private transient boolean initialized;          // Flag indicating 
that the spout is still undergoing initialization process.
    +                                                    // Initialization is 
only complete after the first call to  
KafkaSpoutConsumerRebalanceListener.onPartitionsAssigned()
    +
    +
    +    public KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig, 
KafkaSpoutStreams kafkaSpoutStreams, KafkaTupleBuilder<K, V> tupleBuilder) {
    +        this.kafkaSpoutConfig = kafkaSpoutConfig;                 // Pass 
in configuration
    +        this.kafkaSpoutStreams = kafkaSpoutStreams;
    +        this.tupleBuilder = tupleBuilder;
    +    }
    +
    +    @Override
    +    public void open(Map conf, TopologyContext context, 
SpoutOutputCollector collector) {
    +        initialized = false;
    +
    +        // Spout internals
    +        this.collector = collector;
    +        maxRetries = kafkaSpoutConfig.getMaxTupleRetries();
    +
    +        // Offset management
    +        firstPollOffsetStrategy = 
kafkaSpoutConfig.getFirstPollOffsetStrategy();
    +        consumerAutoCommitMode = 
kafkaSpoutConfig.isConsumerAutoCommitMode();
    +
    +        if (!consumerAutoCommitMode) {     // If it is auto commit, no 
need to commit offsets manually
    +            timer = new Timer(500, 
kafkaSpoutConfig.getOffsetsCommitFreqMs(), TimeUnit.MILLISECONDS);
    +            acked = new HashMap<>();
    +        }
    +
    +        // Kafka consumer
    +        kafkaConsumer = new 
KafkaConsumer<>(kafkaSpoutConfig.getKafkaProps(),
    +                kafkaSpoutConfig.getKeyDeserializer(), 
kafkaSpoutConfig.getValueDeserializer());
    +        kafkaConsumer.subscribe(kafkaSpoutConfig.getSubscribedTopics(), 
new KafkaSpoutConsumerRebalanceListener());
    +        // Initial poll to get the consumer registration process going.
    +        // KafkaSpoutConsumerRebalanceListener will be called following 
this poll, upon partition registration
    +        kafkaConsumer.poll(0);
    +
    +        LOG.debug("Kafka Spout opened with the following configuration: 
{}", kafkaSpoutConfig);
    +    }
    +
    +    // =========== Consumer Rebalance Listener - On the same thread as the 
caller ===========
    +
    +    private class KafkaSpoutConsumerRebalanceListener implements 
ConsumerRebalanceListener {
    +        @Override
    +        public void onPartitionsRevoked(Collection<TopicPartition> 
partitions) {
    +            LOG.debug("Partitions revoked. [consumer-group={}, 
consumer={}, topic-partitions={}]",
    +                    kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, 
partitions);
    +            if (!consumerAutoCommitMode && initialized) {
    +                initialized = false;
    +                commitOffsetsForAckedTuples();
    +            }
    +        }
    +
    +        @Override
    +        public void onPartitionsAssigned(Collection<TopicPartition> 
partitions) {
    +            LOG.debug("Partitions reassignment. [consumer-group={}, 
consumer={}, topic-partitions={}]",
    +                    kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, 
partitions);
    +
    +            initialize(partitions);
    +        }
    +
    +        private void initialize(Collection<TopicPartition> partitions) {
    +            acked.keySet().retainAll(partitions);   // remove from acked 
all partitions that are no longer assigned to this spout
    +            for (TopicPartition tp : partitions) {
    +                final OffsetAndMetadata committedOffset = 
kafkaConsumer.committed(tp);
    +                final long fetchOffset = doSeek(tp, committedOffset);
    +                setAcked(tp, fetchOffset);
    +            }
    +            initialized = true;
    +            LOG.debug("Initialization complete");
    +        }
    +
    +        /**
    +         * sets the cursor to the location dictated by the first poll 
strategy and returns the fetch offset
    +         */
    +        private long doSeek(TopicPartition tp, OffsetAndMetadata 
committedOffset) {
    +            long fetchOffset;
    +            if (committedOffset != null) {             // offset was 
committed for this TopicPartition
    +                if (firstPollOffsetStrategy.equals(EARLIEST)) {
    +                    kafkaConsumer.seekToBeginning(tp);
    +                    fetchOffset = kafkaConsumer.position(tp);
    +                } else if (firstPollOffsetStrategy.equals(LATEST)) {
    +                    kafkaConsumer.seekToEnd(tp);
    +                    fetchOffset = kafkaConsumer.position(tp);
    +                } else {
    +                    // do nothing - by default polling starts at the last 
committed offset
    +                    fetchOffset = committedOffset.offset();
    +                }
    +            } else {    // no commits have ever been done, so start at the 
beginning or end depending on the strategy
    +                if (firstPollOffsetStrategy.equals(EARLIEST) || 
firstPollOffsetStrategy.equals(UNCOMMITTED_EARLIEST)) {
    +                    kafkaConsumer.seekToBeginning(tp);
    +                } else if (firstPollOffsetStrategy.equals(LATEST) || 
firstPollOffsetStrategy.equals(UNCOMMITTED_LATEST)) {
    +                    kafkaConsumer.seekToEnd(tp);
    +                }
    +                fetchOffset = kafkaConsumer.position(tp);
    +            }
    +            return fetchOffset;
    +        }
    +    }
    +
    +    private void setAcked(TopicPartition tp, long fetchOffset) {
    +        // If this partition was previously assigned to this spout, leave 
the acked offsets as they were to resume where it left off
    +        if (!consumerAutoCommitMode && !acked.containsKey(tp)) {
    +            acked.put(tp, new OffsetEntry(tp, fetchOffset));
    +        }
    +    }
    +
    +    // ======== Next Tuple =======
    +
    +    @Override
    +    public void nextTuple() {
    +        if (initialized) {
    +            if (commit()) {
    +                commitOffsetsForAckedTuples();
    +            } else {
    +                emitTuples(poll());
    +            }
    +        } else {
    +            LOG.debug("Spout not initialized. Not sending tuples until 
initialization completes");
    +        }
    +    }
    +
    +    private boolean commit() {
    +        return !consumerAutoCommitMode && timer.isExpiredResetOnTrue();    
// timer != null for non auto commit mode
    +    }
    +
    +    private ConsumerRecords<K, V> poll() {
    +        final ConsumerRecords<K, V> consumerRecords = 
kafkaConsumer.poll(kafkaSpoutConfig.getPollTimeoutMs());
    +        LOG.debug("Polled [{}] records from Kafka", 
consumerRecords.count());
    +        return consumerRecords;
    +    }
    +
    +    private void emitTuples(ConsumerRecords<K, V> consumerRecords) {
    +        for (TopicPartition tp : consumerRecords.partitions()) {
    +            final Iterable<ConsumerRecord<K, V>> records = 
consumerRecords.records(tp.topic());
    +
    +            for (final ConsumerRecord<K, V> record : records) {
    +                if (record.offset() == 0 || record.offset() > 
acked.get(tp).committedOffset) {      // The first poll includes the last 
committed offset. This if avoids duplication
    +                    final List<Object> tuple = 
tupleBuilder.buildTuple(record);
    +                    final MessageId messageId = new MessageId(record, 
tuple);
    +
    +                    kafkaSpoutStreams.emit(collector, messageId);          
 // emits one tuple per record
    +                    LOG.debug("Emitted tuple [{}] for record [{}]", tuple, 
record);
    +                }
    +            }
    +        }
    +    }
    +
    +    private void commitOffsetsForAckedTuples() {
    +        // Find offsets that are ready to be committed for every topic 
partition
    +        final Map<TopicPartition, OffsetAndMetadata> nextCommitOffsets = 
new HashMap<>();
    +        for (Map.Entry<TopicPartition, OffsetEntry> tpOffset : 
acked.entrySet()) {
    +            final OffsetAndMetadata nextCommitOffset = 
tpOffset.getValue().findNextCommitOffset();
    +            if (nextCommitOffset != null) {
    +                nextCommitOffsets.put(tpOffset.getKey(), nextCommitOffset);
    +            }
    +        }
    +
    +        // Commit offsets that are ready to be committed for every topic 
partition
    +        if (!nextCommitOffsets.isEmpty()) {
    +            kafkaConsumer.commitSync(nextCommitOffsets);
    +            LOG.debug("Offsets successfully committed to Kafka [{}]", 
nextCommitOffsets);
    +            // Instead of iterating again, it would be possible to commit 
and update the state for each TopicPartition
    +            // in the prior loop, but the multiple network calls should be 
more expensive than iterating twice over a small loop
    +            for (Map.Entry<TopicPartition, OffsetEntry> tpOffset : 
acked.entrySet()) {
    +                final OffsetEntry offsetEntry = tpOffset.getValue();
    +                
offsetEntry.commit(nextCommitOffsets.get(tpOffset.getKey()));
    +            }
    +        } else {
    +            LOG.trace("No offsets to commit. {}", toString());
    +        }
    +    }
    +
    +    // ======== Ack =======
    +    @Override
    +    public void ack(Object messageId) {
    +        if (!consumerAutoCommitMode) {  // Only need to keep track of 
acked tuples if commits are not done automatically
    +            final MessageId msgId = (MessageId) messageId;
    +            acked.get(msgId.getTopicPartition()).add(msgId);
    +            LOG.debug("Added acked message [{}] to list of messages to be 
committed to Kafka", msgId);
    +        }
    +    }
    +
    +    // ======== Fail =======
    +
    +    @Override
    +    public void fail(Object messageId) {
    +        final MessageId msgId = (MessageId) messageId;
    +        if (msgId.numFails() < maxRetries) {
    +            msgId.incrementNumFails();
    +            kafkaSpoutStreams.emit(collector, msgId);
    +            LOG.debug("Retried tuple with message id [{}]", msgId);
    +        } else { // limit to max number of retries
    +            LOG.debug("Reached maximum number of retries. Message [{}] 
being marked as acked.", msgId);
    +            ack(msgId);
    +        }
    +    }
    +
    +    // ======== Activate / Deactivate / Close / Declare Outputs =======
    +
    +    @Override
    +    public void activate() {
    +        // Shouldn't have to do anything for now. If specific cases need 
to be handled logic will go here
    --- End diff --
    
    activate gets called after deactivate is called.  It can potentially go 
back and forth between the two states multiple times.  If we shutdown in 
deactivate, which I think is fine, then we need to reinitialize in activate.
    
    I assume that once the kafkaConsumer is closed poll will not work on it any 
more.


> As a storm developer I’d like to use the new kafka consumer API (0.8.3) to 
> reduce dependencies and use long term supported kafka apis 
> --------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: STORM-822
>                 URL: https://issues.apache.org/jira/browse/STORM-822
>             Project: Apache Storm
>          Issue Type: Story
>          Components: storm-kafka
>            Reporter: Thomas Becker
>            Assignee: Hugo Louro
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to