[ https://issues.apache.org/jira/browse/STORM-822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15197978#comment-15197978 ]
ASF GitHub Bot commented on STORM-822: -------------------------------------- Github user abhishekagarwal87 commented on a diff in the pull request: https://github.com/apache/storm/pull/1131#discussion_r56399639 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java --- @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import org.apache.kafka.common.serialization.Deserializer; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * KafkaSpoutConfig defines the required configuration to connect a consumer to a consumer group, as well as the subscribing topics + */ +public class KafkaSpoutConfig<K, V> implements Serializable { + public static final long DEFAULT_POLL_TIMEOUT_MS = 2_000; // 2s + public static final long DEFAULT_OFFSET_COMMIT_PERIOD_MS = 15_000; // 15s + public static final int DEFAULT_MAX_RETRIES = Integer.MAX_VALUE; // Retry forever + public static final int DEFAULT_MAX_UNCOMMITTED_OFFSETS = 10_000; // 10,000 records + + // Kafka property names + public interface Consumer { + String GROUP_ID = "group.id"; + String BOOTSTRAP_SERVERS = "bootstrap.servers"; + String ENABLE_AUTO_COMMIT = "enable.auto.commit"; + String AUTO_COMMIT_INTERVAL_MS = "auto.commit.interval.ms"; + String KEY_DESERIALIZER = "key.deserializer"; + String VALUE_DESERIALIZER = "value.deserializer"; + } + + /** + * The offset used by the Kafka spout in the first poll to Kafka broker. The choice of this parameter will + * affect the number of consumer records returned in the first poll. By default this parameter is set to UNCOMMITTED_EARLIEST. <br/><br/> + * The allowed values are EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST. <br/> + * <ul> + * <li>EARLIEST means that the kafka spout polls records starting in the first offset of the partition, regardless of previous commits</li> + * <li>LATEST means that the kafka spout polls records with offsets greater than the last offset in the partition, regardless of previous commits</li> + * <li>UNCOMMITTED_EARLIEST means that the kafka spout polls records from the last committed offset, if any. + * If no offset has been committed, it behaves as EARLIEST.</li> + * <li>UNCOMMITTED_LATEST means that the kafka spout polls records from the last committed offset, if any. + * If no offset has been committed, it behaves as LATEST.</li> + * </ul> + * */ + public enum FirstPollOffsetStrategy { + EARLIEST, + LATEST, + UNCOMMITTED_EARLIEST, + UNCOMMITTED_LATEST } + + /** + * Defines when to poll the next batch of records from Kafka. The choice of this parameter will affect throughput and the memory + * footprint of the Kafka spout. The allowed values are STREAM and BATCH. STREAM will likely have higher throughput and use more memory + * (it stores in memory the entire KafkaRecord,including data). BATCH will likely have less throughput but also use less memory. + * The BATCH behavior is similar to the behavior of the previous Kafka Spout. De default value is STREAM. + * <ul> + * <li>STREAM Every periodic call to nextTuple polls a new batch of records from Kafka as long as the maxUncommittedOffsets + * threshold has not yet been reached. When the threshold his reached, no more records are polled until enough offsets have been + * committed, such that the number of pending offsets is less than maxUncommittedOffsets. See {@link Builder#setMaxUncommittedOffsets(int)} + * </li> + * <li>BATCH Only polls a new batch of records from kafka once all the records that came in the previous poll have been acked.</li> + * </ul> + */ + public enum PollStrategy { + STREAM, + BATCH --- End diff -- Throughput will definitely take a dip with the Batch config and may not be desirable for any use case. Actually if we can just abstract out the part of * fetching tuple object from messageId * in an interface, we can have two implementations. In case of retry, one implementation will fetch the failed message from in-memory. other implementation can fetch the failed message directly from kafka similar to older spout. I have not thought it through completely yet. But it should be doable. > As a storm developer I’d like to use the new kafka consumer API (0.8.3) to > reduce dependencies and use long term supported kafka apis > -------------------------------------------------------------------------------------------------------------------------------------- > > Key: STORM-822 > URL: https://issues.apache.org/jira/browse/STORM-822 > Project: Apache Storm > Issue Type: Story > Components: storm-kafka > Reporter: Thomas Becker > Assignee: Hugo Louro > -- This message was sent by Atlassian JIRA (v6.3.4#6332)