Hey Matthias,

I’m sorry if the ticket was poorly stated. The ticket is to add a DSL overload 
to pass a Materialized argument to suppress. As a result, the result of the 
suppression would be queriable.

This is unrelated to “persistent buffer” aka “spill-to-disk”.

There was some confusion before about whether this ticket could be implemented 
as “query the buffer”. Maybe it can, but not trivially. The obvious way is just 
to add a new state store which we write the results into just before we 
forward. I.e., it’s exactly like the materialized variant of any stateless 
KTable operation. 

Thanks,
John

On Sat, Mar 7, 2020, at 15:32, Matthias J. Sax wrote:
> -----BEGIN PGP SIGNED MESSAGE-----
> Hash: SHA512
> 
> Thanks for the KIP Dongjin,
> 
> I am still not sure if I can follow, what might also be caused by the
> backing JIRA ticket (maybe John can clarify the intent of the ticket
> as he created it):
> 
> Currently, suppress() only uses an in-memory buffer and my
> understanding of the Jira is, to add the ability to use a persistent
> buffer (ie, spill to disk backed by RocksDB).
> 
> Adding a persistent buffer is completely unrelated to allow querying
> the buffer. In fact, one could query an in-memory buffer, too.
> However, querying the buffer does not really seem to be useful as
> pointed out by John, as you can always query the upstream KTable store.
> 
> Also note that for the emit-on-window-close case the result is deleted
> from the buffer when it is emitted, and thus cannot be queried any longe
> r.
> 
> 
> Can you please clarify if you intend to allow spilling to disk or if
> you intent to enable IQ (even if I don't see why querying make sense,
> as the data is either upstream or deleted). Also, if you want to
> enable IQ, why do we need all those new interfaces? The result of a
> suppress() is a KTable that is the same as any other
> key-value/windowed/sessions store?
> 
> We should also have corresponding Jira tickets for different cases to
> avoid the confusion I am in atm :)
> 
> 
> - -Matthias
> 
> 
> On 2/27/20 8:21 AM, John Roesler wrote:
> > Hi Dongjin,
> >
> > No problem; glad we got it sorted out.
> >
> > Thanks again for picking this up! -John
> >
> > On Wed, Feb 26, 2020, at 09:24, Dongjin Lee wrote:
> >>> I was under the impression that you wanted to expand the scope
> >>> of the KIP
> >> to additionally allow querying the internal buffer, not just the
> >> result. Can you clarify whether you are proposing to allow
> >> querying the state of the internal buffer, the result, or both?
> >>
> >> Sorry for the confusion. As we already talked with, we only need
> >> to query the suppressed output, not the internal buffer. The
> >> current implementation is wrong. After refining the KIP and
> >> implementation accordingly I will notify you - I must be
> >> confused, also.
> >>
> >> Thanks, Dongjin
> >>
> >> On Tue, Feb 25, 2020 at 12:17 AM John Roesler
> >> <vvcep...@apache.org> wrote:
> >>
> >>> Hi Dongjin,
> >>>
> >>> Ah, I think I may have been confused. I 100% agree that we need
> >>> a materialized variant for suppress(). Then, you could do:
> >>> ...suppress(..., Materialized.as(“final-count”))
> >>>
> >>> If that’s your proposal, then we are on the same page.
> >>>
> >>> I was under the impression that you wanted to expand the scope
> >>> of the KIP to additionally allow querying the internal buffer,
> >>> not just the result. Can you clarify whether you are proposing
> >>> to allow querying the state of the internal buffer, the result,
> >>> or both?
> >>>
> >>> Thanks, John
> >>>
> >>> On Thu, Feb 20, 2020, at 08:41, Dongjin Lee wrote:
> >>>> Hi John, Thanks for your kind explanation with an example.
> >>>>
> >>>>> But it feels like you're saying you're trying to do
> >>>>> something different
> >>>> than just query the windowed key and get back the current
> >>>> count?
> >>>>
> >>>> Yes, for example, what if we need to retrieve the (all or
> >>>> range) keys
> >>> with
> >>>> a closed window? In this example, let's imagine we need to
> >>>> retrieve only (key=A, window=10), not (key=A, window=20).
> >>>>
> >>>> Of course, the value accompanied by a flushed key is exactly
> >>>> the same to the one in the upstream KTable; However, if our
> >>>> intention is not pointing out a specific key but retrieving a
> >>>> group of unspecified keys, we stuck
> >>> in
> >>>> trouble - since we can't be sure which key is flushed out
> >>>> beforehand.
> >>>>
> >>>> One workaround would be materializing it with
> >>>> `suppressed.filter(e ->
> >>> true,
> >>>> Materialized.as("final-count"))`. But I think providing a
> >>>> materialized variant for suppress method is better than this
> >>>> workaround.
> >>>>
> >>>> Thanks, Dongjin
> >>>>
> >>>> On Thu, Feb 20, 2020 at 1:26 AM John Roesler
> >>>> <vvcep...@apache.org>
> >>> wrote:
> >>>>
> >>>>> Thanks for the response, Dongjin,
> >>>>>
> >>>>> I'm sorry, but I'm still not following. It seems like the
> >>>>> view you
> >>> would
> >>>>> get on the "current state of the buffer" would always be
> >>>>> equivalent to the view of the upstream table.
> >>>>>
> >>>>> Let me try an example, and maybe you can point out the flaw
> >>>>> in my reasoning.
> >>>>>
> >>>>> Let's say we're doing 10 ms windows with a grace period of
> >>>>> zero. Let's also say we're computing a windowed count, and
> >>>>> that we have a "final results" suppression after the count.
> >>>>> Let's  materialize the count as "Count" and the suppressed
> >>>>> result as "Final Count".
> >>>>>
> >>>>> Suppose we get an input event: (time=10, key=A, value=...)
> >>>>>
> >>>>> Then, Count will look like:
> >>>>>
> >>>>> | window | key | value | | 10     | A   |     1 |
> >>>>>
> >>>>> The (internal) suppression buffer will contain:
> >>>>>
> >>>>> | window | key | value | | 10     | A   |     1 |
> >>>>>
> >>>>> The record is still buffered because the window isn't
> >>>>> closed yet. Final Count is an empty table:
> >>>>>
> >>>>> | window | key | value |
> >>>>>
> >>>>> ---------------
> >>>>>
> >>>>> Now, we get a second event: (time=15, key=A, value=...)
> >>>>>
> >>>>> Then, Count will look like:
> >>>>>
> >>>>> | window | key | value | | 10     | A   |     2 |
> >>>>>
> >>>>> The (internal) suppression buffer will contain:
> >>>>>
> >>>>> | window | key | value | | 10     | A   |     2 |
> >>>>>
> >>>>> The record is still buffered because the window isn't
> >>>>> closed yet. Final Count is an empty table:
> >>>>>
> >>>>> | window | key | value |
> >>>>>
> >>>>>
> >>>>> ---------------
> >>>>>
> >>>>> Finally, we get a third event: (time=20, key=A, value=...)
> >>>>>
> >>>>> Then, Count will look like:
> >>>>>
> >>>>> | window | key | value | | 10     | A   |     2 | | 20
> >>>>> | A   |     1 |
> >>>>>
> >>>>> The (internal) suppression buffer will contain:
> >>>>>
> >>>>> | window | key | value | | 20     | A   |     1 |
> >>>>>
> >>>>> Note that window 10 has been flushed out, because it's now
> >>>>> closed. And window 20 is buffered because it isn't closed
> >>>>> yet. Final Count is now:
> >>>>>
> >>>>> | window | key | value | | 10     | A   |     2 |
> >>>>>
> >>>>>
> >>>>> ---------------
> >>>>>
> >>>>> Reading your email, I can't figure out what value there is
> >>>>> in querying
> >>> the
> >>>>> internal suppression buffer, since it only contains exactly
> >>>>> the same
> >>> value
> >>>>> as the upstream table, for each key that is still buffered.
> >>>>> But it feels
> >>> like
> >>>>> you're saying you're trying to do something different than
> >>>>> just query
> >>> the
> >>>>> windowed key and get back the current count?
> >>>>>
> >>>>> Thanks, -John
> >>>>>
> >>>>>
> >>>>> On Wed, Feb 19, 2020, at 09:49, Dongjin Lee wrote:
> >>>>>> Hi John,
> >>>>>>
> >>>>>> 'The intermediate state of the suppression' in KIP does
> >>>>>> not mean the
> >>>>> state
> >>>>>> of upstream KTable - sure, the state of the upstream
> >>>>>> KTable can be
> >>>>> queried
> >>>>>> by materializing the operator immediately before the
> >>>>>> suppress as you
> >>>>> shown.
> >>>>>> What I meant in KIP was the final state of the buffer,
> >>>>>> which is not
> >>>>> emitted
> >>>>>> yet. (I agree, the current description may be confusing;
> >>>>>> it would be
> >>>>> better
> >>>>>> to change it with 'the current state of the suppression'
> >>>>>> or 'the
> >>> results
> >>>>> of
> >>>>>> the suppression', like the Jira issue
> >>>>>> <https://issues.apache.org/jira/browse/KAFKA-8403>
> >>>>>> states.)
> >>>>>>
> >>>>>> For a little bit more about the motivation, here is one
> >>>>>> of my
> >>>>> experience: I
> >>>>>> had to build a monitoring application which collects
> >>>>>> signals from IoT devices (say, a semiconductor production
> >>>>>> line.) If the number of
> >>>>> collected
> >>>>>> signals within the time window is much less than the
> >>>>>> expected, there
> >>> may
> >>>>> be
> >>>>>> some problems like network hiccup in the systems. We
> >>>>>> wanted to build
> >>> the
> >>>>>> system in the form of a dashboard, but could not by lack
> >>>>>> of
> >>> materializing
> >>>>>> feature. It was precisely the case of querying only the
> >>>>>> final
> >>> results of
> >>>>> a
> >>>>>> windowed aggregation, as the Jira issue
> >>>>>> <https://issues.apache.org/jira/browse/KAFKA-8403>
> >>>>>> states. We
> >>> finally
> >>>>> ended
> >>>>>> in implementing the system in an email alerting system
> >>>>>> like this <
> >>>>>
> >>> https://www.confluent.io/blog/kafka-streams-take-on-watermarks-and-t
> riggers/
> >>>>>>
> >>>>>>
> >>>
> and had to collect the keys and windows of trouble by hand.
> >>>>>>
> >>>>>> I think these kinds of use cases would be much common.
> >>>>>> Should it be described in the KIP much more in detail?
> >>>>>>
> >>>>>> Thanks, Dongjin
> >>>>>>
> >>>>>> On Sat, Feb 15, 2020 at 4:43 AM John Roesler
> >>>>>> <vvcep...@apache.org>
> >>>>> wrote:
> >>>>>>
> >>>>>>> Hi Dongjin,
> >>>>>>>
> >>>>>>> Thanks for the KIP!
> >>>>>>>
> >>>>>>> Can you explain more about why the internal data
> >>>>>>> structures of
> >>>>> suppression
> >>>>>>> should be queriable? The motivation just says that
> >>>>>>> users might
> >>> want to
> >>>>> do
> >>>>>>> it, which seems like it could justify literally
> >>>>>>> anything :)
> >>>>>>>
> >>>>>>> One design point of Suppression is that if you wanted
> >>>>>>> to query the
> >>>>> “final
> >>>>>>> state”, you can Materialize the suppress itself (which
> >>>>>>> is why it
> >>> needs
> >>>>> the
> >>>>>>> variant); if you wanted to query the “intermediate
> >>>>>>> state”, you can materialize the operator immediately
> >>>>>>> before the suppress.
> >>>>>>>
> >>>>>>> Example:
> >>>>>>>
> >>>>>>> ...count(Materialized.as(“intermediate”))
> >>>>>>> .supress(untilWindowClosed(),
> >>>>>>> Materialized.as(“final”))
> >>>>>>>
> >>>>>>> I’m not sure what use case would require actually
> >>>>>>> fetching from the internal buffers.
> >>>>>>>
> >>>>>>> Thanks, John
> >>>>>>>
> >>>>>>>
> >>>>>>> On Fri, Feb 14, 2020, at 07:55, Dongjin Lee wrote:
> >>>>>>>> Hi devs,
> >>>>>>>>
> >>>>>>>> I'd like to reboot the discussion on KIP-508, which
> >>>>>>>> aims to
> >>> support a
> >>>>>>>> Materialized variant of KTable#suppress. It was
> >>>>>>>> initially
> >>> submitted
> >>>>>>> several
> >>>>>>>> months ago but closed by the inactivity.
> >>>>>>>>
> >>>>>>>> - KIP:
> >>>>>>>>
> >>>>>>>
> >>>>>
> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-508%3A+Make+Su
> ppression+State+Queriable
> >>>>>>>>
> >>>
> - - Jira: https://issues.apache.org/jira/browse/KAFKA-8403
> >>>>>>>>
> >>>>>>>> All kinds of feedback will be greatly appreciated.
> >>>>>>>>
> >>>>>>>> Best, Dongjin
> >>>>>>>>
> >>>>>>>> -- *Dongjin Lee*
> >>>>>>>>
> >>>>>>>> *A hitchhiker in the mathematical world.* *github:
> >>>>>>>> <http://goog_969573159/>github.com/dongjinleekr
> >>>>>>>> <https://github.com/dongjinleekr>linkedin:
> >>>>>>> kr.linkedin.com/in/dongjinleekr
> >>>>>>>> <https://kr.linkedin.com/in/dongjinleekr>speakerdeck:
> >>>>>>>
> >>>>>>>>
> speakerdeck.com/dongjin
> >>>>>>>> <https://speakerdeck.com/dongjin>*
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>> -- *Dongjin Lee*
> >>>>>>
> >>>>>> *A hitchhiker in the mathematical world.* *github:
> >>>>>> <http://goog_969573159/>github.com/dongjinleekr
> >>>>>> <https://github.com/dongjinleekr>linkedin:
> >>>>> kr.linkedin.com/in/dongjinleekr
> >>>>>> <https://kr.linkedin.com/in/dongjinleekr>speakerdeck:
> >>>>> speakerdeck.com/dongjin
> >>>>>> <https://speakerdeck.com/dongjin>*
> >>>>>>
> >>>>>
> >>>>
> >>>>
> >>>> -- *Dongjin Lee*
> >>>>
> >>>> *A hitchhiker in the mathematical world.* *github:
> >>>> <http://goog_969573159/>github.com/dongjinleekr
> >>>> <https://github.com/dongjinleekr>linkedin:
> >>> kr.linkedin.com/in/dongjinleekr
> >>>> <https://kr.linkedin.com/in/dongjinleekr>speakerdeck:
> >>> speakerdeck.com/dongjin
> >>>> <https://speakerdeck.com/dongjin>*
> >>>>
> >>>
> >> -- *Dongjin Lee*
> >>
> >> *A hitchhiker in the mathematical world.* *github:
> >> <http://goog_969573159/>github.com/dongjinleekr
> >> <https://github.com/dongjinleekr>linkedin:
> >> kr.linkedin.com/in/dongjinleekr
> >> <https://kr.linkedin.com/in/dongjinleekr>speakerdeck:
> >> speakerdeck.com/dongjin <https://speakerdeck.com/dongjin>*
> >>
> -----BEGIN PGP SIGNATURE-----
> 
> iQIzBAEBCgAdFiEEI8mthP+5zxXZZdDSO4miYXKq/OgFAl5kEvEACgkQO4miYXKq
> /OjDehAAlKY3RiFOHQERVBwr+cmUGMaIkTtTDdFQ7KapJFEDwkEtpDGW0f2TeJYl
> CT9NZDP/SWc9J3or0qx/Re88iUYaB1KCOknWDp24aBHyiXCjN6CsK7aGrIpHD6Qj
> XHNaB/7WVIZQa8QNazQ3y++gM5/CKVduaEM8EwjiFfmG/8KfHvBVzp3XzmVZNZ//
> YWwaefuemuu1fSo/r7Do6j5pMCa1FRKxPDU7E+z5JkA/Jjs6HDL/kQZZnu7z6kKr
> OJhDez//JbcNC1ARP1fM+9H88c3ZBPHe01Q+RTj19i0aZrSd3gEBtGVnEUQaB8rY
> AZYtyhbhe+qrTXLJXYU4NiF9SD87wjuLB0UzsWFh0HJ8/8U1P2ez9ZADyrgbEyaY
> VvVfcTThEpCDwEzbdWI5yUCWyQD338C4+fkf2GRcNRcAYB+9au/Xw7yjIRpsvl/b
> v5nJKedlkAQoVEdYnlsEI2Jb2UOsiJFzfzEUXfKnViR9YSyNaxSLwtJIt5MBcY3Q
> ErgVlCcgdB2b3W+c6eZjMffnFKAlQkmmrO6lClkN2domzKkOo4xramXH8RspaOL0
> E542DlCMR14losJzRQkf2TUfbU5OdjSMEkW9FI5cAJ8qxSE9mzX4aKr4eVPQqvqE
> mou27cWz72gsGiGDEObK3c6TZFnOgN8xX6QZ92/1Ihk2AF/C4SE=
> =n/cE
> -----END PGP SIGNATURE-----
>

Reply via email to