[ 
https://issues.apache.org/jira/browse/STORM-822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15218968#comment-15218968
 ] 

ASF GitHub Bot commented on STORM-822:
--------------------------------------

Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1131#discussion_r57972451
  
    --- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
    @@ -0,0 +1,547 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + *   or more contributor license agreements.  See the NOTICE file
    + *   distributed with this work for additional information
    + *   regarding copyright ownership.  The ASF licenses this file
    + *   to you under the Apache License, Version 2.0 (the
    + *   "License"); you may not use this file except in compliance
    + *   with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *   Unless required by applicable law or agreed to in writing, software
    + *   distributed under the License is distributed on an "AS IS" BASIS,
    + *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
    + *   See the License for the specific language governing permissions and
    + *   limitations under the License.
    + */
    +
    +package org.apache.storm.kafka.spout;
    +
    +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
    +import org.apache.kafka.clients.consumer.ConsumerRecord;
    +import org.apache.kafka.clients.consumer.ConsumerRecords;
    +import org.apache.kafka.clients.consumer.KafkaConsumer;
    +import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    +import org.apache.kafka.common.TopicPartition;
    +import 
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy;
    +import org.apache.storm.spout.SpoutOutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.topology.base.BaseRichSpout;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.Comparator;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.NavigableSet;
    +import java.util.Set;
    +import java.util.TreeSet;
    +import java.util.concurrent.TimeUnit;
    +
    +import static 
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
    +import static 
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST;
    +import static 
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
    +import static 
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST;
    +
    +public class KafkaSpout<K, V> extends BaseRichSpout {
    +    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaSpout.class);
    +    private static final Comparator<KafkaSpoutMessageId> OFFSET_COMPARATOR 
= new OffsetComparator();
    +
    +    // Storm
    +    protected SpoutOutputCollector collector;
    +
    +    // Kafka
    +    private final KafkaSpoutConfig<K, V> kafkaSpoutConfig;
    +    private transient KafkaConsumer<K, V> kafkaConsumer;
    +    private transient boolean consumerAutoCommitMode;
    +
    +
    +    // Bookkeeping
    +    private transient int maxRetries;                                   // 
Max number of times a tuple is retried
    +    private transient FirstPollOffsetStrategy firstPollOffsetStrategy;  // 
Strategy to determine the fetch offset of the first realized by the spout upon 
activation
    +    private transient KafkaSpoutRetryService retryService;              // 
Class that has the logic to handle tuple failure
    +    private transient Timer commitTimer;                                // 
timer == null for auto commit mode
    +    private transient boolean initialized;                              // 
Flag indicating that the spout is still undergoing initialization process.
    +    // Initialization is only complete after the first call to  
KafkaSpoutConsumerRebalanceListener.onPartitionsAssigned()
    +
    +    private KafkaSpoutStreams kafkaSpoutStreams;                        // 
Object that wraps all the logic to declare output fields and emit tuples
    +    private transient KafkaSpoutTuplesBuilder<K, V> tuplesBuilder;      // 
Object that contains the logic to build tuples for each ConsumerRecord
    +
    +    private transient Map<TopicPartition, OffsetEntry> acked;           // 
Tuples that were successfully acked. These tuples will be committed 
periodically when the commit timer expires, after consumer rebalance, or on 
close/deactivate
    +    private transient Set<KafkaSpoutMessageId> emitted;                 // 
Tuples that have been emitted but that are "on the wire", i.e. pending being 
acked or failed
    +    private transient Iterator<ConsumerRecord<K, V>> waitingToEmit;        
 // Records that have been polled and are queued to be emitted in the 
nextTuple() call. One record is emitted per nextTuple()
    +    private transient long numUncommittedOffsets;                       // 
Number of offsets that have been polled and emitted but not yet been committed
    +
    +
    +    public KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
    +        this.kafkaSpoutConfig = kafkaSpoutConfig;                 // Pass 
