Github user srdo commented on a diff in the pull request:
https://github.com/apache/storm/pull/2476#discussion_r158125370
--- Diff:
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaStateFactory.java
---
@@ -28,33 +28,34 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class TridentKafkaStateFactory implements StateFactory {
+public class TridentKafkaStateFactory<K, V> implements StateFactory {
+ private static final long serialVersionUID = -3613240970062343385L;
private static final Logger LOG =
LoggerFactory.getLogger(TridentKafkaStateFactory.class);
- private TridentTupleToKafkaMapper mapper;
+ private TridentTupleToKafkaMapper<K, V> mapper;
private KafkaTopicSelector topicSelector;
private Properties producerProperties = new Properties();
- public TridentKafkaStateFactory
withTridentTupleToKafkaMapper(TridentTupleToKafkaMapper mapper) {
+ public TridentKafkaStateFactory<K, V>
withTridentTupleToKafkaMapper(TridentTupleToKafkaMapper<K, V> mapper) {
this.mapper = mapper;
return this;
}
- public TridentKafkaStateFactory
withKafkaTopicSelector(KafkaTopicSelector selector) {
+ public TridentKafkaStateFactory<K, V>
withKafkaTopicSelector(KafkaTopicSelector selector) {
this.topicSelector = selector;
return this;
}
- public TridentKafkaStateFactory withProducerProperties(Properties
props) {
+ public TridentKafkaStateFactory<K, V>
withProducerProperties(Properties props) {
this.producerProperties = props;
return this;
}
@Override
public State makeState(Map<String, Object> conf, IMetricsContext
metrics, int partitionIndex, int numPartitions) {
LOG.info("makeState(partitonIndex={}, numpartitions={}",
partitionIndex, numPartitions);
- TridentKafkaState state = new TridentKafkaState()
+ TridentKafkaState<K, V> state = new TridentKafkaState<K, V>()
--- End diff --
Can use `<>` on the right hand side
---