[ 
https://issues.apache.org/jira/browse/STORM-822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15196225#comment-15196225
 ] 

ASF GitHub Bot commented on STORM-822:
--------------------------------------

Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1131#discussion_r56237896
  
    --- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
    @@ -0,0 +1,517 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + *   or more contributor license agreements.  See the NOTICE file
    + *   distributed with this work for additional information
    + *   regarding copyright ownership.  The ASF licenses this file
    + *   to you under the Apache License, Version 2.0 (the
    + *   "License"); you may not use this file except in compliance
    + *   with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *   Unless required by applicable law or agreed to in writing, software
    + *   distributed under the License is distributed on an "AS IS" BASIS,
    + *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
    + *   See the License for the specific language governing permissions and
    + *   limitations under the License.
    + */
    +
    +package org.apache.storm.kafka.spout;
    +
    +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
    +import org.apache.kafka.clients.consumer.ConsumerRecord;
    +import org.apache.kafka.clients.consumer.ConsumerRecords;
    +import org.apache.kafka.clients.consumer.KafkaConsumer;
    +import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    +import org.apache.kafka.common.TopicPartition;
    +import 
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy;
    +import org.apache.storm.kafka.spout.KafkaSpoutConfig.PollStrategy;
    +import org.apache.storm.spout.SpoutOutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.topology.base.BaseRichSpout;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.Collection;
    +import java.util.Comparator;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.NavigableSet;
    +import java.util.TreeSet;
    +import java.util.concurrent.TimeUnit;
    +
    +import static 
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
    +import static 
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST;
    +import static 
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
    +import static 
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST;
    +
    +public class KafkaSpout<K, V> extends BaseRichSpout {
    +    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaSpout.class);
    +    private static final Comparator<KafkaSpoutMessageId> OFFSET_COMPARATOR 
= new OffsetComparator();
    +
    +    // Storm
    +    protected SpoutOutputCollector collector;
    +
    +    // Kafka
    +    private final KafkaSpoutConfig<K, V> kafkaSpoutConfig;
    +    private transient KafkaConsumer<K, V> kafkaConsumer;
    +    private transient boolean consumerAutoCommitMode;
    +
    +
    +    // Bookkeeping
    +    private KafkaSpoutStreams kafkaSpoutStreams;
    +    private KafkaSpoutTupleBuilder<K, V> tupleBuilder;
    +    private transient Timer commitTimer;                                   
 // timer == null for auto commit mode
    +    private transient Timer logTimer;
    +    private transient Map<TopicPartition, OffsetEntry> acked;         // 
emitted tuples that were successfully acked. These tuples will be committed 
periodically when the timer expires, on consumer rebalance, or on 
close/deactivate
    +    private transient int maxRetries;                                 // 
Max number of times a tuple is retried
    +    private transient boolean initialized;          // Flag indicating 
that the spout is still undergoing initialization process.
    +    // Initialization is only complete after the first call to  
KafkaSpoutConsumerRebalanceListener.onPartitionsAssigned()
    +    private transient long numUncommittedOffsets;   // Number of offsets 
that have been polled and emitted but not yet been committed
    +    private transient FirstPollOffsetStrategy firstPollOffsetStrategy;
    +    private transient PollStrategy pollStrategy;
    +
    +
    +    public KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig, 
