Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1131#discussion_r53724182
  
    --- Diff: 
external/storm-kafka-new-consumer-api/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
    @@ -0,0 +1,457 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + *   or more contributor license agreements.  See the NOTICE file
    + *   distributed with this work for additional information
    + *   regarding copyright ownership.  The ASF licenses this file
    + *   to you under the Apache License, Version 2.0 (the
    + *   "License"); you may not use this file except in compliance
    + *   with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *   Unless required by applicable law or agreed to in writing, software
    + *   distributed under the License is distributed on an "AS IS" BASIS,
    + *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
    + *   See the License for the specific language governing permissions and
    + *   limitations under the License.
    + */
    +
    +package org.apache.storm.kafka.spout;
    +
    +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
    +import org.apache.kafka.clients.consumer.ConsumerRecord;
    +import org.apache.kafka.clients.consumer.ConsumerRecords;
    +import org.apache.kafka.clients.consumer.KafkaConsumer;
    +import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    +import org.apache.kafka.common.TopicPartition;
    +import org.apache.storm.spout.SpoutOutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.topology.base.BaseRichSpout;
    +import org.apache.storm.tuple.Values;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.Collection;
    +import java.util.Comparator;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.TreeSet;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.ScheduledExecutorService;
    +import java.util.concurrent.ThreadFactory;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.locks.Lock;
    +import java.util.concurrent.locks.ReentrantLock;
    +
    +public class KafkaSpout<K,V> extends BaseRichSpout {
    +    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaSpout.class);
    +    private static final 
Comparator<org.apache.storm.kafka.spout.MessageId> OFFSET_COMPARATOR = new 
OffsetComparator();
    +
    +    // Storm
    +    private Map conf;
    +    private TopologyContext context;
    +    protected SpoutOutputCollector collector;
    +
    +    // Kafka
    +    private final org.apache.storm.kafka.spout.KafkaSpoutConfig<K, V> 
kafkaSpoutConfig;
    +    private KafkaConsumer<K, V> kafkaConsumer;
    +
    +    // Bookkeeping
    +    private org.apache.storm.kafka.spout.KafkaSpoutStream kafkaSpoutStream;
    +    private org.apache.storm.kafka.spout.KafkaTupleBuilder<K,V> 
tupleBuilder;
    +    private transient ScheduledExecutorService commitOffsetsTask;
    +    private transient Lock ackCommitLock;
    +    private transient volatile boolean commit;
    +    private transient Map<org.apache.storm.kafka.spout.MessageId, Values> 
emittedTuples;           // Keeps a list of emitted tuples that are pending 
being acked or failed
    +    private transient Map<TopicPartition, 
Set<org.apache.storm.kafka.spout.MessageId>> failed;     // failed tuples. They 
stay in this list until success or max retries is reached
    +    private transient Map<TopicPartition, OffsetEntry> acked;         // 
emitted tuples that were successfully acked. These tuples will be committed by 
the commitOffsetsTask or on consumer rebalance
    +    private transient Set<org.apache.storm.kafka.spout.MessageId> 
