Hello Bill, We added transform() together with process() to support any user-customized stateful processor that can still concatenate to another KStream.
So for your case, would `transform(...).to(topic)` provide the same functionality as "processTo(topic, ...)"? Guozhang On Sat, Mar 12, 2016 at 12:20 PM, Bill Bejeck <[email protected]> wrote: > Hi All, > > While working with KStream/KStreamImp I discovered that there does not seem > to be any way to connect the results of the KStream.process method with a > sink node. > > I'd like to propose an addition to the API a "processTo" method. > > I've looked at and used the "transform", "reduceByKey" and "aggregateByKey" > methods, but "processTo" would work like a more general purpose collector > terminating the KStream and allow for writing out results to an arbitrary > topic (regardless of key type). > > > I've done a quick prototype and some initial testing locally on my fork. > If you think this could be useful I can add unit tests and create a PR. > I've included the proposed code changes and the test driver code below > > > KStream.java additions > > void processTo(String topic, ProcessorSupplier<K,V> processorSupplier, > String... stateStoreNames); > > void processTo(String topic, ProcessorSupplier<K,V> processorSupplier, > Serializer<K> keySerializer, Serializer<V> valSerializer, String... > stateStoreNames); > > > KStreamImpl.java additions > > @Override > public void processTo(String topic, ProcessorSupplier<K, V> > processorSupplier, String... stateStoreNames) { > processTo(topic, processorSupplier, null, null, stateStoreNames); > } > > @SuppressWarnings("unchecked") > @Override > public void processTo(String topic,ProcessorSupplier<K, V> > processorSupplier, Serializer<K> keySerializer, Serializer<V> > valSerializer, String... stateStoreNames) { > String processorName = topology.newName(PROCESSOR_NAME); > String sinkName = topology.newName(SINK_NAME); > StreamPartitioner<K, V> streamPartitioner = null; > > if (keySerializer != null && keySerializer instanceof > WindowedSerializer) { > WindowedSerializer<Object> windowedSerializer = > (WindowedSerializer<Object>) keySerializer; > streamPartitioner = (StreamPartitioner<K, V>) new > WindowedStreamPartitioner<Object, V>(windowedSerializer); > } > > topology.addProcessor(processorName, processorSupplier, this.name > ); > topology.addSink(sinkName,topic, keySerializer, valSerializer, > streamPartitioner, processorName); > topology.connectProcessorAndStateStores(processorName, > stateStoreNames); > } > > > Test Driver > > public class TestDriver { > > public static void main(String[] args) { > StreamsConfig config = new StreamsConfig(getProperties()); > KStreamBuilder kStreamBuilder = new KStreamBuilder(); > > KStream<String,String> transactionKStream = > kStreamBuilder.stream("input"); > > transactionKStream.processTo("output",UpperCaseProcessor::new); > > System.out.println("Starting process-to Example"); > KafkaStreams kafkaStreams = new > KafkaStreams(kStreamBuilder,config); > kafkaStreams.start(); > System.out.println("Now started process-to Example"); > } > > private static class UpperCaseProcessor extends > AbstractProcessor<String, String> { > @Override > public void process(String key, String value) { > context().forward(key, value.toUpperCase()); > context().commit(); > } > } > > private static Properties getProperties() { > Properties props = new Properties(); > props.put(StreamsConfig.CLIENT_ID_CONFIG, "Process-to-test"); > props.put("group.id", "test-streams"); > props.put(StreamsConfig.JOB_ID_CONFIG, "processor_to_test"); > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9092"); > props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, > "localhost:2181"); > props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1); > props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, > StringSerializer.class); > props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, > StringSerializer.class); > props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, > StringDeserializer.class); > props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, > StringDeserializer.class); > props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, > WallclockTimestampExtractor.class); > return props; > } > > } > -- -- Guozhang
