Hi Dongjin,

Thanks for presenting these options. The concern that
Matthias brought up is a very deep problem that afflicts all
operations downstream of windowing operations. It's the same
thing that derailed KIP-300. For the larger context, I have
developed a couple of approaches to resolve this situation,
but I think it makes sense to finish up KIP-478 before
presenting them.

However, I don't think that we need in particular to block
the current proposal on solving that long-running and deep
issue with the DSL. Instead, we should make a top-level
decision whether to:

A: Make Suppress just like all the other KTable operations.
It will have the same pathological behavior that the keyset
is unbounded while the store implementation is only a
KeyValueStore. Again, this exact pathology currently affects
all KTable operations that follow from windowing operations.
For example, it applies to the current workaround that
Dongjin documented in the KIP:
suppress().filter(Materialized<KeyValueStore>). This is
Option 2 that Dongjin presented.

B: Do something different with Suppress in order to side-
step the problem. For example, Suppress does not _need_ to
have a separate state store at all. If we just give people a
switch to make the operation queriable, we can implement a
ReadOnlyKeyValueStore interface by querying the "priorValue"
of the buffer first and then querying the upstream
ValueGetter. This broad category of "do something different
with Suppress" encompases Option 1 and Option 3 that Dongjin
presented.


Speaking personally, I think Choice A would be the most
obvious and least weird choice, but it presents a serious
risk of escalating the severity of the problem of unbounded
state. This is currently a risk that we're aware of, but has
not yet become a big problem in practice. As Matthias
pointed out, Suppress is far more likely to be used
downstream of windowed tables than other operations, so
having a Materialized<KVStore> overload has the significant
risk of getting people into a bad state. Note, broadly
advertising the workaround from the KIP would have the exact
same impact, so we should be careful about recommending it.

Although it's not great to have "special snowflakes" in the
API, Choice B does seem safer in the short term. We would
basically be proposing a temporary API to make the
suppressed view queriable without a Materialized argument.
Then, once we fix the main KIP-300 problem, we would look at
converging Suppress with the rest of the KTable
materialization APIs.

WDYT?
Thanks,
-John


