Github user srdo commented on a diff in the pull request:
https://github.com/apache/storm/pull/2155#discussion_r124600901
--- Diff:
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
---
@@ -116,217 +141,57 @@
private boolean emitNullTuples = false;
public Builder(String bootstrapServers, String ... topics) {
- this(bootstrapServers, (SerializableDeserializer) null,
(SerializableDeserializer) null, new NamedSubscription(topics));
- }
-
- public Builder(String bootstrapServers,
SerializableDeserializer<K> keyDes, SerializableDeserializer<V> valDes, String
... topics) {
- this(bootstrapServers, keyDes, valDes, new
NamedSubscription(topics));
- }
-
- public Builder(String bootstrapServers,
SerializableDeserializer<K> keyDes,
- SerializableDeserializer<V> valDes, Collection<String> topics)
{
- this(bootstrapServers, keyDes, valDes, new
NamedSubscription(topics));
- }
-
- public Builder(String bootstrapServers,
SerializableDeserializer<K> keyDes,
- SerializableDeserializer<V> valDes, Pattern topics) {
- this(bootstrapServers, keyDes, valDes, new
PatternSubscription(topics));
+ this(bootstrapServers, new NamedSubscription(topics));
}
- public Builder(String bootstrapServers,
SerializableDeserializer<K> keyDes,
- SerializableDeserializer<V> valDes, Subscription subscription)
{
- this(bootstrapServers, keyDes, null, valDes, null,
subscription);
+ public Builder(String bootstrapServers, Collection<String> topics)
{
+ this(bootstrapServers, new NamedSubscription(topics));
}
- public Builder(String bootstrapServers, Class<? extends
Deserializer<K>> keyDes,
- Class<? extends Deserializer<V>> valDes, String ...
topics) {
- this(bootstrapServers, keyDes, valDes, new
NamedSubscription(topics));
+ public Builder(String bootstrapServers, Pattern topics) {
+ this(bootstrapServers, new PatternSubscription(topics));
}
- public Builder(String bootstrapServers, Class<? extends
Deserializer<K>> keyDes,
- Class<? extends Deserializer<V>> valDes,
Collection<String> topics) {
- this(bootstrapServers, keyDes, valDes, new
NamedSubscription(topics));
- }
-
- public Builder(String bootstrapServers, Class<? extends
Deserializer<K>> keyDes,
- Class<? extends Deserializer<V>> valDes, Pattern topics) {
- this(bootstrapServers, keyDes, valDes, new
PatternSubscription(topics));
- }
-
- public Builder(String bootstrapServers, Class<? extends
Deserializer<K>> keyDes,
- Class<? extends Deserializer<V>> valDes, Subscription
subscription) {
- this(bootstrapServers, null, keyDes, null, valDes,
subscription);
- }
-
- private Builder(String bootstrapServers,
SerializableDeserializer<K> keyDes,
- Class<? extends Deserializer<K>> keyDesClazz,
- SerializableDeserializer<V> valDes, Class<? extends
Deserializer<V>> valDesClazz, Subscription subscription) {
+ /**
+ * Create a KafkaSpoutConfig builder.
+ * @param bootstrapServers The bootstrap servers the consumer will
use
+ * @param subscription The subscription defining which topics and
partitions each spout instance will read.
+ */
+ public Builder(String bootstrapServers, Subscription subscription)
{
kafkaProps = new HashMap<>();
if (bootstrapServers == null || bootstrapServers.isEmpty()) {
throw new IllegalArgumentException("bootstrap servers
cannot be null");
}
kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers);
- this.keyDes = keyDes;
- this.keyDesClazz = keyDesClazz;
- this.valueDes = valDes;
- this.valueDesClazz = valDesClazz;
this.subscription = subscription;
- this.translator = new DefaultRecordTranslator<K,V>();
- }
-
- private Builder(Builder<?, ?> builder, SerializableDeserializer<K>
keyDes, Class<? extends Deserializer<K>> keyDesClazz,
- SerializableDeserializer<V> valueDes, Class<? extends
Deserializer<V>> valueDesClazz) {
- this.kafkaProps = new HashMap<>(builder.kafkaProps);
- this.subscription = builder.subscription;
- this.pollTimeoutMs = builder.pollTimeoutMs;
- this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs;
- this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy;
- this.maxUncommittedOffsets = builder.maxUncommittedOffsets;
- //this could result in a lot of class case exceptions at
runtime,
- // but because some translators will work no matter what the
generics
- // are I thought it best not to force someone to reset the
translator
- // when they change the key/value types.
- this.translator = (RecordTranslator<K, V>) builder.translator;
- this.retryService = builder.retryService;
- this.keyDes = keyDes;
- this.keyDesClazz = keyDesClazz;
- this.valueDes = valueDes;
- this.valueDesClazz = valueDesClazz;
- }
-
- /**
- * Specifying this key deserializer overrides the property
key.deserializer. If you have
- * set a custom RecordTranslator before calling this it may result
in class cast
- * exceptions at runtime.
- */
- public <NewKeyT> Builder<NewKeyT,V>
setKey(SerializableDeserializer<NewKeyT> keyDeserializer) {
- return new Builder<>(this, keyDeserializer, null, valueDes,
valueDesClazz);
- }
-
- /**
- * Specify a class that can be instantiated to create a
key.deserializer
- * This is the same as setting key.deserializer, but overrides it.
If you have
- * set a custom RecordTranslator before calling this it may result
in class cast
- * exceptions at runtime.
- */
- public <NewKeyT> Builder<NewKeyT, V> setKey(Class<? extends
Deserializer<NewKeyT>> clazz) {
- return new Builder<>(this, null, clazz, valueDes,
valueDesClazz);
- }
-
- /**
- * Specifying this value deserializer overrides the property
value.deserializer. If you have
- * set a custom RecordTranslator before calling this it may result
in class cast
- * exceptions at runtime.
- */
- public <NewValueT> Builder<K,NewValueT>
setValue(SerializableDeserializer<NewValueT> valueDeserializer) {
- return new Builder<>(this, keyDes, keyDesClazz,
valueDeserializer, null);
+ this.translator = new DefaultRecordTranslator<>();
}
/**
- * Specify a class that can be instantiated to create a
value.deserializer
- * This is the same as setting value.deserializer, but overrides
it. If you have
- * set a custom RecordTranslator before calling this it may result
in class cast
- * exceptions at runtime.
- */
- public <NewValueT> Builder<K,NewValueT> setValue(Class<? extends
Deserializer<NewValueT>> clazz) {
- return new Builder<>(this, keyDes, keyDesClazz, null, clazz);
- }
-
- /**
- * Set a Kafka property config.
+ * Set a Kafka consumer property config.
*/
public Builder<K,V> setProp(String key, Object value) {
kafkaProps.put(key, value);
return this;
}
/**
- * Set multiple Kafka property configs.
+ * Set multiple Kafka consumer property configs.
*/
public Builder<K,V> setProp(Map<String, Object> props) {
kafkaProps.putAll(props);
return this;
}
/**
- * Set multiple Kafka property configs.
+ * Set multiple Kafka consumer property configs.
*/
public Builder<K,V> setProp(Properties props) {
for (String name: props.stringPropertyNames()) {
--- End diff --
Nice catch. How about we use putAll to put the properties into kafkaProps
instead? I'm not sure that I like the method filtering the input properties,
I'd rather throw an error if the key is not a String. At the same time I don't
think it should be KafkaSpoutConfig's job to validate KafkaConsumer parameters.
The KafkaConsumer will handle throwing the error if the keys are not Strings
https://github.com/apache/kafka/blob/efb060c57f05d1d586bb14c016b0187c60f8e994/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java#L60
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---