Thanks for the explanation, that does make sense.  I have some questions on
operations, but I'll just wait for the PR and tests.

Thanks,
Bill

On Wed, Jun 27, 2018 at 8:14 PM John Roesler <j...@confluent.io> wrote:

> Hi Bill,
>
> Thanks for the review!
>
> Your question is very much applicable to the KIP and not at all an
> implementation detail. Thanks for bringing it up.
>
> I'm proposing not to change the existing caches and configurations at all
> (for now).
>
> Imagine you have a topology like this:
> commit.interval.ms = 100
>
> (ktable1 (cached)) -> (suppress emitAfter 200)
>
> The first ktable (ktable1) will respect the commit interval and buffer
> events for 100ms before logging, storing, or forwarding them (IIRC).
> Therefore, the second ktable (suppress) will only see the events at a rate
> of once per 100ms. It will apply its own buffering, and emit once per 200ms
> This case is pretty trivial because the suppress time is a multiple of the
> commit interval.
>
> When it's not an integer multiple, you'll get behavior like in this marble
> diagram:
>
>
> <-(k:1)--(k:2)--(k:3)--(k:4)--(k:5)--(k:6)->
>
> [ KTable caching with commit interval = 2 ]
>
> <--------(k:2)---------(k:4)---------(k:6)->
>
>       [ suppress with emitAfter = 3 ]
>
> <---------------(k:2)----------------(k:6)->
>
>
> If this behavior isn't desired (for example, if you wanted to emit (k:3) at
> time 3, I'd recommend setting the "cache.max.bytes.buffering" to 0 or
> modifying the topology to disable caching. Then, the behavior is more
> simply determined just by the suppress operator.
>
> Does that seem right to you?
>
>
> Regarding the changelogs, because the suppression operator hangs onto
> events for a while, it will need its own changelog. The changelog
> should represent the current state of the buffer at all times. So when the
> suppress operator sees (k:2), for example, it will log (k:2). When it
> later gets to time 3, it's time to emit (k:2) downstream. Because k is no
> longer buffered, the suppress operator will log (k:null). Thus, when
> recovering,
> it can rebuild the buffer by reading its changelog.
>
> What do you think about this?
>
> Thanks,
> -John
>
>
>
> On Wed, Jun 27, 2018 at 4:16 PM Bill Bejeck <bbej...@gmail.com> wrote:
>
> > Hi John,  thanks for the KIP.
> >
> > Early on in the KIP, you mention the current approaches for controlling
> the
> > rate of downstream records from a KTable, cache size configuration and
> > commit time.
> >
> > Will these configuration parameters still be in effect for tables that
> > don't use suppression?  For tables taking advantage of suppression, will
> > these configurations have no impact?
> > This last question may be to implementation specific but if the requested
> > suppression time is longer than the specified commit time, will the
> latest
> > record in the suppression buffer get stored in a changelog?
> >
> > Thanks,
> > Bill
> >
> > On Wed, Jun 27, 2018 at 3:04 PM John Roesler <j...@confluent.io> wrote:
> >
> > > Thanks for the feedback, Matthias,
> > >
> > > It seems like in straightforward relational processing cases, it would
> > not
> > > make sense to bound the lateness of KTables. In general, it seems
> better
> > to
> > > have "guard rails" in place that make it easier to write sensible
> > programs
> > > than insensible ones.
> > >
> > > But I'm still going to argue in favor of keeping it for all KTables ;)
> > >
> > > 1. I believe it is simpler to understand the operator if it has one
> > uniform
> > > definition, regardless of context. It's well defined and intuitive what
> > > will happen when you use late-event suppression on a KTable, so I think
> > > nothing surprising or dangerous will happen in that case. From my
> > > perspective, having two sets of allowed operations is actually an
> > increase
> > > in cognitive complexity.
> > >
> > > 2. To me, it's not crazy to use the operator this way. For example, in
> > lieu
> > > of full-featured timestamp semantics, I can implement MVCC behavior
> when
> > > building a KTable by "suppressLateEvents(Duration.ZERO)". I suspect
> that
> > > there are other, non-obvious applications of suppressing late events on
> > > KTables.
> > >
> > > 3. Not to get too much into implementation details in a KIP discussion,
> > but
> > > if we did want to make late-event suppression available only on
> windowed
> > > KTables, we have two enforcement options:
> > >   a. check when we build the topology - this would be simple to
> > implement,
> > > but would be a runtime check. Hopefully, people write tests for their
> > > topology before deploying them, so the feedback loop isn't
> instantaneous,
> > > but it's not too long either.
> > >   b. add a new WindowedKTable type - this would be a compile time
> check,
> > > but would also be substantial increase of both interface and code
> > > complexity.
> > >
> > > We should definitely strive to have guard rails protecting against
> > > surprising or dangerous behavior. Protecting against programs that we
> > don't
> > > currently predict is a lesser benefit, and I think we can put up guard
> > > rails on a case-by-case basis for that. It seems like the increase in
> > > cognitive (and potentially code and interface) complexity makes me
> think
> > we
> > > should skip this case.
> > >
> > > What do you think?
> > >
> > > Thanks,
> > > -John
> > >
> > > On Wed, Jun 27, 2018 at 11:59 AM Matthias J. Sax <
> matth...@confluent.io>
> > > wrote:
> > >
> > > > Thanks for the KIP John.
> > > >
> > > > One initial comments about the last example "Bounded lateness": For a
> > > > non-windowed KTable bounding the lateness does not really make sense,
> > > > does it?
> > > >
> > > > Thus, I am wondering if we should allow `suppressLateEvents()` for
> this
> > > > case? It seems to be better to only allow it for windowed-KTables.
> > > >
> > > >
> > > > -Matthias
> > > >
> > > >
> > > > On 6/27/18 8:53 AM, Ted Yu wrote:
> > > > > I noticed this (lack of primary parameter) as well.
> > > > >
> > > > > What you gave as new example is semantically the same as what I
> > > > suggested.
> > > > > So it is good by me.
> > > > >
> > > > > Thanks
> > > > >
> > > > > On Wed, Jun 27, 2018 at 7:31 AM, John Roesler <j...@confluent.io>
> > > wrote:
> > > > >
> > > > >> Thanks for taking look, Ted,
> > > > >>
> > > > >> I agree this is a departure from the conventions of Streams DSL.
> > > > >>
> > > > >> Most of our config objects have one or two "required" parameters,
> > > which
> > > > fit
> > > > >> naturally with the static factory method approach. TimeWindow, for
> > > > example,
> > > > >> requires a size parameter, so we can naturally say
> > > TimeWindows.of(size).
> > > > >>
> > > > >> I think in the case of a suppression, there's really no "core"
> > > > parameter,
> > > > >> and "Suppression.of()" seems sillier than "new Suppression()". I
> > think
> > > > that
> > > > >> Suppression.of(duration) would be ambiguous, since there are many
> > > > durations
> > > > >> that we can configure.
> > > > >>
> > > > >> However, thinking about it again, I suppose that I can give each
> > > > >> configuration method a static version, which would let you replace
> > > "new
> > > > >> Suppression()." with "Suppression." in all the examples.
> Basically,
> > > > instead
> > > > >> of "of()", we'd support any of the methods I listed.
> > > > >>
> > > > >> For example:
> > > > >>
> > > > >> windowCounts
> > > > >>     .suppress(
> > > > >>         Suppression
> > > > >>             .suppressLateEvents(Duration.ofMinutes(10))
> > > > >>             .suppressIntermediateEvents(
> > > > >>
> > > >  IntermediateSuppression.emitAfter(Duration.ofMinutes(10))
> > > > >>             )
> > > > >>     );
> > > > >>
> > > > >>
> > > > >> Does that seem better?
> > > > >>
> > > > >> Thanks,
> > > > >> -John
> > > > >>
> > > > >>
> > > > >> On Wed, Jun 27, 2018 at 12:44 AM Ted Yu <yuzhih...@gmail.com>
> > wrote:
> > > > >>
> > > > >>> I started to read this KIP which contains a lot of materials.
> > > > >>>
> > > > >>> One suggestion:
> > > > >>>
> > > > >>>     .suppress(
> > > > >>>         new Suppression()
> > > > >>>
> > > > >>>
> > > > >>> Do you think it would be more consistent with the rest of Streams
> > > data
> > > > >>> structures by supporting `of` ?
> > > > >>>
> > > > >>> Suppression.of(Duration.ofMinutes(10))
> > > > >>>
> > > > >>>
> > > > >>> Cheers
> > > > >>>
> > > > >>>
> > > > >>>
> > > > >>> On Tue, Jun 26, 2018 at 1:11 PM, John Roesler <j...@confluent.io
> >
> > > > wrote:
> > > > >>>
> > > > >>>> Hello devs and users,
> > > > >>>>
> > > > >>>> Please take some time to consider this proposal for Kafka
> Streams:
> > > > >>>>
> > > > >>>> KIP-328: Ability to suppress updates for KTables
> > > > >>>>
> > > > >>>> link: https://cwiki.apache.org/confluence/x/sQU0BQ
> > > > >>>>
> > > > >>>> The basic idea is to provide:
> > > > >>>> * more usable control over update rate (vs the current state
> store
> > > > >>> caches)
> > > > >>>> * the final-result-for-windowed-computations feature which
> several
> > > > >> people
> > > > >>>> have requested
> > > > >>>>
> > > > >>>> I look forward to your feedback!
> > > > >>>>
> > > > >>>> Thanks,
> > > > >>>> -John
> > > > >>>>
> > > > >>>
> > > > >>
> > > > >
> > > >
> > > >
> > >
> >
>

Reply via email to