blackList;                       // all the tuples that are in traffic when the 
rebalance occurs will be added to black list to be disregarded when they are 
either acked or failed
    +    private transient int maxRetries;
    +
    +    public KafkaSpout(org.apache.storm.kafka.spout.KafkaSpoutConfig<K,V> 
kafkaSpoutConfig, org.apache.storm.kafka.spout.KafkaSpoutStream 
kafkaSpoutStream, org.apache.storm.kafka.spout.KafkaTupleBuilder<K,V> 
tupleBuilder) {
    +        this.kafkaSpoutConfig = kafkaSpoutConfig;                 // Pass 
in configuration
    +        this.kafkaSpoutStream = kafkaSpoutStream;
    +        this.tupleBuilder = tupleBuilder;
    +    }
    +
    +    @Override
    +    public void open(Map conf, TopologyContext context, 
SpoutOutputCollector collector) {
    +        // Spout internals
    +        this.conf = conf;
    +        this.context = context;
    +        this.collector = collector;
    +
    +        // Bookkeeping objects
    +        emittedTuples = new HashMap<>();
    +        failed = new HashMap<>();
    +        acked = new HashMap<>();
    +        blackList = new HashSet<>();
    +        ackCommitLock = new ReentrantLock();
    +        maxRetries = kafkaSpoutConfig.getMaxTupleRetries();
    +
    +        // Kafka consumer
    +        kafkaConsumer = new 
KafkaConsumer<>(kafkaSpoutConfig.getKafkaProps(),
    +                kafkaSpoutConfig.getKeyDeserializer(), 
kafkaSpoutConfig.getValueDeserializer());
    +        kafkaConsumer.subscribe(kafkaSpoutConfig.getSubscribedTopics(), 
new KafkaSpoutConsumerRebalanceListener());
    +
    +        // Create commit offsets task
    +        if (!kafkaSpoutConfig.isConsumerAutoCommitMode()) {     // If it 
is auto commit, no need to commit offsets manually
    +            createCommitOffsetsTask();
    +        }
    +    }
    +
    +    // ======== Commit Offsets Task =======
    +
    +    private void createCommitOffsetsTask() {
    +        commitOffsetsTask = 
Executors.newSingleThreadScheduledExecutor(commitOffsetsThreadFactory());
    +        commitOffsetsTask.scheduleAtFixedRate(new Runnable() {
    +            @Override
    +            public void run() {
    +                commit = true;
    +            }
    +        }, 1000, kafkaSpoutConfig.getOffsetsCommitFreqMs(), 
TimeUnit.MILLISECONDS);
    +    }
    +
    +    private ThreadFactory commitOffsetsThreadFactory() {
    +        return new ThreadFactory() {
    +            @Override
    +            public Thread newThread(Runnable r) {
    +                return new Thread(r, "kafka-spout-commit-offsets-thread");
    +            }
    +        };
    +    }
    +
    +    // ======== Next Tuple =======
    +
    +    @Override
    +    public void nextTuple() {
    +        if(commit) {
    +            commitAckedTuples();
    +        } else if (retry()) {              // Don't process new tuples 
until the failed tuples have all been acked
    +            retryFailedTuples();
    +        } else {
    +            emitTuples(poll());
    +        }
    +    }
    +
    +    private ConsumerRecords<K, V> poll() {
    +        final ConsumerRecords<K, V> consumerRecords = 
kafkaConsumer.poll(kafkaSpoutConfig.getPollTimeoutMs());
    +        LOG.debug("Polled [{]} records from Kafka", 
consumerRecords.count());
    +        return consumerRecords;
    +    }
    +
    +    private void emitTuples(ConsumerRecords<K, V> consumerRecords) {
    +        for (TopicPartition tp : consumerRecords.partitions()) {
    +            final Iterable<ConsumerRecord<K, V>> records = 
consumerRecords.records(tp.topic());     // TODO Decide if want to give 
flexibility to emmit/poll either per topic or per partition
    +            for (ConsumerRecord<K, V> record : records) {
    +                final Values tuple = tupleBuilder.buildTuple(record);
    +                final org.apache.storm.kafka.spout.MessageId messageId = 
new org.apache.storm.kafka.spout.MessageId(record);                             
     // TODO don't create message for non acking mode. Should we support non 
acking mode?
    +                collector.emit(kafkaSpoutStream.getStreamId(), tuple, 
messageId);           // emits one tuple per record
    +                emittedTuples.put(messageId, tuple);
    +                LOG.info("HMCL - Emitted tuple for record {}", record);
    +            }
    +        }
    +    }
    +
    +    private boolean retry() {
    +        return failed.size() > 0;
    +    }
    +
    +    private void retryFailedTuples() {
    +        for (TopicPartition tp : failed.keySet()) {
    +            for (org.apache.storm.kafka.spout.MessageId msgId : 
failed.get(tp)) {
    +                if (isInBlackList(msgId)) {
    +                    removeFromBlacklist(msgId);
    +                    removeFromFailed(tp, msgId);
    +                } else {
    +                    final Values tuple = emittedTuples.get(msgId);
    +                    LOG.debug("Retrying tuple. [msgId={}, tuple={}]", 
msgId, tuple);
    +                    collector.emit(kafkaSpoutStream.getStreamId(), tuple, 
msgId);
    +                }
    +            }
    +        }
    +    }
    +
    +    // all the tuples that are in traffic when the rebalance occurs will 
