-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA512

Are you aware of KIP-557:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on
+change+support+for+Kafka+Streams

Seems it will address your use case?


- -Matthias


On 2/25/20 6:45 PM, Adam Rinehart wrote:
> Bruno and Guozhang,
>
> Thank you for the replies. Between the 2 of you, I think I know how
> to code what I wanted. I'm going with
>
> stream.flatTransform(...).groupByKey().aggregate()
>
> because an additional requirement that I hadn't stated in the
> original message was I was planning on using a punctuate method to
> delay some values from being forwarded until certain time-outs or
> conditions were met. I can't use punctuate with the ...Values()
> versions, per the documentation.
>
> Basically, I'm going to be transforming the events from one of many
> change events to a Success, Fail, or Lost message. But I only want
> to report actual state changes, so if I get multiple Fail messages,
> I only need to see the first one. So I'm going to use a persistent
> KeyValueStore to store a tracking object when a given key comes in,
> and check the table to see if I'm going to actually emit a change.
> In the punctuate method, it will iterate over the persistent
> KeyValueStore to see if any tracking objects are past a deadline
> without having been emitted to the table, and if so, emit a Lost
> message.
>
>
> On Tue, Feb 25, 2020 at 5:22 PM Bruno Cadonna <[email protected]>
> wrote:
>
>> Hello Guozhang and Adam,
>>
>> Regarding Guozhang's proposal please see recent discussions
>> about `transformValues()` and returning `null` from the
>> transformer:
>>
>> https://issues.apache.org/jira/browse/KAFKA-9533?focusedCommentId=170
44602&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpa
nel#comment-17044602
>>
>>
.
>>
>> With the current behavior, the commands should be:
>>
>> `stream.transformValues(...).filter((k,v) -> return v !=
>> null).groupByKey().aggregate()`
>>
>> Best, Bruno
>>
>> On Tue, Feb 25, 2020 at 2:58 AM Guozhang Wang
>> <[email protected]> wrote:
>>>
>>> Hello Adam,
>>>
>>> It seems your intention is to not "avoid emitting if the new
>>> aggregation result is the same as the old aggregation" but to
>>> "avoid processing the aggregation at all if it state is already
>>> some certain value", right?
>>>
>>> In this case I think you can try sth. like this:
>>>
>>> *stream.transformValues().groupByKey().aggregate()*
>>>
>>> where transformValues is just used as a slight complicated
>>> "filter" operation, in which you can access the state store
>>> that "aggregate" is connected to, and read / check if the
>>> corresponding entry is already `success`, if yes let
>>> `transformValue` to return `null` which means
>> forward
>>> nothing to the downstream.
>>>
>>> The reason to use transformValues instead of transform is to
>>> make sure
>> you
>>> do not introduce unnecessary repartitioning here.
>>>
>>> Guozhang
>>>
>>>
>>>
>>> On Mon, Feb 24, 2020 at 2:01 PM Adam Rinehart
>>> <[email protected]> wrote:
>>>
>>>> So I am trying to process incoming events, that may or may
>>>> not actually update the state of my output object. Originally
>>>> I was doing this with
>> a
>>>> KStream/KTable join, until I saw the discussion about "KTable
>>>> in
>> Compact
>>>> Topic takes too long to be updated", when I switched to
>>>> groupByKey().aggregate().
>>>>
>>>> Some events may not result in a state change. For example,
>>>> once I have
>> an
>>>> incoming success event, I emit a success output and future
>>>> incoming
>> failure
>>>> events will be ignored.
>>>>
>>>> My intention is to only emit a record from the aggregate
>>>> KTable if the aggregate record actually changed. But I can't
>>>> figure out how to do
>> that
>>>> within the aggregator interface. I've tried returning the
>>>> incoming aggregate object when nothing changes, but I still
>>>> get a record emitted from the table.
>>>>
>>>
>>>
>>> -- -- Guozhang
>>
>
-----BEGIN PGP SIGNATURE-----

iQIzBAEBCgAdFiEEI8mthP+5zxXZZdDSO4miYXKq/OgFAl5VYvQACgkQO4miYXKq
/Ojayg//S7UEfoGnbY777UvFCBhN6NwH7dZ/qeL0edaIDQqVYyK574Z8QvIRgg5P
H1RonvtDN3EcZlqCsMjdBuNo48G87XjejIeVlShelguLIx7qxx/Q4Z/gRtKAO+XD
zOrZwZzjVNYdo6FK+YytFo2SbxxV6UmsJU/YyBqrvZHBKAw+DInAdQYRvfHcaN+i
Vs8PPyyVK131ucxz2OTMKIu8u4JVVWPQDNAVBcIlbQava2M4Hg2hT7F3U/8P8veU
1Jwpsq1xtPtXngO3xVJn8oiR4LFDb9oNSR0JTay0fcCvjPsGWZskOuUKrgVfwMYn
mbovxELN04XpU63VpfNxZFPUFe1VkaqMkoJK+qTKhTmBM1+C5AXFkjvHlKjA4iYx
fT8NnHXLoXtCTvNjy8sXcAFaiDtzEz17I1l2Uks828KTkrUcEF00sMxNTd4/O2rl
j0jvRIoX3UdwcgJcWr7/HrYtPDbQ5MIu6wRAiC3fn8OTonhrdLn/WokKsDq6VqJy
UJclbFukAsdSDXj7PZcXqNVaW0+bKfyA10SugtcFkEaVZxMfjPKBri1DhG/0QquR
HQMUvPjXLEuw6Ojkb5qdE+lmEtwmdH0PBqVGv+1jj8ho+qE3T/hRB0zUSsPK1ADc
SQbp6+zW+xZ+mGqARGx536MiQN0KJ7+N5Ld1lgbNjqgC1KPCJjk=
=nOk0
-----END PGP SIGNATURE-----

Reply via email to