[
https://issues.apache.org/jira/browse/STORM-822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15157733#comment-15157733
]
ASF GitHub Bot commented on STORM-822:
--------------------------------------
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/1131#discussion_r53697397
--- Diff:
external/storm-kafka-new-consumer-api/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
---
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Values;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class KafkaSpout<K,V> extends BaseRichSpout {
+ private static final Logger LOG =
LoggerFactory.getLogger(KafkaSpout.class);
+ private static final
Comparator<org.apache.storm.kafka.spout.MessageId> OFFSET_COMPARATOR = new
OffsetComparator();
+
+ // Storm
+ private Map conf;
+ private TopologyContext context;
+ protected SpoutOutputCollector collector;
+
+ // Kafka
+ private final org.apache.storm.kafka.spout.KafkaSpoutConfig<K, V>
kafkaSpoutConfig;
+ private KafkaConsumer<K, V> kafkaConsumer;
+
+ // Bookkeeping
+ private org.apache.storm.kafka.spout.KafkaSpoutStream kafkaSpoutStream;
+ private org.apache.storm.kafka.spout.KafkaTupleBuilder<K,V>
tupleBuilder;
+ private transient ScheduledExecutorService commitOffsetsTask;
+ private transient Lock ackCommitLock;
+ private transient volatile boolean commit;
+ private transient Map<org.apache.storm.kafka.spout.MessageId, Values>
emittedTuples; // Keeps a list of emitted tuples that are pending
being acked or failed
+ private transient Map<TopicPartition,
Set<org.apache.storm.kafka.spout.MessageId>> failed; // failed tuples. They
stay in this list until success or max retries is reached
+ private transient Map<TopicPartition, OffsetEntry> acked; //
emitted tuples that were successfully acked. These tuples will be committed by
the commitOffsetsTask or on consumer rebalance
+ private transient Set<org.apache.storm.kafka.spout.MessageId>
blackList; // all the tuples that are in traffic when the
rebalance occurs will be added to black list to be disregarded when they are
either acked or failed
+ private transient int maxRetries;
+
+ public KafkaSpout(org.apache.storm.kafka.spout.KafkaSpoutConfig<K,V>
kafkaSpoutConfig, org.apache.storm.kafka.spout.KafkaSpoutStream
kafkaSpoutStream, org.apache.storm.kafka.spout.KafkaTupleBuilder<K,V>
tupleBuilder) {
+ this.kafkaSpoutConfig = kafkaSpoutConfig; // Pass
in configuration
+ this.kafkaSpoutStream = kafkaSpoutStream;
+ this.tupleBuilder = tupleBuilder;
+ }
+
+ @Override
+ public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
+ // Spout internals
+ this.conf = conf;
+ this.context = context;
+ this.collector = collector;
+
+ // Bookkeeping objects
+ emittedTuples = new HashMap<>();
+ failed = new HashMap<>();
+ acked = new HashMap<>();
+ blackList = new HashSet<>();
+ ackCommitLock = new ReentrantLock();
+ maxRetries = kafkaSpoutConfig.getMaxTupleRetries();
+
+ // Kafka consumer
+ kafkaConsumer = new
KafkaConsumer<>(kafkaSpoutConfig.getKafkaProps(),
+ kafkaSpoutConfig.getKeyDeserializer(),
kafkaSpoutConfig.getValueDeserializer());
+ kafkaConsumer.subscribe(kafkaSpoutConfig.getSubscribedTopics(),
new KafkaSpoutConsumerRebalanceListener());
+
+ // Create commit offsets task
+ if (!kafkaSpoutConfig.isConsumerAutoCommitMode()) { // If it
is auto commit, no need to commit offsets manually
+ createCommitOffsetsTask();
+ }
+ }
+
+ // ======== Commit Offsets Task =======
+
+ private void createCommitOffsetsTask() {
+ commitOffsetsTask =
Executors.newSingleThreadScheduledExecutor(commitOffsetsThreadFactory());
+ commitOffsetsTask.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ commit = true;
+ }
+ }, 1000, kafkaSpoutConfig.getOffsetsCommitFreqMs(),
TimeUnit.MILLISECONDS);
--- End diff --
Can we replace this with a static Timer? We might have multiple instances
of the spout in a single worker, and having a thread per each just to set a
volatile boolean to true, feels like overkill.
> As a storm developer I’d like to use the new kafka consumer API (0.8.3) to
> reduce dependencies and use long term supported kafka apis
> --------------------------------------------------------------------------------------------------------------------------------------
>
> Key: STORM-822
> URL: https://issues.apache.org/jira/browse/STORM-822
> Project: Apache Storm
> Issue Type: Story
> Components: storm-kafka
> Reporter: Thomas Becker
> Assignee: Hugo Louro
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)