[ 
https://issues.apache.org/jira/browse/STORM-822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15157733#comment-15157733
 ] 

ASF GitHub Bot commented on STORM-822:
--------------------------------------

Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1131#discussion_r53697397
  
    --- Diff: 
external/storm-kafka-new-consumer-api/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
    @@ -0,0 +1,457 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + *   or more contributor license agreements.  See the NOTICE file
    + *   distributed with this work for additional information
    + *   regarding copyright ownership.  The ASF licenses this file
    + *   to you under the Apache License, Version 2.0 (the
    + *   "License"); you may not use this file except in compliance
    + *   with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *   Unless required by applicable law or agreed to in writing, software
    + *   distributed under the License is distributed on an "AS IS" BASIS,
    + *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
    + *   See the License for the specific language governing permissions and
    + *   limitations under the License.
    + */
    +
    +package org.apache.storm.kafka.spout;
    +
    +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
    +import org.apache.kafka.clients.consumer.ConsumerRecord;
    +import org.apache.kafka.clients.consumer.ConsumerRecords;
    +import org.apache.kafka.clients.consumer.KafkaConsumer;
    +import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    +import org.apache.kafka.common.TopicPartition;
    +import org.apache.storm.spout.SpoutOutputCollector;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.topology.base.BaseRichSpout;
    +import org.apache.storm.tuple.Values;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.Collection;
    +import java.util.Comparator;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.TreeSet;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.ScheduledExecutorService;
    +import java.util.concurrent.ThreadFactory;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.locks.Lock;
    +import java.util.concurrent.locks.ReentrantLock;
    +
    +public class KafkaSpout<K,V> extends BaseRichSpout {
    +    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaSpout.class);
    +    private static final 
Comparator<org.apache.storm.kafka.spout.MessageId> OFFSET_COMPARATOR = new 
OffsetComparator();
    +
    +    // Storm
    +    private Map conf;
    +    private TopologyContext context;
    +    protected SpoutOutputCollector collector;
    +
    +    // Kafka
    +    private final org.apache.storm.kafka.spout.KafkaSpoutConfig<K, V> 
kafkaSpoutConfig;
    +    private KafkaConsumer<K, V> kafkaConsumer;
    +
    +    // Bookkeeping
    +    private org.apache.storm.kafka.spout.KafkaSpoutStream kafkaSpoutStream;
    +    private org.apache.storm.kafka.spout.KafkaTupleBuilder<K,V> 
tupleBuilder;
    +    private transient ScheduledExecutorService commitOffsetsTask;
    +    private transient Lock ackCommitLock;
    +    private transient volatile boolean commit;
    +    private transient Map<org.apache.storm.kafka.spout.MessageId, Values> 
emittedTuples;           // Keeps a list of emitted tuples that are pending 
being acked or failed
    +    private transient Map<TopicPartition, 
Set<org.apache.storm.kafka.spout.MessageId>> failed;     // failed tuples. They 
stay in this list until success or max retries is reached
    +    private transient Map<TopicPartition, OffsetEntry> acked;         // 
emitted tuples that were successfully acked. These tuples will be committed by 
the commitOffsetsTask or on consumer rebalance
    +    private transient Set<org.apache.storm.kafka.spout.MessageId> 
blackList;                       // all the tuples that are in traffic when the 
rebalance occurs will be added to black list to be disregarded when they are 
either acked or failed
    +    private transient int maxRetries;
    +
    +    public KafkaSpout(org.apache.storm.kafka.spout.KafkaSpoutConfig<K,V> 
kafkaSpoutConfig, org.apache.storm.kafka.spout.KafkaSpoutStream 
kafkaSpoutStream, org.apache.storm.kafka.spout.KafkaTupleBuilder<K,V> 
tupleBuilder) {
    +        this.kafkaSpoutConfig = kafkaSpoutConfig;                 // Pass 
in configuration
    +        this.kafkaSpoutStream = kafkaSpoutStream;
    +        this.tupleBuilder = tupleBuilder;
    +    }
    +
    +    @Override
    +    public void open(Map conf, TopologyContext context, 
SpoutOutputCollector collector) {
    +        // Spout internals
    +        this.conf = conf;
    +        this.context = context;
    +        this.collector = collector;
    +
    +        // Bookkeeping objects
    +        emittedTuples = new HashMap<>();
    +        failed = new HashMap<>();
    +        acked = new HashMap<>();
    +        blackList = new HashSet<>();
    +        ackCommitLock = new ReentrantLock();
    +        maxRetries = kafkaSpoutConfig.getMaxTupleRetries();
    +
    +        // Kafka consumer
    +        kafkaConsumer = new 
KafkaConsumer<>(kafkaSpoutConfig.getKafkaProps(),
    +                kafkaSpoutConfig.getKeyDeserializer(), 
kafkaSpoutConfig.getValueDeserializer());
    +        kafkaConsumer.subscribe(kafkaSpoutConfig.getSubscribedTopics(), 
new KafkaSpoutConsumerRebalanceListener());
    +
    +        // Create commit offsets task
    +        if (!kafkaSpoutConfig.isConsumerAutoCommitMode()) {     // If it 
is auto commit, no need to commit offsets manually
    +            createCommitOffsetsTask();
    +        }
    +    }
    +
    +    // ======== Commit Offsets Task =======
    +
    +    private void createCommitOffsetsTask() {
    +        commitOffsetsTask = 
Executors.newSingleThreadScheduledExecutor(commitOffsetsThreadFactory());
    +        commitOffsetsTask.scheduleAtFixedRate(new Runnable() {
    +            @Override
    +            public void run() {
    +                commit = true;
    +            }
    +        }, 1000, kafkaSpoutConfig.getOffsetsCommitFreqMs(), 
TimeUnit.MILLISECONDS);
    --- End diff --
    
    Can we replace this with a static Timer?  We might have multiple instances 
of the spout in a single worker, and having a thread per each just to set a 
volatile boolean to true, feels like overkill.


> As a storm developer I’d like to use the new kafka consumer API (0.8.3) to 
> reduce dependencies and use long term supported kafka apis 
> --------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: STORM-822
>                 URL: https://issues.apache.org/jira/browse/STORM-822
>             Project: Apache Storm
>          Issue Type: Story
>          Components: storm-kafka
>            Reporter: Thomas Becker
>            Assignee: Hugo Louro
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to