Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1131#discussion_r54459488
  
    --- Diff: 
external/storm-kafka-new-consumer-api/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
    @@ -0,0 +1,445 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + *   or more contributor license agreements.  See the NOTICE file
    + *   distributed with this work for additional information
    + *   regarding copyright ownership.  The ASF licenses this file
    + *   to you under the Apache License, Version 2.0 (the
    + *   "License"); you may not use this file except in compliance
    + *   with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *   Unless required by applicable law or agreed to in writing, software
    + *   distributed under the License is distributed on an "AS IS" BASIS,
    + *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
    + *   See the License for the specific language governing permissions and
    + *   limitations under the License.
    + */
    +
    +package org.apache.storm.kafka.spout;
    +
    +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
    +import org.apache.kafka.clients.consumer.ConsumerRecord;
    +import org.apache.kafka.clients.consumer.ConsumerRecords;
    +import org.apache.kafka.clients.consumer.KafkaConsumer;
    +import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    +import org.apache.kafka.common.TopicPartition;
    +import 
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy;
    +import org.apache.storm.spout.SpoutOutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.topology.base.BaseRichSpout;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.Collection;
    +import java.util.Comparator;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.NavigableSet;
    +import java.util.TreeSet;
    +import java.util.concurrent.TimeUnit;
    +
    +import static 
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
    +import static 
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST;
    +import static 
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
    +import static 
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST;
    +
    +public class KafkaSpout<K,V> extends BaseRichSpout {
    +    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaSpout.class);
    +    private static final Comparator<MessageId> OFFSET_COMPARATOR = new 
OffsetComparator();
    +
    +    // Storm
    +    protected SpoutOutputCollector collector;
    +
    +    // Kafka
    +    private final KafkaSpoutConfig<K, V> kafkaSpoutConfig;
    +    private KafkaConsumer<K, V> kafkaConsumer;
    +    private transient boolean consumerAutoCommitMode;
    +    private transient FirstPollOffsetStrategy firstPollOffsetStrategy;
    +
    +    // Bookkeeping
    +    private KafkaSpoutStreams kafkaSpoutStreams;
    +    private KafkaTupleBuilder<K,V> tupleBuilder;
    +    private transient Timer timer;                                    // 
timer == null for auto commit mode
    +    private transient Map<TopicPartition, OffsetEntry> acked;         // 
emitted tuples that were successfully acked. These tuples will be committed by 
the commitOffsetsTask or on consumer rebalance
    +    private transient int maxRetries;                                 // 
Max number of times a tuple is retried
    +    private transient boolean initialized;          // Flag indicating 
that the spout is still undergoing initialization process.
    +                                                    // Initialization is 
