Hi Alessandro,

Have a look at this Kafka Usage Pattern for computing averages without
using an ArrayList.

https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns#KafkaStreamUsagePatterns-Howtocomputean(windowed)average?

The advantages of this pattern over the ArrayList approach is the reduced
space needed to compute the aggregate. Note that you will still need to
implement a SerDe. However, the SerDe should be a bit easier to implement
than a SerDe for an ArrayList.

Hope that helps.

Best,
Bruno

On Mon, Apr 15, 2019 at 4:57 PM Alessandro Tagliapietra <
tagliapietra.alessan...@gmail.com> wrote:

> Sorry but it seemed harder than I thought,
>
> to have the custom aggregation working I need to get an ArrayList of all
> the values in the window, so far my aggregate DSL method creates an
> ArrayList on the initializer and adds each value to the list in the
> aggregator.
> Then I think I'll have to provide a serder to change the output type of
> that method.
> I was looking at
>
> https://stackoverflow.com/questions/46365884/issue-with-arraylist-serde-in-kafka-streams-api
> but
> that seems more towards a list of longs and already uses longSerde.
> I'm currently trying to implement another avro model that has a field of
> type array so I can use the regular avro serializer to implement this.
> Should I create my own serdes instead or is this the right way?
>
> Thank you in advance
>
> --
> Alessandro Tagliapietra
>
> On Mon, Apr 15, 2019 at 3:42 PM Alessandro Tagliapietra <
> tagliapietra.alessan...@gmail.com> wrote:
>
> > Thank you Bruno and Matthias,
> >
> > I've modified the transformer to implement the ValueTransformerWithKey
> > interface and everything is working fine.
> > I've now to window the data and manually aggregate each window data since
> > I've to do some averages and sum of differences.
> > So far I've just having some issues with message types since I'm changing
> > the data type when aggregating the window but I think it's an easy
> problem.
> >
> > Thank you again
> > Best
> >
> > --
> > Alessandro Tagliapietra
> >
> > On Sun, Apr 14, 2019 at 11:26 AM Bruno Cadonna <br...@confluent.io>
> wrote:
> >
> >> Hi Alessandro,
> >>
> >> the `TransformSupplier` is internally wrapped with a
> `ProcessorSupplier`,
> >> so the statement
> >>
> >> `transform` is essentially equivalent to adding the Transformer via
> >> Topology#addProcessor() to your processor topology
> >>
> >> is correct.
> >>
> >> If you do not change the key, you should definitely use one of the
> >> overloads of `transformValues` to avoid internal data redistribution. In
> >> your case the overload with `ValueTransformerWithKeySupplier` as
> suggested
> >> by Matthias would fit.
> >>
> >> Best,
> >> Bruno
> >>
> >> On Sat, Apr 13, 2019 at 12:51 PM Matthias J. Sax <matth...@confluent.io
> >
> >> wrote:
> >>
> >> > There is also `ValueTransformerWithKey` that gives you read-only acess
> >> > to the key.
> >> >
> >> > -Matthias
> >> >
> >> > On 4/12/19 5:34 PM, Alessandro Tagliapietra wrote:
> >> > > Hi Bruno,
> >> > >
> >> > > Thank you for the quick answer.
> >> > >
> >> > > I'm actually trying to do that since it seems there is really no way
> >> to
> >> > > have it use `Processor<K, V>`.
> >> > > I just wanted (if that would've made any sense) to use the Processor
> >> in
> >> > > both DSL and non-DSL pipelines.
> >> > >
> >> > > Anyway, regarding `transformValues()` I don't think I can use it as
> I
> >> > need
> >> > > the message key since that is the discriminating value for the
> filter
> >> (I
> >> > > want to exclude old values per sensor ID so per message key)
> >> > >
> >> > > Right now I've this
> >> > >
> >> >
> >>
> https://gist.github.com/alex88/7d229698546971452c3efc862fb4d3fd#file-timestampfiltertransformer-java
> >> > > and
> >> > > i'm using it with `transform()` .
> >> > >
> >> > > One thing I've found confusing is this
> >> > >
> >> >
> >>
> https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#streams-developer-guide-dsl-process
> >> > >
> >> > > transform is essentially equivalent to adding the Transformer via
> >> > >> Topology#addProcessor() to yourprocessor topology
> >> > >> <
> >> >
> >>
> https://docs.confluent.io/current/streams/concepts.html#streams-concepts-processor-topology
> >> > >
> >> > >> .
> >> > >
> >> > >
> >> > > is it? Doesn't `transform` need a TransformSupplier while
> >> `addProcessor`
> >> > > uses a ProcessorSupplier?
> >> > >
> >> > > Thank you again for your help
> >> > >
> >> > > --
> >> > > Alessandro Tagliapietra
> >> > >
> >> > >
> >> > > On Fri, Apr 12, 2019 at 5:04 PM Bruno Cadonna <br...@confluent.io>
> >> > wrote:
> >> > >
> >> > >> Hi Alessandro,
> >> > >>
> >> > >> Have you considered using `transform()` (actually in your case you
> >> > should
> >> > >> use `transformValues()`) instead of `.process()`? `transform()` and
> >> > >> `transformValues()` are stateful operations similar to `.process`
> but
> >> > they
> >> > >> return a `KStream`. On a `KStream` you can then apply a windowed
> >> > >> aggregation.
> >> > >>
> >> > >> Hope that helps.
> >> > >>
> >> > >> Best,
> >> > >> Bruno
> >> > >>
> >> > >>
> >> > >> On Fri, Apr 12, 2019 at 4:31 PM Alessandro Tagliapietra <
> >> > >> tagliapietra.alessan...@gmail.com> wrote:
> >> > >>
> >> > >>> Hi there,
> >> > >>>
> >> > >>> I'm just starting with Kafka and I'm trying to create a stream
> >> > processor
> >> > >>> that in multiple stages:
> >> > >>>  - filters messages using a kv store so that only messages with
> >> higher
> >> > >>> timestamp gets processed
> >> > >>>  - aggregates the message metrics by minute giving e.g. the avg of
> >> > those
> >> > >>> metrics in that minute
> >> > >>>
> >> > >>> The message is simple, the key is the sensor ID and the value is
> >> e.g. {
> >> > >>> timestamp: UNIX_TIMESTAMP, speed: INT }.
> >> > >>>
> >> > >>> I've started by creating a processor to use the kv store and
> filter
> >> old
> >> > >>> messages:
> >> > >>>
> >> > >>>
> >> > >>>
> >> > >>
> >> >
> >>
> https://gist.github.com/alex88/7d229698546971452c3efc862fb4d3fd#file-timestampfilter-java
> >> > >>>
> >> > >>> Then I was trying to implement windowing, I saw very nice
> windowing
> >> > >>> examples for the DSL but none for the Processor API (only a small
> >> > >> reference
> >> > >>> to the windowed store), can someone point me in the right
> direction?
> >> > >>>
> >> > >>> Now, since I wasn't able to find any example I tried to use the
> DSL
> >> but
> >> > >>> haven't found a way to use my processor with it, I saw this
> >> > >>>
> >> > >>>
> >> > >>
> >> >
> >>
> https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#applying-processors-and-transformers-processor-api-integration
> >> > >>> but
> >> > >>> it explains mostly transformers not processors. I also saw after
> >> that
> >> > the
> >> > >>> example usage of the processor but `.process(...)` returns void,
> so
> >> I
> >> > >>> cannot have a KStream from a processor?
> >> > >>>
> >> > >>> Thank you all in advance
> >> > >>>
> >> > >>> --
> >> > >>> Alessandro Tagliapietra
> >> > >>>
> >> > >>
> >> > >
> >> >
> >> >
> >>
> >
>

Reply via email to