in configuration
    +        this.kafkaSpoutStreams = kafkaSpoutConfig.getKafkaSpoutStreams();
    +    }
    +
    +    @Override
    +    public void open(Map conf, TopologyContext context, 
SpoutOutputCollector collector) {
    +        initialized = false;
    +
    +        // Spout internals
    +        this.collector = collector;
    +        maxRetries = kafkaSpoutConfig.getMaxTupleRetries();
    +        numUncommittedOffsets = 0;
    +
    +        // Offset management
    +        firstPollOffsetStrategy = 
kafkaSpoutConfig.getFirstPollOffsetStrategy();
    +        consumerAutoCommitMode = 
kafkaSpoutConfig.isConsumerAutoCommitMode();
    +
    +        // Retries management
    +        retryService = kafkaSpoutConfig.getRetryService();
    +
    +        // Tuples builder delegate
    +        tuplesBuilder = kafkaSpoutConfig.getTuplesBuilder();
    +
    +        if (!consumerAutoCommitMode) {     // If it is auto commit, no 
need to commit offsets manually
    +            commitTimer = new Timer(500, 
kafkaSpoutConfig.getOffsetsCommitPeriodMs(), TimeUnit.MILLISECONDS);
    +        }
    +
    +        acked = new HashMap<>();
    +        emitted = new HashSet<>();
    +        waitingToEmit = Collections.emptyListIterator();
    +
    +        LOG.info("Kafka Spout opened with the following configuration: 
{}", kafkaSpoutConfig);
    +    }
    +
    +    // =========== Consumer Rebalance Listener - On the same thread as the 
caller ===========
    +
    +    private class KafkaSpoutConsumerRebalanceListener implements 
ConsumerRebalanceListener {
    +        @Override
    +        public void onPartitionsRevoked(Collection<TopicPartition> 
partitions) {
    +            LOG.debug("Partitions revoked. [consumer-group={}, 
consumer={}, topic-partitions={}]",
    +                    kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, 
partitions);
    +            if (!consumerAutoCommitMode && initialized) {
    +                initialized = false;
    +                commitOffsetsForAckedTuples();
    +            }
    +        }
    +
    +        @Override
    +        public void onPartitionsAssigned(Collection<TopicPartition> 
partitions) {
    +            LOG.debug("Partitions reassignment. [consumer-group={}, 
consumer={}, topic-partitions={}]",
    +                    kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, 
partitions);
    +
    +            initialize(partitions);
    +        }
    +
    +        private void initialize(Collection<TopicPartition> partitions) {
    +            if (!consumerAutoCommitMode) {
    +                acked.keySet().retainAll(partitions);   // remove from 
acked all partitions that are no longer assigned to this spout
    +            }
    +
    +            retryService.remove(partitions);
    +
    +            for (TopicPartition tp : partitions) {
    +                final OffsetAndMetadata committedOffset = 
kafkaConsumer.committed(tp);
    +                final long fetchOffset = doSeek(tp, committedOffset);
    +                setAcked(tp, fetchOffset);
    +            }
    +            initialized = true;
    +            LOG.debug("Initialization complete");
    +        }
    +
    +        /**
    +         * sets the cursor to the location dictated by the first poll 
strategy and returns the fetch offset
    +         */
    +        private long doSeek(TopicPartition tp, OffsetAndMetadata 
committedOffset) {
    +            long fetchOffset;
    +            if (committedOffset != null) {             // offset was 
committed for this TopicPartition
    +                if (firstPollOffsetStrategy.equals(EARLIEST)) {
    +                    kafkaConsumer.seekToBeginning(tp);
    +                    fetchOffset = kafkaConsumer.position(tp);
    +                } else if (firstPollOffsetStrategy.equals(LATEST)) {
    +                    kafkaConsumer.seekToEnd(tp);
    +                    fetchOffset = kafkaConsumer.position(tp);
    +                } else {
    +                    // By default polling starts at the last committed 
offset. +1 to point fetch to the first uncommitted offset.
    +                    fetchOffset = committedOffset.offset() + 1;
    +                    kafkaConsumer.seek(tp, fetchOffset);
    +                }
    +            } else {    // no commits have ever been done, so start at the 
beginning or end depending on the strategy
    +                if (firstPollOffsetStrategy.equals(EARLIEST) || 
firstPollOffsetStrategy.equals(UNCOMMITTED_EARLIEST)) {
    +                    kafkaConsumer.seekToBeginning(tp);
    +                } else if (firstPollOffsetStrategy.equals(LATEST) || 
firstPollOffsetStrategy.equals(UNCOMMITTED_LATEST)) {
    +                    kafkaConsumer.seekToEnd(tp);
    +                }
    +                fetchOffset = kafkaConsumer.position(tp);
    +            }
    +            return fetchOffset;
    +        }
    +    }
    +
    +    private void setAcked(TopicPartition tp, long fetchOffset) {
    +        // If this partition was previously assigned to this spout, leave 
the acked offsets as they were to resume where it left off
    +        if (!consumerAutoCommitMode && !acked.containsKey(tp)) {
    +            acked.put(tp, new OffsetEntry(tp, fetchOffset));
    +        }
    +    }
    +
    +    // ======== Next Tuple =======
    +
    +    @Override
    +    public void nextTuple() {
    +        if (initialized) {
    +            if (commit()) {
    +                commitOffsetsForAckedTuples();
    +            }
    +
    +            if (poll()) {
    +                setWaitingToEmit(pollKafkaBroker());
    +            }
    +
    +            if (waitingToEmit()) {
    +                emit();
    +            }
    +        } else {
    +            LOG.debug("Spout not initialized. Not sending tuples until 
initialization completes");
    +        }
    +    }
    +
    +    private boolean commit() {
    +        return !consumerAutoCommitMode && 
commitTimer.isExpiredResetOnTrue();    // timer != null for non auto commit mode
    +    }
    +
    +    private boolean poll() {
    +        return !waitingToEmit() && numUncommittedOffsets < 
kafkaSpoutConfig.getMaxUncommittedOffsets();
    +    }
    +
    +    private boolean waitingToEmit() {
    +        return waitingToEmit != null && waitingToEmit.hasNext();
    +    }
    +
    +    public void setWaitingToEmit(ConsumerRecords<K,V> consumerRecords) {
    +        List<ConsumerRecord<K,V>> waitingToEmitList = new LinkedList<>();
    +        for (TopicPartition tp : consumerRecords.partitions()) {
    +            waitingToEmitList.addAll(consumerRecords.records(tp));
    +        }
    +        waitingToEmit = waitingToEmitList.iterator();
    +        LOG.trace("Records waiting to be emitted {}", waitingToEmitList);
    +    }
    +
    +    // ======== poll =========
    +    private ConsumerRecords<K, V> pollKafkaBroker() {
    +        doSeekRetriableTopicPartitions();
    +
    +        final ConsumerRecords<K, V> consumerRecords = 
kafkaConsumer.poll(kafkaSpoutConfig.getPollTimeoutMs());
    +        final int numPolledRecords = consumerRecords.count();
    +        LOG.debug("Polled [{}] records from Kafka. 
NumUncommittedOffsets=[{}]", numPolledRecords, numUncommittedOffsets);
    +        return consumerRecords;
    +    }
    +
    +    private void doSeekRetriableTopicPartitions() {
    +        final Set<TopicPartition> retriableTopicPartitions = 
retryService.retriableTopicPartitions();
    +
    +        for (TopicPartition rtp : retriableTopicPartitions) {
    +            final OffsetAndMetadata offsetAndMeta = 
acked.get(rtp).findNextCommitOffset();
    +            if (offsetAndMeta != null) {
    +                kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1);  // 
seek to the next offset that is ready to commit in next commit cycle
    +            } else {
    +                kafkaConsumer.seekToEnd(rtp);    // Seek to last committed 
offset
    +            }
    +        }
    +    }
    +
    +    // ======== emit  =========
    +    private void emit() {
    +        emitTupleIfNotEmitted(waitingToEmit.next());
    +        waitingToEmit.remove();
    --- End diff --
    
    @revans2 I apologize but I am really not following your thought. I am 
totally willing to accommodate what you have in mind, but I am not quite 
understanding what to do. I believe this should be a simple change. Can you 
please paste a code snippet, or some pseudo code, on how to do what you have in 
mind. I would be happy to integrate that. Thanks a lot.


> As a storm developer I’d like to use the new kafka consumer API (0.8.3) to 
> reduce dependencies and use long term supported kafka apis 
> --------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: STORM-822
>                 URL: https://issues.apache.org/jira/browse/STORM-822
>             Project: Apache Storm
>          Issue Type: Story
>          Components: storm-kafka
>            Reporter: Thomas Becker
>            Assignee: Hugo Louro
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to