only complete after the first call to  
KafkaSpoutConsumerRebalanceListener.onPartitionsAssigned()
    +
    +
    +    public KafkaSpout(KafkaSpoutConfig<K,V> kafkaSpoutConfig, 
KafkaSpoutStreams kafkaSpoutStreams, KafkaTupleBuilder<K,V> tupleBuilder) {
    +        this.kafkaSpoutConfig = kafkaSpoutConfig;                 // Pass 
in configuration
    +        this.kafkaSpoutStreams = kafkaSpoutStreams;
    +        this.tupleBuilder = tupleBuilder;
    +    }
    +
    +    @Override
    +    public void open(Map conf, TopologyContext context, 
SpoutOutputCollector collector) {
    +        initialized = false;
    +
    +        // Spout internals
    +        this.collector = collector;
    +        maxRetries = kafkaSpoutConfig.getMaxTupleRetries();
    +
    +        // Offset management
    +        firstPollOffsetStrategy = 
kafkaSpoutConfig.getFirstPollOffsetStrategy();
    +        consumerAutoCommitMode = 
kafkaSpoutConfig.isConsumerAutoCommitMode();
    +
    +        if (!consumerAutoCommitMode) {     // If it is auto commit, no 
need to commit offsets manually
    +            timer = new Timer(kafkaSpoutConfig.getOffsetsCommitFreqMs(), 
500, TimeUnit.MILLISECONDS);
    +            acked = new HashMap<>();
    +        }
    +
    +        // Kafka consumer
    +        kafkaConsumer = new 
KafkaConsumer<>(kafkaSpoutConfig.getKafkaProps(),
    +                kafkaSpoutConfig.getKeyDeserializer(), 
kafkaSpoutConfig.getValueDeserializer());
    +        kafkaConsumer.subscribe(kafkaSpoutConfig.getSubscribedTopics(), 
new KafkaSpoutConsumerRebalanceListener());
    +        // Initial poll to get the consumer registration process going.
    +        // KafkaSpoutConsumerRebalanceListener will be called foloowing 
this poll upon partition registration
    +        kafkaConsumer.poll(0);
    +
    +        LOG.debug("Kafka Spout opened with the following configuration: 
{}", kafkaSpoutConfig.toString());
    +    }
    +
    +    // ======== Next Tuple =======
    +
    +    @Override
    +    public void nextTuple() {
    +        if (initialized) {
    +            if(commit()) {
    +                commitOffsetsForAckedTuples();
    +            } else {
    +                emitTuples(poll());
    +            }
    +        } else {
    +            LOG.debug("Spout not initialized. Not sending tuples until 
initialization completes");
    +        }
    +    }
    +
    +    private boolean commit() {
    +        return !consumerAutoCommitMode && timer.expired();    // timer != 
null for non auto commit mode
    +    }
    +
    +    private ConsumerRecords<K, V> poll() {
    +        final ConsumerRecords<K, V> consumerRecords = 
kafkaConsumer.poll(kafkaSpoutConfig.getPollTimeoutMs());
    +        LOG.debug("Polled [{}] records from Kafka", 
consumerRecords.count());
    +        return consumerRecords;
    +    }
    +
    +    private void emitTuples(ConsumerRecords<K, V> consumerRecords) {
    +        for (TopicPartition tp : consumerRecords.partitions()) {
    +            final Iterable<ConsumerRecord<K, V>> records = 
consumerRecords.records(tp.topic());     // TODO Decide if want to give 
flexibility to emmit/poll either per topic or per partition
    +
    +            for (final ConsumerRecord<K, V> record : records) {
    +                if (record.offset() == 0 || record.offset() > 
acked.get(tp).committedOffset) {      // The first poll includes the last 
committed offset. This if avoids duplication
    +                    final List<Object> tuple = 
tupleBuilder.buildTuple(record);
    +                    final MessageId messageId = new MessageId(record, 
tuple);                                  // TODO don't create message for non 
acking mode. Should we support non acking mode?
    +
    +                    kafkaSpoutStreams.emit(collector, messageId);          
 // emits one tuple per record
    +                    LOG.debug("Emitted tuple [{}] for record [{}]", tuple, 
record);
    +                }
    +            }
    +        }
    +    }
    +
    +    // ======== Ack =======
    +    @Override
    +    public void ack(Object messageId) {
    +        final MessageId msgId = (MessageId) messageId;
    +        final TopicPartition tp = msgId.getTopicPartition();
    +
    +        if (!consumerAutoCommitMode) {  // Only need to keep track of 
acked tuples if commits are not done automatically
    +            acked.get(tp).add(msgId);
    +            LOG.debug("Adding acked message to [{}] to list of messages to 
be committed to Kafka", msgId);
    +        }
    +    }
    +
    +    // ======== Fail =======
    +
    +    @Override
    +    public void fail(Object messageId) {
    +        final MessageId msgId = (MessageId) messageId;
    +        if (msgId.numFails() < maxRetries) {
    +            msgId.incrementNumFails();
    +            kafkaSpoutStreams.emit(collector, msgId);
    +            LOG.debug("Retried tuple with message id [{}]", messageId);
    +        } else { // limit to max number of retries
    +            LOG.debug("Reached maximum number of retries. Message being 
marked as acked.");
    +            ack(msgId);
    +        }
    +    }
    +
    +    // ======== Activate / Deactivate =======
    +
    +    @Override
    +    public void activate() {
    +        // Shouldn't have to do anything for now. If specific cases need 
to be handled logic will go here
    +    }
    +
    +    @Override
    +    public void deactivate() {
    +        if(!consumerAutoCommitMode) {
    +            commitOffsetsForAckedTuples();
    +        }
    +    }
    +
    +    @Override
    +    public void close() {
    +        try {
    +            kafkaConsumer.wakeup();
    +            if(!consumerAutoCommitMode) {
    +                commitOffsetsForAckedTuples();
    +            }
    +        } finally {
    +            //remove resources
    +            kafkaConsumer.close();
    +        }
    +    }
    +
    +    // TODO must declare multiple output streams
    --- End diff --
    
    Do we need this now?  Or should we have a follow on JIRA for it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to