[ https://issues.apache.org/jira/browse/STORM-822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15158084#comment-15158084 ]
ASF GitHub Bot commented on STORM-822: -------------------------------------- Github user hmcl commented on a diff in the pull request: https://github.com/apache/storm/pull/1131#discussion_r53723733 --- Diff: external/storm-kafka-new-consumer-api/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java --- @@ -0,0 +1,457 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichSpout; +import org.apache.storm.tuple.Values; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +public class KafkaSpout<K,V> extends BaseRichSpout { + private static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class); + private static final Comparator<org.apache.storm.kafka.spout.MessageId> OFFSET_COMPARATOR = new OffsetComparator(); + + // Storm + private Map conf; + private TopologyContext context; + protected SpoutOutputCollector collector; + + // Kafka + private final org.apache.storm.kafka.spout.KafkaSpoutConfig<K, V> kafkaSpoutConfig; + private KafkaConsumer<K, V> kafkaConsumer; + + // Bookkeeping + private org.apache.storm.kafka.spout.KafkaSpoutStream kafkaSpoutStream; + private org.apache.storm.kafka.spout.KafkaTupleBuilder<K,V> tupleBuilder; + private transient ScheduledExecutorService commitOffsetsTask; + private transient Lock ackCommitLock; + private transient volatile boolean commit; + private transient Map<org.apache.storm.kafka.spout.MessageId, Values> emittedTuples; // Keeps a list of emitted tuples that are pending being acked or failed + private transient Map<TopicPartition, Set<org.apache.storm.kafka.spout.MessageId>> failed; // failed tuples. They stay in this list until success or max retries is reached + private transient Map<TopicPartition, OffsetEntry> acked; // emitted tuples that were successfully acked. These tuples will be committed by the commitOffsetsTask or on consumer rebalance + private transient Set<org.apache.storm.kafka.spout.MessageId> blackList; // all the tuples that are in traffic when the rebalance occurs will be added to black list to be disregarded when they are either acked or failed + private transient int maxRetries; + + public KafkaSpout(org.apache.storm.kafka.spout.KafkaSpoutConfig<K,V> kafkaSpoutConfig, org.apache.storm.kafka.spout.KafkaSpoutStream kafkaSpoutStream, org.apache.storm.kafka.spout.KafkaTupleBuilder<K,V> tupleBuilder) { + this.kafkaSpoutConfig = kafkaSpoutConfig; // Pass in configuration + this.kafkaSpoutStream = kafkaSpoutStream; + this.tupleBuilder = tupleBuilder; + } + + @Override + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { + // Spout internals + this.conf = conf; + this.context = context; + this.collector = collector; + + // Bookkeeping objects + emittedTuples = new HashMap<>(); + failed = new HashMap<>(); + acked = new HashMap<>(); + blackList = new HashSet<>(); + ackCommitLock = new ReentrantLock(); + maxRetries = kafkaSpoutConfig.getMaxTupleRetries(); + + // Kafka consumer + kafkaConsumer = new KafkaConsumer<>(kafkaSpoutConfig.getKafkaProps(), + kafkaSpoutConfig.getKeyDeserializer(), kafkaSpoutConfig.getValueDeserializer()); + kafkaConsumer.subscribe(kafkaSpoutConfig.getSubscribedTopics(), new KafkaSpoutConsumerRebalanceListener()); + + // Create commit offsets task + if (!kafkaSpoutConfig.isConsumerAutoCommitMode()) { // If it is auto commit, no need to commit offsets manually + createCommitOffsetsTask(); + } + } + + // ======== Commit Offsets Task ======= + + private void createCommitOffsetsTask() { + commitOffsetsTask = Executors.newSingleThreadScheduledExecutor(commitOffsetsThreadFactory()); + commitOffsetsTask.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + commit = true; + } + }, 1000, kafkaSpoutConfig.getOffsetsCommitFreqMs(), TimeUnit.MILLISECONDS); --- End diff -- @revans2 it may be possible but depends on what we want to do. My initial implementation used this thread to do the commits to Kafka. However that causes ConcurrentModificationException on the Kafka side, as it is single threaded. The immediate fix was to just create this volatile variable and leave it as is. This was meant to be temporary all along, an just pushed it like this for an early review attending to the urgency of the patch. Nevertheless, we should discuss if we would like to consider in the future using the Kafka option commitAsync, which receives as parameter OffsetCommitCallback, which could be used to manage the bookkeeping state. I suppose even if we consider this it won't be in this patch... but I wonder if this piece of functionality would helpful for that scenario, if we ever want to consider it. I am referring to this comment as C_1 in another comment bellow. > As a storm developer I’d like to use the new kafka consumer API (0.8.3) to > reduce dependencies and use long term supported kafka apis > -------------------------------------------------------------------------------------------------------------------------------------- > > Key: STORM-822 > URL: https://issues.apache.org/jira/browse/STORM-822 > Project: Apache Storm > Issue Type: Story > Components: storm-kafka > Reporter: Thomas Becker > Assignee: Hugo Louro > -- This message was sent by Atlassian JIRA (v6.3.4#6332)