[ 
https://issues.apache.org/jira/browse/STORM-822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15196227#comment-15196227
 ] 

ASF GitHub Bot commented on STORM-822:
--------------------------------------

Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1131#discussion_r56238105
  
    --- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
    @@ -0,0 +1,517 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + *   or more contributor license agreements.  See the NOTICE file
    + *   distributed with this work for additional information
    + *   regarding copyright ownership.  The ASF licenses this file
    + *   to you under the Apache License, Version 2.0 (the
    + *   "License"); you may not use this file except in compliance
    + *   with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *   Unless required by applicable law or agreed to in writing, software
    + *   distributed under the License is distributed on an "AS IS" BASIS,
    + *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
    + *   See the License for the specific language governing permissions and
    + *   limitations under the License.
    + */
    +
    +package org.apache.storm.kafka.spout;
    +
    +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
    +import org.apache.kafka.clients.consumer.ConsumerRecord;
    +import org.apache.kafka.clients.consumer.ConsumerRecords;
    +import org.apache.kafka.clients.consumer.KafkaConsumer;
    +import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    +import org.apache.kafka.common.TopicPartition;
    +import 
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy;
    +import org.apache.storm.kafka.spout.KafkaSpoutConfig.PollStrategy;
    +import org.apache.storm.spout.SpoutOutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.topology.base.BaseRichSpout;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.Collection;
    +import java.util.Comparator;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.NavigableSet;
    +import java.util.TreeSet;
    +import java.util.concurrent.TimeUnit;
    +
    +import static 
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
    +import static 
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST;
    +import static 
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
    +import static 
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST;
    +
    +public class KafkaSpout<K, V> extends BaseRichSpout {
    +    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaSpout.class);
    +    private static final Comparator<KafkaSpoutMessageId> OFFSET_COMPARATOR 
= new OffsetComparator();
    +
    +    // Storm
    +    protected SpoutOutputCollector collector;
    +
    +    // Kafka
    +    private final KafkaSpoutConfig<K, V> kafkaSpoutConfig;
    +    private transient KafkaConsumer<K, V> kafkaConsumer;
    +    private transient boolean consumerAutoCommitMode;
    +
    +
    +    // Bookkeeping
    +    private KafkaSpoutStreams kafkaSpoutStreams;
    +    private KafkaSpoutTupleBuilder<K, V> tupleBuilder;
    +    private transient Timer commitTimer;                                   
 // timer == null for auto commit mode
    +    private transient Timer logTimer;
    +    private transient Map<TopicPartition, OffsetEntry> acked;         // 
emitted tuples that were successfully acked. These tuples will be committed 
periodically when the timer expires, on consumer rebalance, or on 
close/deactivate
    +    private transient int maxRetries;                                 // 
Max number of times a tuple is retried
    +    private transient boolean initialized;          // Flag indicating 
that the spout is still undergoing initialization process.
    +    // Initialization is only complete after the first call to  
KafkaSpoutConsumerRebalanceListener.onPartitionsAssigned()
    +    private transient long numUncommittedOffsets;   // Number of offsets 
that have been polled and emitted but not yet been committed
    +    private transient FirstPollOffsetStrategy firstPollOffsetStrategy;
    +    private transient PollStrategy pollStrategy;
    +
    +
    +    public KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig, 
