Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1808#discussion_r96904191
  
    --- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
 ---
    @@ -61,129 +66,244 @@
          * If no offset has been committed, it behaves as LATEST.</li>
          * </ul>
          * */
    -    public enum FirstPollOffsetStrategy {
    +    public static enum FirstPollOffsetStrategy {
             EARLIEST,
             LATEST,
             UNCOMMITTED_EARLIEST,
             UNCOMMITTED_LATEST }
    -
    -    // Kafka consumer configuration
    -    private final Map<String, Object> kafkaProps;
    -    private final Deserializer<K> keyDeserializer;
    -    private final Deserializer<V> valueDeserializer;
    -    private final long pollTimeoutMs;
    -
    -    // Kafka spout configuration
    -    private final long offsetCommitPeriodMs;
    -    private final int maxRetries;
    -    private final int maxUncommittedOffsets;
    -    private final long partitionRefreshPeriodMs;
    -    private final boolean manualPartitionAssignment;
    -    private final FirstPollOffsetStrategy firstPollOffsetStrategy;
    -    private final KafkaSpoutStreams kafkaSpoutStreams;
    -    private final KafkaSpoutTuplesBuilder<K, V> tuplesBuilder;
    -    private final KafkaSpoutRetryService retryService;
    -
    -    private KafkaSpoutConfig(Builder<K,V> builder) {
    -        this.kafkaProps = setDefaultsAndGetKafkaProps(builder.kafkaProps);
    -        this.keyDeserializer = builder.keyDeserializer;
    -        this.valueDeserializer = builder.valueDeserializer;
    -        this.pollTimeoutMs = builder.pollTimeoutMs;
    -        this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs;
    -        this.maxRetries = builder.maxRetries;
    -        this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy;
    -        this.kafkaSpoutStreams = builder.kafkaSpoutStreams;
    -        this.maxUncommittedOffsets = builder.maxUncommittedOffsets;
    -        this.partitionRefreshPeriodMs = builder.partitionRefreshPeriodMs;
    -        this.manualPartitionAssignment = builder.manualPartitionAssignment;
    -        this.tuplesBuilder = builder.tuplesBuilder;
    -        this.retryService = builder.retryService;
    +    
    +    public static Builder<String, String> builder(String bootstrapServers, 
String ... topics) {
    +        return new Builder<>(bootstrapServers, StringDeserializer.class, 
StringDeserializer.class, topics);
         }
    -
    -    private Map<String, Object> setDefaultsAndGetKafkaProps(Map<String, 
Object> kafkaProps) {
    +    
    +    public static Builder<String, String> builder(String bootstrapServers, 
Collection<String> topics) {
    +        return new Builder<>(bootstrapServers, StringDeserializer.class, 
StringDeserializer.class, topics);
    +    }
    +    
    +    public static Builder<String, String> builder(String bootstrapServers, 
Pattern topics) {
    +        return new Builder<>(bootstrapServers, StringDeserializer.class, 
StringDeserializer.class, topics);
    +    }
    +    
    +    private static Map<String, Object> 
setDefaultsAndGetKafkaProps(Map<String, Object> kafkaProps) {
             // set defaults for properties not specified
    -        if (!kafkaProps.containsKey(Consumer.ENABLE_AUTO_COMMIT)) {
    -            kafkaProps.put(Consumer.ENABLE_AUTO_COMMIT, "false");
    +        if 
(!kafkaProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
    +            kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"false");
             }
             return kafkaProps;
         }
    -
    +    
         public static class Builder<K,V> {
             private final Map<String, Object> kafkaProps;
    -        private SerializableDeserializer<K> keyDeserializer;
    -        private SerializableDeserializer<V> valueDeserializer;
    +        private Subscription subscription;
    +        private final SerializableDeserializer<K> keyDes;
    +        private final Class<? extends Deserializer<K>> keyDesClazz;
    +        private final SerializableDeserializer<V> valueDes;
    +        private final Class<? extends Deserializer<V>> valueDesClazz;
    +        private RecordTranslator<K, V> translator;
             private long pollTimeoutMs = DEFAULT_POLL_TIMEOUT_MS;
             private long offsetCommitPeriodMs = 
DEFAULT_OFFSET_COMMIT_PERIOD_MS;
             private int maxRetries = DEFAULT_MAX_RETRIES;
             private FirstPollOffsetStrategy firstPollOffsetStrategy = 
FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
    -        private final KafkaSpoutStreams kafkaSpoutStreams;
             private int maxUncommittedOffsets = 
DEFAULT_MAX_UNCOMMITTED_OFFSETS;
    +        private KafkaSpoutRetryService retryService = 
DEFAULT_RETRY_SERVICE;
             private long partitionRefreshPeriodMs = 
DEFAULT_PARTITION_REFRESH_PERIOD_MS;
    -        private boolean manualPartitionAssignment = false;
    -        private final KafkaSpoutTuplesBuilder<K, V> tuplesBuilder;
    -        private final KafkaSpoutRetryService retryService;
    -
    -        /**
    -         * Please refer to javadoc in {@link #Builder(Map, 
KafkaSpoutStreams, KafkaSpoutTuplesBuilder, KafkaSpoutRetryService)}.<p/>
    -         * This constructor uses by the default the following 
implementation for {@link KafkaSpoutRetryService}:<p/>
    -         * {@code new 
KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), 
TimeInterval.milliSeconds(2),
    -         *           DEFAULT_MAX_RETRIES, TimeInterval.seconds(10)))}
    -         */
    -        public Builder(Map<String, Object> kafkaProps, KafkaSpoutStreams 
kafkaSpoutStreams,
    -                       KafkaSpoutTuplesBuilder<K,V> tuplesBuilder) {
    -            this(kafkaProps, kafkaSpoutStreams, tuplesBuilder,
    -                    new 
KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), 
TimeInterval.milliSeconds(2),
    -                            DEFAULT_MAX_RETRIES, 
TimeInterval.seconds(10)));
    +        
    +        public Builder(String bootstrapServers, 
SerializableDeserializer<K> keyDes, SerializableDeserializer<V> valDes, String 
... topics) {
    +            this(bootstrapServers, keyDes, valDes, new 
NamedSubscription(topics));
             }
    -
    -        /***
    -         * KafkaSpoutConfig defines the required configuration to connect 
a consumer to a consumer group, as well as the subscribing topics
    -         * The optional configuration can be specified using the set 
methods of this builder
    -         * @param kafkaProps    properties defining consumer connection to 
Kafka broker as specified in @see <a 
href="http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html";>KafkaConsumer</a>
    -         * @param kafkaSpoutStreams    streams to where the tuples are 
emitted for each tuple. Multiple topics can emit in the same stream.
    -         * @param tuplesBuilder logic to build tuples from {@link 
ConsumerRecord}s.
    -         * @param retryService  logic that manages the retrial of failed 
tuples
    -         */
    -        public Builder(Map<String, Object> kafkaProps, KafkaSpoutStreams 
kafkaSpoutStreams,
    -                       KafkaSpoutTuplesBuilder<K,V> tuplesBuilder, 
KafkaSpoutRetryService retryService) {
    -            if (kafkaProps == null || kafkaProps.isEmpty()) {
    -                throw new IllegalArgumentException("Properties defining 
consumer connection to Kafka broker are required: " + kafkaProps);
    -            }
    -
    -            if (kafkaSpoutStreams == null)  {
    -                throw new IllegalArgumentException("Must specify stream 
associated with each topic. Multiple topics can emit to the same stream");
    -            }
    -
    -            if (tuplesBuilder == null) {
    -                throw new IllegalArgumentException("Must specify at last 
one tuple builder per topic declared in KafkaSpoutStreams");
    -            }
    -
    -            if (retryService == null) {
    -                throw new IllegalArgumentException("Must specify at 
implementation of retry service");
    +        
    +        public Builder(String bootstrapServers, 
SerializableDeserializer<K> keyDes, SerializableDeserializer<V> valDes, 
Collection<String> topics) {
    +            this(bootstrapServers, keyDes, valDes, new 
NamedSubscription(topics));
    +        }
    +        
    +        public Builder(String bootstrapServers, 
SerializableDeserializer<K> keyDes, SerializableDeserializer<V> valDes, Pattern 
topics) {
    +            this(bootstrapServers, keyDes, valDes, new 
PatternSubscription(topics));
    +        }
    +        
    +        public Builder(String bootstrapServers, 
SerializableDeserializer<K> keyDes, SerializableDeserializer<V> valDes, 
Subscription subscription) {
    +           this(bootstrapServers, keyDes, null, valDes, null, 
subscription);
    +        }
    +        
    +        public Builder(String bootstrapServers, Class<? extends 
Deserializer<K>> keyDes, Class<? extends Deserializer<V>> valDes, String ... 
topics) {
    +            this(bootstrapServers, keyDes, valDes, new 
NamedSubscription(topics));
    +        }
    +        
    +        public Builder(String bootstrapServers, Class<? extends 
Deserializer<K>> keyDes, Class<? extends Deserializer<V>> valDes, 
Collection<String> topics) {
    +            this(bootstrapServers, keyDes, valDes, new 
NamedSubscription(topics));
    +        }
    +        
    +        public Builder(String bootstrapServers, Class<? extends 
Deserializer<K>> keyDes, Class<? extends Deserializer<V>> valDes, Pattern 
topics) {
    +            this(bootstrapServers, keyDes, valDes, new 
PatternSubscription(topics));
    +        }
    +        
    +        public Builder(String bootstrapServers, Class<? extends 
Deserializer<K>> keyDes, Class<? extends Deserializer<V>> valDes, Subscription 
subscription) {
    +           this(bootstrapServers, null, keyDes, null, valDes, 
subscription);
    +        }
    +        
    +        private Builder(String bootstrapServers, 
SerializableDeserializer<K> keyDes, Class<? extends Deserializer<K>> 
keyDesClazz,
    +                   SerializableDeserializer<V> valDes, Class<? extends 
Deserializer<V>> valDesClazz, Subscription subscription) {
    +            kafkaProps = new HashMap<>();
    +            if (bootstrapServers == null || bootstrapServers.isEmpty()) {
    +                throw new IllegalArgumentException("bootstrap servers 
cannot be null");
                 }
    +            kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers);
    +            this.keyDes = keyDes;
    +            this.keyDesClazz = keyDesClazz;
    +            this.valueDes = valDes;
    +            this.valueDesClazz = valDesClazz;
    +            this.subscription = subscription;
    +            this.translator = new DefaultRecordTranslator<K,V>();
    +        }
     
    -            this.kafkaProps = kafkaProps;
    -            this.kafkaSpoutStreams = kafkaSpoutStreams;
    -            this.tuplesBuilder = tuplesBuilder;
    -            this.retryService = retryService;
    +        private Builder(Builder<?, ?> builder, SerializableDeserializer<K> 
keyDes, Class<? extends Deserializer<K>> keyDesClazz,
    +                   SerializableDeserializer<V> valueDes, Class<? extends 
Deserializer<V>> valueDesClazz) {
    +            this.kafkaProps = new HashMap<>(builder.kafkaProps);
    +            this.subscription = builder.subscription;
    +            this.pollTimeoutMs = builder.pollTimeoutMs;
    +            this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs;
    +            this.maxRetries = builder.maxRetries;
    +            this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy;
    +            this.maxUncommittedOffsets = builder.maxUncommittedOffsets;
    +            //this could result in a lot of class case exceptions at 
runtime,
    +            // but this is the only way to really maintain API 
compatibility
    +            this.translator = (RecordTranslator<K, V>) builder.translator;
    +            this.retryService = builder.retryService;
    +            this.keyDes = keyDes;
    +            this.keyDesClazz = keyDesClazz;
    +            this.valueDes = valueDes;
    +            this.valueDesClazz = valueDesClazz;
             }
     
             /**
              * Specifying this key deserializer overrides the property 
key.deserializer
              */
    -        public Builder<K,V> setKeyDeserializer(SerializableDeserializer<K> 
keyDeserializer) {
    -            this.keyDeserializer = keyDeserializer;
    -            return this;
    +        public <NK> Builder<NK,V> setKey(SerializableDeserializer<NK> 
keyDeserializer) {
    --- End diff --
    
    I think what I was asking was "Why would you need to create a Builder with 
key type A and then later set a deserializer for keys of type B"? :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to