Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1131#discussion_r57770615
  
    --- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
 ---
    @@ -0,0 +1,292 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + *   or more contributor license agreements.  See the NOTICE file
    + *   distributed with this work for additional information
    + *   regarding copyright ownership.  The ASF licenses this file
    + *   to you under the Apache License, Version 2.0 (the
    + *   "License"); you may not use this file except in compliance
    + *   with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *   Unless required by applicable law or agreed to in writing, software
    + *   distributed under the License is distributed on an "AS IS" BASIS,
    + *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
    + *   See the License for the specific language governing permissions and
    + *   limitations under the License.
    + */
    +
    +package org.apache.storm.kafka.spout;
    +
    +import org.apache.kafka.common.serialization.Deserializer;
    +
    +import java.io.Serializable;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +
    +/**
    + * KafkaSpoutConfig defines the required configuration to connect a 
consumer to a consumer group, as well as the subscribing topics
    + */
    +public class KafkaSpoutConfig<K, V> implements Serializable {
    +    public static final long DEFAULT_POLL_TIMEOUT_MS = 2_000;            
// 2s
    +    public static final long DEFAULT_OFFSET_COMMIT_PERIOD_MS = 15_000;   
// 15s
    +    public static final int DEFAULT_MAX_RETRIES = Integer.MAX_VALUE;     
// Retry forever
    +    public static final int DEFAULT_MAX_UNCOMMITTED_OFFSETS = 10_000;    
// 10,000 records
    +
    +    // Kafka property names
    +    public interface Consumer {
    +        String GROUP_ID = "group.id";
    +        String BOOTSTRAP_SERVERS = "bootstrap.servers";
    +        String ENABLE_AUTO_COMMIT = "enable.auto.commit";
    +        String AUTO_COMMIT_INTERVAL_MS = "auto.commit.interval.ms";
    +        String KEY_DESERIALIZER = "key.deserializer";
    +        String VALUE_DESERIALIZER = "value.deserializer";
    +    }
    +
    +    /**
    +     * The offset used by the Kafka spout in the first poll to Kafka 
broker. The choice of this parameter will
    +     * affect the number of consumer records returned in the first poll. 
By default this parameter is set to UNCOMMITTED_EARLIEST. <br/><br/>
    +     * The allowed values are EARLIEST, LATEST, UNCOMMITTED_EARLIEST, 
UNCOMMITTED_LATEST. <br/>
    +     * <ul>
    +     * <li>EARLIEST means that the kafka spout polls records starting in 
the first offset of the partition, regardless of previous commits</li>
    +     * <li>LATEST means that the kafka spout polls records with offsets 
greater than the last offset in the partition, regardless of previous 
commits</li>
    +     * <li>UNCOMMITTED_EARLIEST means that the kafka spout polls records 
from the last committed offset, if any.
    +     * If no offset has been committed, it behaves as EARLIEST.</li>
    +     * <li>UNCOMMITTED_LATEST means that the kafka spout polls records 
from the last committed offset, if any.
    +     * If no offset has been committed, it behaves as LATEST.</li>
    +     * </ul>
    +     * */
    +    public enum FirstPollOffsetStrategy {
    +        EARLIEST,
    +        LATEST,
    +        UNCOMMITTED_EARLIEST,
    +        UNCOMMITTED_LATEST }
    +
    +    // Kafka consumer configuration
    +    private final Map<String, Object> kafkaProps;
    +    private final Deserializer<K> keyDeserializer;
    +    private final Deserializer<V> valueDeserializer;
    +    private final long pollTimeoutMs;
    +
    +    // Kafka spout configuration
    +    private final long offsetCommitPeriodMs;
    +    private final int maxRetries;
    +    private final int maxUncommittedOffsets;
    +    private final FirstPollOffsetStrategy firstPollOffsetStrategy;
    +    private final KafkaSpoutStreams kafkaSpoutStreams;
    +    private final KafkaSpoutTuplesBuilder<K, V> tuplesBuilder;
    +    private final KafkaSpoutRetryService retryService;
    +
    +    private KafkaSpoutConfig(Builder<K,V> builder) {
    +        this.kafkaProps = setDefaultsAndGetKafkaProps(builder.kafkaProps);
    +        this.keyDeserializer = builder.keyDeserializer;
    +        this.valueDeserializer = builder.valueDeserializer;
    +        this.pollTimeoutMs = builder.pollTimeoutMs;
    +        this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs;
    +        this.maxRetries = builder.maxRetries;
    +        this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy;
    +        this.kafkaSpoutStreams = builder.kafkaSpoutStreams;
    +        this.maxUncommittedOffsets = builder.maxUncommittedOffsets;
    +        this.tuplesBuilder = builder.tuplesBuilder;
    +        this.retryService = builder.retryService;
    +    }
    +
    +    private Map<String, Object> setDefaultsAndGetKafkaProps(Map<String, 
Object> kafkaProps) {
    +        // set defaults for properties not specified
    +        if (!kafkaProps.containsKey(Consumer.ENABLE_AUTO_COMMIT)) {
    +            kafkaProps.put(Consumer.ENABLE_AUTO_COMMIT, "false");
    +        }
    +        return kafkaProps;
    +    }
    +
    +    public static class Builder<K,V> {
    +        private final Map<String, Object> kafkaProps;
    +        private Deserializer<K> keyDeserializer;
    +        private Deserializer<V> valueDeserializer;
    +        private long pollTimeoutMs = DEFAULT_POLL_TIMEOUT_MS;
    +        private long offsetCommitPeriodMs = 
DEFAULT_OFFSET_COMMIT_PERIOD_MS;
    +        private int maxRetries = DEFAULT_MAX_RETRIES;
    +        private FirstPollOffsetStrategy firstPollOffsetStrategy = 
FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
    +        private final KafkaSpoutStreams kafkaSpoutStreams;
    +        private int maxUncommittedOffsets = 
DEFAULT_MAX_UNCOMMITTED_OFFSETS;
    +        private final KafkaSpoutTuplesBuilder<K, V> tuplesBuilder;
    +        private final KafkaSpoutRetryService retryService;
    +
    +        /***
    +         * KafkaSpoutConfig defines the required configuration to connect 
a consumer to a consumer group, as well as the subscribing topics
    +         * The optional configuration can be specified using the set 
methods of this builder
    +         * @param kafkaProps    properties defining consumer connection to 
Kafka broker as specified in @see <a 
href="http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html";>KafkaConsumer</a>
    +         * @param kafkaSpoutStreams    streams to where the tuples are 
emitted for each tuple. Multiple topics can emit in the same stream.
    +         */
    +        public Builder(Map<String, Object> kafkaProps, KafkaSpoutStreams 
kafkaSpoutStreams,
    +                       KafkaSpoutTuplesBuilder<K,V> tuplesBuilder, 
KafkaSpoutRetryService retryService) {
    --- End diff --
    
    Would it be possible to have retryService be optional with a default 
instead of forceing everyone to pass in an argument.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to