be added
    +    // to black list to be disregarded when they are either acked or failed
    +    private boolean isInBlackList(org.apache.storm.kafka.spout.MessageId 
msgId) {
    +        return blackList.contains(msgId);
    +    }
    +
    +    private void 
removeFromBlacklist(org.apache.storm.kafka.spout.MessageId msgId) {
    +        blackList.remove(msgId);
    +    }
    +
    +    // ======== Ack =======
    +
    +    @Override
    +    public void ack(Object messageId) {
    +        final org.apache.storm.kafka.spout.MessageId msgId = 
(org.apache.storm.kafka.spout.MessageId) messageId;
    +        final TopicPartition tp = msgId.getTopicPartition();
    +
    +        if (isInBlackList(msgId)) {
    +            removeFromBlacklist(msgId);
    +        } else {
    +            addAckedTuples(tp, msgId);
    +            // Removed acked tuples from the emittedTuples data structure
    +            emittedTuples.remove(msgId);
    +            // if this acked msg is a retry, remove it from failed data 
structure
    +            removeFromFailed(tp, msgId);
    +        }
    +    }
    +
    +    private void addAckedTuples(TopicPartition tp, 
org.apache.storm.kafka.spout.MessageId msgId) {
    +        // lock because ack and commit happen in different threads
    +        ackCommitLock.lock();
    +        try {
    +            if (!acked.containsKey(tp)) {
    +                acked.put(tp, new OffsetEntry(tp));
    +            }
    +            acked.get(tp).add(msgId);
    +        } finally {
    +            ackCommitLock.unlock();
    +        }
    +    }
    +
    +    // ======== Fail =======
    +
    +    @Override
    +    public void fail(Object messageId) {
    +        final org.apache.storm.kafka.spout.MessageId msgId = 
(org.apache.storm.kafka.spout.MessageId) messageId;
    +
    +        if (isInBlackList(msgId)) {
    +            removeFromBlacklist(msgId);
    +        } else {
    +            final TopicPartition tp = msgId.getTopicPartition();
    +            // limit to max number of retries
    +            if (msgId.numFails() >= maxRetries) {
    +                LOG.debug("Reached the maximum number of retries. Adding 
[{]} to list of messages to be committed to kafka", msgId);
    +                ack(msgId);
    +                removeFromFailed(tp, msgId);
    +            } else {
    +                addToFailed(tp, msgId);
    +            }
    +        }
    +    }
    +
    +    private void addToFailed(TopicPartition tp, 
org.apache.storm.kafka.spout.MessageId msgId) {
    +        if (!failed.containsKey(tp)) {
    +            failed.put(tp, new 
HashSet<org.apache.storm.kafka.spout.MessageId>());
    +        }
    +        final Set<org.apache.storm.kafka.spout.MessageId> msgIds = 
failed.get(tp);
    +        if (msgIds.contains(msgId)) {      // do this to update the 
counter of the message
    +            msgIds.remove(msgId);
    +        }
    +        msgId.incrementNumFails();        // increment number of failures 
counter
    +        msgIds.add(msgId);
    +    }
    +
    +    private void removeFromFailed(TopicPartition tp, 
org.apache.storm.kafka.spout.MessageId msgId) {
    +        if (failed.containsKey(tp)) {
    +            final Set<org.apache.storm.kafka.spout.MessageId> msgIds = 
failed.get(tp);
    +            msgIds.remove(msgId);
    +            if (msgIds.isEmpty()) {
    +                failed.remove(tp);
    +            }
    +            LOG.debug("Removing from failed list [{}, {}]", tp, msgId);
    +        }
    +    }
    +
    +    // ======== Activate / Deactivate =======
    +
    +    @Override
    +    public void activate() {
    +        // Shouldn't have to do anything for now. If specific cases need 
to be handled logic will go here
    +    }
    +
    +    @Override
    +    public void deactivate() {
    +        commitAckedTuples();
    +    }
    +
    +    @Override
    +    public void close() {
    +        try {
    +            kafkaConsumer.wakeup();
    +            commitAckedTuples();
    +        } finally {
    +            //remove resources
    +            kafkaConsumer.close();
    +        }
    +    }
    +
    +    @Override
    +    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    +        declarer.declareStream(kafkaSpoutStream.getStreamId(), 
kafkaSpoutStream.getOutputFields());
    +    }
    +
    +    // ====== Private helper methods ======
    +
    +    private void commitAckedTuples() {
    +        final Map<TopicPartition, OffsetAndMetadata> toCommitOffsets = new 
HashMap<>();
    +
    +        LOG.debug("Committing acked offsets to Kafka");
    +        // lock because ack and commit happen in different threads
    +        ackCommitLock.lock();
    +        try {
    +            for (TopicPartition tp : acked.keySet()) {
    +                final OffsetEntry offsetEntry = acked.get(tp);
    +                OffsetAndMetadata offsetAndMetadata = 
offsetEntry.findOffsetToCommit();
    +                if (offsetAndMetadata != null) {
    +                    toCommitOffsets.put(tp, offsetAndMetadata);
    +                }
    +            }
    +
    +            if (!toCommitOffsets.isEmpty()) {
    +                kafkaConsumer.commitSync(toCommitOffsets);
    +                LOG.debug("Offsets successfully committed to Kafka [{]}", 
toCommitOffsets);
    +            }
    +
    +            // Instead of iterating again, we could commit each 
TopicPartition in the prior loop,
    +            // but the multiple networks calls should be more expensive 
than iteration twice over a small loop
    +            for (TopicPartition tp : acked.keySet()) {
    +                OffsetEntry offsetEntry = acked.get(tp);
    +                offsetEntry.updateAckedState(toCommitOffsets.get(tp));
    +                updateAckedState(tp, offsetEntry);
    +            }
    +        } catch (Exception e) {
    +            LOG.error("Exception occurred while committing to Kafka 
offsets for acked tuples ", e);
    +        } finally {
    +            commit = false;
    +            ackCommitLock.unlock();
    +        }
    +    }
    +
    +    private void updateAckedState(TopicPartition tp, OffsetEntry 
offsetEntry) {
    +        // lock because ack and commit happen in different threads
    +        ackCommitLock.lock();
    +        try {
    +            if (offsetEntry.isEmpty()) {
    +                acked.remove(tp);
    +            }
    +        } finally {
    +            ackCommitLock.unlock();
    +        }
    +    }
    +
    +    // ======= Offsets Commit Management ==========
    +
    +    private static class OffsetComparator implements 
Comparator<org.apache.storm.kafka.spout.MessageId> {
    +        public int compare(org.apache.storm.kafka.spout.MessageId m1, 
org.apache.storm.kafka.spout.MessageId m2) {
    +            return m1.offset() < m2.offset() ? -1 : m1.offset() == 
m2.offset() ? 0 : 1;
    +        }
    +    }
    +
    +    /** This class is not thread safe */
    +    // Although this class is called by multiple (2) threads, all the 
calling methods are properly synchronized
    +    private class OffsetEntry {
    +        private long committedOffset;          // last offset committed to 
Kafka
    +        private long toCommitOffset;                // last offset to be 
committed in the next commit operation
    +        private final Set<org.apache.storm.kafka.spout.MessageId> 
ackedMsgs = new TreeSet<>(OFFSET_COMPARATOR);     // sort messages by ascending 
order of offset
    +        private Set<org.apache.storm.kafka.spout.MessageId> toCommitMsgs = 
new TreeSet<>(OFFSET_COMPARATOR);        // Messages that contain the offsets 
to be committed in the next commit operation
    +
    +        public OffsetEntry(TopicPartition tp) {
    +            OffsetAndMetadata committed = kafkaConsumer.committed(tp);
    +            committedOffset = committed == null ? -1 : committed.offset();
    +            LOG.debug("Created OffsetEntry for [topic-partition={}, 
last-committed-offset={}]", tp, committedOffset);
    +        }
    +
    +        public void add(org.apache.storm.kafka.spout.MessageId msgId) {    
      // O(Log N)
    +            ackedMsgs.add(msgId);
    +        }
    +
    +        /**
    +         * This method has side effects. The method updateAckedState 
should be called after this method.
    +         */
    +        public OffsetAndMetadata findOffsetToCommit() {
    +            long currOffset;
    +            OffsetAndMetadata offsetAndMetadata = null;
    +            toCommitMsgs = new TreeSet<>(OFFSET_COMPARATOR);
    +            toCommitOffset = committedOffset;
    +            org.apache.storm.kafka.spout.MessageId toCommitMsg = null;
    +
    +            for (org.apache.storm.kafka.spout.MessageId ackedMsg : 
ackedMsgs) {  // for K matching messages complexity is K*(Log*N). K <= N
    +                if ((currOffset = ackedMsg.offset()) == toCommitOffset || 
currOffset == toCommitOffset + 1) {    // found the next offset to commit
    +                    toCommitMsgs.add(ackedMsg);
    +                    toCommitMsg = ackedMsg;
    +                    toCommitOffset = currOffset;
    +                } else if (ackedMsg.offset() > toCommitOffset + 1) {    // 
offset found is not continuous to the offsets listed to go in the next commit, 
so stop search
    +                    break;
    +                } else {
    +                    LOG.debug("Unexpected offset found [{]}. {}", 
ackedMsg.offset(), toString());
    +                    break;
    +                }
    +            }
    +
    +            if (!toCommitMsgs.isEmpty()) {
    +                offsetAndMetadata = new OffsetAndMetadata(toCommitOffset, 
toCommitMsg.getMetadata(Thread.currentThread()));
    +                LOG.debug("Last offset to be committed in the next commit 
call: [{]}", offsetAndMetadata.offset());
    +                LOG.trace(toString());
    +            } else {
    +                LOG.debug("No offsets found ready to commit");
    +                LOG.trace(toString());
    +            }
    +            return offsetAndMetadata;
    +        }
    +
    +        /**
    +         * This method has side effects and should be called after 
findOffsetToCommit
    +         */
    +        public void updateAckedState(OffsetAndMetadata offsetAndMetadata) {
    +            if (offsetAndMetadata != null) {
    +                committedOffset = offsetAndMetadata.offset();
    +                toCommitMsgs = new TreeSet<>(OFFSET_COMPARATOR);
    +                ackedMsgs.removeAll(toCommitMsgs);
    +            }
    +        }
    +
    +        public boolean isEmpty() {
    +            return ackedMsgs.isEmpty();
    +        }
    +
    +        @Override
    +        public String toString() {
    +            return "OffsetEntry{" +
    +                    "committedOffset=" + committedOffset +
    +                    ", toCommitOffset=" + toCommitOffset +
    +                    ", ackedMsgs=" + ackedMsgs +
    +                    ", toCommitMsgs=" + toCommitMsgs +
    +                    '}';
    +        }
    +    }
    +
    +    // =========== Consumer Rebalance Listener - On the same thread as the 
caller ===========
    +
    +    private class KafkaSpoutConsumerRebalanceListener implements 
ConsumerRebalanceListener {
    +        @Override
    +        public void onPartitionsRevoked(Collection<TopicPartition> 
partitions) {
    +            LOG.debug("Partitions revoked. [consumer-group={}, 
consumer={}, topic-partitions={}]",
    +                    kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, 
partitions);
    +            commitAckedTuples();  // commit acked records,
    +            copyEmittedTuplesToBlackList(); // all the tuples that are in 
traffic when the rebalance occurs will be added to black list to avoid 
duplication
    +            clearFailed();   // remove all failed tuples form list to 
avoid duplication
    +            clearEmittedTuples();    // clear emitted tuples
    --- End diff --
    
    This comment is a continuation of C1.
    
    @revans2 As I said in C1, the commits to Kafka, as well as all its 
callbacks are done in the callers thread. If I am not missing anything that 
means that all the calls are done in the same thread, which implies that in 
practice this class is single threaded, and therefore we do not need any 
locking whatsoever. Is there any reason why we may need to lock?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to