[ https://issues.apache.org/jira/browse/STORM-822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15199971#comment-15199971 ]
ASF GitHub Bot commented on STORM-822: -------------------------------------- Github user harshach commented on a diff in the pull request: https://github.com/apache/storm/pull/1131#discussion_r56542210 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java --- @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import org.apache.kafka.common.serialization.Deserializer; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * KafkaSpoutConfig defines the required configuration to connect a consumer to a consumer group, as well as the subscribing topics + */ +public class KafkaSpoutConfig<K, V> implements Serializable { + public static final long DEFAULT_POLL_TIMEOUT_MS = 2_000; // 2s + public static final long DEFAULT_OFFSET_COMMIT_PERIOD_MS = 15_000; // 15s + public static final int DEFAULT_MAX_RETRIES = Integer.MAX_VALUE; // Retry forever + public static final int DEFAULT_MAX_UNCOMMITTED_OFFSETS = 10_000; // 10,000 records + + // Kafka property names + public interface Consumer { + String GROUP_ID = "group.id"; + String BOOTSTRAP_SERVERS = "bootstrap.servers"; + String ENABLE_AUTO_COMMIT = "enable.auto.commit"; + String AUTO_COMMIT_INTERVAL_MS = "auto.commit.interval.ms"; + String KEY_DESERIALIZER = "key.deserializer"; + String VALUE_DESERIALIZER = "value.deserializer"; + } + + /** + * The offset used by the Kafka spout in the first poll to Kafka broker. The choice of this parameter will + * affect the number of consumer records returned in the first poll. By default this parameter is set to UNCOMMITTED_EARLIEST. <br/><br/> + * The allowed values are EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST. <br/> + * <ul> + * <li>EARLIEST means that the kafka spout polls records starting in the first offset of the partition, regardless of previous commits</li> + * <li>LATEST means that the kafka spout polls records with offsets greater than the last offset in the partition, regardless of previous commits</li> + * <li>UNCOMMITTED_EARLIEST means that the kafka spout polls records from the last committed offset, if any. + * If no offset has been committed, it behaves as EARLIEST.</li> + * <li>UNCOMMITTED_LATEST means that the kafka spout polls records from the last committed offset, if any. + * If no offset has been committed, it behaves as LATEST.</li> + * </ul> + * */ + public enum FirstPollOffsetStrategy { + EARLIEST, + LATEST, + UNCOMMITTED_EARLIEST, + UNCOMMITTED_LATEST } + + /** + * Defines when to poll the next batch of records from Kafka. The choice of this parameter will affect throughput and the memory + * footprint of the Kafka spout. The allowed values are STREAM and BATCH. STREAM will likely have higher throughput and use more memory + * (it stores in memory the entire KafkaRecord,including data). BATCH will likely have less throughput but also use less memory. + * The BATCH behavior is similar to the behavior of the previous Kafka Spout. De default value is STREAM. + * <ul> + * <li>STREAM Every periodic call to nextTuple polls a new batch of records from Kafka as long as the maxUncommittedOffsets + * threshold has not yet been reached. When the threshold his reached, no more records are polled until enough offsets have been + * committed, such that the number of pending offsets is less than maxUncommittedOffsets. See {@link Builder#setMaxUncommittedOffsets(int)} + * </li> + * <li>BATCH Only polls a new batch of records from kafka once all the records that came in the previous poll have been acked.</li> + * </ul> + */ + public enum PollStrategy { + STREAM, + BATCH --- End diff -- @revans2 I disagree with using existing kafkaspout just because it works. Its been a nightmare to maintain and support the existing kafkaspout. Its goes through way too much delegation to achieve simple tasks. I am +1 for going away from existing kafkaspout design. _[0] - We fetch (poll) a new set of records from Kafka when we don't have something to emit from our buffer. The internal spout flow control assumes that we will emit around one tuple at a time when it calls .nextTuple(). So we have a buffer of messages to emit and emit them one at a time_ The above is not true. https://github.com/apache/storm/blob/master/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java#L140 It emits all the available tuples in one nextTuple call. _[1] - If offsets 1 2 3 4 5 6 get fetched (polled) and the following occurs: 1 2 committed, 4 fails , 3 5 6 pending and the exponential back-off for 4 has expired and 4 has been retired less then the maximum number of retries. [1.1] - We want to fetch again starting on 4, i.e 4 5 6 ... and then filter out any messages that are not ready to be replayed. [1.2] - Let's say 3 that was pending fails after the retry fetch described in [1.1] (4 5 6 ... ) has happened. What do we do? We fetch again starting with 3 this time (3 4 5 ...)_ This is unnecessarily complex. If its time to fetch again. We fetch from last committed offset + 1. If 3 fails it will be available and we don't need to fetch again. _[5] - Does it make sense to even support retrying an offset forever? This can potentially cause an infinite cycle of fetch (poll) calls to Kafka accross sahe network... but it will guarantee delivery up to that point (offset)._ probably not but storm core api + ackers contract is at least once delivery. So as long as we get retriable exception from kafka we should provide this capability. If its something like OffsetNotFound exception than we can move as the data on the kafka is purged. _We do NOT want at all to have the feature to retry from memory. That means that there is no need to ever keep tuples in memory, only offsets._ Given we agreed on not having in-memory replays as failures are rare. Supporting this won't give us much benefit and having one more config for the user won't help either. > As a storm developer I’d like to use the new kafka consumer API (0.8.3) to > reduce dependencies and use long term supported kafka apis > -------------------------------------------------------------------------------------------------------------------------------------- > > Key: STORM-822 > URL: https://issues.apache.org/jira/browse/STORM-822 > Project: Apache Storm > Issue Type: Story > Components: storm-kafka > Reporter: Thomas Becker > Assignee: Hugo Louro > -- This message was sent by Atlassian JIRA (v6.3.4#6332)