Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2155#discussion_r124600901 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java --- @@ -116,217 +141,57 @@ private boolean emitNullTuples = false; public Builder(String bootstrapServers, String ... topics) { - this(bootstrapServers, (SerializableDeserializer) null, (SerializableDeserializer) null, new NamedSubscription(topics)); - } - - public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, SerializableDeserializer<V> valDes, String ... topics) { - this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics)); - } - - public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, - SerializableDeserializer<V> valDes, Collection<String> topics) { - this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics)); - } - - public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, - SerializableDeserializer<V> valDes, Pattern topics) { - this(bootstrapServers, keyDes, valDes, new PatternSubscription(topics)); + this(bootstrapServers, new NamedSubscription(topics)); } - public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, - SerializableDeserializer<V> valDes, Subscription subscription) { - this(bootstrapServers, keyDes, null, valDes, null, subscription); + public Builder(String bootstrapServers, Collection<String> topics) { + this(bootstrapServers, new NamedSubscription(topics)); } - public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes, - Class<? extends Deserializer<V>> valDes, String ... topics) { - this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics)); + public Builder(String bootstrapServers, Pattern topics) { + this(bootstrapServers, new PatternSubscription(topics)); } - public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes, - Class<? extends Deserializer<V>> valDes, Collection<String> topics) { - this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics)); - } - - public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes, - Class<? extends Deserializer<V>> valDes, Pattern topics) { - this(bootstrapServers, keyDes, valDes, new PatternSubscription(topics)); - } - - public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes, - Class<? extends Deserializer<V>> valDes, Subscription subscription) { - this(bootstrapServers, null, keyDes, null, valDes, subscription); - } - - private Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, - Class<? extends Deserializer<K>> keyDesClazz, - SerializableDeserializer<V> valDes, Class<? extends Deserializer<V>> valDesClazz, Subscription subscription) { + /** + * Create a KafkaSpoutConfig builder. + * @param bootstrapServers The bootstrap servers the consumer will use + * @param subscription The subscription defining which topics and partitions each spout instance will read. + */ + public Builder(String bootstrapServers, Subscription subscription) { kafkaProps = new HashMap<>(); if (bootstrapServers == null || bootstrapServers.isEmpty()) { throw new IllegalArgumentException("bootstrap servers cannot be null"); } kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - this.keyDes = keyDes; - this.keyDesClazz = keyDesClazz; - this.valueDes = valDes; - this.valueDesClazz = valDesClazz; this.subscription = subscription; - this.translator = new DefaultRecordTranslator<K,V>(); - } - - private Builder(Builder<?, ?> builder, SerializableDeserializer<K> keyDes, Class<? extends Deserializer<K>> keyDesClazz, - SerializableDeserializer<V> valueDes, Class<? extends Deserializer<V>> valueDesClazz) { - this.kafkaProps = new HashMap<>(builder.kafkaProps); - this.subscription = builder.subscription; - this.pollTimeoutMs = builder.pollTimeoutMs; - this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs; - this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy; - this.maxUncommittedOffsets = builder.maxUncommittedOffsets; - //this could result in a lot of class case exceptions at runtime, - // but because some translators will work no matter what the generics - // are I thought it best not to force someone to reset the translator - // when they change the key/value types. - this.translator = (RecordTranslator<K, V>) builder.translator; - this.retryService = builder.retryService; - this.keyDes = keyDes; - this.keyDesClazz = keyDesClazz; - this.valueDes = valueDes; - this.valueDesClazz = valueDesClazz; - } - - /** - * Specifying this key deserializer overrides the property key.deserializer. If you have - * set a custom RecordTranslator before calling this it may result in class cast - * exceptions at runtime. - */ - public <NewKeyT> Builder<NewKeyT,V> setKey(SerializableDeserializer<NewKeyT> keyDeserializer) { - return new Builder<>(this, keyDeserializer, null, valueDes, valueDesClazz); - } - - /** - * Specify a class that can be instantiated to create a key.deserializer - * This is the same as setting key.deserializer, but overrides it. If you have - * set a custom RecordTranslator before calling this it may result in class cast - * exceptions at runtime. - */ - public <NewKeyT> Builder<NewKeyT, V> setKey(Class<? extends Deserializer<NewKeyT>> clazz) { - return new Builder<>(this, null, clazz, valueDes, valueDesClazz); - } - - /** - * Specifying this value deserializer overrides the property value.deserializer. If you have - * set a custom RecordTranslator before calling this it may result in class cast - * exceptions at runtime. - */ - public <NewValueT> Builder<K,NewValueT> setValue(SerializableDeserializer<NewValueT> valueDeserializer) { - return new Builder<>(this, keyDes, keyDesClazz, valueDeserializer, null); + this.translator = new DefaultRecordTranslator<>(); } /** - * Specify a class that can be instantiated to create a value.deserializer - * This is the same as setting value.deserializer, but overrides it. If you have - * set a custom RecordTranslator before calling this it may result in class cast - * exceptions at runtime. - */ - public <NewValueT> Builder<K,NewValueT> setValue(Class<? extends Deserializer<NewValueT>> clazz) { - return new Builder<>(this, keyDes, keyDesClazz, null, clazz); - } - - /** - * Set a Kafka property config. + * Set a Kafka consumer property config. */ public Builder<K,V> setProp(String key, Object value) { kafkaProps.put(key, value); return this; } /** - * Set multiple Kafka property configs. + * Set multiple Kafka consumer property configs. */ public Builder<K,V> setProp(Map<String, Object> props) { kafkaProps.putAll(props); return this; } /** - * Set multiple Kafka property configs. + * Set multiple Kafka consumer property configs. */ public Builder<K,V> setProp(Properties props) { for (String name: props.stringPropertyNames()) { --- End diff -- Nice catch. How about we use putAll to put the properties into kafkaProps instead? I'm not sure that I like the method filtering the input properties, I'd rather throw an error if the key is not a String. At the same time I don't think it should be KafkaSpoutConfig's job to validate KafkaConsumer parameters. The KafkaConsumer will handle throwing the error if the keys are not Strings https://github.com/apache/kafka/blob/efb060c57f05d1d586bb14c016b0187c60f8e994/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java#L60
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---