KafkaSpoutTupleBuilder<K, V> tupleBuilder) {
    +        this.kafkaSpoutConfig = kafkaSpoutConfig;                 // Pass 
in configuration
    +        this.kafkaSpoutStreams = kafkaSpoutConfig.getKafkaSpoutStreams();
    +        this.tupleBuilder = tupleBuilder;
    +    }
    +
    +    @Override
    +    public void open(Map conf, TopologyContext context, 
SpoutOutputCollector collector) {
    +        initialized = false;
    +
    +        // Spout internals
    +        this.collector = collector;
    +        maxRetries = kafkaSpoutConfig.getMaxTupleRetries();
    +        numUncommittedOffsets = 0;
    +        logTimer = new Timer(500, Math.min(1000, 
kafkaSpoutConfig.getOffsetsCommitPeriodMs()/2), TimeUnit.MILLISECONDS);
    +
    +        // Offset management
    +        firstPollOffsetStrategy = 
kafkaSpoutConfig.getFirstPollOffsetStrategy();
    +        pollStrategy = kafkaSpoutConfig.getPollStrategy();
    +        consumerAutoCommitMode = 
kafkaSpoutConfig.isConsumerAutoCommitMode();
    +
    +        if (!consumerAutoCommitMode) {     // If it is auto commit, no 
need to commit offsets manually
    +            commitTimer = new Timer(500, 
kafkaSpoutConfig.getOffsetsCommitPeriodMs(), TimeUnit.MILLISECONDS);
    +            acked = new HashMap<>();
    +        }
    +
    +        LOG.info("Kafka Spout opened with the following configuration: 
{}", kafkaSpoutConfig);
    +    }
    +
    +    // =========== Consumer Rebalance Listener - On the same thread as the 
caller ===========
    +
    +    private class KafkaSpoutConsumerRebalanceListener implements 
ConsumerRebalanceListener {
    +        @Override
    +        public void onPartitionsRevoked(Collection<TopicPartition> 
partitions) {
    +            LOG.debug("Partitions revoked. [consumer-group={}, 
consumer={}, topic-partitions={}]",
    +                    kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, 
partitions);
    +            if (!consumerAutoCommitMode && initialized) {
    +                initialized = false;
    +                commitOffsetsForAckedTuples();
    +            }
    +        }
    +
    +        @Override
    +        public void onPartitionsAssigned(Collection<TopicPartition> 
partitions) {
    +            LOG.debug("Partitions reassignment. [consumer-group={}, 
consumer={}, topic-partitions={}]",
    +                    kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, 
partitions);
    +
    +            initialize(partitions);
    +        }
    +
    +        private void initialize(Collection<TopicPartition> partitions) {
    +            if (!consumerAutoCommitMode) {
    +                acked.keySet().retainAll(partitions);   // remove from 
acked all partitions that are no longer assigned to this spout
    +            }
    +
    +            for (TopicPartition tp : partitions) {
    +                final OffsetAndMetadata committedOffset = 
kafkaConsumer.committed(tp);
    +                final long fetchOffset = doSeek(tp, committedOffset);
    +                setAcked(tp, fetchOffset);
    +            }
    +            initialized = true;
    +            LOG.debug("Initialization complete");
    +        }
    +
    +        /**
    +         * sets the cursor to the location dictated by the first poll 
strategy and returns the fetch offset
    +         */
    +        private long doSeek(TopicPartition tp, OffsetAndMetadata 
committedOffset) {
    +            long fetchOffset;
    +            if (committedOffset != null) {             // offset was 
committed for this TopicPartition
    +                if (firstPollOffsetStrategy.equals(EARLIEST)) {
    +                    kafkaConsumer.seekToBeginning(tp);
    +                    fetchOffset = kafkaConsumer.position(tp);
    +                } else if (firstPollOffsetStrategy.equals(LATEST)) {
    +                    kafkaConsumer.seekToEnd(tp);
    +                    fetchOffset = kafkaConsumer.position(tp);
    +                } else {
    +                    // By default polling starts at the last committed 
offset. +1 to point fetch to the first uncommitted offset.
    +                    fetchOffset = committedOffset.offset() + 1;
    +                    kafkaConsumer.seek(tp, fetchOffset);
    +                }
    +            } else {    // no commits have ever been done, so start at the 
beginning or end depending on the strategy
    +                if (firstPollOffsetStrategy.equals(EARLIEST) || 
firstPollOffsetStrategy.equals(UNCOMMITTED_EARLIEST)) {
    +                    kafkaConsumer.seekToBeginning(tp);
    +                } else if (firstPollOffsetStrategy.equals(LATEST) || 
firstPollOffsetStrategy.equals(UNCOMMITTED_LATEST)) {
    +                    kafkaConsumer.seekToEnd(tp);
    +                }
    +                fetchOffset = kafkaConsumer.position(tp);
    +            }
    +            return fetchOffset;
    +        }
    +    }
    +
    +    private void setAcked(TopicPartition tp, long fetchOffset) {
    +        // If this partition was previously assigned to this spout, leave 
the acked offsets as they were to resume where it left off
    +        if (!consumerAutoCommitMode && !acked.containsKey(tp)) {
    +            acked.put(tp, new OffsetEntry(tp, fetchOffset));
    +        }
    +    }
    +
    +    // ======== Next Tuple =======
    +
    +    @Override
    +    public void nextTuple() {
    +        if (initialized) {
    +            if (commit()) {
    +                commitOffsetsForAckedTuples();
    +            } else if (poll()) {
    --- End diff --
    
    Minor nit. I don't think poll, commit, and logging should be mutually 
exclusive.  It slows things down if we don't emit anything, and it also means 
that we may never log if we always emit something.
    
    ```
    if (commit()) {
        commitOffsetsForAckedTuples();
    }
    
    if (poll()) {
      emitTuples(pollKafkaBroker());
    }
    
    if (logTimer.isExpiredResetOnTrue()) {
      log()
    }
    ```


> As a storm developer I’d like to use the new kafka consumer API (0.8.3) to 
> reduce dependencies and use long term supported kafka apis 
> --------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: STORM-822
>                 URL: https://issues.apache.org/jira/browse/STORM-822
>             Project: Apache Storm
>          Issue Type: Story
>          Components: storm-kafka
>            Reporter: Thomas Becker
>            Assignee: Hugo Louro
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to