Hi Alessandro, Have you considered using `transform()` (actually in your case you should use `transformValues()`) instead of `.process()`? `transform()` and `transformValues()` are stateful operations similar to `.process` but they return a `KStream`. On a `KStream` you can then apply a windowed aggregation.
Hope that helps. Best, Bruno On Fri, Apr 12, 2019 at 4:31 PM Alessandro Tagliapietra < tagliapietra.alessan...@gmail.com> wrote: > Hi there, > > I'm just starting with Kafka and I'm trying to create a stream processor > that in multiple stages: > - filters messages using a kv store so that only messages with higher > timestamp gets processed > - aggregates the message metrics by minute giving e.g. the avg of those > metrics in that minute > > The message is simple, the key is the sensor ID and the value is e.g. { > timestamp: UNIX_TIMESTAMP, speed: INT }. > > I've started by creating a processor to use the kv store and filter old > messages: > > > https://gist.github.com/alex88/7d229698546971452c3efc862fb4d3fd#file-timestampfilter-java > > Then I was trying to implement windowing, I saw very nice windowing > examples for the DSL but none for the Processor API (only a small reference > to the windowed store), can someone point me in the right direction? > > Now, since I wasn't able to find any example I tried to use the DSL but > haven't found a way to use my processor with it, I saw this > > https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#applying-processors-and-transformers-processor-api-integration > but > it explains mostly transformers not processors. I also saw after that the > example usage of the processor but `.process(...)` returns void, so I > cannot have a KStream from a processor? > > Thank you all in advance > > -- > Alessandro Tagliapietra >