Hi All,
While working with KStream/KStreamImp I discovered that there does not seem
to be any way to connect the results of the KStream.process method with a
sink node.
I'd like to propose an addition to the API a "processTo" method.
I've looked at and used the "transform", "reduceByKey" and "aggregateByKey"
methods, but "processTo" would work like a more general purpose collector
terminating the KStream and allow for writing out results to an arbitrary
topic (regardless of key type).
I've done a quick prototype and some initial testing locally on my fork.
If you think this could be useful I can add unit tests and create a PR.
I've included the proposed code changes and the test driver code below
KStream.java additions
void processTo(String topic, ProcessorSupplier<K,V> processorSupplier,
String... stateStoreNames);
void processTo(String topic, ProcessorSupplier<K,V> processorSupplier,
Serializer<K> keySerializer, Serializer<V> valSerializer, String...
stateStoreNames);
KStreamImpl.java additions
@Override
public void processTo(String topic, ProcessorSupplier<K, V>
processorSupplier, String... stateStoreNames) {
processTo(topic, processorSupplier, null, null, stateStoreNames);
}
@SuppressWarnings("unchecked")
@Override
public void processTo(String topic,ProcessorSupplier<K, V>
processorSupplier, Serializer<K> keySerializer, Serializer<V>
valSerializer, String... stateStoreNames) {
String processorName = topology.newName(PROCESSOR_NAME);
String sinkName = topology.newName(SINK_NAME);
StreamPartitioner<K, V> streamPartitioner = null;
if (keySerializer != null && keySerializer instanceof
WindowedSerializer) {
WindowedSerializer<Object> windowedSerializer =
(WindowedSerializer<Object>) keySerializer;
streamPartitioner = (StreamPartitioner<K, V>) new
WindowedStreamPartitioner<Object, V>(windowedSerializer);
}
topology.addProcessor(processorName, processorSupplier, this.name);
topology.addSink(sinkName,topic, keySerializer, valSerializer,
streamPartitioner, processorName);
topology.connectProcessorAndStateStores(processorName,
stateStoreNames);
}
Test Driver
public class TestDriver {
public static void main(String[] args) {
StreamsConfig config = new StreamsConfig(getProperties());
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String,String> transactionKStream =
kStreamBuilder.stream("input");
transactionKStream.processTo("output",UpperCaseProcessor::new);
System.out.println("Starting process-to Example");
KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder,config);
kafkaStreams.start();
System.out.println("Now started process-to Example");
}
private static class UpperCaseProcessor extends
AbstractProcessor<String, String> {
@Override
public void process(String key, String value) {
context().forward(key, value.toUpperCase());
context().commit();
}
}
private static Properties getProperties() {
Properties props = new Properties();
props.put(StreamsConfig.CLIENT_ID_CONFIG, "Process-to-test");
props.put("group.id", "test-streams");
props.put(StreamsConfig.JOB_ID_CONFIG, "processor_to_test");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
WallclockTimestampExtractor.class);
return props;
}
}