Thanks for the explanation, that does make sense. I have some questions on operations, but I'll just wait for the PR and tests.
Thanks, Bill On Wed, Jun 27, 2018 at 8:14 PM John Roesler <j...@confluent.io> wrote: > Hi Bill, > > Thanks for the review! > > Your question is very much applicable to the KIP and not at all an > implementation detail. Thanks for bringing it up. > > I'm proposing not to change the existing caches and configurations at all > (for now). > > Imagine you have a topology like this: > commit.interval.ms = 100 > > (ktable1 (cached)) -> (suppress emitAfter 200) > > The first ktable (ktable1) will respect the commit interval and buffer > events for 100ms before logging, storing, or forwarding them (IIRC). > Therefore, the second ktable (suppress) will only see the events at a rate > of once per 100ms. It will apply its own buffering, and emit once per 200ms > This case is pretty trivial because the suppress time is a multiple of the > commit interval. > > When it's not an integer multiple, you'll get behavior like in this marble > diagram: > > > <-(k:1)--(k:2)--(k:3)--(k:4)--(k:5)--(k:6)-> > > [ KTable caching with commit interval = 2 ] > > <--------(k:2)---------(k:4)---------(k:6)-> > > [ suppress with emitAfter = 3 ] > > <---------------(k:2)----------------(k:6)-> > > > If this behavior isn't desired (for example, if you wanted to emit (k:3) at > time 3, I'd recommend setting the "cache.max.bytes.buffering" to 0 or > modifying the topology to disable caching. Then, the behavior is more > simply determined just by the suppress operator. > > Does that seem right to you? > > > Regarding the changelogs, because the suppression operator hangs onto > events for a while, it will need its own changelog. The changelog > should represent the current state of the buffer at all times. So when the > suppress operator sees (k:2), for example, it will log (k:2). When it > later gets to time 3, it's time to emit (k:2) downstream. Because k is no > longer buffered, the suppress operator will log (k:null). Thus, when > recovering, > it can rebuild the buffer by reading its changelog. > > What do you think about this? > > Thanks, > -John > > > > On Wed, Jun 27, 2018 at 4:16 PM Bill Bejeck <bbej...@gmail.com> wrote: > > > Hi John, thanks for the KIP. > > > > Early on in the KIP, you mention the current approaches for controlling > the > > rate of downstream records from a KTable, cache size configuration and > > commit time. > > > > Will these configuration parameters still be in effect for tables that > > don't use suppression? For tables taking advantage of suppression, will > > these configurations have no impact? > > This last question may be to implementation specific but if the requested > > suppression time is longer than the specified commit time, will the > latest > > record in the suppression buffer get stored in a changelog? > > > > Thanks, > > Bill > > > > On Wed, Jun 27, 2018 at 3:04 PM John Roesler <j...@confluent.io> wrote: > > > > > Thanks for the feedback, Matthias, > > > > > > It seems like in straightforward relational processing cases, it would > > not > > > make sense to bound the lateness of KTables. In general, it seems > better > > to > > > have "guard rails" in place that make it easier to write sensible > > programs > > > than insensible ones. > > > > > > But I'm still going to argue in favor of keeping it for all KTables ;) > > > > > > 1. I believe it is simpler to understand the operator if it has one > > uniform > > > definition, regardless of context. It's well defined and intuitive what > > > will happen when you use late-event suppression on a KTable, so I think > > > nothing surprising or dangerous will happen in that case. From my > > > perspective, having two sets of allowed operations is actually an > > increase > > > in cognitive complexity. > > > > > > 2. To me, it's not crazy to use the operator this way. For example, in > > lieu > > > of full-featured timestamp semantics, I can implement MVCC behavior > when > > > building a KTable by "suppressLateEvents(Duration.ZERO)". I suspect > that > > > there are other, non-obvious applications of suppressing late events on > > > KTables. > > > > > > 3. Not to get too much into implementation details in a KIP discussion, > > but > > > if we did want to make late-event suppression available only on > windowed > > > KTables, we have two enforcement options: > > > a. check when we build the topology - this would be simple to > > implement, > > > but would be a runtime check. Hopefully, people write tests for their > > > topology before deploying them, so the feedback loop isn't > instantaneous, > > > but it's not too long either. > > > b. add a new WindowedKTable type - this would be a compile time > check, > > > but would also be substantial increase of both interface and code > > > complexity. > > > > > > We should definitely strive to have guard rails protecting against > > > surprising or dangerous behavior. Protecting against programs that we > > don't > > > currently predict is a lesser benefit, and I think we can put up guard > > > rails on a case-by-case basis for that. It seems like the increase in > > > cognitive (and potentially code and interface) complexity makes me > think > > we > > > should skip this case. > > > > > > What do you think? > > > > > > Thanks, > > > -John > > > > > > On Wed, Jun 27, 2018 at 11:59 AM Matthias J. Sax < > matth...@confluent.io> > > > wrote: > > > > > > > Thanks for the KIP John. > > > > > > > > One initial comments about the last example "Bounded lateness": For a > > > > non-windowed KTable bounding the lateness does not really make sense, > > > > does it? > > > > > > > > Thus, I am wondering if we should allow `suppressLateEvents()` for > this > > > > case? It seems to be better to only allow it for windowed-KTables. > > > > > > > > > > > > -Matthias > > > > > > > > > > > > On 6/27/18 8:53 AM, Ted Yu wrote: > > > > > I noticed this (lack of primary parameter) as well. > > > > > > > > > > What you gave as new example is semantically the same as what I > > > > suggested. > > > > > So it is good by me. > > > > > > > > > > Thanks > > > > > > > > > > On Wed, Jun 27, 2018 at 7:31 AM, John Roesler <j...@confluent.io> > > > wrote: > > > > > > > > > >> Thanks for taking look, Ted, > > > > >> > > > > >> I agree this is a departure from the conventions of Streams DSL. > > > > >> > > > > >> Most of our config objects have one or two "required" parameters, > > > which > > > > fit > > > > >> naturally with the static factory method approach. TimeWindow, for > > > > example, > > > > >> requires a size parameter, so we can naturally say > > > TimeWindows.of(size). > > > > >> > > > > >> I think in the case of a suppression, there's really no "core" > > > > parameter, > > > > >> and "Suppression.of()" seems sillier than "new Suppression()". I > > think > > > > that > > > > >> Suppression.of(duration) would be ambiguous, since there are many > > > > durations > > > > >> that we can configure. > > > > >> > > > > >> However, thinking about it again, I suppose that I can give each > > > > >> configuration method a static version, which would let you replace > > > "new > > > > >> Suppression()." with "Suppression." in all the examples. > Basically, > > > > instead > > > > >> of "of()", we'd support any of the methods I listed. > > > > >> > > > > >> For example: > > > > >> > > > > >> windowCounts > > > > >> .suppress( > > > > >> Suppression > > > > >> .suppressLateEvents(Duration.ofMinutes(10)) > > > > >> .suppressIntermediateEvents( > > > > >> > > > > IntermediateSuppression.emitAfter(Duration.ofMinutes(10)) > > > > >> ) > > > > >> ); > > > > >> > > > > >> > > > > >> Does that seem better? > > > > >> > > > > >> Thanks, > > > > >> -John > > > > >> > > > > >> > > > > >> On Wed, Jun 27, 2018 at 12:44 AM Ted Yu <yuzhih...@gmail.com> > > wrote: > > > > >> > > > > >>> I started to read this KIP which contains a lot of materials. > > > > >>> > > > > >>> One suggestion: > > > > >>> > > > > >>> .suppress( > > > > >>> new Suppression() > > > > >>> > > > > >>> > > > > >>> Do you think it would be more consistent with the rest of Streams > > > data > > > > >>> structures by supporting `of` ? > > > > >>> > > > > >>> Suppression.of(Duration.ofMinutes(10)) > > > > >>> > > > > >>> > > > > >>> Cheers > > > > >>> > > > > >>> > > > > >>> > > > > >>> On Tue, Jun 26, 2018 at 1:11 PM, John Roesler <j...@confluent.io > > > > > > wrote: > > > > >>> > > > > >>>> Hello devs and users, > > > > >>>> > > > > >>>> Please take some time to consider this proposal for Kafka > Streams: > > > > >>>> > > > > >>>> KIP-328: Ability to suppress updates for KTables > > > > >>>> > > > > >>>> link: https://cwiki.apache.org/confluence/x/sQU0BQ > > > > >>>> > > > > >>>> The basic idea is to provide: > > > > >>>> * more usable control over update rate (vs the current state > store > > > > >>> caches) > > > > >>>> * the final-result-for-windowed-computations feature which > several > > > > >> people > > > > >>>> have requested > > > > >>>> > > > > >>>> I look forward to your feedback! > > > > >>>> > > > > >>>> Thanks, > > > > >>>> -John > > > > >>>> > > > > >>> > > > > >> > > > > > > > > > > > > > > > > > > >