Hi Dongjin,

No problem; glad we got it sorted out.

Thanks again for picking this up!
-John

On Wed, Feb 26, 2020, at 09:24, Dongjin Lee wrote:
> > I was under the impression that you wanted to expand the scope of the KIP
> to additionally allow querying the internal buffer, not just the result.
> Can you clarify whether you are proposing to allow querying the state of
> the internal buffer, the result, or both?
> 
> Sorry for the confusion. As we already talked with, we only need to query
> the suppressed output, not the internal buffer. The current implementation
> is wrong. After refining the KIP and implementation accordingly I will
> notify you - I must be confused, also.
> 
> Thanks,
> Dongjin
> 
> On Tue, Feb 25, 2020 at 12:17 AM John Roesler <vvcep...@apache.org> wrote:
> 
> > Hi Dongjin,
> >
> > Ah, I think I may have been confused. I 100% agree that we need a
> > materialized variant for suppress(). Then, you could do:
> > ...suppress(..., Materialized.as(“final-count”))
> >
> > If that’s your proposal, then we are on the same page.
> >
> > I was under the impression that you wanted to expand the scope of the KIP
> > to additionally allow querying the internal buffer, not just the result.
> > Can you clarify whether you are proposing to allow querying the state of
> > the internal buffer, the result, or both?
> >
> > Thanks,
> > John
> >
> > On Thu, Feb 20, 2020, at 08:41, Dongjin Lee wrote:
> > > Hi John,
> > > Thanks for your kind explanation with an example.
> > >
> > > > But it feels like you're saying you're trying to do something different
> > > than just query the windowed key and get back the current count?
> > >
> > > Yes, for example, what if we need to retrieve the (all or range) keys
> > with
> > > a closed window? In this example, let's imagine we need to retrieve only
> > > (key=A, window=10), not (key=A, window=20).
> > >
> > > Of course, the value accompanied by a flushed key is exactly the same to
> > > the one in the upstream KTable; However, if our intention is not pointing
> > > out a specific key but retrieving a group of unspecified keys, we stuck
> > in
> > > trouble - since we can't be sure which key is flushed out beforehand.
> > >
> > > One workaround would be materializing it with `suppressed.filter(e ->
> > true,
> > > Materialized.as("final-count"))`. But I think providing a materialized
> > > variant for suppress method is better than this workaround.
> > >
> > > Thanks,
> > > Dongjin
> > >
> > > On Thu, Feb 20, 2020 at 1:26 AM John Roesler <vvcep...@apache.org>
> > wrote:
> > >
> > > > Thanks for the response, Dongjin,
> > > >
> > > > I'm sorry, but I'm still not following. It seems like the view you
> > would
> > > > get on the "current state of the buffer" would always be equivalent to
> > > > the view of the upstream table.
> > > >
> > > > Let me try an example, and maybe you can point out the flaw in my
> > > > reasoning.
> > > >
> > > > Let's say we're doing 10 ms windows with a grace period of zero.
> > > > Let's also say we're computing a windowed count, and that we have
> > > > a "final results" suppression after the count. Let's  materialize the
> > > > count as "Count" and the suppressed result as "Final Count".
> > > >
> > > > Suppose we get an input event:
> > > > (time=10, key=A, value=...)
> > > >
> > > > Then, Count will look like:
> > > >
> > > > | window | key | value |
> > > > | 10     | A   |     1 |
> > > >
> > > > The (internal) suppression buffer will contain:
> > > >
> > > > | window | key | value |
> > > > | 10     | A   |     1 |
> > > >
> > > > The record is still buffered because the window isn't closed yet.
> > > > Final Count is an empty table:
> > > >
> > > > | window | key | value |
> > > >
> > > > ---------------
> > > >
> > > > Now, we get a second event:
> > > > (time=15, key=A, value=...)
> > > >
> > > > Then, Count will look like:
> > > >
> > > > | window | key | value |
> > > > | 10     | A   |     2 |
> > > >
> > > > The (internal) suppression buffer will contain:
> > > >
> > > > | window | key | value |
> > > > | 10     | A   |     2 |
> > > >
> > > > The record is still buffered because the window isn't closed yet.
> > > > Final Count is an empty table:
> > > >
> > > > | window | key | value |
> > > >
> > > >
> > > > ---------------
> > > >
> > > > Finally, we get a third event:
> > > > (time=20, key=A, value=...)
> > > >
> > > > Then, Count will look like:
> > > >
> > > > | window | key | value |
> > > > | 10     | A   |     2 |
> > > > | 20     | A   |     1 |
> > > >
> > > > The (internal) suppression buffer will contain:
> > > >
> > > > | window | key | value |
> > > > | 20     | A   |     1 |
> > > >
> > > > Note that window 10 has been flushed out, because it's now closed.
> > > > And window 20 is buffered because it isn't closed yet.
> > > > Final Count is now:
> > > >
> > > > | window | key | value |
> > > > | 10     | A   |     2 |
> > > >
> > > >
> > > > ---------------
> > > >
> > > > Reading your email, I can't figure out what value there is in querying
> > the
> > > > internal suppression buffer, since it only contains exactly the same
> > value
> > > > as
> > > > the upstream table, for each key that is still buffered. But it feels
> > like
> > > > you're saying you're trying to do something different than just query
> > the
> > > > windowed key and get back the current count?
> > > >
> > > > Thanks,
> > > > -John
> > > >
> > > >
> > > > On Wed, Feb 19, 2020, at 09:49, Dongjin Lee wrote:
> > > > > Hi John,
> > > > >
> > > > > 'The intermediate state of the suppression' in KIP does not mean the
> > > > state
> > > > > of upstream KTable - sure, the state of the upstream KTable can be
> > > > queried
> > > > > by materializing the operator immediately before the suppress as you
> > > > shown.
> > > > > What I meant in KIP was the final state of the buffer, which is not
> > > > emitted
> > > > > yet. (I agree, the current description may be confusing; it would be
> > > > better
> > > > > to change it with 'the current state of the suppression' or 'the
> > results
> > > > of
> > > > > the suppression', like the Jira issue
> > > > > <https://issues.apache.org/jira/browse/KAFKA-8403> states.)
> > > > >
> > > > > For a little bit more about the motivation, here is one of my
> > > > experience: I
> > > > > had to build a monitoring application which collects signals from IoT
> > > > > devices (say, a semiconductor production line.) If the number of
> > > > collected
> > > > > signals within the time window is much less than the expected, there
> > may
> > > > be
> > > > > some problems like network hiccup in the systems. We wanted to build
> > the
> > > > > system in the form of a dashboard, but could not by lack of
> > materializing
> > > > > feature. It was precisely the case of querying only the final
> > results of
> > > > a
> > > > > windowed aggregation, as the Jira issue
> > > > > <https://issues.apache.org/jira/browse/KAFKA-8403> states. We
> > finally
> > > > ended
> > > > > in implementing the system in an email alerting system like this
> > > > > <
> > > >
> > https://www.confluent.io/blog/kafka-streams-take-on-watermarks-and-triggers/
> > > > >
> > > > > and had to collect the keys and windows of trouble by hand.
> > > > >
> > > > > I think these kinds of use cases would be much common. Should it be
> > > > > described in the KIP much more in detail?
> > > > >
> > > > > Thanks,
> > > > > Dongjin
> > > > >
> > > > > On Sat, Feb 15, 2020 at 4:43 AM John Roesler <vvcep...@apache.org>
> > > > wrote:
> > > > >
> > > > > > Hi Dongjin,
> > > > > >
> > > > > > Thanks for the KIP!
> > > > > >
> > > > > > Can you explain more about why the internal data structures of
> > > > suppression
> > > > > > should be queriable? The motivation just says that users might
> > want to
> > > > do
> > > > > > it, which seems like it could justify literally anything :)
> > > > > >
> > > > > > One design point of Suppression is that if you wanted to query the
> > > > “final
> > > > > > state”, you can Materialize the suppress itself (which is why it
> > needs
> > > > the
> > > > > > variant); if you wanted to query the “intermediate state”, you can
> > > > > > materialize the operator immediately before the suppress.
> > > > > >
> > > > > > Example:
> > > > > >
> > > > > > ...count(Materialized.as(“intermediate”))
> > > > > >   .supress(untilWindowClosed(), Materialized.as(“final”))
> > > > > >
> > > > > > I’m not sure what use case would require actually fetching from the
> > > > > > internal buffers.
> > > > > >
> > > > > > Thanks,
> > > > > > John
> > > > > >
> > > > > >
> > > > > > On Fri, Feb 14, 2020, at 07:55, Dongjin Lee wrote:
> > > > > > > Hi devs,
> > > > > > >
> > > > > > > I'd like to reboot the discussion on KIP-508, which aims to
> > support a
> > > > > > > Materialized variant of KTable#suppress. It was initially
> > submitted
> > > > > > several
> > > > > > > months ago but closed by the inactivity.
> > > > > > >
> > > > > > > - KIP:
> > > > > > >
> > > > > >
> > > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-508%3A+Make+Suppression+State+Queriable
> > > > > > > - Jira: https://issues.apache.org/jira/browse/KAFKA-8403
> > > > > > >
> > > > > > > All kinds of feedback will be greatly appreciated.
> > > > > > >
> > > > > > > Best,
> > > > > > > Dongjin
> > > > > > >
> > > > > > > --
> > > > > > > *Dongjin Lee*
> > > > > > >
> > > > > > > *A hitchhiker in the mathematical world.*
> > > > > > > *github:  <http://goog_969573159/>github.com/dongjinleekr
> > > > > > > <https://github.com/dongjinleekr>linkedin:
> > > > > > kr.linkedin.com/in/dongjinleekr
> > > > > > > <https://kr.linkedin.com/in/dongjinleekr>speakerdeck:
> > > > > > speakerdeck.com/dongjin
> > > > > > > <https://speakerdeck.com/dongjin>*
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > *Dongjin Lee*
> > > > >
> > > > > *A hitchhiker in the mathematical world.*
> > > > > *github:  <http://goog_969573159/>github.com/dongjinleekr
> > > > > <https://github.com/dongjinleekr>linkedin:
> > > > kr.linkedin.com/in/dongjinleekr
> > > > > <https://kr.linkedin.com/in/dongjinleekr>speakerdeck:
> > > > speakerdeck.com/dongjin
> > > > > <https://speakerdeck.com/dongjin>*
> > > > >
> > > >
> > >
> > >
> > > --
> > > *Dongjin Lee*
> > >
> > > *A hitchhiker in the mathematical world.*
> > > *github:  <http://goog_969573159/>github.com/dongjinleekr
> > > <https://github.com/dongjinleekr>linkedin:
> > kr.linkedin.com/in/dongjinleekr
> > > <https://kr.linkedin.com/in/dongjinleekr>speakerdeck:
> > speakerdeck.com/dongjin
> > > <https://speakerdeck.com/dongjin>*
> > >
> >
> -- 
> *Dongjin Lee*
> 
> *A hitchhiker in the mathematical world.*
> *github:  <http://goog_969573159/>github.com/dongjinleekr
> <https://github.com/dongjinleekr>linkedin: kr.linkedin.com/in/dongjinleekr
> <https://kr.linkedin.com/in/dongjinleekr>speakerdeck: speakerdeck.com/dongjin
> <https://speakerdeck.com/dongjin>*
>

Reply via email to