Hi Dongjin, No problem; glad we got it sorted out.
Thanks again for picking this up! -John On Wed, Feb 26, 2020, at 09:24, Dongjin Lee wrote: > > I was under the impression that you wanted to expand the scope of the KIP > to additionally allow querying the internal buffer, not just the result. > Can you clarify whether you are proposing to allow querying the state of > the internal buffer, the result, or both? > > Sorry for the confusion. As we already talked with, we only need to query > the suppressed output, not the internal buffer. The current implementation > is wrong. After refining the KIP and implementation accordingly I will > notify you - I must be confused, also. > > Thanks, > Dongjin > > On Tue, Feb 25, 2020 at 12:17 AM John Roesler <vvcep...@apache.org> wrote: > > > Hi Dongjin, > > > > Ah, I think I may have been confused. I 100% agree that we need a > > materialized variant for suppress(). Then, you could do: > > ...suppress(..., Materialized.as(“final-count”)) > > > > If that’s your proposal, then we are on the same page. > > > > I was under the impression that you wanted to expand the scope of the KIP > > to additionally allow querying the internal buffer, not just the result. > > Can you clarify whether you are proposing to allow querying the state of > > the internal buffer, the result, or both? > > > > Thanks, > > John > > > > On Thu, Feb 20, 2020, at 08:41, Dongjin Lee wrote: > > > Hi John, > > > Thanks for your kind explanation with an example. > > > > > > > But it feels like you're saying you're trying to do something different > > > than just query the windowed key and get back the current count? > > > > > > Yes, for example, what if we need to retrieve the (all or range) keys > > with > > > a closed window? In this example, let's imagine we need to retrieve only > > > (key=A, window=10), not (key=A, window=20). > > > > > > Of course, the value accompanied by a flushed key is exactly the same to > > > the one in the upstream KTable; However, if our intention is not pointing > > > out a specific key but retrieving a group of unspecified keys, we stuck > > in > > > trouble - since we can't be sure which key is flushed out beforehand. > > > > > > One workaround would be materializing it with `suppressed.filter(e -> > > true, > > > Materialized.as("final-count"))`. But I think providing a materialized > > > variant for suppress method is better than this workaround. > > > > > > Thanks, > > > Dongjin > > > > > > On Thu, Feb 20, 2020 at 1:26 AM John Roesler <vvcep...@apache.org> > > wrote: > > > > > > > Thanks for the response, Dongjin, > > > > > > > > I'm sorry, but I'm still not following. It seems like the view you > > would > > > > get on the "current state of the buffer" would always be equivalent to > > > > the view of the upstream table. > > > > > > > > Let me try an example, and maybe you can point out the flaw in my > > > > reasoning. > > > > > > > > Let's say we're doing 10 ms windows with a grace period of zero. > > > > Let's also say we're computing a windowed count, and that we have > > > > a "final results" suppression after the count. Let's materialize the > > > > count as "Count" and the suppressed result as "Final Count". > > > > > > > > Suppose we get an input event: > > > > (time=10, key=A, value=...) > > > > > > > > Then, Count will look like: > > > > > > > > | window | key | value | > > > > | 10 | A | 1 | > > > > > > > > The (internal) suppression buffer will contain: > > > > > > > > | window | key | value | > > > > | 10 | A | 1 | > > > > > > > > The record is still buffered because the window isn't closed yet. > > > > Final Count is an empty table: > > > > > > > > | window | key | value | > > > > > > > > --------------- > > > > > > > > Now, we get a second event: > > > > (time=15, key=A, value=...) > > > > > > > > Then, Count will look like: > > > > > > > > | window | key | value | > > > > | 10 | A | 2 | > > > > > > > > The (internal) suppression buffer will contain: > > > > > > > > | window | key | value | > > > > | 10 | A | 2 | > > > > > > > > The record is still buffered because the window isn't closed yet. > > > > Final Count is an empty table: > > > > > > > > | window | key | value | > > > > > > > > > > > > --------------- > > > > > > > > Finally, we get a third event: > > > > (time=20, key=A, value=...) > > > > > > > > Then, Count will look like: > > > > > > > > | window | key | value | > > > > | 10 | A | 2 | > > > > | 20 | A | 1 | > > > > > > > > The (internal) suppression buffer will contain: > > > > > > > > | window | key | value | > > > > | 20 | A | 1 | > > > > > > > > Note that window 10 has been flushed out, because it's now closed. > > > > And window 20 is buffered because it isn't closed yet. > > > > Final Count is now: > > > > > > > > | window | key | value | > > > > | 10 | A | 2 | > > > > > > > > > > > > --------------- > > > > > > > > Reading your email, I can't figure out what value there is in querying > > the > > > > internal suppression buffer, since it only contains exactly the same > > value > > > > as > > > > the upstream table, for each key that is still buffered. But it feels > > like > > > > you're saying you're trying to do something different than just query > > the > > > > windowed key and get back the current count? > > > > > > > > Thanks, > > > > -John > > > > > > > > > > > > On Wed, Feb 19, 2020, at 09:49, Dongjin Lee wrote: > > > > > Hi John, > > > > > > > > > > 'The intermediate state of the suppression' in KIP does not mean the > > > > state > > > > > of upstream KTable - sure, the state of the upstream KTable can be > > > > queried > > > > > by materializing the operator immediately before the suppress as you > > > > shown. > > > > > What I meant in KIP was the final state of the buffer, which is not > > > > emitted > > > > > yet. (I agree, the current description may be confusing; it would be > > > > better > > > > > to change it with 'the current state of the suppression' or 'the > > results > > > > of > > > > > the suppression', like the Jira issue > > > > > <https://issues.apache.org/jira/browse/KAFKA-8403> states.) > > > > > > > > > > For a little bit more about the motivation, here is one of my > > > > experience: I > > > > > had to build a monitoring application which collects signals from IoT > > > > > devices (say, a semiconductor production line.) If the number of > > > > collected > > > > > signals within the time window is much less than the expected, there > > may > > > > be > > > > > some problems like network hiccup in the systems. We wanted to build > > the > > > > > system in the form of a dashboard, but could not by lack of > > materializing > > > > > feature. It was precisely the case of querying only the final > > results of > > > > a > > > > > windowed aggregation, as the Jira issue > > > > > <https://issues.apache.org/jira/browse/KAFKA-8403> states. We > > finally > > > > ended > > > > > in implementing the system in an email alerting system like this > > > > > < > > > > > > https://www.confluent.io/blog/kafka-streams-take-on-watermarks-and-triggers/ > > > > > > > > > > and had to collect the keys and windows of trouble by hand. > > > > > > > > > > I think these kinds of use cases would be much common. Should it be > > > > > described in the KIP much more in detail? > > > > > > > > > > Thanks, > > > > > Dongjin > > > > > > > > > > On Sat, Feb 15, 2020 at 4:43 AM John Roesler <vvcep...@apache.org> > > > > wrote: > > > > > > > > > > > Hi Dongjin, > > > > > > > > > > > > Thanks for the KIP! > > > > > > > > > > > > Can you explain more about why the internal data structures of > > > > suppression > > > > > > should be queriable? The motivation just says that users might > > want to > > > > do > > > > > > it, which seems like it could justify literally anything :) > > > > > > > > > > > > One design point of Suppression is that if you wanted to query the > > > > “final > > > > > > state”, you can Materialize the suppress itself (which is why it > > needs > > > > the > > > > > > variant); if you wanted to query the “intermediate state”, you can > > > > > > materialize the operator immediately before the suppress. > > > > > > > > > > > > Example: > > > > > > > > > > > > ...count(Materialized.as(“intermediate”)) > > > > > > .supress(untilWindowClosed(), Materialized.as(“final”)) > > > > > > > > > > > > I’m not sure what use case would require actually fetching from the > > > > > > internal buffers. > > > > > > > > > > > > Thanks, > > > > > > John > > > > > > > > > > > > > > > > > > On Fri, Feb 14, 2020, at 07:55, Dongjin Lee wrote: > > > > > > > Hi devs, > > > > > > > > > > > > > > I'd like to reboot the discussion on KIP-508, which aims to > > support a > > > > > > > Materialized variant of KTable#suppress. It was initially > > submitted > > > > > > several > > > > > > > months ago but closed by the inactivity. > > > > > > > > > > > > > > - KIP: > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-508%3A+Make+Suppression+State+Queriable > > > > > > > - Jira: https://issues.apache.org/jira/browse/KAFKA-8403 > > > > > > > > > > > > > > All kinds of feedback will be greatly appreciated. > > > > > > > > > > > > > > Best, > > > > > > > Dongjin > > > > > > > > > > > > > > -- > > > > > > > *Dongjin Lee* > > > > > > > > > > > > > > *A hitchhiker in the mathematical world.* > > > > > > > *github: <http://goog_969573159/>github.com/dongjinleekr > > > > > > > <https://github.com/dongjinleekr>linkedin: > > > > > > kr.linkedin.com/in/dongjinleekr > > > > > > > <https://kr.linkedin.com/in/dongjinleekr>speakerdeck: > > > > > > speakerdeck.com/dongjin > > > > > > > <https://speakerdeck.com/dongjin>* > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > *Dongjin Lee* > > > > > > > > > > *A hitchhiker in the mathematical world.* > > > > > *github: <http://goog_969573159/>github.com/dongjinleekr > > > > > <https://github.com/dongjinleekr>linkedin: > > > > kr.linkedin.com/in/dongjinleekr > > > > > <https://kr.linkedin.com/in/dongjinleekr>speakerdeck: > > > > speakerdeck.com/dongjin > > > > > <https://speakerdeck.com/dongjin>* > > > > > > > > > > > > > > > > > > -- > > > *Dongjin Lee* > > > > > > *A hitchhiker in the mathematical world.* > > > *github: <http://goog_969573159/>github.com/dongjinleekr > > > <https://github.com/dongjinleekr>linkedin: > > kr.linkedin.com/in/dongjinleekr > > > <https://kr.linkedin.com/in/dongjinleekr>speakerdeck: > > speakerdeck.com/dongjin > > > <https://speakerdeck.com/dongjin>* > > > > > > -- > *Dongjin Lee* > > *A hitchhiker in the mathematical world.* > *github: <http://goog_969573159/>github.com/dongjinleekr > <https://github.com/dongjinleekr>linkedin: kr.linkedin.com/in/dongjinleekr > <https://kr.linkedin.com/in/dongjinleekr>speakerdeck: speakerdeck.com/dongjin > <https://speakerdeck.com/dongjin>* >