Hello Bill,

We added transform() together with process() to support any user-customized
stateful processor that can still concatenate to another KStream.

So for your case, would `transform(...).to(topic)` provide the same
functionality as "processTo(topic, ...)"?

Guozhang


On Sat, Mar 12, 2016 at 12:20 PM, Bill Bejeck <bbej...@gmail.com> wrote:

> Hi All,
>
> While working with KStream/KStreamImp I discovered that there does not seem
> to be any way to connect the results of the KStream.process method with a
> sink node.
>
> I'd like to propose an addition to the API a "processTo" method.
>
> I've looked at and used the "transform", "reduceByKey" and "aggregateByKey"
>  methods, but "processTo" would work like a more general purpose collector
> terminating the KStream and allow for writing out results to an arbitrary
> topic (regardless of key type).
>
>
>  I've done a quick prototype and some  initial testing locally on my fork.
> If you think this could be useful I can add unit tests and create a PR.
> I've included the proposed code changes and the test driver code below
>
>
> KStream.java additions
>
> void processTo(String topic,  ProcessorSupplier<K,V> processorSupplier,
> String... stateStoreNames);
>
> void processTo(String topic, ProcessorSupplier<K,V> processorSupplier,
>  Serializer<K> keySerializer, Serializer<V> valSerializer, String...
> stateStoreNames);
>
>
> KStreamImpl.java additions
>
>  @Override
>     public void processTo(String topic, ProcessorSupplier<K, V>
> processorSupplier,  String... stateStoreNames) {
>         processTo(topic, processorSupplier, null, null, stateStoreNames);
>     }
>
>     @SuppressWarnings("unchecked")
>     @Override
>     public void processTo(String topic,ProcessorSupplier<K, V>
> processorSupplier,  Serializer<K> keySerializer, Serializer<V>
> valSerializer, String... stateStoreNames) {
>         String processorName = topology.newName(PROCESSOR_NAME);
>         String sinkName = topology.newName(SINK_NAME);
>         StreamPartitioner<K, V> streamPartitioner = null;
>
>         if (keySerializer != null && keySerializer instanceof
> WindowedSerializer) {
>             WindowedSerializer<Object> windowedSerializer =
> (WindowedSerializer<Object>) keySerializer;
>             streamPartitioner = (StreamPartitioner<K, V>) new
> WindowedStreamPartitioner<Object, V>(windowedSerializer);
>         }
>
>         topology.addProcessor(processorName, processorSupplier, this.name
> );
>         topology.addSink(sinkName,topic, keySerializer, valSerializer,
> streamPartitioner, processorName);
>         topology.connectProcessorAndStateStores(processorName,
> stateStoreNames);
>     }
>
>
> Test Driver
>
> public class TestDriver {
>
>     public static void main(String[] args) {
>         StreamsConfig config = new StreamsConfig(getProperties());
>         KStreamBuilder kStreamBuilder = new KStreamBuilder();
>
>         KStream<String,String> transactionKStream =
>  kStreamBuilder.stream("input");
>
>         transactionKStream.processTo("output",UpperCaseProcessor::new);
>
>         System.out.println("Starting process-to Example");
>         KafkaStreams kafkaStreams = new
> KafkaStreams(kStreamBuilder,config);
>         kafkaStreams.start();
>         System.out.println("Now started process-to Example");
>     }
>
>     private static class UpperCaseProcessor extends
> AbstractProcessor<String, String> {
>         @Override
>         public void process(String key, String value) {
>             context().forward(key, value.toUpperCase());
>             context().commit();
>         }
>     }
>
>     private static Properties getProperties() {
>         Properties props = new Properties();
>         props.put(StreamsConfig.CLIENT_ID_CONFIG, "Process-to-test");
>         props.put("group.id", "test-streams");
>         props.put(StreamsConfig.JOB_ID_CONFIG, "processor_to_test");
>         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092");
>         props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG,
> "localhost:2181");
>         props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
>         props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG,
> StringSerializer.class);
>         props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG,
> StringSerializer.class);
>         props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG,
> StringDeserializer.class);
>         props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
> StringDeserializer.class);
>         props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
> WallclockTimestampExtractor.class);
>         return props;
>     }
>
> }
>



-- 
-- Guozhang

Reply via email to