That is a good point..

I cannot think of a better option than documentation and warning, and also
given that we'd probably better not reusing the function name `until` for
close time.


Guozhang


On Tue, Jul 10, 2018 at 3:31 PM, John Roesler <j...@confluent.io> wrote:

> I had some opportunity to reflect on the default for close time today...
>
> Note that the current "close time" is equal to the retention time, and
> therefore "close" today shares the default retention of 24h.
>
> It would definitely break any application that today specifies a retention
> time to set close shorter than that time. It's also likely to break apps if
> they *don't* set the retention time and rely on the 24h default. So it's
> unfortunate, but I think if "close" isn't set, we should use the retention
> time instead of a fixed default.
>
> When we ultimately remove the retention time parameter ("until"), we will
> have to set "close" to a default of 24h.
>
> Of course, this has a negative impact on the user of "final results", since
> they won't see any output at all for retentionTime/24h, and may find this
> confusing. What can we do about this except document it well? Maybe log a
> warning if we see that close wasn't explicitly set while using "final
> results"?
>
> Thanks,
> -John
>
> On Tue, Jul 10, 2018 at 10:46 AM John Roesler <j...@confluent.io> wrote:
>
> > Hi Guozhang,
> >
> > That sounds good to me. I'll include that in the KIP.
> >
> > Thanks,
> > -John
> >
> > On Mon, Jul 9, 2018 at 6:33 PM Guozhang Wang <wangg...@gmail.com> wrote:
> >
> >> Let me clarify a bit on what I meant about moving `retentionPeriod` to
> >> WindowStoreBuilder:
> >>
> >> In another discussion we had around KIP-319 / 330, that the "retention
> >> period" should not really be a window spec, but only a window store
> spec,
> >> as it only affects how long to retain each window to be queryable along
> >> with the storage cost.
> >>
> >> More specifically, today the "maintainMs" returned from Windows is used
> in
> >> three places:
> >>
> >> 1) for windowed aggregations, they are passed in directly into
> >> `Stores.persistentWindows()` as the retention period parameters. For
> this
> >> use case we should just let the WindowStoreBuilder to specify this value
> >> itself.
> >>
> >> NOTE: It is also returned in the KStreamWindowAggregate processor, to
> >> determine if a received record should be dropped due to its lateness. We
> >> may need to think of another way to get this value inside the processor
> >>
> >> 2) for windowed stream-stream join, it is used as the join range
> parameter
> >> but only to check that "windowSizeMs <= retentionPeriodMs". We can do
> this
> >> check at the store builder lever instead of at the processor level.
> >>
> >>
> >> If we can remove its usage in both 1) and 2), then we should be able to
> >> safely remove this from the `Windows` spec.
> >>
> >>
> >> Guozhang
> >>
> >>
> >> On Mon, Jul 9, 2018 at 3:53 PM, John Roesler <j...@confluent.io> wrote:
> >>
> >> > Thanks for the reply, Guozhang,
> >> >
> >> > Good! I agree, that is also a good reason, and I actually made use of
> >> that
> >> > in my tests. I'll update the KIP.
> >> >
> >> > By the way, I chose "allowedLateness" as I was trying to pick a better
> >> name
> >> > than "close", but I think it's actually the wrong name. We don't want
> to
> >> > bound the lateness of events in general, only with respect to the end
> of
> >> > their window.
> >> >
> >> > If we have a window [0,10), with "allowedLateness" of 5, then if we
> get
> >> an
> >> > event with timestamp 3 at time 9, the name implies we'd reject it,
> which
> >> > seems silly. Really, we'd only want to start rejecting that event at
> >> stream
> >> > time 15.
> >> >
> >> > What I meant was more like "allowedLatenessAfterWindowEnd", but
> that's
> >> too
> >> > verbose. I think that "close" + some documentation about what it means
> >> will
> >> > be better.
> >> >
> >> > 1: "Close" would be measured from the end of the window, so a
> reasonable
> >> > default would be "0". Recall that "close" really only needs to be
> >> specified
> >> > for final results, and a default of 0 would produce the most intuitive
> >> > results. If folks later discover that they are missing some late
> events,
> >> > they can adjust the parameter accordingly. IMHO, any other value would
> >> just
> >> > be a guess on our part.
> >> >
> >> > 2a:
> >> > I think you're saying to re-use "until" instead of adding "close" to
> the
> >> > window.
> >> >
> >> > The downside here would be that the semantic change could be more
> >> confusing
> >> > than deprecating "until" and introducing window "close" and a
> >> > "retentionTime" on the store builder. The deprecation is a good,
> >> controlled
> >> > way for us to make sure people are getting the semantics they think
> >> they're
> >> > getting, as well as giving us an opportunity to link people to the API
> >> they
> >> > should use instead.
> >> >
> >> > I didn't fully understand the second part, but it sounds like you're
> >> > suggesting to add a new "retentionTime" setter to Windows to bridge
> the
> >> gap
> >> > until we add it to the store builder? That seems kind of roundabout to
> >> me,
> >> > if that's what you meant. We could just immediately add it to the
> store
> >> > builders in the same PR.
> >> >
> >> > 2b: Sounds good to me!
> >> >
> >> > Thanks again,
> >> > -John
> >> >
> >> >
> >> > On Mon, Jul 9, 2018 at 4:55 PM Guozhang Wang <wangg...@gmail.com>
> >> wrote:
> >> >
> >> > > John,
> >> > >
> >> > > Thanks for your replies. As for the two options of the API, I think
> >> I'm
> >> > > slightly inclined to the first option as well. My motivation is a
> bit
> >> > > different, as I think of the first one maybe more flexible, for
> >> example:
> >> > >
> >> > > KTable<Windowed<..>> table = ... count();
> >> > >
> >> > > table.toStream().peek(..);   // want to peek at the changelog
> stream,
> >> do
> >> > > not care about final results.
> >> > >
> >> > > table.suppress().toStream().to("topic");    // sending to a topic,
> >> want
> >> > to
> >> > > only send the final results.
> >> > >
> >> > > --------------
> >> > >
> >> > > Besides that, I have a few more minor questions:
> >> > >
> >> > > 1. For "allowedLateness", what should be the default value? I.e. if
> >> user
> >> > do
> >> > > not specify "allowedLateness" in TimeWindows, what value should we
> >> set?
> >> > >
> >> > > 2. For API names, some personal suggestions here:
> >> > >
> >> > > 2.a) "allowedLateness"  -> "until" (semantics changed, and also
> value
> >> is
> >> > > defined as delta on top of window length), where "until" ->
> >> > > "retentionPeriod", and the latter will be removed from `Windows` to
> `
> >> > > WindowStoreBuilder` in the future.
> >> > >
> >> > > 2.b) "BufferConfig" -> "Buffered" ?
> >> > >
> >> > >
> >> > >
> >> > > Guozhang
> >> > >
> >> > >
> >> > > On Mon, Jul 9, 2018 at 2:09 PM, John Roesler <j...@confluent.io>
> >> wrote:
> >> > >
> >> > > > Hey Matthias and Guozhang,
> >> > > >
> >> > > > Sorry for the slow reply. I was mulling about your feedback and
> >> > weighing
> >> > > > some ideas in a sketchbook PR: https://github.com/apache/
> >> > kafka/pull/5337
> >> > > .
> >> > > >
> >> > > > Your thought about keeping suppression independent of business
> logic
> >> > is a
> >> > > > very good one. I agree that it would make more sense to add some
> >> kind
> >> > of
> >> > > > "window close" concept to the window definition.
> >> > > >
> >> > > > In fact, doing that immediately solves the inconsistency problem
> >> > Guozhang
> >> > > > brought up. There's no need to add a "final results" or "emission"
> >> > option
> >> > > > to the windowed aggregation.
> >> > > >
> >> > > > What do you think about an API more like this:
> >> > > >
> >> > > > final StreamsBuilder builder = new StreamsBuilder();
> >> > > >
> >> > > > builder
> >> > > >   .stream("input", Consumed.with(STRING_SERDE, STRING_SERDE))
> >> > > >   .groupBy(
> >> > > >     (String k1, String v1) -> k1,
> >> > > >     Serialized.with(STRING_SERDE, STRING_SERDE)
> >> > > >   )
> >> > > >   .windowedBy(TimeWindows
> >> > > >     .of(scaledTime(2L))
> >> > > >     .until(scaledTime(3L))
> >> > > >     .allowedLateness(scaledTime(1L))
> >> > > >   )
> >> > > >   .count(Materialized.as("counts"))
> >> > > >   .suppress(
> >> > > >     emitFinalResultsOnly(
> >> > > >       BufferConfig.withBufferKeys(10_000L).bufferFullStrategy(
> >> > SHUT_DOWN)
> >> > > >     )
> >> > > >   )
> >> > > >   .toStream()
> >> > > >   .to("output-suppressed", Produced.with(STRING_SERDE,
> LONG_SERDE));
> >> > > >
> >> > > > Note that:
> >> > > >  * "emitFinalResultsOnly" is available *only* on windowed tables
> >> > > (enforced
> >> > > > by the type system at compile time), and it determines the time to
> >> wait
> >> > > by
> >> > > > looking at "allowedLateness" on the TimeWindows config.
> >> > > >  * querying "counts" will produce results (eventually) consistent
> >> with
> >> > > > what's observable in "output-suppressed".
> >> > > >  * in all cases, "suppress" has no effect on business logic, just
> on
> >> > > event
> >> > > > suppression.
> >> > > >
> >> > > > Is this API straightforward? Or do you still prefer the version
> that
> >> > both
> >> > > > proposed:
> >> > > >
> >> > > >   ...
> >> > > >   .windowedBy(TimeWindows
> >> > > >     .of(scaledTime(2L))
> >> > > >     .until(scaledTime(3L))
> >> > > >     .allowedLateness(scaledTime(1L))
> >> > > >   )
> >> > > >   .count(
> >> > > >     Materialized.as("counts"),
> >> > > >     emitFinalResultsOnly(
> >> > > >       BufferConfig.withBufferKeys(10_000L).bufferFullStrategy(
> >> > SHUT_DOWN)
> >> > > >     )
> >> > > >   )
> >> > > >   ...
> >> > > >
> >> > > > To me, these two are practically identical, and I still vaguely
> >> prefer
> >> > > the
> >> > > > first one.
> >> > > >
> >> > > > The prototype has made clearer to me that users of "final results
> >> for
> >> > > > windows" and users of "suppression for table events" both need to
> >> > > configure
> >> > > > the suppression buffer.
> >> > > >
> >> > > > This buffer configuration consists of:
> >> > > > 1. how many keys or bytes to keep in memory
> >> > > > 2. what to do if memory runs out (shut down, start using disk,
> ...)
> >> > > >
> >> > > > So it's not as simple as setting a "final results" flag. We'll
> >> either
> >> > > have
> >> > > > an "Emit" config object on the windowed aggregators that takes the
> >> same
> >> > > > BufferConfig that the "Suppress" config on the suppression
> >> operator, or
> >> > > we
> >> > > > just use the suppression operator for both.
> >> > > >
> >> > > > Perhaps it would sweeten the deal a little to point out that we
> >> have 2
> >> > > > overloads already for each windowed aggregator (with and without
> >> > > > Materialized). Adding "Emitted" or something would mean that we'd
> >> add a
> >> > > new
> >> > > > overload for each one, taking us up to 4 overloads each for
> "count",
> >> > > > "aggregate" and "reduce". Using "suppress" means that we don't add
> >> any
> >> > > new
> >> > > > overloads.
> >> > > >
> >> > > > Thanks again for helping to hash this out,
> >> > > > -John
> >> > > >
> >> > > > On Fri, Jul 6, 2018 at 6:20 PM Guozhang Wang <wangg...@gmail.com>
> >> > wrote:
> >> > > >
> >> > > > > I think I agree with Matthias for having dedicated APIs for
> >> windowed
> >> > > > > operation final output scenario, PLUS separating the window
> close
> >> > which
> >> > > > the
> >> > > > > "final output" would rely on, from the window retention time
> >> itself
> >> > > > > (admittedly it would make this KIP effort larger, but if we
> >> believe
> >> > we
> >> > > > need
> >> > > > > to do this separation anyways we could just do it now).
> >> > > > >
> >> > > > > And then we can have the `KTable#suppress()` for
> >> > > intermediate-suppression
> >> > > > > only, not for late-record-suppression, until we've seen that
> >> becomes
> >> > a
> >> > > > > common feature request because our current design still allows
> to
> >> be
> >> > > > > extended for that purpose.
> >> > > > >
> >> > > > >
> >> > > > > Guozhang
> >> > > > >
> >> > > > > On Wed, Jul 4, 2018 at 12:53 PM, Matthias J. Sax <
> >> > > matth...@confluent.io>
> >> > > > > wrote:
> >> > > > >
> >> > > > > > Thanks for the discussion. I am just catching up.
> >> > > > > >
> >> > > > > > In general, I think we have different uses cases and
> >> non-windowed
> >> > and
> >> > > > > > windowed is quite different. For the non-windowed case,
> >> suppress()
> >> > > has
> >> > > > > > no (useful) close or retention time, no final semantics, and
> >> also
> >> > no
> >> > > > > > business logic impact.
> >> > > > > >
> >> > > > > > On the other hand, for windowed aggregations, close time and
> >> final
> >> > > > > > result do have a meaning. IMHO, `close()` is part of business
> >> logic
> >> > > > > > while retention time is not. Also, suppression of intermediate
> >> > result
> >> > > > is
> >> > > > > > not a business rule and there might be use case for which
> either
> >> > > "early
> >> > > > > > intermediate" (before window end time) are suppressed only, or
> >> all
> >> > > > > > intermediates are suppressed (maybe also something in the
> >> middle,
> >> > ie,
> >> > > > > > just reduce the load of intermediate updates). Thus,
> >> > > window-suppression
> >> > > > > > is much richer.
> >> > > > > >
> >> > > > > > IMHO, a generic `suppress()` operator that can be inserted
> into
> >> the
> >> > > > data
> >> > > > > > flow at any point is useful. Maybe we should keep is as
> generic
> >> as
> >> > > > > > possible. However, it might be difficult to use with regard to
> >> > > > > > windowing, as the mental effort to use it is high.
> >> > > > > >
> >> > > > > > With regard to Guozhang's comment:
> >> > > > > >
> >> > > > > > > we will actually
> >> > > > > > > process data as old as 30 days as well, while most of the
> late
> >> > > > updates
> >> > > > > > > beyond 5 minutes would be discarded anyways.
> >> > > > > >
> >> > > > > > If we use `suppress()` as a standalone operator, this is
> correct
> >> > and
> >> > > > > > intended IMHO. To address the issue if the behavior is
> >> unwanted, I
> >> > > > would
> >> > > > > > suggest to add a "suppress option" directly to
> >> > > > > > `count()/reduce()/aggregate()` window operator similar to
> >> > > > > > `Materialized`. This would be an "embedded suppress" and avoid
> >> the
> >> > > > > > issue. It would also address the issue about mental effort for
> >> > > "single
> >> > > > > > final window result" use case.
> >> > > > > >
> >> > > > > > I also think that a shorter close-time than retention time is
> >> > useful
> >> > > > for
> >> > > > > > window aggregation. If we add close() to the window definition
> >> and
> >> > > > > > until() to `Materialized`, we can separate both correctly
> IMHO.
> >> > > > > >
> >> > > > > > About setting `close = min(close,retention)` I am not sure. We
> >> > might
> >> > > > > > rather throw an exception than reducing the close time
> >> > automatically.
> >> > > > > > Otherwise, I see many user question about "I set close to X
> but
> >> it
> >> > > does
> >> > > > > > not get updated for some data that is with delay of X".
> >> > > > > >
> >> > > > > > The tricky question might be to design the API in a backward
> >> > > compatible
> >> > > > > > way though.
> >> > > > > >
> >> > > > > >
> >> > > > > >
> >> > > > > > -Matthias
> >> > > > > >
> >> > > > > > On 7/3/18 5:38 AM, John Roesler wrote:
> >> > > > > > > Hi Guozhang,
> >> > > > > > >
> >> > > > > > > I see. It seems like if we want to decouple 1) and 2), we
> >> need to
> >> > > > alter
> >> > > > > > the
> >> > > > > > > definition of the window. Do you think it would close the
> gap
> >> if
> >> > we
> >> > > > > > added a
> >> > > > > > > "window close" time to the window definition?
> >> > > > > > >
> >> > > > > > > Such as:
> >> > > > > > >
> >> > > > > > > builder.stream("input")
> >> > > > > > > .groupByKey()
> >> > > > > > > .windowedBy(
> >> > > > > > >   TimeWindows
> >> > > > > > >     .of(60_000)
> >> > > > > > >     .closeAfter(10 * 60)
> >> > > > > > >     .until(30L * 24 * 60 * 60 * 1000)
> >> > > > > > > )
> >> > > > > > > .count()
> >> > > > > > > .suppress(Suppression.finalResultsOnly());
> >> > > > > > >
> >> > > > > > > Possibly called "finalResultsAtWindowClose" or something?
> >> > > > > > >
> >> > > > > > > Thanks,
> >> > > > > > > -John
> >> > > > > > >
> >> > > > > > > On Mon, Jul 2, 2018 at 6:50 PM Guozhang Wang <
> >> wangg...@gmail.com
> >> > >
> >> > > > > wrote:
> >> > > > > > >
> >> > > > > > >> Hey John,
> >> > > > > > >>
> >> > > > > > >> Obviously I'm too lazy on email replying diligence compared
> >> with
> >> > > you
> >> > > > > :)
> >> > > > > > >> Will try to reply them separately:
> >> > > > > > >>
> >> > > > > > >>
> >> > > > > > >> ------------------------------
> ------------------------------
> >> > > > > > -----------------
> >> > > > > > >>
> >> > > > > > >> To reply your email on "Mon, Jul 2, 2018 at 8:23 AM":
> >> > > > > > >>
> >> > > > > > >> I'm aware of this use case, but again, the concern is that,
> >> in
> >> > > this
> >> > > > > > setting
> >> > > > > > >> in order to let the window be queryable for 30 days, we
> will
> >> > > > actually
> >> > > > > > >> process data as old as 30 days as well, while most of the
> >> late
> >> > > > updates
> >> > > > > > >> beyond 5 minutes would be discarded anyways. Personally I
> >> think
> >> > > for
> >> > > > > the
> >> > > > > > >> final update scenario, the ideal situation users would want
> >> is
> >> > > that
> >> > > > > "do
> >> > > > > > not
> >> > > > > > >> process any data that is less than 5 minutes, and of course
> >> no
> >> > > > update
> >> > > > > > >> records to the downstream later than 5 minutes either; but
> >> > retain
> >> > > > the
> >> > > > > > >> window to be queryable for 30 days". And by doing that the
> >> final
> >> > > > > window
> >> > > > > > >> snapshot would also be aligned with the update stream as
> >> well.
> >> > In
> >> > > > > other
> >> > > > > > >> words, among these three periods:
> >> > > > > > >>
> >> > > > > > >> 1) the retention length of the window / table.
> >> > > > > > >> 2) the late records acceptance for updating the window.
> >> > > > > > >> 3) the late records update to be sent downstream.
> >> > > > > > >>
> >> > > > > > >> Final update use cases would naturally want 2) = 3), while
> 1)
> >> > may
> >> > > be
> >> > > > > > >> different and larger, while what we provide now is that 1)
> =
> >> 2),
> >> > > > which
> >> > > > > > >> could be different and in practice larger than 3), hence
> not
> >> the
> >> > > > most
> >> > > > > > >> intuitive for their needs.
> >> > > > > > >>
> >> > > > > > >>
> >> > > > > > >>
> >> > > > > > >> ------------------------------
> ------------------------------
> >> > > > > > -----------------
> >> > > > > > >>
> >> > > > > > >> To reply your email on "Mon, Jul 2, 2018 at 10:27 AM":
> >> > > > > > >>
> >> > > > > > >> I'd like option 2) over option 1) better as well from
> >> > programming
> >> > > > pov.
> >> > > > > > But
> >> > > > > > >> I'm wondering if option 2) would provide the above
> semantics
> >> or
> >> > it
> >> > > > is
> >> > > > > > still
> >> > > > > > >> coupling 1) with 2) as well ?
> >> > > > > > >>
> >> > > > > > >>
> >> > > > > > >>
> >> > > > > > >> Guozhang
> >> > > > > > >>
> >> > > > > > >>
> >> > > > > > >>
> >> > > > > > >>
> >> > > > > > >> On Mon, Jul 2, 2018 at 1:08 PM, John Roesler <
> >> j...@confluent.io
> >> > >
> >> > > > > wrote:
> >> > > > > > >>
> >> > > > > > >>> In fact, to push the idea further (which IIRC is what
> >> Matthias
> >> > > > > > originally
> >> > > > > > >>> proposed), if we can accept "Suppression#finalResultsOnly"
> >> in
> >> > my
> >> > > > last
> >> > > > > > >>> email, then we could also consider whether to eliminate
> >> > > > > > >>> "suppressLateEvents" entirely.
> >> > > > > > >>>
> >> > > > > > >>> We could always add it later, but you've both expressed
> >> doubt
> >> > > that
> >> > > > > > there
> >> > > > > > >>> are practical use cases for it outside of final-results.
> >> > > > > > >>>
> >> > > > > > >>> -John
> >> > > > > > >>>
> >> > > > > > >>> On Mon, Jul 2, 2018 at 12:27 PM John Roesler <
> >> > j...@confluent.io>
> >> > > > > > wrote:
> >> > > > > > >>>
> >> > > > > > >>>> Hi again, Guozhang ;) Here's the second part of my
> >> response...
> >> > > > > > >>>>
> >> > > > > > >>>> It seems like your main concern is: "if I'm a user who
> >> wants
> >> > > final
> >> > > > > > >> update
> >> > > > > > >>>> semantics, how complicated is it for me to get it?"
> >> > > > > > >>>>
> >> > > > > > >>>> I think we have to assume that people don't always have
> >> time
> >> > to
> >> > > > > become
> >> > > > > > >>>> deeply familiar with all the nuances of a programming
> >> > > environment
> >> > > > > > >> before
> >> > > > > > >>>> they use it. Especially if they're evaluating several
> >> > frameworks
> >> > > > for
> >> > > > > > >>> their
> >> > > > > > >>>> use case, it's very valuable to make it as obvious as
> >> possible
> >> > > how
> >> > > > > to
> >> > > > > > >>>> accomplish various computations with Streams.
> >> > > > > > >>>>
> >> > > > > > >>>> To me the biggest question is whether with a fresh
> >> > perspective,
> >> > > > > people
> >> > > > > > >>>> would say "oh, I get it, I have to bound my lateness and
> >> > > suppress
> >> > > > > > >>>> intermediate updates, and of course I'll get only the
> final
> >> > > > > result!",
> >> > > > > > >> or
> >> > > > > > >>> if
> >> > > > > > >>>> it's more like "wtf? all I want is the final result, what
> >> are
> >> > > all
> >> > > > > > these
> >> > > > > > >>>> parameters?".
> >> > > > > > >>>>
> >> > > > > > >>>> I was talking with Matthias a while back, and he had an
> >> idea
> >> > > that
> >> > > > I
> >> > > > > > >> think
> >> > > > > > >>>> can help, which is to essentially set up a final-result
> >> recipe
> >> > > in
> >> > > > > > >>> addition
> >> > > > > > >>>> to the raw parameters. I previously thought that it
> >> wouldn't
> >> > be
> >> > > > > > >> possible
> >> > > > > > >>> to
> >> > > > > > >>>> restrict its usage to Windowed KTables, but thinking
> about
> >> it
> >> > > > again
> >> > > > > > >> this
> >> > > > > > >>>> weekend, I have a couple of ideas:
> >> > > > > > >>>>
> >> > > > > > >>>> ================
> >> > > > > > >>>> = 1. Static Wrapper =
> >> > > > > > >>>> ================
> >> > > > > > >>>> We can define an extra static function that "wraps" a
> >> KTable
> >> > > with
> >> > > > > > >>>> final-result semantics.
> >> > > > > > >>>>
> >> > > > > > >>>> public static <K extends Windowed, V> KTable<K, V>
> >> > > > finalResultsOnly(
> >> > > > > > >>>>   final KTable<K, V> windowedKTable,
> >> > > > > > >>>>   final Duration maxAllowedLateness,
> >> > > > > > >>>>   final Suppression.BufferFullStrategy
> bufferFullStrategy)
> >> {
> >> > > > > > >>>>     return windowedKTable.suppress(
> >> > > > > > >>>>         Suppression.suppressLateEvents(
> maxAllowedLateness)
> >> > > > > > >>>>                    .suppressIntermediateEvents(
> >> > > > > > >>>>                      IntermediateSuppression
> >> > > > > > >>>>                        .emitAfter(maxAllowedLateness)
> >> > > > > > >>>>                        .bufferFullStrategy(
> >> > bufferFullStrategy)
> >> > > > > > >>>>                    )
> >> > > > > > >>>>     );
> >> > > > > > >>>> }
> >> > > > > > >>>>
> >> > > > > > >>>> Because windowedKTable is a parameter, the static
> function
> >> can
> >> > > > > easily
> >> > > > > > >>>> impose an extra bound on the key type, that it extends
> >> > Windowed.
> >> > > > > This
> >> > > > > > >>> would
> >> > > > > > >>>> make "final results only" only available on windowed
> >> ktables.
> >> > > > > > >>>>
> >> > > > > > >>>> Here's how it would look to use:
> >> > > > > > >>>>
> >> > > > > > >>>> final KTable<Windowed<Integer>, Long> windowCounts = ...
> >> > > > > > >>>> final KTable<Windowed<Integer>, Long> finalCounts =
> >> > > > > > >>>>   finalResultsOnly(
> >> > > > > > >>>>     windowCounts,
> >> > > > > > >>>>     Duration.ofMinutes(10),
> >> > > > > > >>>>     Suppression.BufferFullStrategy.SHUT_DOWN
> >> > > > > > >>>>   );
> >> > > > > > >>>>
> >> > > > > > >>>> Trying to use it on a non-windowed KTable yields:
> >> > > > > > >>>>
> >> > > > > > >>>>> Error:(129, 35) java: method finalResultsOnly in class
> >> > > > > > >>>>> org.apache.kafka.streams.kstream.internals.
> >> > KTableAggregateTest
> >> > > > > > cannot
> >> > > > > > >>> be
> >> > > > > > >>>>> applied to given types;
> >> > > > > > >>>>>   required:
> >> > > > > > >>>>> org.apache.kafka.streams.kstream.KTable<K,V>,java.time.
> >> > > > > > >>> Duration,org.apache.kafka.streams.kstream.Suppression.
> >> > > > > > BufferFullStrategy
> >> > > > > > >>>>>   found:
> >> > > > > > >>>>> org.apache.kafka.streams.kstream.KTable<java.lang.
> >> > > > > > >>> String,java.lang.String>,java.time.Duration,org.apache.
> >> > > > > > >>> kafka.streams.kstream.Suppression.BufferFullStrategy
> >> > > > > > >>>>>   reason: inference variable K has incompatible bounds
> >> > > > > > >>>>>     equality constraints: java.lang.String
> >> > > > > > >>>>>     upper bounds:
> >> org.apache.kafka.streams.kstream.Windowed
> >> > > > > > >>>>
> >> > > > > > >>>>
> >> > > > > > >>>>
> >> > > > > > >>>> =================================================
> >> > > > > > >>>> = 2. Add <K,V> parameters and recipe method to
> Suppression
> >> =
> >> > > > > > >>>> =================================================
> >> > > > > > >>>>
> >> > > > > > >>>> By adding K,V parameters to Suppression, we can provide a
> >> > > > similarly
> >> > > > > > >>>> bounded config method directly on the Suppression class:
> >> > > > > > >>>>
> >> > > > > > >>>> public static <K extends Windowed, V> Suppression<K, V>
> >> > > > > > >>>> finalResultsOnly(final Duration maxAllowedLateness, final
> >> > > > > > >>>> BufferFullStrategy bufferFullStrategy) {
> >> > > > > > >>>>     return Suppression
> >> > > > > > >>>>         .<K, V>suppressLateEvents(maxAllowedLateness)
> >> > > > > > >>>>         .suppressIntermediateEvents(
> IntermediateSuppression
> >> > > > > > >>>>             .emitAfter(maxAllowedLateness)
> >> > > > > > >>>>             .bufferFullStrategy(bufferFullStrategy)
> >> > > > > > >>>>         );
> >> > > > > > >>>> }
> >> > > > > > >>>>
> >> > > > > > >>>> Then, here's how it would look to use it:
> >> > > > > > >>>>
> >> > > > > > >>>> final KTable<Windowed<Integer>, Long> windowCounts = ...
> >> > > > > > >>>> final KTable<Windowed<Integer>, Long> finalCounts =
> >> > > > > > >>>>   windowCounts.suppress(
> >> > > > > > >>>>     Suppression.finalResultsOnly(
> >> > > > > > >>>>       Duration.ofMinutes(10)
> >> > > > > > >>>>       Suppression.BufferFullStrategy.SHUT_DOWN
> >> > > > > > >>>>     )
> >> > > > > > >>>>   );
> >> > > > > > >>>>
> >> > > > > > >>>> Trying to use it on a non-windowed ktable yields:
> >> > > > > > >>>>
> >> > > > > > >>>>> Error:(127, 35) java: method finalResultsOnly in class
> >> > > > > > >>>>> org.apache.kafka.streams.kstream.Suppression<K,V>
> cannot
> >> be
> >> > > > applied
> >> > > > > > to
> >> > > > > > >>>>> given types;
> >> > > > > > >>>>>   required:
> >> > > > > > >>>>> java.time.Duration,org.apache.kafka.streams.kstream.
> >> > > > > > >>> Suppression.BufferFullStrategy
> >> > > > > > >>>>>   found:
> >> > > > > > >>>>> java.time.Duration,org.apache.kafka.streams.kstream.
> >> > > > > > >>> Suppression.BufferFullStrategy
> >> > > > > > >>>>>   reason: explicit type argument java.lang.String does
> not
> >> > > > conform
> >> > > > > to
> >> > > > > > >>>>> declared bound(s)
> >> org.apache.kafka.streams.kstream.Windowed
> >> > > > > > >>>>
> >> > > > > > >>>>
> >> > > > > > >>>>
> >> > > > > > >>>> ============
> >> > > > > > >>>> = Downsides =
> >> > > > > > >>>> ============
> >> > > > > > >>>>
> >> > > > > > >>>> Of course, there's a downside either way:
> >> > > > > > >>>> * for 1:  this "wrapper" interaction would be the first
> in
> >> the
> >> > > > DSL.
> >> > > > > Is
> >> > > > > > >> it
> >> > > > > > >>>> too strange, and how discoverable would it be?
> >> > > > > > >>>> * for 2: adding those type parameters to Suppression will
> >> > force
> >> > > > all
> >> > > > > > >>>> callers to provide them in the event of a chained
> >> construction
> >> > > > > because
> >> > > > > > >>> Java
> >> > > > > > >>>> doesn't do RHS recursive type inference. This is already
> >> > visible
> >> > > > in
> >> > > > > > >> other
> >> > > > > > >>>> parts of the Streams DSL. For example, often calls to
> >> > > Materialized
> >> > > > > > >>> builders
> >> > > > > > >>>> have to provide seemingly obvious type bounds.
> >> > > > > > >>>>
> >> > > > > > >>>> ============
> >> > > > > > >>>> = Conclusion =
> >> > > > > > >>>> ============
> >> > > > > > >>>>
> >> > > > > > >>>> I think option 2 is more "normal" and discoverable. It
> does
> >> > > have a
> >> > > > > > >>>> downside, but it's one that's pre-existing elsewhere in
> the
> >> > DSL.
> >> > > > > > >>>>
> >> > > > > > >>>> WDYT? Would the addition of this "recipe" method to
> >> > Suppression
> >> > > > > > resolve
> >> > > > > > >>>> your concern?
> >> > > > > > >>>>
> >> > > > > > >>>> Thanks again,
> >> > > > > > >>>> -John
> >> > > > > > >>>>
> >> > > > > > >>>> On Sun, Jul 1, 2018 at 11:24 PM Guozhang Wang <
> >> > > wangg...@gmail.com
> >> > > > >
> >> > > > > > >>> wrote:
> >> > > > > > >>>>
> >> > > > > > >>>>> Hi John,
> >> > > > > > >>>>>
> >> > > > > > >>>>> Regarding the metrics: yeah I think I'm with you that
> the
> >> > > dropped
> >> > > > > > >>> records
> >> > > > > > >>>>> due to window retention or emit suppression policies
> >> should
> >> > be
> >> > > > > > >> recorded
> >> > > > > > >>>>> differently, and using this KIP's proposed metric would
> be
> >> > > fine.
> >> > > > If
> >> > > > > > >> you
> >> > > > > > >>>>> also think we can use this KIP's proposed metrics to
> cover
> >> > the
> >> > > > > window
> >> > > > > > >>>>> retention cased skipping records, then we can include
> the
> >> > > changes
> >> > > > > in
> >> > > > > > >>> this
> >> > > > > > >>>>> KIP as well.
> >> > > > > > >>>>>
> >> > > > > > >>>>> Regarding the current proposal, I'm actually not too
> >> worried
> >> > > > about
> >> > > > > > the
> >> > > > > > >>>>> inconsistency between query semantics and downstream
> emit
> >> > > > > semantics.
> >> > > > > > >> For
> >> > > > > > >>>>> queries, we will always return the current running
> >> results of
> >> > > the
> >> > > > > > >>> windows,
> >> > > > > > >>>>> being it partial or final results depending on the
> window
> >> > > > retention
> >> > > > > > >> time
> >> > > > > > >>>>> anyways, which has nothing to do whether the emitted
> >> stream
> >> > > > should
> >> > > > > be
> >> > > > > > >>> one
> >> > > > > > >>>>> final output per key or not. I also agree that having a
> >> > unified
> >> > > > > > >>> operation
> >> > > > > > >>>>> is generally better for users to focus on leveraging
> that
> >> one
> >> > > > only
> >> > > > > > >> than
> >> > > > > > >>>>> learning about two set of operations. The only question
> I
> >> had
> >> > > is,
> >> > > > > for
> >> > > > > > >>>>> final
> >> > > > > > >>>>> updates of window stores, if it is a bit awkward to
> >> > understand
> >> > > > the
> >> > > > > > >>>>> configuration combo. Thinking about this more, I think
> my
> >> > root
> >> > > > > worry
> >> > > > > > >> in
> >> > > > > > >>>>> the
> >> > > > > > >>>>> "suppressLateEvents" call for windowed tables, since
> from
> >> a
> >> > > user
> >> > > > > > >>>>> perspective: if my retention time is X which means "pay
> >> the
> >> > > cost
> >> > > > to
> >> > > > > > >>> allow
> >> > > > > > >>>>> late records up to X to still be applied updating the
> >> > tables",
> >> > > > why
> >> > > > > > >>> would I
> >> > > > > > >>>>> ever want to suppressLateEvents by Y ( < X), to say "do
> >> not
> >> > > send
> >> > > > > the
> >> > > > > > >>>>> updates up to Y, which means the downstream operator or
> >> sink
> >> > > > topic
> >> > > > > > for
> >> > > > > > >>>>> this
> >> > > > > > >>>>> stream would actually see a truncated update stream
> while
> >> > I've
> >> > > > paid
> >> > > > > > >>> larger
> >> > > > > > >>>>> cost for that"; and of course, Y > X would not make
> sense
> >> > > either
> >> > > > as
> >> > > > > > >> you
> >> > > > > > >>>>> would not see any updates later than X anyways. So in
> >> all, my
> >> > > > > feeling
> >> > > > > > >> is
> >> > > > > > >>>>> that it makes less sense for windowed table's
> >> > > > "suppressLateEvents"
> >> > > > > > >> with
> >> > > > > > >>> a
> >> > > > > > >>>>> parameter that is not equal to the window retention, and
> >> > > opening
> >> > > > > the
> >> > > > > > >>> door
> >> > > > > > >>>>> in the current proposal may confuse people with that.
> >> > > > > > >>>>>
> >> > > > > > >>>>> Again, above is just a subjective opinion and probably
> we
> >> can
> >> > > > also
> >> > > > > > >> bring
> >> > > > > > >>>>> up
> >> > > > > > >>>>> some scenarios that users does want to set X != Y.. but
> >> > > > personally
> >> > > > > I
> >> > > > > > >>> feel
> >> > > > > > >>>>> that even if the semantics for this scenario if
> intuitive
> >> for
> >> > > > user
> >> > > > > to
> >> > > > > > >>>>> understand, doe that really make sense and should we
> >> really
> >> > > open
> >> > > > > the
> >> > > > > > >>> door
> >> > > > > > >>>>> for it. So I think maybe separating the final update in
> a
> >> > > > separate
> >> > > > > > >> API's
> >> > > > > > >>>>> benefits may overwhelm the advantage of having one
> uniform
> >> > > > > > definition.
> >> > > > > > >>> And
> >> > > > > > >>>>> for my alternative proposal, the rationale was from both
> >> my
> >> > > > concern
> >> > > > > > >>> about
> >> > > > > > >>>>> "suppressLateEvents" for windowed store, and Matthias'
> >> > question
> >> > > > > about
> >> > > > > > >>>>> "suppressLateEvents" for non-windowed stores, that if it
> >> is
> >> > > less
> >> > > > > > >>>>> meaningful
> >> > > > > > >>>>> for both, we can consider removing it completely and
> only
> >> do
> >> > > > > > >>>>> "IntermediateSuppression" in Suppress instead.
> >> > > > > > >>>>>
> >> > > > > > >>>>> So I'd summarize my thoughts in the following questions:
> >> > > > > > >>>>>
> >> > > > > > >>>>> 1. Does "suppressLateEvents" with parameter Y != X
> (window
> >> > > > > retention
> >> > > > > > >>> time)
> >> > > > > > >>>>> for windowed stores make sense in practice?
> >> > > > > > >>>>> 2. Does "suppressLateEvents" with any parameter Y for
> >> > > > non-windowed
> >> > > > > > >>> stores
> >> > > > > > >>>>> make sense in practice?
> >> > > > > > >>>>>
> >> > > > > > >>>>>
> >> > > > > > >>>>>
> >> > > > > > >>>>> Guozhang
> >> > > > > > >>>>>
> >> > > > > > >>>>>
> >> > > > > > >>>>> On Fri, Jun 29, 2018 at 2:26 PM, Bill Bejeck <
> >> > > bbej...@gmail.com>
> >> > > > > > >> wrote:
> >> > > > > > >>>>>
> >> > > > > > >>>>>> Thanks for the explanation, that does make sense.  I
> have
> >> > some
> >> > > > > > >>>>> questions on
> >> > > > > > >>>>>> operations, but I'll just wait for the PR and tests.
> >> > > > > > >>>>>>
> >> > > > > > >>>>>> Thanks,
> >> > > > > > >>>>>> Bill
> >> > > > > > >>>>>>
> >> > > > > > >>>>>> On Wed, Jun 27, 2018 at 8:14 PM John Roesler <
> >> > > j...@confluent.io
> >> > > > >
> >> > > > > > >>> wrote:
> >> > > > > > >>>>>>
> >> > > > > > >>>>>>> Hi Bill,
> >> > > > > > >>>>>>>
> >> > > > > > >>>>>>> Thanks for the review!
> >> > > > > > >>>>>>>
> >> > > > > > >>>>>>> Your question is very much applicable to the KIP and
> >> not at
> >> > > all
> >> > > > > an
> >> > > > > > >>>>>>> implementation detail. Thanks for bringing it up.
> >> > > > > > >>>>>>>
> >> > > > > > >>>>>>> I'm proposing not to change the existing caches and
> >> > > > > configurations
> >> > > > > > >>> at
> >> > > > > > >>>>> all
> >> > > > > > >>>>>>> (for now).
> >> > > > > > >>>>>>>
> >> > > > > > >>>>>>> Imagine you have a topology like this:
> >> > > > > > >>>>>>> commit.interval.ms = 100
> >> > > > > > >>>>>>>
> >> > > > > > >>>>>>> (ktable1 (cached)) -> (suppress emitAfter 200)
> >> > > > > > >>>>>>>
> >> > > > > > >>>>>>> The first ktable (ktable1) will respect the commit
> >> interval
> >> > > and
> >> > > > > > >>> buffer
> >> > > > > > >>>>>>> events for 100ms before logging, storing, or
> forwarding
> >> > them
> >> > > > > > >> (IIRC).
> >> > > > > > >>>>>>> Therefore, the second ktable (suppress) will only see
> >> the
> >> > > > events
> >> > > > > > >> at
> >> > > > > > >>> a
> >> > > > > > >>>>>> rate
> >> > > > > > >>>>>>> of once per 100ms. It will apply its own buffering,
> and
> >> > emit
> >> > > > once
> >> > > > > > >>> per
> >> > > > > > >>>>>> 200ms
> >> > > > > > >>>>>>> This case is pretty trivial because the suppress time
> >> is a
> >> > > > > > >> multiple
> >> > > > > > >>> of
> >> > > > > > >>>>>> the
> >> > > > > > >>>>>>> commit interval.
> >> > > > > > >>>>>>>
> >> > > > > > >>>>>>> When it's not an integer multiple, you'll get behavior
> >> like
> >> > > in
> >> > > > > > >> this
> >> > > > > > >>>>>> marble
> >> > > > > > >>>>>>> diagram:
> >> > > > > > >>>>>>>
> >> > > > > > >>>>>>>
> >> > > > > > >>>>>>> <-(k:1)--(k:2)--(k:3)--(k:4)--(k:5)--(k:6)->
> >> > > > > > >>>>>>>
> >> > > > > > >>>>>>> [ KTable caching with commit interval = 2 ]
> >> > > > > > >>>>>>>
> >> > > > > > >>>>>>> <--------(k:2)---------(k:4)---------(k:6)->
> >> > > > > > >>>>>>>
> >> > > > > > >>>>>>>       [ suppress with emitAfter = 3 ]
> >> > > > > > >>>>>>>
> >> > > > > > >>>>>>> <---------------(k:2)----------------(k:6)->
> >> > > > > > >>>>>>>
> >> > > > > > >>>>>>>
> >> > > > > > >>>>>>> If this behavior isn't desired (for example, if you
> >> wanted
> >> > to
> >> > > > > emit
> >> > > > > > >>>>> (k:3)
> >> > > > > > >>>>>> at
> >> > > > > > >>>>>>> time 3, I'd recommend setting the
> >> > "cache.max.bytes.buffering"
> >> > > > to
> >> > > > > 0
> >> > > > > > >>> or
> >> > > > > > >>>>>>> modifying the topology to disable caching. Then, the
> >> > behavior
> >> > > > is
> >> > > > > > >>> more
> >> > > > > > >>>>>>> simply determined just by the suppress operator.
> >> > > > > > >>>>>>>
> >> > > > > > >>>>>>> Does that seem right to you?
> >> > > > > > >>>>>>>
> >> > > > > > >>>>>>>
> >> > > > > > >>>>>>> Regarding the changelogs, because the suppression
> >> operator
> >> > > > hangs
> >> > > > > > >>> onto
> >> > > > > > >>>>>>> events for a while, it will need its own changelog.
> The
> >> > > > changelog
> >> > > > > > >>>>>>> should represent the current state of the buffer at
> all
> >> > > times.
> >> > > > So
> >> > > > > > >>> when
> >> > > > > > >>>>>> the
> >> > > > > > >>>>>>> suppress operator sees (k:2), for example, it will log
> >> > (k:2).
> >> > > > > When
> >> > > > > > >>> it
> >> > > > > > >>>>>>> later gets to time 3, it's time to emit (k:2)
> >> downstream.
> >> > > > Because
> >> > > > > > >> k
> >> > > > > > >>>>> is no
> >> > > > > > >>>>>>> longer buffered, the suppress operator will log
> >> (k:null).
> >> > > Thus,
> >> > > > > > >> when
> >> > > > > > >>>>>>> recovering,
> >> > > > > > >>>>>>> it can rebuild the buffer by reading its changelog.
> >> > > > > > >>>>>>>
> >> > > > > > >>>>>>> What do you think about this?
> >> > > > > > >>>>>>>
> >> > > > > > >>>>>>> Thanks,
> >> > > > > > >>>>>>> -John
> >> > > > > > >>>>>>>
> >> > > > > > >>>>>>>
> >> > > > > > >>>>>>>
> >> > > > > > >>>>>>> On Wed, Jun 27, 2018 at 4:16 PM Bill Bejeck <
> >> > > bbej...@gmail.com
> >> > > > >
> >> > > > > > >>>>> wrote:
> >> > > > > > >>>>>>>
> >> > > > > > >>>>>>>> Hi John,  thanks for the KIP.
> >> > > > > > >>>>>>>>
> >> > > > > > >>>>>>>> Early on in the KIP, you mention the current
> approaches
> >> > for
> >> > > > > > >>>>> controlling
> >> > > > > > >>>>>>> the
> >> > > > > > >>>>>>>> rate of downstream records from a KTable, cache size
> >> > > > > > >> configuration
> >> > > > > > >>>>> and
> >> > > > > > >>>>>>>> commit time.
> >> > > > > > >>>>>>>>
> >> > > > > > >>>>>>>> Will these configuration parameters still be in
> effect
> >> for
> >> > > > > > >> tables
> >> > > > > > >>>>> that
> >> > > > > > >>>>>>>> don't use suppression?  For tables taking advantage
> of
> >> > > > > > >>> suppression,
> >> > > > > > >>>>>> will
> >> > > > > > >>>>>>>> these configurations have no impact?
> >> > > > > > >>>>>>>> This last question may be to implementation specific
> >> but
> >> > if
> >> > > > the
> >> > > > > > >>>>>> requested
> >> > > > > > >>>>>>>> suppression time is longer than the specified commit
> >> time,
> >> > > > will
> >> > > > > > >>> the
> >> > > > > > >>>>>>> latest
> >> > > > > > >>>>>>>> record in the suppression buffer get stored in a
> >> > changelog?
> >> > > > > > >>>>>>>>
> >> > > > > > >>>>>>>> Thanks,
> >> > > > > > >>>>>>>> Bill
> >> > > > > > >>>>>>>>
> >> > > > > > >>>>>>>> On Wed, Jun 27, 2018 at 3:04 PM John Roesler <
> >> > > > j...@confluent.io
> >> > > > > > >>>
> >> > > > > > >>>>>> wrote:
> >> > > > > > >>>>>>>>
> >> > > > > > >>>>>>>>> Thanks for the feedback, Matthias,
> >> > > > > > >>>>>>>>>
> >> > > > > > >>>>>>>>> It seems like in straightforward relational
> processing
> >> > > cases,
> >> > > > > > >> it
> >> > > > > > >>>>>> would
> >> > > > > > >>>>>>>> not
> >> > > > > > >>>>>>>>> make sense to bound the lateness of KTables. In
> >> general,
> >> > it
> >> > > > > > >>> seems
> >> > > > > > >>>>>>> better
> >> > > > > > >>>>>>>> to
> >> > > > > > >>>>>>>>> have "guard rails" in place that make it easier to
> >> write
> >> > > > > > >>> sensible
> >> > > > > > >>>>>>>> programs
> >> > > > > > >>>>>>>>> than insensible ones.
> >> > > > > > >>>>>>>>>
> >> > > > > > >>>>>>>>> But I'm still going to argue in favor of keeping it
> >> for
> >> > all
> >> > > > > > >>>>> KTables
> >> > > > > > >>>>>> ;)
> >> > > > > > >>>>>>>>>
> >> > > > > > >>>>>>>>> 1. I believe it is simpler to understand the
> operator
> >> if
> >> > it
> >> > > > > > >> has
> >> > > > > > >>>>> one
> >> > > > > > >>>>>>>> uniform
> >> > > > > > >>>>>>>>> definition, regardless of context. It's well defined
> >> and
> >> > > > > > >>> intuitive
> >> > > > > > >>>>>> what
> >> > > > > > >>>>>>>>> will happen when you use late-event suppression on a
> >> > > KTable,
> >> > > > > > >> so
> >> > > > > > >>> I
> >> > > > > > >>>>>> think
> >> > > > > > >>>>>>>>> nothing surprising or dangerous will happen in that
> >> case.
> >> > > > From
> >> > > > > > >>> my
> >> > > > > > >>>>>>>>> perspective, having two sets of allowed operations
> is
> >> > > > actually
> >> > > > > > >>> an
> >> > > > > > >>>>>>>> increase
> >> > > > > > >>>>>>>>> in cognitive complexity.
> >> > > > > > >>>>>>>>>
> >> > > > > > >>>>>>>>> 2. To me, it's not crazy to use the operator this
> way.
> >> > For
> >> > > > > > >>>>> example,
> >> > > > > > >>>>>> in
> >> > > > > > >>>>>>>> lieu
> >> > > > > > >>>>>>>>> of full-featured timestamp semantics, I can
> implement
> >> > MVCC
> >> > > > > > >>>>> behavior
> >> > > > > > >>>>>>> when
> >> > > > > > >>>>>>>>> building a KTable by
> >> "suppressLateEvents(Duration.ZERO)".
> >> > I
> >> > > > > > >>>>> suspect
> >> > > > > > >>>>>>> that
> >> > > > > > >>>>>>>>> there are other, non-obvious applications of
> >> suppressing
> >> > > late
> >> > > > > > >>>>> events
> >> > > > > > >>>>>> on
> >> > > > > > >>>>>>>>> KTables.
> >> > > > > > >>>>>>>>>
> >> > > > > > >>>>>>>>> 3. Not to get too much into implementation details
> in
> >> a
> >> > KIP
> >> > > > > > >>>>>> discussion,
> >> > > > > > >>>>>>>> but
> >> > > > > > >>>>>>>>> if we did want to make late-event suppression
> >> available
> >> > > only
> >> > > > > > >> on
> >> > > > > > >>>>>>> windowed
> >> > > > > > >>>>>>>>> KTables, we have two enforcement options:
> >> > > > > > >>>>>>>>>   a. check when we build the topology - this would
> be
> >> > > simple
> >> > > > > > >> to
> >> > > > > > >>>>>>>> implement,
> >> > > > > > >>>>>>>>> but would be a runtime check. Hopefully, people
> write
> >> > tests
> >> > > > > > >> for
> >> > > > > > >>>>> their
> >> > > > > > >>>>>>>>> topology before deploying them, so the feedback loop
> >> > isn't
> >> > > > > > >>>>>>> instantaneous,
> >> > > > > > >>>>>>>>> but it's not too long either.
> >> > > > > > >>>>>>>>>   b. add a new WindowedKTable type - this would be a
> >> > > compile
> >> > > > > > >>> time
> >> > > > > > >>>>>>> check,
> >> > > > > > >>>>>>>>> but would also be substantial increase of both
> >> interface
> >> > > and
> >> > > > > > >>> code
> >> > > > > > >>>>>>>>> complexity.
> >> > > > > > >>>>>>>>>
> >> > > > > > >>>>>>>>> We should definitely strive to have guard rails
> >> > protecting
> >> > > > > > >>> against
> >> > > > > > >>>>>>>>> surprising or dangerous behavior. Protecting against
> >> > > programs
> >> > > > > > >>>>> that we
> >> > > > > > >>>>>>>> don't
> >> > > > > > >>>>>>>>> currently predict is a lesser benefit, and I think
> we
> >> can
> >> > > put
> >> > > > > > >> up
> >> > > > > > >>>>>> guard
> >> > > > > > >>>>>>>>> rails on a case-by-case basis for that. It seems
> like
> >> the
> >> > > > > > >>>>> increase in
> >> > > > > > >>>>>>>>> cognitive (and potentially code and interface)
> >> complexity
> >> > > > > > >> makes
> >> > > > > > >>> me
> >> > > > > > >>>>>>> think
> >> > > > > > >>>>>>>> we
> >> > > > > > >>>>>>>>> should skip this case.
> >> > > > > > >>>>>>>>>
> >> > > > > > >>>>>>>>> What do you think?
> >> > > > > > >>>>>>>>>
> >> > > > > > >>>>>>>>> Thanks,
> >> > > > > > >>>>>>>>> -John
> >> > > > > > >>>>>>>>>
> >> > > > > > >>>>>>>>> On Wed, Jun 27, 2018 at 11:59 AM Matthias J. Sax <
> >> > > > > > >>>>>>> matth...@confluent.io>
> >> > > > > > >>>>>>>>> wrote:
> >> > > > > > >>>>>>>>>
> >> > > > > > >>>>>>>>>> Thanks for the KIP John.
> >> > > > > > >>>>>>>>>>
> >> > > > > > >>>>>>>>>> One initial comments about the last example
> "Bounded
> >> > > > > > >>> lateness":
> >> > > > > > >>>>>> For a
> >> > > > > > >>>>>>>>>> non-windowed KTable bounding the lateness does not
> >> > really
> >> > > > > > >> make
> >> > > > > > >>>>>> sense,
> >> > > > > > >>>>>>>>>> does it?
> >> > > > > > >>>>>>>>>>
> >> > > > > > >>>>>>>>>> Thus, I am wondering if we should allow
> >> > > > > > >> `suppressLateEvents()`
> >> > > > > > >>>>> for
> >> > > > > > >>>>>>> this
> >> > > > > > >>>>>>>>>> case? It seems to be better to only allow it for
> >> > > > > > >>>>> windowed-KTables.
> >> > > > > > >>>>>>>>>>
> >> > > > > > >>>>>>>>>>
> >> > > > > > >>>>>>>>>> -Matthias
> >> > > > > > >>>>>>>>>>
> >> > > > > > >>>>>>>>>>
> >> > > > > > >>>>>>>>>> On 6/27/18 8:53 AM, Ted Yu wrote:
> >> > > > > > >>>>>>>>>>> I noticed this (lack of primary parameter) as
> well.
> >> > > > > > >>>>>>>>>>>
> >> > > > > > >>>>>>>>>>> What you gave as new example is semantically the
> >> same
> >> > as
> >> > > > > > >>> what
> >> > > > > > >>>>> I
> >> > > > > > >>>>>>>>>> suggested.
> >> > > > > > >>>>>>>>>>> So it is good by me.
> >> > > > > > >>>>>>>>>>>
> >> > > > > > >>>>>>>>>>> Thanks
> >> > > > > > >>>>>>>>>>>
> >> > > > > > >>>>>>>>>>> On Wed, Jun 27, 2018 at 7:31 AM, John Roesler <
> >> > > > > > >>>>> j...@confluent.io
> >> > > > > > >>>>>>>
> >> > > > > > >>>>>>>>> wrote:
> >> > > > > > >>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>> Thanks for taking look, Ted,
> >> > > > > > >>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>> I agree this is a departure from the conventions
> of
> >> > > > > > >> Streams
> >> > > > > > >>>>> DSL.
> >> > > > > > >>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>> Most of our config objects have one or two
> >> "required"
> >> > > > > > >>>>>> parameters,
> >> > > > > > >>>>>>>>> which
> >> > > > > > >>>>>>>>>> fit
> >> > > > > > >>>>>>>>>>>> naturally with the static factory method
> approach.
> >> > > > > > >>>>> TimeWindow,
> >> > > > > > >>>>>> for
> >> > > > > > >>>>>>>>>> example,
> >> > > > > > >>>>>>>>>>>> requires a size parameter, so we can naturally
> say
> >> > > > > > >>>>>>>>> TimeWindows.of(size).
> >> > > > > > >>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>> I think in the case of a suppression, there's
> >> really
> >> > no
> >> > > > > > >>>>> "core"
> >> > > > > > >>>>>>>>>> parameter,
> >> > > > > > >>>>>>>>>>>> and "Suppression.of()" seems sillier than "new
> >> > > > > > >>>>> Suppression()". I
> >> > > > > > >>>>>>>> think
> >> > > > > > >>>>>>>>>> that
> >> > > > > > >>>>>>>>>>>> Suppression.of(duration) would be ambiguous,
> since
> >> > there
> >> > > > > > >>> are
> >> > > > > > >>>>>> many
> >> > > > > > >>>>>>>>>> durations
> >> > > > > > >>>>>>>>>>>> that we can configure.
> >> > > > > > >>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>> However, thinking about it again, I suppose that
> I
> >> can
> >> > > > > > >> give
> >> > > > > > >>>>> each
> >> > > > > > >>>>>>>>>>>> configuration method a static version, which
> would
> >> let
> >> > > > > > >> you
> >> > > > > > >>>>>> replace
> >> > > > > > >>>>>>>>> "new
> >> > > > > > >>>>>>>>>>>> Suppression()." with "Suppression." in all the
> >> > examples.
> >> > > > > > >>>>>>> Basically,
> >> > > > > > >>>>>>>>>> instead
> >> > > > > > >>>>>>>>>>>> of "of()", we'd support any of the methods I
> >> listed.
> >> > > > > > >>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>> For example:
> >> > > > > > >>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>> windowCounts
> >> > > > > > >>>>>>>>>>>>     .suppress(
> >> > > > > > >>>>>>>>>>>>         Suppression
> >> > > > > > >>>>>>>>>>>>             .suppressLateEvents(Duration.
> >> > ofMinutes(10))
> >> > > > > > >>>>>>>>>>>>             .suppressIntermediateEvents(
> >> > > > > > >>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>
> >> IntermediateSuppression.emitAfter(Duration.ofMinutes(
> >> > 10))
> >> > > > > > >>>>>>>>>>>>             )
> >> > > > > > >>>>>>>>>>>>     );
> >> > > > > > >>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>> Does that seem better?
> >> > > > > > >>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>> Thanks,
> >> > > > > > >>>>>>>>>>>> -John
> >> > > > > > >>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>> On Wed, Jun 27, 2018 at 12:44 AM Ted Yu <
> >> > > > > > >>> yuzhih...@gmail.com
> >> > > > > > >>>>>>
> >> > > > > > >>>>>>>> wrote:
> >> > > > > > >>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>> I started to read this KIP which contains a lot
> of
> >> > > > > > >>>>> materials.
> >> > > > > > >>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>> One suggestion:
> >> > > > > > >>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>     .suppress(
> >> > > > > > >>>>>>>>>>>>>         new Suppression()
> >> > > > > > >>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>> Do you think it would be more consistent with
> the
> >> > rest
> >> > > > > > >> of
> >> > > > > > >>>>>> Streams
> >> > > > > > >>>>>>>>> data
> >> > > > > > >>>>>>>>>>>>> structures by supporting `of` ?
> >> > > > > > >>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>> Suppression.of(Duration.ofMinutes(10))
> >> > > > > > >>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>> Cheers
> >> > > > > > >>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>> On Tue, Jun 26, 2018 at 1:11 PM, John Roesler <
> >> > > > > > >>>>>> j...@confluent.io
> >> > > > > > >>>>>>>>
> >> > > > > > >>>>>>>>>> wrote:
> >> > > > > > >>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>> Hello devs and users,
> >> > > > > > >>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>> Please take some time to consider this proposal
> >> for
> >> > > > > > >> Kafka
> >> > > > > > >>>>>>> Streams:
> >> > > > > > >>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>> KIP-328: Ability to suppress updates for
> KTables
> >> > > > > > >>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>> link:
> >> https://cwiki.apache.org/confluence/x/sQU0BQ
> >> > > > > > >>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>> The basic idea is to provide:
> >> > > > > > >>>>>>>>>>>>>> * more usable control over update rate (vs the
> >> > current
> >> > > > > > >>>>> state
> >> > > > > > >>>>>>> store
> >> > > > > > >>>>>>>>>>>>> caches)
> >> > > > > > >>>>>>>>>>>>>> * the final-result-for-windowed-computations
> >> > feature
> >> > > > > > >>> which
> >> > > > > > >>>>>>> several
> >> > > > > > >>>>>>>>>>>> people
> >> > > > > > >>>>>>>>>>>>>> have requested
> >> > > > > > >>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>> I look forward to your feedback!
> >> > > > > > >>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>> Thanks,
> >> > > > > > >>>>>>>>>>>>>> -John
> >> > > > > > >>>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>>
> >> > > > > > >>>>>>>>>>>
> >> > > > > > >>>>>>>>>>
> >> > > > > > >>>>>>>>>>
> >> > > > > > >>>>>>>>>
> >> > > > > > >>>>>>>>
> >> > > > > > >>>>>>>
> >> > > > > > >>>>>>
> >> > > > > > >>>>>
> >> > > > > > >>>>>
> >> > > > > > >>>>>
> >> > > > > > >>>>> --
> >> > > > > > >>>>> -- Guozhang
> >> > > > > > >>>>>
> >> > > > > > >>>>
> >> > > > > > >>>
> >> > > > > > >>
> >> > > > > > >>
> >> > > > > > >>
> >> > > > > > >> --
> >> > > > > > >> -- Guozhang
> >> > > > > > >>
> >> > > > > > >
> >> > > > > >
> >> > > > > >
> >> > > > >
> >> > > > >
> >> > > > > --
> >> > > > > -- Guozhang
> >> > > > >
> >> > > >
> >> > >
> >> > >
> >> > >
> >> > > --
> >> > > -- Guozhang
> >> > >
> >> >
> >>
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >
>



-- 
-- Guozhang

Reply via email to