Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2155#discussion_r124600901
  
    --- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
 ---
    @@ -116,217 +141,57 @@
             private boolean emitNullTuples = false;
     
             public Builder(String bootstrapServers, String ... topics) {
    -            this(bootstrapServers, (SerializableDeserializer) null, 
(SerializableDeserializer) null, new NamedSubscription(topics));
    -        }
    -
    -        public Builder(String bootstrapServers, 
SerializableDeserializer<K> keyDes, SerializableDeserializer<V> valDes, String 
... topics) {
    -            this(bootstrapServers, keyDes, valDes, new 
NamedSubscription(topics));
    -        }
    -        
    -        public Builder(String bootstrapServers, 
SerializableDeserializer<K> keyDes,
    -            SerializableDeserializer<V> valDes, Collection<String> topics) 
{
    -            this(bootstrapServers, keyDes, valDes, new 
NamedSubscription(topics));
    -        }
    -        
    -        public Builder(String bootstrapServers, 
SerializableDeserializer<K> keyDes,
    -            SerializableDeserializer<V> valDes, Pattern topics) {
    -            this(bootstrapServers, keyDes, valDes, new 
PatternSubscription(topics));
    +            this(bootstrapServers, new NamedSubscription(topics));
             }
             
    -        public Builder(String bootstrapServers, 
SerializableDeserializer<K> keyDes,
    -            SerializableDeserializer<V> valDes, Subscription subscription) 
{
    -            this(bootstrapServers, keyDes, null, valDes, null, 
subscription);
    +        public Builder(String bootstrapServers, Collection<String> topics) 
{
    +            this(bootstrapServers, new NamedSubscription(topics));
             }
             
    -        public Builder(String bootstrapServers, Class<? extends 
Deserializer<K>> keyDes,
    -                Class<? extends Deserializer<V>> valDes, String ... 
topics) {
    -            this(bootstrapServers, keyDes, valDes, new 
NamedSubscription(topics));
    +        public Builder(String bootstrapServers, Pattern topics) {
    +            this(bootstrapServers, new PatternSubscription(topics));
             }
             
    -        public Builder(String bootstrapServers, Class<? extends 
Deserializer<K>> keyDes,
    -                Class<? extends Deserializer<V>> valDes, 
Collection<String> topics) {
    -            this(bootstrapServers, keyDes, valDes, new 
NamedSubscription(topics));
    -        }
    -        
    -        public Builder(String bootstrapServers, Class<? extends 
Deserializer<K>> keyDes,
    -                Class<? extends Deserializer<V>> valDes, Pattern topics) {
    -            this(bootstrapServers, keyDes, valDes, new 
PatternSubscription(topics));
    -        }
    -        
    -        public Builder(String bootstrapServers, Class<? extends 
Deserializer<K>> keyDes,
    -                Class<? extends Deserializer<V>> valDes, Subscription 
subscription) {
    -            this(bootstrapServers, null, keyDes, null, valDes, 
subscription);
    -        }
    -        
    -        private Builder(String bootstrapServers, 
SerializableDeserializer<K> keyDes,
    -                Class<? extends Deserializer<K>> keyDesClazz,
    -                SerializableDeserializer<V> valDes, Class<? extends 
Deserializer<V>> valDesClazz, Subscription subscription) {
    +        /**
    +         * Create a KafkaSpoutConfig builder.
    +         * @param bootstrapServers The bootstrap servers the consumer will 
use
    +         * @param subscription The subscription defining which topics and 
partitions each spout instance will read.
    +         */
    +        public Builder(String bootstrapServers, Subscription subscription) 
{
                 kafkaProps = new HashMap<>();
                 if (bootstrapServers == null || bootstrapServers.isEmpty()) {
                     throw new IllegalArgumentException("bootstrap servers 
cannot be null");
                 }
                 kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers);
    -            this.keyDes = keyDes;
    -            this.keyDesClazz = keyDesClazz;
    -            this.valueDes = valDes;
    -            this.valueDesClazz = valDesClazz;
                 this.subscription = subscription;
    -            this.translator = new DefaultRecordTranslator<K,V>();
    -        }
    -
    -        private Builder(Builder<?, ?> builder, SerializableDeserializer<K> 
keyDes, Class<? extends Deserializer<K>> keyDesClazz,
    -                SerializableDeserializer<V> valueDes, Class<? extends 
Deserializer<V>> valueDesClazz) {
    -            this.kafkaProps = new HashMap<>(builder.kafkaProps);
    -            this.subscription = builder.subscription;
    -            this.pollTimeoutMs = builder.pollTimeoutMs;
    -            this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs;
    -            this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy;
    -            this.maxUncommittedOffsets = builder.maxUncommittedOffsets;
    -            //this could result in a lot of class case exceptions at 
runtime,
    -            // but because some translators will work no matter what the 
generics
    -            // are I thought it best not to force someone to reset the 
translator
    -            // when they change the key/value types.
    -            this.translator = (RecordTranslator<K, V>) builder.translator;
    -            this.retryService = builder.retryService;
    -            this.keyDes = keyDes;
    -            this.keyDesClazz = keyDesClazz;
    -            this.valueDes = valueDes;
    -            this.valueDesClazz = valueDesClazz;
    -        }
    -
    -        /**
    -         * Specifying this key deserializer overrides the property 
key.deserializer. If you have
    -         * set a custom RecordTranslator before calling this it may result 
in class cast
    -         * exceptions at runtime.
    -         */
    -        public <NewKeyT> Builder<NewKeyT,V> 
setKey(SerializableDeserializer<NewKeyT> keyDeserializer) {
    -            return new Builder<>(this, keyDeserializer, null, valueDes, 
valueDesClazz);
    -        }
    -        
    -        /**
    -         * Specify a class that can be instantiated to create a 
key.deserializer
    -         * This is the same as setting key.deserializer, but overrides it. 
If you have
    -         * set a custom RecordTranslator before calling this it may result 
in class cast
    -         * exceptions at runtime.
    -         */
    -        public <NewKeyT> Builder<NewKeyT, V> setKey(Class<? extends 
Deserializer<NewKeyT>> clazz) {
    -            return new Builder<>(this, null, clazz, valueDes, 
valueDesClazz);
    -        }
    -
    -        /**
    -         * Specifying this value deserializer overrides the property 
value.deserializer.  If you have
    -         * set a custom RecordTranslator before calling this it may result 
in class cast
    -         * exceptions at runtime.
    -         */
    -        public <NewValueT> Builder<K,NewValueT> 
setValue(SerializableDeserializer<NewValueT> valueDeserializer) {
    -            return new Builder<>(this, keyDes, keyDesClazz, 
valueDeserializer, null);
    +            this.translator = new DefaultRecordTranslator<>();
             }
             
             /**
    -         * Specify a class that can be instantiated to create a 
value.deserializer
    -         * This is the same as setting value.deserializer, but overrides 
it.  If you have
    -         * set a custom RecordTranslator before calling this it may result 
in class cast
    -         * exceptions at runtime.
    -         */
    -        public <NewValueT> Builder<K,NewValueT> setValue(Class<? extends 
Deserializer<NewValueT>> clazz) {
    -            return new Builder<>(this, keyDes, keyDesClazz, null, clazz);
    -        }
    -        
    -        /**
    -         * Set a Kafka property config.
    +         * Set a Kafka consumer property config. 
              */
             public Builder<K,V> setProp(String key, Object value) {
                 kafkaProps.put(key, value);
                 return this;
             }
             
             /**
    -         * Set multiple Kafka property configs.
    +         * Set multiple Kafka consumer property configs.
              */
             public Builder<K,V> setProp(Map<String, Object> props) {
                 kafkaProps.putAll(props);
                 return this;
             }
             
             /**
    -         * Set multiple Kafka property configs.
    +         * Set multiple Kafka consumer property configs.
              */
             public Builder<K,V> setProp(Properties props) {
                 for (String name: props.stringPropertyNames()) {
    --- End diff --
    
    Nice catch. How about we use putAll to put the properties into kafkaProps 
instead? I'm not sure that I like the method filtering the input properties, 
I'd rather throw an error if the key is not a String. At the same time I don't 
think it should be KafkaSpoutConfig's job to validate KafkaConsumer parameters. 
The KafkaConsumer will handle throwing the error if the keys are not Strings 
https://github.com/apache/kafka/blob/efb060c57f05d1d586bb14c016b0187c60f8e994/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java#L60


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to