Hi Dongjin,

Sorry for the delay. I'm glad you're still pushing this
forward. It would be nice to get this in to the 2.7 release.

I just took another look at the KIP, and it looks good to
me!

I think this is ready for a vote.

Thanks,
-John

On Wed, 2020-08-05 at 22:04 +0900, Dongjin Lee wrote:
> Hi All,
> 
> I updated the KIP
> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-508%3A+Make+Suppression+State+Queriable>
> and the implementation, following the discussion here.
> 
> You must be working hard preparing the release of 2.6.0, so please have a
> look after your work is done.
> 
> Thanks,
> Dongjin
> 
> On Sun, Mar 8, 2020 at 12:20 PM John Roesler <vvcep...@apache.org> wrote:
> 
> > Thanks Matthias,
> > 
> > Good idea. I've changed the ticket name and added a note
> > clarifying that this ticket is not the same as
> > https://issues.apache.org/jira/browse/KAFKA-7224
> > 
> > Incidentally, I learned that I never documented my reasons
> > for abandoning my work on KAFKA-7224 ! I've now updated
> > that ticket, too, so your question had an unexpected side-benefit.
> > 
> > Thanks,
> > -John
> > 
> > On Sat, Mar 7, 2020, at 18:01, Matthias J. Sax wrote:
> > > -----BEGIN PGP SIGNED MESSAGE-----
> > > Hash: SHA512
> > > 
> > > Thanks for clarification.
> > > 
> > > Can you maybe update the Jira ticket? Do we have a ticket for
> > > spill-to-disk? Maybe link to it and explain that it's two different
> > > things? Maybe even rename the ticket to something more clear, ie,
> > > "make suppress result queryable" or simliar?
> > > 
> > > 
> > > - -Matthias
> > > 
> > > On 3/7/20 1:58 PM, John Roesler wrote:
> > > > Hey Matthias,
> > > > 
> > > > I’m sorry if the ticket was poorly stated. The ticket is to add a
> > > DSL overload to pass a Materialized argument to suppress. As a result,
> > > the result of the suppression would be queriable.
> > > > This is unrelated to “persistent buffer” aka “spill-to-disk”.
> > > > 
> > > > There was some confusion before about whether this ticket could be
> > > implemented as “query the buffer”. Maybe it can, but not trivially.
> > > The obvious way is just to add a new state store which we write the
> > > results into just before we forward. I.e., it’s exactly like the
> > > materialized variant of any stateless KTable operation.
> > > > Thanks, John
> > > > 
> > > > On Sat, Mar 7, 2020, at 15:32, Matthias J. Sax wrote: Thanks for
> > > > the KIP Dongjin,
> > > > 
> > > > I am still not sure if I can follow, what might also be caused by
> > > > the backing JIRA ticket (maybe John can clarify the intent of the
> > > > ticket as he created it):
> > > > 
> > > > Currently, suppress() only uses an in-memory buffer and my
> > > > understanding of the Jira is, to add the ability to use a
> > > > persistent buffer (ie, spill to disk backed by RocksDB).
> > > > 
> > > > Adding a persistent buffer is completely unrelated to allow
> > > > querying the buffer. In fact, one could query an in-memory buffer,
> > > > too. However, querying the buffer does not really seem to be useful
> > > > as pointed out by John, as you can always query the upstream KTable
> > > > store.
> > > > 
> > > > Also note that for the emit-on-window-close case the result is
> > > > deleted from the buffer when it is emitted, and thus cannot be
> > > > queried any longe r.
> > > > 
> > > > 
> > > > Can you please clarify if you intend to allow spilling to disk or
> > > > if you intent to enable IQ (even if I don't see why querying make
> > > > sense, as the data is either upstream or deleted). Also, if you
> > > > want to enable IQ, why do we need all those new interfaces? The
> > > > result of a suppress() is a KTable that is the same as any other
> > > > key-value/windowed/sessions store?
> > > > 
> > > > We should also have corresponding Jira tickets for different cases
> > > > to avoid the confusion I am in atm :)
> > > > 
> > > > 
> > > > -Matthias
> > > > 
> > > > 
> > > > On 2/27/20 8:21 AM, John Roesler wrote:
> > > > > > > Hi Dongjin,
> > > > > > > 
> > > > > > > No problem; glad we got it sorted out.
> > > > > > > 
> > > > > > > Thanks again for picking this up! -John
> > > > > > > 
> > > > > > > On Wed, Feb 26, 2020, at 09:24, Dongjin Lee wrote:
> > > > > > > > > I was under the impression that you wanted to expand the
> > > > > > > > > scope of the KIP
> > > > > > > > to additionally allow querying the internal buffer, not
> > > > > > > > just the result. Can you clarify whether you are proposing
> > > > > > > > to allow querying the state of the internal buffer, the
> > > > > > > > result, or both?
> > > > > > > > 
> > > > > > > > Sorry for the confusion. As we already talked with, we only
> > > > > > > > need to query the suppressed output, not the internal
> > > > > > > > buffer. The current implementation is wrong. After refining
> > > > > > > > the KIP and implementation accordingly I will notify you -
> > > > > > > > I must be confused, also.
> > > > > > > > 
> > > > > > > > Thanks, Dongjin
> > > > > > > > 
> > > > > > > > On Tue, Feb 25, 2020 at 12:17 AM John Roesler
> > > > > > > > <vvcep...@apache.org> wrote:
> > > > > > > > 
> > > > > > > > > Hi Dongjin,
> > > > > > > > > 
> > > > > > > > > Ah, I think I may have been confused. I 100% agree that
> > > > > > > > > we need a materialized variant for suppress(). Then, you
> > > > > > > > > could do: ...suppress(...,
> > > > > > > > > Materialized.as(“final-count”))
> > > > > > > > > 
> > > > > > > > > If that’s your proposal, then we are on the same page.
> > > > > > > > > 
> > > > > > > > > I was under the impression that you wanted to expand the
> > > > > > > > > scope of the KIP to additionally allow querying the
> > > > > > > > > internal buffer, not just the result. Can you clarify
> > > > > > > > > whether you are proposing to allow querying the state of
> > > > > > > > > the internal buffer, the result, or both?
> > > > > > > > > 
> > > > > > > > > Thanks, John
> > > > > > > > > 
> > > > > > > > > On Thu, Feb 20, 2020, at 08:41, Dongjin Lee wrote:
> > > > > > > > > > Hi John, Thanks for your kind explanation with an
> > > > > > > > > > example.
> > > > > > > > > > 
> > > > > > > > > > > But it feels like you're saying you're trying to do
> > > > > > > > > > > something different
> > > > > > > > > > than just query the windowed key and get back the
> > > > > > > > > > current count?
> > > > > > > > > > 
> > > > > > > > > > Yes, for example, what if we need to retrieve the (all
> > > > > > > > > > or range) keys
> > > > > > > > > with
> > > > > > > > > > a closed window? In this example, let's imagine we need
> > > > > > > > > > to retrieve only (key=A, window=10), not (key=A,
> > > > > > > > > > window=20).
> > > > > > > > > > 
> > > > > > > > > > Of course, the value accompanied by a flushed key is
> > > > > > > > > > exactly the same to the one in the upstream KTable;
> > > > > > > > > > However, if our intention is not pointing out a
> > > > > > > > > > specific key but retrieving a group of unspecified
> > > > > > > > > > keys, we stuck
> > > > > > > > > in
> > > > > > > > > > trouble - since we can't be sure which key is flushed
> > > > > > > > > > out beforehand.
> > > > > > > > > > 
> > > > > > > > > > One workaround would be materializing it with
> > > > > > > > > > `suppressed.filter(e ->
> > > > > > > > > true,
> > > > > > > > > > Materialized.as("final-count"))`. But I think providing
> > > > > > > > > > a materialized variant for suppress method is better
> > > > > > > > > > than this workaround.
> > > > > > > > > > 
> > > > > > > > > > Thanks, Dongjin
> > > > > > > > > > 
> > > > > > > > > > On Thu, Feb 20, 2020 at 1:26 AM John Roesler
> > > > > > > > > > <vvcep...@apache.org>
> > > > > > > > > wrote:
> > > > > > > > > > > Thanks for the response, Dongjin,
> > > > > > > > > > > 
> > > > > > > > > > > I'm sorry, but I'm still not following. It seems like
> > > > > > > > > > > the view you
> > > > > > > > > would
> > > > > > > > > > > get on the "current state of the buffer" would always
> > > > > > > > > > > be equivalent to the view of the upstream table.
> > > > > > > > > > > 
> > > > > > > > > > > Let me try an example, and maybe you can point out
> > > > > > > > > > > the flaw in my reasoning.
> > > > > > > > > > > 
> > > > > > > > > > > Let's say we're doing 10 ms windows with a grace
> > > > > > > > > > > period of zero. Let's also say we're computing a
> > > > > > > > > > > windowed count, and that we have a "final results"
> > > > > > > > > > > suppression after the count. Let's  materialize the
> > > > > > > > > > > count as "Count" and the suppressed result as "Final
> > > > > > > > > > > Count".
> > > > > > > > > > > 
> > > > > > > > > > > Suppose we get an input event: (time=10, key=A,
> > > > > > > > > > > value=...)
> > > > > > > > > > > 
> > > > > > > > > > > Then, Count will look like:
> > > > > > > > > > > 
> > > > > > > > > > > > window | key | value | | 10     | A   |     1 |
> > > > > > > > > > > 
> > > > > > > > > > > The (internal) suppression buffer will contain:
> > > > > > > > > > > 
> > > > > > > > > > > > window | key | value | | 10     | A   |     1 |
> > > > > > > > > > > 
> > > > > > > > > > > The record is still buffered because the window
> > > > > > > > > > > isn't closed yet. Final Count is an empty table:
> > > > > > > > > > > 
> > > > > > > > > > > > window | key | value |
> > > > > > > > > > > 
> > > > > > > > > > > ---------------
> > > > > > > > > > > 
> > > > > > > > > > > Now, we get a second event: (time=15, key=A,
> > > > > > > > > > > value=...)
> > > > > > > > > > > 
> > > > > > > > > > > Then, Count will look like:
> > > > > > > > > > > 
> > > > > > > > > > > > window | key | value | | 10     | A   |     2 |
> > > > > > > > > > > 
> > > > > > > > > > > The (internal) suppression buffer will contain:
> > > > > > > > > > > 
> > > > > > > > > > > > window | key | value | | 10     | A   |     2 |
> > > > > > > > > > > 
> > > > > > > > > > > The record is still buffered because the window
> > > > > > > > > > > isn't closed yet. Final Count is an empty table:
> > > > > > > > > > > 
> > > > > > > > > > > > window | key | value |
> > > > > > > > > > > 
> > > > > > > > > > > ---------------
> > > > > > > > > > > 
> > > > > > > > > > > Finally, we get a third event: (time=20, key=A,
> > > > > > > > > > > value=...)
> > > > > > > > > > > 
> > > > > > > > > > > Then, Count will look like:
> > > > > > > > > > > 
> > > > > > > > > > > > window | key | value | | 10     | A   |     2 | |
> > > > > > > > > > > 20 | A   |     1 |
> > > > > > > > > > > 
> > > > > > > > > > > The (internal) suppression buffer will contain:
> > > > > > > > > > > 
> > > > > > > > > > > > window | key | value | | 20     | A   |     1 |
> > > > > > > > > > > 
> > > > > > > > > > > Note that window 10 has been flushed out, because
> > > > > > > > > > > it's now closed. And window 20 is buffered because it
> > > > > > > > > > > isn't closed yet. Final Count is now:
> > > > > > > > > > > 
> > > > > > > > > > > > window | key | value | | 10     | A   |     2 |
> > > > > > > > > > > 
> > > > > > > > > > > ---------------
> > > > > > > > > > > 
> > > > > > > > > > > Reading your email, I can't figure out what value
> > > > > > > > > > > there is in querying
> > > > > > > > > the
> > > > > > > > > > > internal suppression buffer, since it only contains
> > > > > > > > > > > exactly the same
> > > > > > > > > value
> > > > > > > > > > > as the upstream table, for each key that is still
> > > > > > > > > > > buffered. But it feels
> > > > > > > > > like
> > > > > > > > > > > you're saying you're trying to do something different
> > > > > > > > > > > than just query
> > > > > > > > > the
> > > > > > > > > > > windowed key and get back the current count?
> > > > > > > > > > > 
> > > > > > > > > > > Thanks, -John
> > > > > > > > > > > 
> > > > > > > > > > > 
> > > > > > > > > > > On Wed, Feb 19, 2020, at 09:49, Dongjin Lee wrote:
> > > > > > > > > > > > Hi John,
> > > > > > > > > > > > 
> > > > > > > > > > > > 'The intermediate state of the suppression' in KIP
> > > > > > > > > > > > does not mean the
> > > > > > > > > > > state
> > > > > > > > > > > > of upstream KTable - sure, the state of the
> > > > > > > > > > > > upstream KTable can be
> > > > > > > > > > > queried
> > > > > > > > > > > > by materializing the operator immediately before
> > > > > > > > > > > > the suppress as you
> > > > > > > > > > > shown.
> > > > > > > > > > > > What I meant in KIP was the final state of the
> > > > > > > > > > > > buffer, which is not
> > > > > > > > > > > emitted
> > > > > > > > > > > > yet. (I agree, the current description may be
> > > > > > > > > > > > confusing; it would be
> > > > > > > > > > > better
> > > > > > > > > > > > to change it with 'the current state of the
> > > > > > > > > > > > suppression' or 'the
> > > > > > > > > results
> > > > > > > > > > > of
> > > > > > > > > > > > the suppression', like the Jira issue
> > > > > > > > > > > > <https://issues.apache.org/jira/browse/KAFKA-8403>
> > > > > > > > > > > > states.)
> > > > > > > > > > > > 
> > > > > > > > > > > > For a little bit more about the motivation, here is
> > > > > > > > > > > > one of my
> > > > > > > > > > > experience: I
> > > > > > > > > > > > had to build a monitoring application which
> > > > > > > > > > > > collects signals from IoT devices (say, a
> > > > > > > > > > > > semiconductor production line.) If the number of
> > > > > > > > > > > collected
> > > > > > > > > > > > signals within the time window is much less than
> > > > > > > > > > > > the expected, there
> > > > > > > > > may
> > > > > > > > > > > be
> > > > > > > > > > > > some problems like network hiccup in the systems.
> > > > > > > > > > > > We wanted to build
> > > > > > > > > the
> > > > > > > > > > > > system in the form of a dashboard, but could not by
> > > > > > > > > > > > lack of
> > > > > > > > > materializing
> > > > > > > > > > > > feature. It was precisely the case of querying only
> > > > > > > > > > > > the final
> > > > > > > > > results of
> > > > > > > > > > > a
> > > > > > > > > > > > windowed aggregation, as the Jira issue
> > > > > > > > > > > > <https://issues.apache.org/jira/browse/KAFKA-8403>
> > > > > > > > > > > > states. We
> > > > > > > > > finally
> > > > > > > > > > > ended
> > > > > > > > > > > > in implementing the system in an email alerting
> > > > > > > > > > > > system like this <
> > > > > > > > > https://www.confluent.io/blog/kafka-streams-take-on-watermarks-an
> > > d-t
> > > riggers/
> > > > > > > > > > > > 
> > > > and had to collect the keys and windows of trouble by hand.
> > > > > > > > > > > > I think these kinds of use cases would be much
> > > > > > > > > > > > common. Should it be described in the KIP much more
> > > > > > > > > > > > in detail?
> > > > > > > > > > > > 
> > > > > > > > > > > > Thanks, Dongjin
> > > > > > > > > > > > 
> > > > > > > > > > > > On Sat, Feb 15, 2020 at 4:43 AM John Roesler
> > > > > > > > > > > > <vvcep...@apache.org>
> > > > > > > > > > > wrote:
> > > > > > > > > > > > > Hi Dongjin,
> > > > > > > > > > > > > 
> > > > > > > > > > > > > Thanks for the KIP!
> > > > > > > > > > > > > 
> > > > > > > > > > > > > Can you explain more about why the internal data
> > > > > > > > > > > > > structures of
> > > > > > > > > > > suppression
> > > > > > > > > > > > > should be queriable? The motivation just says
> > > > > > > > > > > > > that users might
> > > > > > > > > want to
> > > > > > > > > > > do
> > > > > > > > > > > > > it, which seems like it could justify literally
> > > > > > > > > > > > > anything :)
> > > > > > > > > > > > > 
> > > > > > > > > > > > > One design point of Suppression is that if you
> > > > > > > > > > > > > wanted to query the
> > > > > > > > > > > “final
> > > > > > > > > > > > > state”, you can Materialize the suppress itself
> > > > > > > > > > > > > (which is why it
> > > > > > > > > needs
> > > > > > > > > > > the
> > > > > > > > > > > > > variant); if you wanted to query the
> > > > > > > > > > > > > “intermediate state”, you can materialize the
> > > > > > > > > > > > > operator immediately before the suppress.
> > > > > > > > > > > > > 
> > > > > > > > > > > > > Example:
> > > > > > > > > > > > > 
> > > > > > > > > > > > > ...count(Materialized.as(“intermediate”))
> > > > > > > > > > > > > .supress(untilWindowClosed(),
> > > > > > > > > > > > > Materialized.as(“final”))
> > > > > > > > > > > > > 
> > > > > > > > > > > > > I’m not sure what use case would require
> > > > > > > > > > > > > actually fetching from the internal buffers.
> > > > > > > > > > > > > 
> > > > > > > > > > > > > Thanks, John
> > > > > > > > > > > > > 
> > > > > > > > > > > > > 
> > > > > > > > > > > > > On Fri, Feb 14, 2020, at 07:55, Dongjin Lee
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > Hi devs,
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > I'd like to reboot the discussion on KIP-508,
> > > > > > > > > > > > > > which aims to
> > > > > > > > > support a
> > > > > > > > > > > > > > Materialized variant of KTable#suppress. It
> > > > > > > > > > > > > > was initially
> > > > > > > > > submitted
> > > > > > > > > > > > > several
> > > > > > > > > > > > > > months ago but closed by the inactivity.
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > - KIP:
> > > > > > > > > > > > > > 
> > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-508%3A+Make
> > > +Su
> > > ppression+State+Queriable
> > > > - Jira: https://issues.apache.org/jira/browse/KAFKA-8403
> > > > > > > > > > > > > > All kinds of feedback will be greatly
> > > > > > > > > > > > > > appreciated.
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > Best, Dongjin
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > -- *Dongjin Lee*
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > *A hitchhiker in the mathematical world.*
> > > > > > > > > > > > > > *github:
> > > > > > > > > > > > > > <http://goog_969573159/>github.com/dongjinleekr
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > 
> > > <https://github.com/dongjinleekr>linkedin:
> > > > > > > > > > > > > kr.linkedin.com/in/dongjinleekr
> > > > > > > > > > > > > > <https://kr.linkedin.com/in/dongjinleekr>speakerdeck:
> > > speakerdeck.com/dongjin
> > > > > > > > > > > > > > <https://speakerdeck.com/dongjin>*
> > > > > > > > > > > > > > 
> > > > > > > > > > > > 
> > > > > > > > > > > > -- *Dongjin Lee*
> > > > > > > > > > > > 
> > > > > > > > > > > > *A hitchhiker in the mathematical world.* *github:
> > > > > > > > > > > > <http://goog_969573159/>github.com/dongjinleekr
> > > > > > > > > > > > <https://github.com/dongjinleekr>linkedin:
> > > > > > > > > > > kr.linkedin.com/in/dongjinleekr
> > > > > > > > > > > > <https://kr.linkedin.com/in/dongjinleekr>speakerdeck:
> > > speakerdeck.com/dongjin
> > > > > > > > > > > > <https://speakerdeck.com/dongjin>*
> > > > > > > > > > > > 
> > > > > > > > > > 
> > > > > > > > > > -- *Dongjin Lee*
> > > > > > > > > > 
> > > > > > > > > > *A hitchhiker in the mathematical world.* *github:
> > > > > > > > > > <http://goog_969573159/>github.com/dongjinleekr
> > > > > > > > > > <https://github.com/dongjinleekr>linkedin:
> > > > > > > > > kr.linkedin.com/in/dongjinleekr
> > > > > > > > > > <https://kr.linkedin.com/in/dongjinleekr>speakerdeck:
> > > > > > > > > speakerdeck.com/dongjin
> > > > > > > > > > <https://speakerdeck.com/dongjin>*
> > > > > > > > > > 
> > > > > > > > -- *Dongjin Lee*
> > > > > > > > 
> > > > > > > > *A hitchhiker in the mathematical world.* *github:
> > > > > > > > <http://goog_969573159/>github.com/dongjinleekr
> > > > > > > > <https://github.com/dongjinleekr>linkedin:
> > > > > > > > kr.linkedin.com/in/dongjinleekr
> > > > > > > > <https://kr.linkedin.com/in/dongjinleekr>speakerdeck:
> > > > > > > > speakerdeck.com/dongjin <https://speakerdeck.com/dongjin>*
> > > > > > > > 
> > > -----BEGIN PGP SIGNATURE-----
> > > 
> > > iQIzBAEBCgAdFiEEI8mthP+5zxXZZdDSO4miYXKq/OgFAl5kNdkACgkQO4miYXKq
> > > /OjPWw/9EOLnzNMz1UvqmX4P6sLyA++QURhDKRbcuX2CUKHTWzi8A1Zks+CXL8yC
> > > JkrKb1sPSS5yi4QEAKOosEjvK7xr+VydDdWb3MM2IrnBh6kUOw/2CCuL7W+nbf0s
> > > Y7Uq9SJ161izgt4ZJ4OEHqAYK1VfSVszhVIvGCkksBwjrra8wpf7hcprwHguJR9B
> > > 397yMXa2vx/RWZY1Yu8zhhdedVaIcLBEiRUkjt3BlafyyXfGHY1h2XDWuzfuY9pB
> > > 0Uf5Oft3+ifi62T8ZXRLaB3+6qtojFc8hucZ83VYEhM0K010ZJVIItLcKl09gAow
> > > fyLYVwbpihM4qMfFaIoMDtA/mA+K65QgfXS4oMyesEX8aL473PdEYXLipSl2MTfB
> > > +WeEgN4wWq1M1PwzDjuJ1R1MVZGttASXPAkZGEwqJpnW5QMwn1Ofy0dFT/smI5zP
> > > w2aPl6otI4xwbkTOwkXAPbKCaQSB4+ibsPeFOKPTxpkUPAbbyWHusbD4Q26ick+c
> > > NGhWYPEkfQnUvoqmVl34ZB71PY5y5yj3vP+pGoFARTfuZ+bzqYHQ9NNWa+DyRCkn
> > > cnQNLhI8/TWOp8yj+ZH6i1THSONYfu0bDMnmyC8GOuBds932hgGzhfRtmZTFg4j2
> > > 02yVjYQIm65QUbSm6r7lrQLzlJ/OQyVuIoJf6IyxnoX6wxB4IiU=
> > > =St1e
> > > -----END PGP SIGNATURE-----
> > > 
> 
> 

Reply via email to