KafkaSpoutTupleBuilder<K, V> tupleBuilder) {
    +        this.kafkaSpoutConfig = kafkaSpoutConfig;                 // Pass 
in configuration
    +        this.kafkaSpoutStreams = kafkaSpoutConfig.getKafkaSpoutStreams();
    +        this.tupleBuilder = tupleBuilder;
    +    }
    +
    +    @Override
    +    public void open(Map conf, TopologyContext context, 
SpoutOutputCollector collector) {
    +        initialized = false;
    +
    +        // Spout internals
    +        this.collector = collector;
    +        maxRetries = kafkaSpoutConfig.getMaxTupleRetries();
    +        numUncommittedOffsets = 0;
    +        logTimer = new Timer(500, Math.min(1000, 
kafkaSpoutConfig.getOffsetsCommitPeriodMs()/2), TimeUnit.MILLISECONDS);
    +
    +        // Offset management
    +        firstPollOffsetStrategy = 
kafkaSpoutConfig.getFirstPollOffsetStrategy();
    +        pollStrategy = kafkaSpoutConfig.getPollStrategy();
    +        consumerAutoCommitMode = 
kafkaSpoutConfig.isConsumerAutoCommitMode();
    +
    +        if (!consumerAutoCommitMode) {     // If it is auto commit, no 
need to commit offsets manually
    +            commitTimer = new Timer(500, 
kafkaSpoutConfig.getOffsetsCommitPeriodMs(), TimeUnit.MILLISECONDS);
    +            acked = new HashMap<>();
    +        }
    +
    +        LOG.info("Kafka Spout opened with the following configuration: 
{}", kafkaSpoutConfig);
    +    }
    +
    +    // =========== Consumer Rebalance Listener - On the same thread as the 
caller ===========
    +
    +    private class KafkaSpoutConsumerRebalanceListener implements 
ConsumerRebalanceListener {
    +        @Override
    +        public void onPartitionsRevoked(Collection<TopicPartition> 
partitions) {
    +            LOG.debug("Partitions revoked. [consumer-group={}, 
consumer={}, topic-partitions={}]",
    +                    kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, 
partitions);
    +            if (!consumerAutoCommitMode && initialized) {
    +                initialized = false;
    +                commitOffsetsForAckedTuples();
    +            }
    +        }
    +
    +        @Override
    +        public void onPartitionsAssigned(Collection<TopicPartition> 
partitions) {
    +            LOG.debug("Partitions reassignment. [consumer-group={}, 
consumer={}, topic-partitions={}]",
    +                    kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, 
partitions);
    +
    +            initialize(partitions);
    +        }
    +
    +        private void initialize(Collection<TopicPartition> partitions) {
    +            if (!consumerAutoCommitMode) {
    +                acked.keySet().retainAll(partitions);   // remove from 
acked all partitions that are no longer assigned to this spout
    +            }
    +
    +            for (TopicPartition tp : partitions) {
    +                final OffsetAndMetadata committedOffset = 
kafkaConsumer.committed(tp);
    +                final long fetchOffset = doSeek(tp, committedOffset);
    +                setAcked(tp, fetchOffset);
    +            }
    +            initialized = true;
    +            LOG.debug("Initialization complete");
    +        }
    +
    +        /**
    +         * sets the cursor to the location dictated by the first poll 
strategy and returns the fetch offset
    +         */
    +        private long doSeek(TopicPartition tp, OffsetAndMetadata 
committedOffset) {
    +            long fetchOffset;
    +            if (committedOffset != null) {             // offset was 
committed for this TopicPartition
    +                if (firstPollOffsetStrategy.equals(EARLIEST)) {
    +                    kafkaConsumer.seekToBeginning(tp);
    +                    fetchOffset = kafkaConsumer.position(tp);
    +                } else if (firstPollOffsetStrategy.equals(LATEST)) {
    +                    kafkaConsumer.seekToEnd(tp);
    +                    fetchOffset = kafkaConsumer.position(tp);
    +                } else {
    +                    // By default polling starts at the last committed 
offset. +1 to point fetch to the first uncommitted offset.
    +                    fetchOffset = committedOffset.offset() + 1;
    +                    kafkaConsumer.seek(tp, fetchOffset);
    +                }
    +            } else {    // no commits have ever been done, so start at the 
beginning or end depending on the strategy
    +                if (firstPollOffsetStrategy.equals(EARLIEST) || 
firstPollOffsetStrategy.equals(UNCOMMITTED_EARLIEST)) {
    +                    kafkaConsumer.seekToBeginning(tp);
    +                } else if (firstPollOffsetStrategy.equals(LATEST) || 
firstPollOffsetStrategy.equals(UNCOMMITTED_LATEST)) {
    +                    kafkaConsumer.seekToEnd(tp);
    +                }
    +                fetchOffset = kafkaConsumer.position(tp);
    +            }
    +            return fetchOffset;
    +        }
    +    }
    +
    +    private void setAcked(TopicPartition tp, long fetchOffset) {
    +        // If this partition was previously assigned to this spout, leave 
the acked offsets as they were to resume where it left off
    +        if (!consumerAutoCommitMode && !acked.containsKey(tp)) {
    +            acked.put(tp, new OffsetEntry(tp, fetchOffset));
    +        }
    +    }
    +
    +    // ======== Next Tuple =======
    +
    +    @Override
    +    public void nextTuple() {
    +        if (initialized) {
    +            if (commit()) {
    +                commitOffsetsForAckedTuples();
    +            } else if (poll()) {
    --- End diff --
    
    OK I misunderstood what log was doing.  It is saying what state we are in 
with trace logging.  I am not sure we really need that.  I would rather see 
metrics about the values we care about instead of trace logging.


> As a storm developer I’d like to use the new kafka consumer API (0.8.3) to 
> reduce dependencies and use long term supported kafka apis 
> --------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: STORM-822
>                 URL: https://issues.apache.org/jira/browse/STORM-822
>             Project: Apache Storm
>          Issue Type: Story
>          Components: storm-kafka
>            Reporter: Thomas Becker
>            Assignee: Hugo Louro
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to