Hi Alessandro, Have a look at this Kafka Usage Pattern for computing averages without using an ArrayList.
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns#KafkaStreamUsagePatterns-Howtocomputean(windowed)average? The advantages of this pattern over the ArrayList approach is the reduced space needed to compute the aggregate. Note that you will still need to implement a SerDe. However, the SerDe should be a bit easier to implement than a SerDe for an ArrayList. Hope that helps. Best, Bruno On Mon, Apr 15, 2019 at 4:57 PM Alessandro Tagliapietra < tagliapietra.alessan...@gmail.com> wrote: > Sorry but it seemed harder than I thought, > > to have the custom aggregation working I need to get an ArrayList of all > the values in the window, so far my aggregate DSL method creates an > ArrayList on the initializer and adds each value to the list in the > aggregator. > Then I think I'll have to provide a serder to change the output type of > that method. > I was looking at > > https://stackoverflow.com/questions/46365884/issue-with-arraylist-serde-in-kafka-streams-api > but > that seems more towards a list of longs and already uses longSerde. > I'm currently trying to implement another avro model that has a field of > type array so I can use the regular avro serializer to implement this. > Should I create my own serdes instead or is this the right way? > > Thank you in advance > > -- > Alessandro Tagliapietra > > On Mon, Apr 15, 2019 at 3:42 PM Alessandro Tagliapietra < > tagliapietra.alessan...@gmail.com> wrote: > > > Thank you Bruno and Matthias, > > > > I've modified the transformer to implement the ValueTransformerWithKey > > interface and everything is working fine. > > I've now to window the data and manually aggregate each window data since > > I've to do some averages and sum of differences. > > So far I've just having some issues with message types since I'm changing > > the data type when aggregating the window but I think it's an easy > problem. > > > > Thank you again > > Best > > > > -- > > Alessandro Tagliapietra > > > > On Sun, Apr 14, 2019 at 11:26 AM Bruno Cadonna <br...@confluent.io> > wrote: > > > >> Hi Alessandro, > >> > >> the `TransformSupplier` is internally wrapped with a > `ProcessorSupplier`, > >> so the statement > >> > >> `transform` is essentially equivalent to adding the Transformer via > >> Topology#addProcessor() to your processor topology > >> > >> is correct. > >> > >> If you do not change the key, you should definitely use one of the > >> overloads of `transformValues` to avoid internal data redistribution. In > >> your case the overload with `ValueTransformerWithKeySupplier` as > suggested > >> by Matthias would fit. > >> > >> Best, > >> Bruno > >> > >> On Sat, Apr 13, 2019 at 12:51 PM Matthias J. Sax <matth...@confluent.io > > > >> wrote: > >> > >> > There is also `ValueTransformerWithKey` that gives you read-only acess > >> > to the key. > >> > > >> > -Matthias > >> > > >> > On 4/12/19 5:34 PM, Alessandro Tagliapietra wrote: > >> > > Hi Bruno, > >> > > > >> > > Thank you for the quick answer. > >> > > > >> > > I'm actually trying to do that since it seems there is really no way > >> to > >> > > have it use `Processor<K, V>`. > >> > > I just wanted (if that would've made any sense) to use the Processor > >> in > >> > > both DSL and non-DSL pipelines. > >> > > > >> > > Anyway, regarding `transformValues()` I don't think I can use it as > I > >> > need > >> > > the message key since that is the discriminating value for the > filter > >> (I > >> > > want to exclude old values per sensor ID so per message key) > >> > > > >> > > Right now I've this > >> > > > >> > > >> > https://gist.github.com/alex88/7d229698546971452c3efc862fb4d3fd#file-timestampfiltertransformer-java > >> > > and > >> > > i'm using it with `transform()` . > >> > > > >> > > One thing I've found confusing is this > >> > > > >> > > >> > https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#streams-developer-guide-dsl-process > >> > > > >> > > transform is essentially equivalent to adding the Transformer via > >> > >> Topology#addProcessor() to yourprocessor topology > >> > >> < > >> > > >> > https://docs.confluent.io/current/streams/concepts.html#streams-concepts-processor-topology > >> > > > >> > >> . > >> > > > >> > > > >> > > is it? Doesn't `transform` need a TransformSupplier while > >> `addProcessor` > >> > > uses a ProcessorSupplier? > >> > > > >> > > Thank you again for your help > >> > > > >> > > -- > >> > > Alessandro Tagliapietra > >> > > > >> > > > >> > > On Fri, Apr 12, 2019 at 5:04 PM Bruno Cadonna <br...@confluent.io> > >> > wrote: > >> > > > >> > >> Hi Alessandro, > >> > >> > >> > >> Have you considered using `transform()` (actually in your case you > >> > should > >> > >> use `transformValues()`) instead of `.process()`? `transform()` and > >> > >> `transformValues()` are stateful operations similar to `.process` > but > >> > they > >> > >> return a `KStream`. On a `KStream` you can then apply a windowed > >> > >> aggregation. > >> > >> > >> > >> Hope that helps. > >> > >> > >> > >> Best, > >> > >> Bruno > >> > >> > >> > >> > >> > >> On Fri, Apr 12, 2019 at 4:31 PM Alessandro Tagliapietra < > >> > >> tagliapietra.alessan...@gmail.com> wrote: > >> > >> > >> > >>> Hi there, > >> > >>> > >> > >>> I'm just starting with Kafka and I'm trying to create a stream > >> > processor > >> > >>> that in multiple stages: > >> > >>> - filters messages using a kv store so that only messages with > >> higher > >> > >>> timestamp gets processed > >> > >>> - aggregates the message metrics by minute giving e.g. the avg of > >> > those > >> > >>> metrics in that minute > >> > >>> > >> > >>> The message is simple, the key is the sensor ID and the value is > >> e.g. { > >> > >>> timestamp: UNIX_TIMESTAMP, speed: INT }. > >> > >>> > >> > >>> I've started by creating a processor to use the kv store and > filter > >> old > >> > >>> messages: > >> > >>> > >> > >>> > >> > >>> > >> > >> > >> > > >> > https://gist.github.com/alex88/7d229698546971452c3efc862fb4d3fd#file-timestampfilter-java > >> > >>> > >> > >>> Then I was trying to implement windowing, I saw very nice > windowing > >> > >>> examples for the DSL but none for the Processor API (only a small > >> > >> reference > >> > >>> to the windowed store), can someone point me in the right > direction? > >> > >>> > >> > >>> Now, since I wasn't able to find any example I tried to use the > DSL > >> but > >> > >>> haven't found a way to use my processor with it, I saw this > >> > >>> > >> > >>> > >> > >> > >> > > >> > https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#applying-processors-and-transformers-processor-api-integration > >> > >>> but > >> > >>> it explains mostly transformers not processors. I also saw after > >> that > >> > the > >> > >>> example usage of the processor but `.process(...)` returns void, > so > >> I > >> > >>> cannot have a KStream from a processor? > >> > >>> > >> > >>> Thank you all in advance > >> > >>> > >> > >>> -- > >> > >>> Alessandro Tagliapietra > >> > >>> > >> > >> > >> > > > >> > > >> > > >> > > >