Thanks Bill for the detailed description of your use case.

I think part of the issue is that today KStream does not allow non-keyed
aggregation, and hence users need to call a `map` function if they want to
do aggregations on other fields. We did this by design for KStream since if
users write the processor of your own for this case, they also need to
manually extract the aggregation key while updating the state store, which
is exactly what `map` would do, and hence making it explicit would help
guiding the programming. This `map` function does not generate any overhead
in terms of storage, all it does is extract the key for the use of the next
processor.

As for your proposed change, I think we need to be a bit careful about the
return types here if we want to add a return KStream<>, since its key value
type would be different from input KStream<K, V> but rather KStream<K1, V1>
for your case. But process() function itself does not have any method to
enforce typing on the returned value.

Guozhang



On Sun, Mar 13, 2016 at 3:38 PM, Bill Bejeck <bbej...@gmail.com> wrote:

> Hi Guozhang,
>
> Possibly, but the use case I'm working with  is having a of collector
> object , for aggregate statistics for example, that would output results
> intermittently (via punctuate).
>
> The issue for me is that 'transform(..)'  returns a key-value pair for each
> message, possibly of a different type.
>
> I've achieved something similar in the KStream api using the form of
>  map(...).aggregateByKey(....).to(...)  but using that approach I need to
> map each message to an intermediate form and do the periodic aggregations
> of "stats" objects.
>
> What I'd really like is a way to attach a sink to a processor.
>
> With that in mind, instead of introducing a "proccessTo" method, another
> option could to change the return type of "process" from void to
> KStream<K,V>.
>
> Then the use case becomes 'process(..).to(...)', similar to
> 'transform(..).to(..).
>
> I've made those changes locally and everything compiles fine and running my
> simple drive program achieves the desired results.
>
> I know I could be splitting hairs here,  but in my opinion, it would be
> nice to have.
>
> Thanks for your time!
>
> Bill
>
>
> On Sun, Mar 13, 2016 at 4:28 PM, Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Hello Bill,
> >
> > We added transform() together with process() to support any
> user-customized
> > stateful processor that can still concatenate to another KStream.
> >
> > So for your case, would `transform(...).to(topic)` provide the same
> > functionality as "processTo(topic, ...)"?
> >
> > Guozhang
> >
> >
> > On Sat, Mar 12, 2016 at 12:20 PM, Bill Bejeck <bbej...@gmail.com> wrote:
> >
> > > Hi All,
> > >
> > > While working with KStream/KStreamImp I discovered that there does not
> > seem
> > > to be any way to connect the results of the KStream.process method
> with a
> > > sink node.
> > >
> > > I'd like to propose an addition to the API a "processTo" method.
> > >
> > > I've looked at and used the "transform", "reduceByKey" and
> > "aggregateByKey"
> > >  methods, but "processTo" would work like a more general purpose
> > collector
> > > terminating the KStream and allow for writing out results to an
> arbitrary
> > > topic (regardless of key type).
> > >
> > >
> > >  I've done a quick prototype and some  initial testing locally on my
> > fork.
> > > If you think this could be useful I can add unit tests and create a PR.
> > > I've included the proposed code changes and the test driver code below
> > >
> > >
> > > KStream.java additions
> > >
> > > void processTo(String topic,  ProcessorSupplier<K,V> processorSupplier,
> > > String... stateStoreNames);
> > >
> > > void processTo(String topic, ProcessorSupplier<K,V> processorSupplier,
> > >  Serializer<K> keySerializer, Serializer<V> valSerializer, String...
> > > stateStoreNames);
> > >
> > >
> > > KStreamImpl.java additions
> > >
> > >  @Override
> > >     public void processTo(String topic, ProcessorSupplier<K, V>
> > > processorSupplier,  String... stateStoreNames) {
> > >         processTo(topic, processorSupplier, null, null,
> stateStoreNames);
> > >     }
> > >
> > >     @SuppressWarnings("unchecked")
> > >     @Override
> > >     public void processTo(String topic,ProcessorSupplier<K, V>
> > > processorSupplier,  Serializer<K> keySerializer, Serializer<V>
> > > valSerializer, String... stateStoreNames) {
> > >         String processorName = topology.newName(PROCESSOR_NAME);
> > >         String sinkName = topology.newName(SINK_NAME);
> > >         StreamPartitioner<K, V> streamPartitioner = null;
> > >
> > >         if (keySerializer != null && keySerializer instanceof
> > > WindowedSerializer) {
> > >             WindowedSerializer<Object> windowedSerializer =
> > > (WindowedSerializer<Object>) keySerializer;
> > >             streamPartitioner = (StreamPartitioner<K, V>) new
> > > WindowedStreamPartitioner<Object, V>(windowedSerializer);
> > >         }
> > >
> > >         topology.addProcessor(processorName, processorSupplier,
> > this.name
> > > );
> > >         topology.addSink(sinkName,topic, keySerializer, valSerializer,
> > > streamPartitioner, processorName);
> > >         topology.connectProcessorAndStateStores(processorName,
> > > stateStoreNames);
> > >     }
> > >
> > >
> > > Test Driver
> > >
> > > public class TestDriver {
> > >
> > >     public static void main(String[] args) {
> > >         StreamsConfig config = new StreamsConfig(getProperties());
> > >         KStreamBuilder kStreamBuilder = new KStreamBuilder();
> > >
> > >         KStream<String,String> transactionKStream =
> > >  kStreamBuilder.stream("input");
> > >
> > >         transactionKStream.processTo("output",UpperCaseProcessor::new);
> > >
> > >         System.out.println("Starting process-to Example");
> > >         KafkaStreams kafkaStreams = new
> > > KafkaStreams(kStreamBuilder,config);
> > >         kafkaStreams.start();
> > >         System.out.println("Now started process-to Example");
> > >     }
> > >
> > >     private static class UpperCaseProcessor extends
> > > AbstractProcessor<String, String> {
> > >         @Override
> > >         public void process(String key, String value) {
> > >             context().forward(key, value.toUpperCase());
> > >             context().commit();
> > >         }
> > >     }
> > >
> > >     private static Properties getProperties() {
> > >         Properties props = new Properties();
> > >         props.put(StreamsConfig.CLIENT_ID_CONFIG, "Process-to-test");
> > >         props.put("group.id", "test-streams");
> > >         props.put(StreamsConfig.JOB_ID_CONFIG, "processor_to_test");
> > >         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> > > "localhost:9092");
> > >         props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG,
> > > "localhost:2181");
> > >         props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
> > >         props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG,
> > > StringSerializer.class);
> > >         props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG,
> > > StringSerializer.class);
> > >         props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG,
> > > StringDeserializer.class);
> > >         props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
> > > StringDeserializer.class);
> > >         props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
> > > WallclockTimestampExtractor.class);
> > >         return props;
> > >     }
> > >
> > > }
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Reply via email to