Thank you Bruno and Matthias,

I've modified the transformer to implement the ValueTransformerWithKey
interface and everything is working fine.
I've now to window the data and manually aggregate each window data since
I've to do some averages and sum of differences.
So far I've just having some issues with message types since I'm changing
the data type when aggregating the window but I think it's an easy problem.

Thank you again
Best

--
Alessandro Tagliapietra

On Sun, Apr 14, 2019 at 11:26 AM Bruno Cadonna <br...@confluent.io> wrote:

> Hi Alessandro,
>
> the `TransformSupplier` is internally wrapped with a `ProcessorSupplier`,
> so the statement
>
> `transform` is essentially equivalent to adding the Transformer via
> Topology#addProcessor() to your processor topology
>
> is correct.
>
> If you do not change the key, you should definitely use one of the
> overloads of `transformValues` to avoid internal data redistribution. In
> your case the overload with `ValueTransformerWithKeySupplier` as suggested
> by Matthias would fit.
>
> Best,
> Bruno
>
> On Sat, Apr 13, 2019 at 12:51 PM Matthias J. Sax <matth...@confluent.io>
> wrote:
>
> > There is also `ValueTransformerWithKey` that gives you read-only acess
> > to the key.
> >
> > -Matthias
> >
> > On 4/12/19 5:34 PM, Alessandro Tagliapietra wrote:
> > > Hi Bruno,
> > >
> > > Thank you for the quick answer.
> > >
> > > I'm actually trying to do that since it seems there is really no way to
> > > have it use `Processor<K, V>`.
> > > I just wanted (if that would've made any sense) to use the Processor in
> > > both DSL and non-DSL pipelines.
> > >
> > > Anyway, regarding `transformValues()` I don't think I can use it as I
> > need
> > > the message key since that is the discriminating value for the filter
> (I
> > > want to exclude old values per sensor ID so per message key)
> > >
> > > Right now I've this
> > >
> >
> https://gist.github.com/alex88/7d229698546971452c3efc862fb4d3fd#file-timestampfiltertransformer-java
> > > and
> > > i'm using it with `transform()` .
> > >
> > > One thing I've found confusing is this
> > >
> >
> https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#streams-developer-guide-dsl-process
> > >
> > > transform is essentially equivalent to adding the Transformer via
> > >> Topology#addProcessor() to yourprocessor topology
> > >> <
> >
> https://docs.confluent.io/current/streams/concepts.html#streams-concepts-processor-topology
> > >
> > >> .
> > >
> > >
> > > is it? Doesn't `transform` need a TransformSupplier while
> `addProcessor`
> > > uses a ProcessorSupplier?
> > >
> > > Thank you again for your help
> > >
> > > --
> > > Alessandro Tagliapietra
> > >
> > >
> > > On Fri, Apr 12, 2019 at 5:04 PM Bruno Cadonna <br...@confluent.io>
> > wrote:
> > >
> > >> Hi Alessandro,
> > >>
> > >> Have you considered using `transform()` (actually in your case you
> > should
> > >> use `transformValues()`) instead of `.process()`? `transform()` and
> > >> `transformValues()` are stateful operations similar to `.process` but
> > they
> > >> return a `KStream`. On a `KStream` you can then apply a windowed
> > >> aggregation.
> > >>
> > >> Hope that helps.
> > >>
> > >> Best,
> > >> Bruno
> > >>
> > >>
> > >> On Fri, Apr 12, 2019 at 4:31 PM Alessandro Tagliapietra <
> > >> tagliapietra.alessan...@gmail.com> wrote:
> > >>
> > >>> Hi there,
> > >>>
> > >>> I'm just starting with Kafka and I'm trying to create a stream
> > processor
> > >>> that in multiple stages:
> > >>>  - filters messages using a kv store so that only messages with
> higher
> > >>> timestamp gets processed
> > >>>  - aggregates the message metrics by minute giving e.g. the avg of
> > those
> > >>> metrics in that minute
> > >>>
> > >>> The message is simple, the key is the sensor ID and the value is
> e.g. {
> > >>> timestamp: UNIX_TIMESTAMP, speed: INT }.
> > >>>
> > >>> I've started by creating a processor to use the kv store and filter
> old
> > >>> messages:
> > >>>
> > >>>
> > >>>
> > >>
> >
> https://gist.github.com/alex88/7d229698546971452c3efc862fb4d3fd#file-timestampfilter-java
> > >>>
> > >>> Then I was trying to implement windowing, I saw very nice windowing
> > >>> examples for the DSL but none for the Processor API (only a small
> > >> reference
> > >>> to the windowed store), can someone point me in the right direction?
> > >>>
> > >>> Now, since I wasn't able to find any example I tried to use the DSL
> but
> > >>> haven't found a way to use my processor with it, I saw this
> > >>>
> > >>>
> > >>
> >
> https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#applying-processors-and-transformers-processor-api-integration
> > >>> but
> > >>> it explains mostly transformers not processors. I also saw after that
> > the
> > >>> example usage of the processor but `.process(...)` returns void, so I
> > >>> cannot have a KStream from a processor?
> > >>>
> > >>> Thank you all in advance
> > >>>
> > >>> --
> > >>> Alessandro Tagliapietra
> > >>>
> > >>
> > >
> >
> >
>

Reply via email to