[
https://issues.apache.org/jira/browse/STORM-822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15216764#comment-15216764
]
ASF GitHub Bot commented on STORM-822:
--------------------------------------
Github user hmcl commented on a diff in the pull request:
https://github.com/apache/storm/pull/1131#discussion_r57791718
--- Diff:
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
---
@@ -0,0 +1,547 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+
+import static
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
+import static
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST;
+import static
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
+import static
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST;
+
+public class KafkaSpout<K, V> extends BaseRichSpout {
+ private static final Logger LOG =
LoggerFactory.getLogger(KafkaSpout.class);
+ private static final Comparator<KafkaSpoutMessageId> OFFSET_COMPARATOR
= new OffsetComparator();
+
+ // Storm
+ protected SpoutOutputCollector collector;
+
+ // Kafka
+ private final KafkaSpoutConfig<K, V> kafkaSpoutConfig;
+ private transient KafkaConsumer<K, V> kafkaConsumer;
+ private transient boolean consumerAutoCommitMode;
+
+
+ // Bookkeeping
+ private transient int maxRetries; //
Max number of times a tuple is retried
+ private transient FirstPollOffsetStrategy firstPollOffsetStrategy; //
Strategy to determine the fetch offset of the first realized by the spout upon
activation
+ private transient KafkaSpoutRetryService retryService; //
Class that has the logic to handle tuple failure
+ private transient Timer commitTimer; //
timer == null for auto commit mode
+ private transient boolean initialized; //
Flag indicating that the spout is still undergoing initialization process.
+ // Initialization is only complete after the first call to
KafkaSpoutConsumerRebalanceListener.onPartitionsAssigned()
+
+ private KafkaSpoutStreams kafkaSpoutStreams; //
Object that wraps all the logic to declare output fields and emit tuples
+ private transient KafkaSpoutTuplesBuilder<K, V> tuplesBuilder; //
Object that contains the logic to build tuples for each ConsumerRecord
+
+ private transient Map<TopicPartition, OffsetEntry> acked; //
Tuples that were successfully acked. These tuples will be committed
periodically when the commit timer expires, after consumer rebalance, or on
close/deactivate
+ private transient Set<KafkaSpoutMessageId> emitted; //
Tuples that have been emitted but that are "on the wire", i.e. pending being
acked or failed
+ private transient Iterator<ConsumerRecord<K, V>> waitingToEmit;
// Records that have been polled and are queued to be emitted in the
nextTuple() call. One record is emitted per nextTuple()
+ private transient long numUncommittedOffsets; //
Number of offsets that have been polled and emitted but not yet been committed
+
+
+ public KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
+ this.kafkaSpoutConfig = kafkaSpoutConfig; // Pass
in configuration
+ this.kafkaSpoutStreams = kafkaSpoutConfig.getKafkaSpoutStreams();
+ }
+
+ @Override
+ public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
+ initialized = false;
+
+ // Spout internals
+ this.collector = collector;
+ maxRetries = kafkaSpoutConfig.getMaxTupleRetries();
+ numUncommittedOffsets = 0;
+
+ // Offset management
+ firstPollOffsetStrategy =
kafkaSpoutConfig.getFirstPollOffsetStrategy();
+ consumerAutoCommitMode =
kafkaSpoutConfig.isConsumerAutoCommitMode();
+
+ // Retries management
+ retryService = kafkaSpoutConfig.getRetryService();
+
+ // Tuples builder delegate
+ tuplesBuilder = kafkaSpoutConfig.getTuplesBuilder();
+
+ if (!consumerAutoCommitMode) { // If it is auto commit, no
need to commit offsets manually
+ commitTimer = new Timer(500,
kafkaSpoutConfig.getOffsetsCommitPeriodMs(), TimeUnit.MILLISECONDS);
+ }
+
+ acked = new HashMap<>();
+ emitted = new HashSet<>();
+ waitingToEmit = Collections.emptyListIterator();
+
+ LOG.info("Kafka Spout opened with the following configuration:
{}", kafkaSpoutConfig);
+ }
+
+ // =========== Consumer Rebalance Listener - On the same thread as the
caller ===========
+
+ private class KafkaSpoutConsumerRebalanceListener implements
ConsumerRebalanceListener {
+ @Override
+ public void onPartitionsRevoked(Collection<TopicPartition>
partitions) {
+ LOG.debug("Partitions revoked. [consumer-group={},
consumer={}, topic-partitions={}]",
+ kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer,
partitions);
+ if (!consumerAutoCommitMode && initialized) {
+ initialized = false;
+ commitOffsetsForAckedTuples();
+ }
+ }
+
+ @Override
+ public void onPartitionsAssigned(Collection<TopicPartition>
partitions) {
+ LOG.debug("Partitions reassignment. [consumer-group={},
consumer={}, topic-partitions={}]",
+ kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer,
partitions);
+
+ initialize(partitions);
+ }
+
+ private void initialize(Collection<TopicPartition> partitions) {
+ if (!consumerAutoCommitMode) {
+ acked.keySet().retainAll(partitions); // remove from
acked all partitions that are no longer assigned to this spout
+ }
+
+ retryService.remove(partitions);
+
+ for (TopicPartition tp : partitions) {
+ final OffsetAndMetadata committedOffset =
kafkaConsumer.committed(tp);
+ final long fetchOffset = doSeek(tp, committedOffset);
+ setAcked(tp, fetchOffset);
+ }
+ initialized = true;
+ LOG.debug("Initialization complete");
+ }
+
+ /**
+ * sets the cursor to the location dictated by the first poll
strategy and returns the fetch offset
+ */
+ private long doSeek(TopicPartition tp, OffsetAndMetadata
committedOffset) {
+ long fetchOffset;
+ if (committedOffset != null) { // offset was
committed for this TopicPartition
+ if (firstPollOffsetStrategy.equals(EARLIEST)) {
+ kafkaConsumer.seekToBeginning(tp);
+ fetchOffset = kafkaConsumer.position(tp);
+ } else if (firstPollOffsetStrategy.equals(LATEST)) {
+ kafkaConsumer.seekToEnd(tp);
+ fetchOffset = kafkaConsumer.position(tp);
+ } else {
+ // By default polling starts at the last committed
offset. +1 to point fetch to the first uncommitted offset.
+ fetchOffset = committedOffset.offset() + 1;
+ kafkaConsumer.seek(tp, fetchOffset);
+ }
+ } else { // no commits have ever been done, so start at the
beginning or end depending on the strategy
+ if (firstPollOffsetStrategy.equals(EARLIEST) ||
firstPollOffsetStrategy.equals(UNCOMMITTED_EARLIEST)) {
+ kafkaConsumer.seekToBeginning(tp);
+ } else if (firstPollOffsetStrategy.equals(LATEST) ||
firstPollOffsetStrategy.equals(UNCOMMITTED_LATEST)) {
+ kafkaConsumer.seekToEnd(tp);
+ }
+ fetchOffset = kafkaConsumer.position(tp);
+ }
+ return fetchOffset;
+ }
+ }
+
+ private void setAcked(TopicPartition tp, long fetchOffset) {
+ // If this partition was previously assigned to this spout, leave
the acked offsets as they were to resume where it left off
+ if (!consumerAutoCommitMode && !acked.containsKey(tp)) {
+ acked.put(tp, new OffsetEntry(tp, fetchOffset));
+ }
+ }
+
+ // ======== Next Tuple =======
+
+ @Override
+ public void nextTuple() {
+ if (initialized) {
+ if (commit()) {
+ commitOffsetsForAckedTuples();
+ }
+
+ if (poll()) {
+ setWaitingToEmit(pollKafkaBroker());
+ }
+
+ if (waitingToEmit()) {
+ emit();
+ }
+ } else {
+ LOG.debug("Spout not initialized. Not sending tuples until
initialization completes");
+ }
+ }
+
+ private boolean commit() {
+ return !consumerAutoCommitMode &&
commitTimer.isExpiredResetOnTrue(); // timer != null for non auto commit mode
+ }
+
+ private boolean poll() {
+ return !waitingToEmit() && numUncommittedOffsets <
kafkaSpoutConfig.getMaxUncommittedOffsets();
+ }
+
+ private boolean waitingToEmit() {
+ return waitingToEmit != null && waitingToEmit.hasNext();
+ }
+
+ public void setWaitingToEmit(ConsumerRecords<K,V> consumerRecords) {
+ List<ConsumerRecord<K,V>> waitingToEmitList = new LinkedList<>();
+ for (TopicPartition tp : consumerRecords.partitions()) {
+ waitingToEmitList.addAll(consumerRecords.records(tp));
+ }
+ waitingToEmit = waitingToEmitList.iterator();
+ LOG.trace("Records waiting to be emitted {}", waitingToEmitList);
+ }
+
+ // ======== poll =========
+ private ConsumerRecords<K, V> pollKafkaBroker() {
+ doSeekRetriableTopicPartitions();
+
+ final ConsumerRecords<K, V> consumerRecords =
kafkaConsumer.poll(kafkaSpoutConfig.getPollTimeoutMs());
+ final int numPolledRecords = consumerRecords.count();
+ LOG.debug("Polled [{}] records from Kafka.
NumUncommittedOffsets=[{}]", numPolledRecords, numUncommittedOffsets);
+ return consumerRecords;
+ }
+
+ private void doSeekRetriableTopicPartitions() {
+ final Set<TopicPartition> retriableTopicPartitions =
retryService.retriableTopicPartitions();
+
+ for (TopicPartition rtp : retriableTopicPartitions) {
+ final OffsetAndMetadata offsetAndMeta =
acked.get(rtp).findNextCommitOffset();
+ if (offsetAndMeta != null) {
+ kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1); //
seek to the next offset that is ready to commit in next commit cycle
+ } else {
+ kafkaConsumer.seekToEnd(rtp); // Seek to last committed
offset
+ }
+ }
+ }
+
+ // ======== emit =========
+ private void emit() {
+ emitTupleIfNotEmitted(waitingToEmit.next());
+ waitingToEmit.remove();
--- End diff --
@revans2 can you please detail what you mean by too slow? In the previous
implementation we had a `for` loop iterating over all the records fetched
(polled) and emitting all of those records during a single call to
`nextTuple()`. You suggested that we send one tuple only per call to
`nextTuple()`, which is what it's done in the current implementation. It has to
wait at most the amount of time between two consecutive calls to `nextTuple()`
before either fetching (polling) or emitting the next record. What do you mean
by very slow?
> As a storm developer I’d like to use the new kafka consumer API (0.8.3) to
> reduce dependencies and use long term supported kafka apis
> --------------------------------------------------------------------------------------------------------------------------------------
>
> Key: STORM-822
> URL: https://issues.apache.org/jira/browse/STORM-822
> Project: Apache Storm
> Issue Type: Story
> Components: storm-kafka
> Reporter: Thomas Becker
> Assignee: Hugo Louro
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)