Bruno and Guozhang,

Thank you for the replies. Between the 2 of you, I think I know how to code
what I wanted. I'm going with

stream.flatTransform(...).groupByKey().aggregate()

because an additional requirement that I hadn't stated in the original
message was I was planning on using a punctuate method to delay some values
from being forwarded until certain time-outs or conditions were met. I
can't use punctuate with the ...Values() versions, per the documentation.

Basically, I'm going to be transforming the events from one of many change
events to a Success, Fail, or Lost message. But I only want to report
actual state changes, so if I get multiple Fail messages, I only need to
see the first one. So I'm going to use a persistent KeyValueStore to store
a tracking object when a given key comes in, and check the table to see if
I'm going to actually emit a change. In the punctuate method, it will
iterate over the persistent KeyValueStore to see if any tracking objects
are past a deadline without having been emitted to the table, and if so,
emit a Lost message.


On Tue, Feb 25, 2020 at 5:22 PM Bruno Cadonna <[email protected]> wrote:

> Hello Guozhang and Adam,
>
> Regarding Guozhang's proposal please see recent discussions about
> `transformValues()` and returning `null` from the transformer:
>
> https://issues.apache.org/jira/browse/KAFKA-9533?focusedCommentId=17044602&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17044602
> .
>
> With the current behavior, the commands should be:
>
> `stream.transformValues(...).filter((k,v) -> return v !=
> null).groupByKey().aggregate()`
>
> Best,
> Bruno
>
> On Tue, Feb 25, 2020 at 2:58 AM Guozhang Wang <[email protected]> wrote:
> >
> > Hello Adam,
> >
> > It seems your intention is to not "avoid emitting if the new aggregation
> > result is the same as the old aggregation" but to "avoid processing the
> > aggregation at all if it state is already some certain value", right?
> >
> > In this case I think you can try sth. like this:
> >
> > *stream.transformValues().groupByKey().aggregate()*
> >
> > where transformValues is just used as a slight complicated "filter"
> > operation, in which you can access the state store that "aggregate" is
> > connected to, and read / check if the corresponding entry is already
> > `success`, if yes let `transformValue` to return `null` which means
> forward
> > nothing to the downstream.
> >
> > The reason to use transformValues instead of transform is to make sure
> you
> > do not introduce unnecessary repartitioning here.
> >
> > Guozhang
> >
> >
> >
> > On Mon, Feb 24, 2020 at 2:01 PM Adam Rinehart <[email protected]>
> > wrote:
> >
> > > So I am trying to process incoming events, that may or may not actually
> > > update the state of my output object. Originally I was doing this with
> a
> > > KStream/KTable join, until I saw the discussion about "KTable in
> Compact
> > > Topic takes too long to be updated", when I switched to
> > > groupByKey().aggregate().
> > >
> > > Some events may not result in a state change. For example, once I have
> an
> > > incoming success event, I emit a success output and future incoming
> failure
> > > events will be ignored.
> > >
> > > My intention is to only emit a record from the aggregate KTable if the
> > > aggregate record actually changed. But I can't figure out how to do
> that
> > > within the aggregator interface. I've tried returning the incoming
> > > aggregate object when nothing changes, but I still get a record emitted
> > > from the table.
> > >
> >
> >
> > --
> > -- Guozhang
>

Reply via email to