On Wed, 2020-09-16 at 00:01 +0900, Dongjin Lee wrote:
> Hi Matthias,
> 
> Thank you very much for the detailed feedback. Here are my opinions:
> 
> > Because there is no final result for non-windowed KTables, it seems that
> this new feature only make sense for the windowed-aggregation case?
> 
> I think a little bit different. Of course, for windowed KTable, this
> feature provides the final state; for non-windowed KTables, it provides a
> view to the records received more than the predefined waiting time ago -
> excluding the records that are waiting for more events.
> 
> > Thus, the signature of `Materialized` should take a `WindowStore` instead
> of a `KeyValueStore`?
> 
> I reviewed the implementation following your comments and found the
> following:
> 
> 1. `Materialized` instance includes the following: KeySerde, ValueSerde,
> StoreSupplier, and Queriable Name.
> 2. The other `Materialized` method variants in `KTable` are making use of
> KeySerde, ValueSerde, and Queriable Name only. (That is, StoreSupplier is
> ignored.)
> 3. `KTable#suppress(Suppressed, Materialized)` uses the Queriable Name
> only. StoreSupplier is also ignored.
> 
> So, we have three choices for the method signature:
> 
> 1. `KTable#suppress(Suppressed, String)` (i.e., passing the Queriable Name
> only):
> 
>   This is the simplest; however, it is inconsistent with the other
> Materialized variant methods.
> 
> 2. `KTable#suppress(Suppressed, Materialized<K, V, KeyValueStore>)` (i.e.,
> current proposal)
> 
>   This approach is harmless at this point, for StoreSupplier is ignored;
> However, since suppression can be used to both of `KeyValueStore` and
> `WindowStore`, this approach is not only weird but also leaving some
> potential risk to the future. (On second thoughts, I agree, this API design
> is bad and dangerous.)
> 
> 3. `KTable#suppress(Suppressed, Materialized<K, V, StateStore>)`
> 
>   This approach embraces both of `KeyValueStore` and `WindowStore` cases.
> Since the concrete class type of `Suppressed` instance differs for the
> `StateStore`[^1], it seems like we can validate the arguments on the method
> call. (e.g., throw `IllegalArgumentException` if when `Suppressed` instance
> on `KeyValueStore` is given with `Materialized` instance of `WindowStore`.)
> This approach not only breaks the API consistency but also guards from a
> miss-usage of the API.
> 
> How do you think? I am now making a try on the third approach.
> 
> Thanks,
> Dongjin
> 
> [^1]: `SuppressedInternal` for `KeyValueStore` and
> `FinalResultsSuppressionBuilder` for `WindowStore`.
> 
> On Sat, Sep 12, 2020 at 3:29 AM Matthias J. Sax <mj...@apache.org> wrote:
> 
> > Thanks for updating the KIP.
> > 
> > I think there is still one open question. `suppress()` can be used on
> > non-windowed KTable for rate control, as well as on a windowed-KTable
> > (also for rate control, but actually mainly) for only emitting the final
> > result of a windowed aggregation. For the non-windowed case, we use a
> > KeyValueStore while for the windowed cases, we use a WindowStore.
> > 
> > Because there is no final result for non-windowed KTables, it seems that
> > this new feature only make sense for the windowed-aggregation case?
> > Thus, the signature of `Materialized` should take a `WindowStore`
> > instead of a `KeyValueStore`?
> > 
> > If that's correct, I am wondering:
> > 
> >  - Can we guard from a miss-usage of the API if the upstream KTable is
> > not windowed (or maybe it's not necessary to guard)?
> >  - Can we actually implement it? We had issues with regard to KIP-300 to
> > materialize windowed-KTables?
> > 
> > Would be worth to clarify upfront. Maybe, we even need a POC
> > implementation to verify that it works?
> > 
> > 
> > -Matthias
> > 
> > 
> > On 9/11/20 12:26 AM, Dongjin Lee wrote:
> > > Hi All,
> > > 
> > > Here is the voting thread:
> > > 
> > 
> > https://lists.apache.org/thread.html/r5653bf2dafbb27b247bf20dbe6f070c151b3823d96c9c9ca94183e20%40%3Cdev.kafka.apache.org%3E
> > > Thanks,
> > > Dongjin
> > > 
> > > On Fri, Sep 11, 2020 at 4:23 PM Dongjin Lee <dong...@apache.org> wrote:
> > > 
> > > > Hi John,
> > > > 
> > > > Thanks for the feedback. I will open the Vote thread now.
> > > > 
> > > > Best,
> > > > Dongjin
> > > > 
> > > > On Fri, Sep 11, 2020 at 2:00 AM John Roesler <vvcep...@apache.org>
> > wrote:
> > > > > Hi Dongjin,
> > > > > 
> > > > > Sorry for the delay. I'm glad you're still pushing this
> > > > > forward. It would be nice to get this in to the 2.7 release.
> > > > > 
> > > > > I just took another look at the KIP, and it looks good to
> > > > > me!
> > > > > 
> > > > > I think this is ready for a vote.
> > > > > 
> > > > > Thanks,
> > > > > -John
> > > > > 
> > > > > On Wed, 2020-08-05 at 22:04 +0900, Dongjin Lee wrote:
> > > > > > Hi All,
> > > > > > 
> > > > > > I updated the KIP
> > > > > > <
> > 
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-508%3A+Make+Suppression+State+Queriable
> > > > > > and the implementation, following the discussion here.
> > > > > > 
> > > > > > You must be working hard preparing the release of 2.6.0, so please
> > have
> > > > > a
> > > > > > look after your work is done.
> > > > > > 
> > > > > > Thanks,
> > > > > > Dongjin
> > > > > > 
> > > > > > On Sun, Mar 8, 2020 at 12:20 PM John Roesler <vvcep...@apache.org>
> > > > > wrote:
> > > > > > > Thanks Matthias,
> > > > > > > 
> > > > > > > Good idea. I've changed the ticket name and added a note
> > > > > > > clarifying that this ticket is not the same as
> > > > > > > https://issues.apache.org/jira/browse/KAFKA-7224
> > > > > > > 
> > > > > > > Incidentally, I learned that I never documented my reasons
> > > > > > > for abandoning my work on KAFKA-7224 ! I've now updated
> > > > > > > that ticket, too, so your question had an unexpected side-benefit.
> > > > > > > 
> > > > > > > Thanks,
> > > > > > > -John
> > > > > > > 
> > > > > > > On Sat, Mar 7, 2020, at 18:01, Matthias J. Sax wrote:
> > > Thanks for clarification.
> > > 
> > > Can you maybe update the Jira ticket? Do we have a ticket for
> > > spill-to-disk? Maybe link to it and explain that it's two different
> > > things? Maybe even rename the ticket to something more clear, ie,
> > > "make suppress result queryable" or simliar?
> > > 
> > > 
> > > -Matthias
> > > 
> > > On 3/7/20 1:58 PM, John Roesler wrote:
> > > > > > > > > > Hey Matthias,
> > > > > > > > > > 
> > > > > > > > > > I’m sorry if the ticket was poorly stated. The ticket is to 
> > > > > > > > > > add a
> > > DSL overload to pass a Materialized argument to suppress. As a
> > > > > > result,
> > > the result of the suppression would be queriable.
> > > > > > > > > > This is unrelated to “persistent buffer” aka 
> > > > > > > > > > “spill-to-disk”.
> > > > > > > > > > 
> > > > > > > > > > There was some confusion before about whether this ticket 
> > > > > > > > > > could be
> > > implemented as “query the buffer”. Maybe it can, but not trivially.
> > > The obvious way is just to add a new state store which we write the
> > > results into just before we forward. I.e., it’s exactly like the
> > > materialized variant of any stateless KTable operation.
> > > > > > > > > > Thanks, John
> > > > > > > > > > 
> > > > > > > > > > On Sat, Mar 7, 2020, at 15:32, Matthias J. Sax wrote: 
> > > > > > > > > > Thanks for
> > > > > > > > > > the KIP Dongjin,
> > > > > > > > > > 
> > > > > > > > > > I am still not sure if I can follow, what might also be 
> > > > > > > > > > caused by
> > > > > > > > > > the backing JIRA ticket (maybe John can clarify the intent 
> > > > > > > > > > of the
> > > > > > > > > > ticket as he created it):
> > > > > > > > > > 
> > > > > > > > > > Currently, suppress() only uses an in-memory buffer and my
> > > > > > > > > > understanding of the Jira is, to add the ability to use a
> > > > > > > > > > persistent buffer (ie, spill to disk backed by RocksDB).
> > > > > > > > > > 
> > > > > > > > > > Adding a persistent buffer is completely unrelated to allow
> > > > > > > > > > querying the buffer. In fact, one could query an in-memory 
> > > > > > > > > > buffer,
> > > > > > > > > > too. However, querying the buffer does not really seem to be
> > > > > > useful
> > > > > > > > > > as pointed out by John, as you can always query the upstream
> > > > > > KTable
> > > > > > > > > > store.
> > > > > > > > > > 
> > > > > > > > > > Also note that for the emit-on-window-close case the result 
> > > > > > > > > > is
> > > > > > > > > > deleted from the buffer when it is emitted, and thus cannot 
> > > > > > > > > > be
> > > > > > > > > > queried any longe r.
> > > > > > > > > > 
> > > > > > > > > > 
> > > > > > > > > > Can you please clarify if you intend to allow spilling to 
> > > > > > > > > > disk or
> > > > > > > > > > if you intent to enable IQ (even if I don't see why 
> > > > > > > > > > querying make
> > > > > > > > > > sense, as the data is either upstream or deleted). Also, if 
> > > > > > > > > > you
> > > > > > > > > > want to enable IQ, why do we need all those new interfaces? 
> > > > > > > > > > The
> > > > > > > > > > result of a suppress() is a KTable that is the same as any 
> > > > > > > > > > other
> > > > > > > > > > key-value/windowed/sessions store?
> > > > > > > > > > 
> > > > > > > > > > We should also have corresponding Jira tickets for 
> > > > > > > > > > different cases
> > > > > > > > > > to avoid the confusion I am in atm :)
> > > > > > > > > > 
> > > > > > > > > > 
> > > > > > > > > > -Matthias
> > > > > > > > > > 
> > > > > > > > > > 
> > > > > > > > > > On 2/27/20 8:21 AM, John Roesler wrote:
> > > > > > > > > > > > > Hi Dongjin,
> > > > > > > > > > > > > 
> > > > > > > > > > > > > No problem; glad we got it sorted out.
> > > > > > > > > > > > > 
> > > > > > > > > > > > > Thanks again for picking this up! -John
> > > > > > > > > > > > > 
> > > > > > > > > > > > > On Wed, Feb 26, 2020, at 09:24, Dongjin Lee wrote:
> > > > > > > > > > > > > > > I was under the impression that you wanted to 
> > > > > > > > > > > > > > > expand the
> > > > > > > > > > > > > > > scope of the KIP
> > > > > > > > > > > > > > to additionally allow querying the internal buffer, 
> > > > > > > > > > > > > > not
> > > > > > > > > > > > > > just the result. Can you clarify whether you are 
> > > > > > > > > > > > > > proposing
> > > > > > > > > > > > > > to allow querying the state of the internal buffer, 
> > > > > > > > > > > > > > the
> > > > > > > > > > > > > > result, or both?
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > Sorry for the confusion. As we already talked with, 
> > > > > > > > > > > > > > we
> > > > > > only
> > > > > > > > > > > > > > need to query the suppressed output, not the 
> > > > > > > > > > > > > > internal
> > > > > > > > > > > > > > buffer. The current implementation is wrong. After
> > > > > > refining
> > > > > > > > > > > > > > the KIP and implementation accordingly I will 
> > > > > > > > > > > > > > notify you -
> > > > > > > > > > > > > > I must be confused, also.
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > Thanks, Dongjin
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > On Tue, Feb 25, 2020 at 12:17 AM John Roesler
> > > > > > > > > > > > > > <vvcep...@apache.org> wrote:
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > Hi Dongjin,
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > Ah, I think I may have been confused. I 100% 
> > > > > > > > > > > > > > > agree that
> > > > > > > > > > > > > > > we need a materialized variant for suppress(). 
> > > > > > > > > > > > > > > Then, you
> > > > > > > > > > > > > > > could do: ...suppress(...,
> > > > > > > > > > > > > > > Materialized.as(“final-count”))
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > If that’s your proposal, then we are on the same 
> > > > > > > > > > > > > > > page.
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > I was under the impression that you wanted to 
> > > > > > > > > > > > > > > expand the
> > > > > > > > > > > > > > > scope of the KIP to additionally allow querying 
> > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > internal buffer, not just the result. Can you 
> > > > > > > > > > > > > > > clarify
> > > > > > > > > > > > > > > whether you are proposing to allow querying the 
> > > > > > > > > > > > > > > state of
> > > > > > > > > > > > > > > the internal buffer, the result, or both?
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > Thanks, John
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > On Thu, Feb 20, 2020, at 08:41, Dongjin Lee wrote:
> > > > > > > > > > > > > > > > Hi John, Thanks for your kind explanation with 
> > > > > > > > > > > > > > > > an
> > > > > > > > > > > > > > > > example.
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > But it feels like you're saying you're trying 
> > > > > > > > > > > > > > > > > to do
> > > > > > > > > > > > > > > > > something different
> > > > > > > > > > > > > > > > than just query the windowed key and get back 
> > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > current count?
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > Yes, for example, what if we need to retrieve 
> > > > > > > > > > > > > > > > the (all
> > > > > > > > > > > > > > > > or range) keys
> > > > > > > > > > > > > > > with
> > > > > > > > > > > > > > > > a closed window? In this example, let's imagine 
> > > > > > > > > > > > > > > > we
> > > > > > need
> > > > > > > > > > > > > > > > to retrieve only (key=A, window=10), not (key=A,
> > > > > > > > > > > > > > > > window=20).
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > Of course, the value accompanied by a flushed 
> > > > > > > > > > > > > > > > key is
> > > > > > > > > > > > > > > > exactly the same to the one in the upstream 
> > > > > > > > > > > > > > > > KTable;
> > > > > > > > > > > > > > > > However, if our intention is not pointing out a
> > > > > > > > > > > > > > > > specific key but retrieving a group of 
> > > > > > > > > > > > > > > > unspecified
> > > > > > > > > > > > > > > > keys, we stuck
> > > > > > > > > > > > > > > in
> > > > > > > > > > > > > > > > trouble - since we can't be sure which key is 
> > > > > > > > > > > > > > > > flushed
> > > > > > > > > > > > > > > > out beforehand.
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > One workaround would be materializing it with
> > > > > > > > > > > > > > > > `suppressed.filter(e ->
> > > > > > > > > > > > > > > true,
> > > > > > > > > > > > > > > > Materialized.as("final-count"))`. But I think
> > > > > > providing
> > > > > > > > > > > > > > > > a materialized variant for suppress method is 
> > > > > > > > > > > > > > > > better
> > > > > > > > > > > > > > > > than this workaround.
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > Thanks, Dongjin
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > On Thu, Feb 20, 2020 at 1:26 AM John Roesler
> > > > > > > > > > > > > > > > <vvcep...@apache.org>
> > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > Thanks for the response, Dongjin,
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > I'm sorry, but I'm still not following. It 
> > > > > > > > > > > > > > > > > seems
> > > > > > like
> > > > > > > > > > > > > > > > > the view you
> > > > > > > > > > > > > > > would
> > > > > > > > > > > > > > > > > get on the "current state of the buffer" would
> > > > > > always
> > > > > > > > > > > > > > > > > be equivalent to the view of the upstream 
> > > > > > > > > > > > > > > > > table.
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > Let me try an example, and maybe you can 
> > > > > > > > > > > > > > > > > point out
> > > > > > > > > > > > > > > > > the flaw in my reasoning.
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > Let's say we're doing 10 ms windows with a 
> > > > > > > > > > > > > > > > > grace
> > > > > > > > > > > > > > > > > period of zero. Let's also say we're 
> > > > > > > > > > > > > > > > > computing a
> > > > > > > > > > > > > > > > > windowed count, and that we have a "final 
> > > > > > > > > > > > > > > > > results"
> > > > > > > > > > > > > > > > > suppression after the count. Let's  
> > > > > > > > > > > > > > > > > materialize the
> > > > > > > > > > > > > > > > > count as "Count" and the suppressed result as 
> > > > > > > > > > > > > > > > > "Final
> > > > > > > > > > > > > > > > > Count".
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > Suppose we get an input event: (time=10, 
> > > > > > > > > > > > > > > > > key=A,
> > > > > > > > > > > > > > > > > value=...)
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > Then, Count will look like:
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > window | key | value | | 10     | A   |     
> > > > > > > > > > > > > > > > > > 1 |
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > The (internal) suppression buffer will 
> > > > > > > > > > > > > > > > > contain:
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > window | key | value | | 10     | A   |     
> > > > > > > > > > > > > > > > > > 1 |
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > The record is still buffered because the 
> > > > > > > > > > > > > > > > > window
> > > > > > > > > > > > > > > > > isn't closed yet. Final Count is an empty 
> > > > > > > > > > > > > > > > > table:
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > window | key | value |
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > ---------------
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > Now, we get a second event: (time=15, key=A,
> > > > > > > > > > > > > > > > > value=...)
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > Then, Count will look like:
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > window | key | value | | 10     | A   |     
> > > > > > > > > > > > > > > > > > 2 |
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > The (internal) suppression buffer will 
> > > > > > > > > > > > > > > > > contain:
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > window | key | value | | 10     | A   |     
> > > > > > > > > > > > > > > > > > 2 |
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > The record is still buffered because the 
> > > > > > > > > > > > > > > > > window
> > > > > > > > > > > > > > > > > isn't closed yet. Final Count is an empty 
> > > > > > > > > > > > > > > > > table:
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > window | key | value |
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > ---------------
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > Finally, we get a third event: (time=20, 
> > > > > > > > > > > > > > > > > key=A,
> > > > > > > > > > > > > > > > > value=...)
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > Then, Count will look like:
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > window | key | value | | 10     | A   |     
> > > > > > > > > > > > > > > > > > 2 | |
> > > > > > > > > > > > > > > > > 20 | A   |     1 |
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > The (internal) suppression buffer will 
> > > > > > > > > > > > > > > > > contain:
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > window | key | value | | 20     | A   |     
> > > > > > > > > > > > > > > > > > 1 |
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > Note that window 10 has been flushed out, 
> > > > > > > > > > > > > > > > > because
> > > > > > > > > > > > > > > > > it's now closed. And window 20 is buffered 
> > > > > > > > > > > > > > > > > because
> > > > > > it
> > > > > > > > > > > > > > > > > isn't closed yet. Final Count is now:
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > window | key | value | | 10     | A   |     
> > > > > > > > > > > > > > > > > > 2 |
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > ---------------
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > Reading your email, I can't figure out what 
> > > > > > > > > > > > > > > > > value
> > > > > > > > > > > > > > > > > there is in querying
> > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > internal suppression buffer, since it only 
> > > > > > > > > > > > > > > > > contains
> > > > > > > > > > > > > > > > > exactly the same
> > > > > > > > > > > > > > > value
> > > > > > > > > > > > > > > > > as the upstream table, for each key that is 
> > > > > > > > > > > > > > > > > still
> > > > > > > > > > > > > > > > > buffered. But it feels
> > > > > > > > > > > > > > > like
> > > > > > > > > > > > > > > > > you're saying you're trying to do something
> > > > > > different
> > > > > > > > > > > > > > > > > than just query
> > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > windowed key and get back the current count?
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > Thanks, -John
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > On Wed, Feb 19, 2020, at 09:49, Dongjin Lee 
> > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > Hi John,
> > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > 'The intermediate state of the suppression' 
> > > > > > > > > > > > > > > > > > in KIP
> > > > > > > > > > > > > > > > > > does not mean the
> > > > > > > > > > > > > > > > > state
> > > > > > > > > > > > > > > > > > of upstream KTable - sure, the state of the
> > > > > > > > > > > > > > > > > > upstream KTable can be
> > > > > > > > > > > > > > > > > queried
> > > > > > > > > > > > > > > > > > by materializing the operator immediately 
> > > > > > > > > > > > > > > > > > before
> > > > > > > > > > > > > > > > > > the suppress as you
> > > > > > > > > > > > > > > > > shown.
> > > > > > > > > > > > > > > > > > What I meant in KIP was the final state of 
> > > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > buffer, which is not
> > > > > > > > > > > > > > > > > emitted
> > > > > > > > > > > > > > > > > > yet. (I agree, the current description may 
> > > > > > > > > > > > > > > > > > be
> > > > > > > > > > > > > > > > > > confusing; it would be
> > > > > > > > > > > > > > > > > better
> > > > > > > > > > > > > > > > > > to change it with 'the current state of the
> > > > > > > > > > > > > > > > > > suppression' or 'the
> > > > > > > > > > > > > > > results
> > > > > > > > > > > > > > > > > of
> > > > > > > > > > > > > > > > > > the suppression', like the Jira issue
> > > > > > > > > > > > > > > > > > <https://issues.apache.org/jira/browse/KAFKA-8403
> > > > > > > > > > > > > > > > > > states.)
> > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > For a little bit more about the motivation, 
> > > > > > > > > > > > > > > > > > here
> > > > > > is
> > > > > > > > > > > > > > > > > > one of my
> > > > > > > > > > > > > > > > > experience: I
> > > > > > > > > > > > > > > > > > had to build a monitoring application which
> > > > > > > > > > > > > > > > > > collects signals from IoT devices (say, a
> > > > > > > > > > > > > > > > > > semiconductor production line.) If the 
> > > > > > > > > > > > > > > > > > number of
> > > > > > > > > > > > > > > > > collected
> > > > > > > > > > > > > > > > > > signals within the time window is much less 
> > > > > > > > > > > > > > > > > > than
> > > > > > > > > > > > > > > > > > the expected, there
> > > > > > > > > > > > > > > may
> > > > > > > > > > > > > > > > > be
> > > > > > > > > > > > > > > > > > some problems like network hiccup in the 
> > > > > > > > > > > > > > > > > > systems.
> > > > > > > > > > > > > > > > > > We wanted to build
> > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > system in the form of a dashboard, but 
> > > > > > > > > > > > > > > > > > could not
> > > > > > by
> > > > > > > > > > > > > > > > > > lack of
> > > > > > > > > > > > > > > materializing
> > > > > > > > > > > > > > > > > > feature. It was precisely the case of 
> > > > > > > > > > > > > > > > > > querying
> > > > > > only
> > > > > > > > > > > > > > > > > > the final
> > > > > > > > > > > > > > > results of
> > > > > > > > > > > > > > > > > a
> > > > > > > > > > > > > > > > > > windowed aggregation, as the Jira issue
> > > > > > > > > > > > > > > > > > <https://issues.apache.org/jira/browse/KAFKA-8403
> > > > > > > > > > > > > > > > > > states. We
> > > > > > > > > > > > > > > finally
> > > > > > > > > > > > > > > > > ended
> > > > > > > > > > > > > > > > > > in implementing the system in an email 
> > > > > > > > > > > > > > > > > > alerting
> > > > > > > > > > > > > > > > > > system like this <
> > > > > > https://www.confluent.io/blog/kafka-streams-take-on-watermarks-an
> > > d-t
> > > riggers/
> > > > > > > > > > and had to collect the keys and windows of trouble by hand.
> > > > > > > > > > > > > > > > > > I think these kinds of use cases would be 
> > > > > > > > > > > > > > > > > > much
> > > > > > > > > > > > > > > > > > common. Should it be described in the KIP 
> > > > > > > > > > > > > > > > > > much
> > > > > > more
> > > > > > > > > > > > > > > > > > in detail?
> > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > Thanks, Dongjin
> > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > On Sat, Feb 15, 2020 at 4:43 AM John Roesler
> > > > > > > > > > > > > > > > > > <vvcep...@apache.org>
> > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > Hi Dongjin,
> > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > Thanks for the KIP!
> > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > Can you explain more about why the 
> > > > > > > > > > > > > > > > > > > internal data
> > > > > > > > > > > > > > > > > > > structures of
> > > > > > > > > > > > > > > > > suppression
> > > > > > > > > > > > > > > > > > > should be queriable? The motivation just 
> > > > > > > > > > > > > > > > > > > says
> > > > > > > > > > > > > > > > > > > that users might
> > > > > > > > > > > > > > > want to
> > > > > > > > > > > > > > > > > do
> > > > > > > > > > > > > > > > > > > it, which seems like it could justify 
> > > > > > > > > > > > > > > > > > > literally
> > > > > > > > > > > > > > > > > > > anything :)
> > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > One design point of Suppression is that 
> > > > > > > > > > > > > > > > > > > if you
> > > > > > > > > > > > > > > > > > > wanted to query the
> > > > > > > > > > > > > > > > > “final
> > > > > > > > > > > > > > > > > > > state”, you can Materialize the suppress 
> > > > > > > > > > > > > > > > > > > itself
> > > > > > > > > > > > > > > > > > > (which is why it
> > > > > > > > > > > > > > > needs
> > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > variant); if you wanted to query the
> > > > > > > > > > > > > > > > > > > “intermediate state”, you can materialize 
> > > > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > operator immediately before the suppress.
> > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > Example:
> > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > ...count(Materialized.as(“intermediate”))
> > > > > > > > > > > > > > > > > > > .supress(untilWindowClosed(),
> > > > > > > > > > > > > > > > > > > Materialized.as(“final”))
> > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > I’m not sure what use case would require
> > > > > > > > > > > > > > > > > > > actually fetching from the internal 
> > > > > > > > > > > > > > > > > > > buffers.
> > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > Thanks, John
> > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > On Fri, Feb 14, 2020, at 07:55, Dongjin 
> > > > > > > > > > > > > > > > > > > Lee
> > > > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > Hi devs,
> > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > I'd like to reboot the discussion on 
> > > > > > > > > > > > > > > > > > > > KIP-508,
> > > > > > > > > > > > > > > > > > > > which aims to
> > > > > > > > > > > > > > > support a
> > > > > > > > > > > > > > > > > > > > Materialized variant of 
> > > > > > > > > > > > > > > > > > > > KTable#suppress. It
> > > > > > > > > > > > > > > > > > > > was initially
> > > > > > > > > > > > > > > submitted
> > > > > > > > > > > > > > > > > > > several
> > > > > > > > > > > > > > > > > > > > months ago but closed by the inactivity.
> > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > - KIP:
> > > > > > > > > > > > > > > > > > > > 
> > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-508%3A+Make
> > > +Su
> > > ppression+State+Queriable
> > > > > > > > > > - Jira: https://issues.apache.org/jira/browse/KAFKA-8403
> > > > > > > > > > > > > > > > > > > > All kinds of feedback will be greatly
> > > > > > > > > > > > > > > > > > > > appreciated.
> > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > Best, Dongjin
> > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > -- *Dongjin Lee*
> > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > *A hitchhiker in the mathematical 
> > > > > > > > > > > > > > > > > > > > world.*
> > > > > > > > > > > > > > > > > > > > *github:
> > > > > > > > > > > > > > > > > > > > <http://goog_969573159/>
> > > > > > github.com/dongjinleekr
> > > > > > > > > > > > > > > > > > > > 
> > > <https://github.com/dongjinleekr>linkedin:
> > > > > > > > > > > > > > > > > > > kr.linkedin.com/in/dongjinleekr
> > > > > > > > > > > > > > > > > > > > <https://kr.linkedin.com/in/dongjinleekr
> > > > > > > speakerdeck:
> > > speakerdeck.com/dongjin
> > > > > > > > > > > > > > > > > > > > <https://speakerdeck.com/dongjin>*
> > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > -- *Dongjin Lee*
> > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > *A hitchhiker in the mathematical world.* 
> > > > > > > > > > > > > > > > > > *github:
> > > > > > > > > > > > > > > > > > <http://goog_969573159/>github.com/dongjinleekr
> > > > > > > > > > > > > > > > > > <https://github.com/dongjinleekr>linkedin:
> > > > > > > > > > > > > > > > > kr.linkedin.com/in/dongjinleekr
> > > > > > > > > > > > > > > > > > <https://kr.linkedin.com/in/dongjinleekr
> > > > > > > speakerdeck:
> > > speakerdeck.com/dongjin
> > > > > > > > > > > > > > > > > > <https://speakerdeck.com/dongjin>*
> > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > -- *Dongjin Lee*
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > *A hitchhiker in the mathematical world.* 
> > > > > > > > > > > > > > > > *github:
> > > > > > > > > > > > > > > > <http://goog_969573159/>github.com/dongjinleekr
> > > > > > > > > > > > > > > > <https://github.com/dongjinleekr>linkedin:
> > > > > > > > > > > > > > > kr.linkedin.com/in/dongjinleekr
> > > > > > > > > > > > > > > > <https://kr.linkedin.com/in/dongjinleekr>speakerdeck:
> > > > > > > > > > > > > > > speakerdeck.com/dongjin
> > > > > > > > > > > > > > > > <https://speakerdeck.com/dongjin>*
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > -- *Dongjin Lee*
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > *A hitchhiker in the mathematical world.* *github:
> > > > > > > > > > > > > > <http://goog_969573159/>github.com/dongjinleekr
> > > > > > > > > > > > > > <https://github.com/dongjinleekr>linkedin:
> > > > > > > > > > > > > > kr.linkedin.com/in/dongjinleekr
> > > > > > > > > > > > > > <https://kr.linkedin.com/in/dongjinleekr>speakerdeck:
> > > > > > > > > > > > > > speakerdeck.com/dongjin 
> > > > > > > > > > > > > > <https://speakerdeck.com/dongjin
> > > > > > > *
> > > > 
> > > > --
> > > > *Dongjin Lee*
> > > > 
> > > > *A hitchhiker in the mathematical world.*
> > > > 
> > > > 
> > > > 
> > > > 
> > > > *github:  <http://goog_969573159/>github.com/dongjinleekr
> > > > <https://github.com/dongjinleekr>keybase:
> > https://keybase.io/dongjinleekr
> > > > <https://keybase.io/dongjinleekr>linkedin:
> > kr.linkedin.com/in/dongjinleekr
> > > > <https://kr.linkedin.com/in/dongjinleekr>speakerdeck:
> > speakerdeck.com/dongjin
> > > > <https://speakerdeck.com/dongjin>*
> > > > 

Reply via email to