-----BEGIN PGP SIGNED MESSAGE----- Hash: SHA512 Are you aware of KIP-557: https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on +change+support+for+Kafka+Streams
Seems it will address your use case? - -Matthias On 2/25/20 6:45 PM, Adam Rinehart wrote: > Bruno and Guozhang, > > Thank you for the replies. Between the 2 of you, I think I know how > to code what I wanted. I'm going with > > stream.flatTransform(...).groupByKey().aggregate() > > because an additional requirement that I hadn't stated in the > original message was I was planning on using a punctuate method to > delay some values from being forwarded until certain time-outs or > conditions were met. I can't use punctuate with the ...Values() > versions, per the documentation. > > Basically, I'm going to be transforming the events from one of many > change events to a Success, Fail, or Lost message. But I only want > to report actual state changes, so if I get multiple Fail messages, > I only need to see the first one. So I'm going to use a persistent > KeyValueStore to store a tracking object when a given key comes in, > and check the table to see if I'm going to actually emit a change. > In the punctuate method, it will iterate over the persistent > KeyValueStore to see if any tracking objects are past a deadline > without having been emitted to the table, and if so, emit a Lost > message. > > > On Tue, Feb 25, 2020 at 5:22 PM Bruno Cadonna <[email protected]> > wrote: > >> Hello Guozhang and Adam, >> >> Regarding Guozhang's proposal please see recent discussions >> about `transformValues()` and returning `null` from the >> transformer: >> >> https://issues.apache.org/jira/browse/KAFKA-9533?focusedCommentId=170 44602&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpa nel#comment-17044602 >> >> . >> >> With the current behavior, the commands should be: >> >> `stream.transformValues(...).filter((k,v) -> return v != >> null).groupByKey().aggregate()` >> >> Best, Bruno >> >> On Tue, Feb 25, 2020 at 2:58 AM Guozhang Wang >> <[email protected]> wrote: >>> >>> Hello Adam, >>> >>> It seems your intention is to not "avoid emitting if the new >>> aggregation result is the same as the old aggregation" but to >>> "avoid processing the aggregation at all if it state is already >>> some certain value", right? >>> >>> In this case I think you can try sth. like this: >>> >>> *stream.transformValues().groupByKey().aggregate()* >>> >>> where transformValues is just used as a slight complicated >>> "filter" operation, in which you can access the state store >>> that "aggregate" is connected to, and read / check if the >>> corresponding entry is already `success`, if yes let >>> `transformValue` to return `null` which means >> forward >>> nothing to the downstream. >>> >>> The reason to use transformValues instead of transform is to >>> make sure >> you >>> do not introduce unnecessary repartitioning here. >>> >>> Guozhang >>> >>> >>> >>> On Mon, Feb 24, 2020 at 2:01 PM Adam Rinehart >>> <[email protected]> wrote: >>> >>>> So I am trying to process incoming events, that may or may >>>> not actually update the state of my output object. Originally >>>> I was doing this with >> a >>>> KStream/KTable join, until I saw the discussion about "KTable >>>> in >> Compact >>>> Topic takes too long to be updated", when I switched to >>>> groupByKey().aggregate(). >>>> >>>> Some events may not result in a state change. For example, >>>> once I have >> an >>>> incoming success event, I emit a success output and future >>>> incoming >> failure >>>> events will be ignored. >>>> >>>> My intention is to only emit a record from the aggregate >>>> KTable if the aggregate record actually changed. But I can't >>>> figure out how to do >> that >>>> within the aggregator interface. I've tried returning the >>>> incoming aggregate object when nothing changes, but I still >>>> get a record emitted from the table. >>>> >>> >>> >>> -- -- Guozhang >> > -----BEGIN PGP SIGNATURE----- iQIzBAEBCgAdFiEEI8mthP+5zxXZZdDSO4miYXKq/OgFAl5VYvQACgkQO4miYXKq /Ojayg//S7UEfoGnbY777UvFCBhN6NwH7dZ/qeL0edaIDQqVYyK574Z8QvIRgg5P H1RonvtDN3EcZlqCsMjdBuNo48G87XjejIeVlShelguLIx7qxx/Q4Z/gRtKAO+XD zOrZwZzjVNYdo6FK+YytFo2SbxxV6UmsJU/YyBqrvZHBKAw+DInAdQYRvfHcaN+i Vs8PPyyVK131ucxz2OTMKIu8u4JVVWPQDNAVBcIlbQava2M4Hg2hT7F3U/8P8veU 1Jwpsq1xtPtXngO3xVJn8oiR4LFDb9oNSR0JTay0fcCvjPsGWZskOuUKrgVfwMYn mbovxELN04XpU63VpfNxZFPUFe1VkaqMkoJK+qTKhTmBM1+C5AXFkjvHlKjA4iYx fT8NnHXLoXtCTvNjy8sXcAFaiDtzEz17I1l2Uks828KTkrUcEF00sMxNTd4/O2rl j0jvRIoX3UdwcgJcWr7/HrYtPDbQ5MIu6wRAiC3fn8OTonhrdLn/WokKsDq6VqJy UJclbFukAsdSDXj7PZcXqNVaW0+bKfyA10SugtcFkEaVZxMfjPKBri1DhG/0QquR HQMUvPjXLEuw6Ojkb5qdE+lmEtwmdH0PBqVGv+1jj8ho+qE3T/hRB0zUSsPK1ADc SQbp6+zW+xZ+mGqARGx536MiQN0KJ7+N5Ld1lgbNjqgC1KPCJjk= =nOk0 -----END PGP SIGNATURE-----
