Hi all,

I have updated KIP-328 with all the feedback I've gotten so far. Please
take another look and let me know what you think!

Thanks,
-John

On Wed, Jul 11, 2018 at 12:28 AM Guozhang Wang <wangg...@gmail.com> wrote:

> That is a good point..
>
> I cannot think of a better option than documentation and warning, and also
> given that we'd probably better not reusing the function name `until` for
> close time.
>
>
> Guozhang
>
>
> On Tue, Jul 10, 2018 at 3:31 PM, John Roesler <j...@confluent.io> wrote:
>
> > I had some opportunity to reflect on the default for close time today...
> >
> > Note that the current "close time" is equal to the retention time, and
> > therefore "close" today shares the default retention of 24h.
> >
> > It would definitely break any application that today specifies a
> retention
> > time to set close shorter than that time. It's also likely to break apps
> if
> > they *don't* set the retention time and rely on the 24h default. So it's
> > unfortunate, but I think if "close" isn't set, we should use the
> retention
> > time instead of a fixed default.
> >
> > When we ultimately remove the retention time parameter ("until"), we will
> > have to set "close" to a default of 24h.
> >
> > Of course, this has a negative impact on the user of "final results",
> since
> > they won't see any output at all for retentionTime/24h, and may find this
> > confusing. What can we do about this except document it well? Maybe log a
> > warning if we see that close wasn't explicitly set while using "final
> > results"?
> >
> > Thanks,
> > -John
> >
> > On Tue, Jul 10, 2018 at 10:46 AM John Roesler <j...@confluent.io> wrote:
> >
> > > Hi Guozhang,
> > >
> > > That sounds good to me. I'll include that in the KIP.
> > >
> > > Thanks,
> > > -John
> > >
> > > On Mon, Jul 9, 2018 at 6:33 PM Guozhang Wang <wangg...@gmail.com>
> wrote:
> > >
> > >> Let me clarify a bit on what I meant about moving `retentionPeriod` to
> > >> WindowStoreBuilder:
> > >>
> > >> In another discussion we had around KIP-319 / 330, that the "retention
> > >> period" should not really be a window spec, but only a window store
> > spec,
> > >> as it only affects how long to retain each window to be queryable
> along
> > >> with the storage cost.
> > >>
> > >> More specifically, today the "maintainMs" returned from Windows is
> used
> > in
> > >> three places:
> > >>
> > >> 1) for windowed aggregations, they are passed in directly into
> > >> `Stores.persistentWindows()` as the retention period parameters. For
> > this
> > >> use case we should just let the WindowStoreBuilder to specify this
> value
> > >> itself.
> > >>
> > >> NOTE: It is also returned in the KStreamWindowAggregate processor, to
> > >> determine if a received record should be dropped due to its lateness.
> We
> > >> may need to think of another way to get this value inside the
> processor
> > >>
> > >> 2) for windowed stream-stream join, it is used as the join range
> > parameter
> > >> but only to check that "windowSizeMs <= retentionPeriodMs". We can do
> > this
> > >> check at the store builder lever instead of at the processor level.
> > >>
> > >>
> > >> If we can remove its usage in both 1) and 2), then we should be able
> to
> > >> safely remove this from the `Windows` spec.
> > >>
> > >>
> > >> Guozhang
> > >>
> > >>
> > >> On Mon, Jul 9, 2018 at 3:53 PM, John Roesler <j...@confluent.io>
> wrote:
> > >>
> > >> > Thanks for the reply, Guozhang,
> > >> >
> > >> > Good! I agree, that is also a good reason, and I actually made use
> of
> > >> that
> > >> > in my tests. I'll update the KIP.
> > >> >
> > >> > By the way, I chose "allowedLateness" as I was trying to pick a
> better
> > >> name
> > >> > than "close", but I think it's actually the wrong name. We don't
> want
> > to
> > >> > bound the lateness of events in general, only with respect to the
> end
> > of
> > >> > their window.
> > >> >
> > >> > If we have a window [0,10), with "allowedLateness" of 5, then if we
> > get
> > >> an
> > >> > event with timestamp 3 at time 9, the name implies we'd reject it,
> > which
> > >> > seems silly. Really, we'd only want to start rejecting that event at
> > >> stream
> > >> > time 15.
> > >> >
> > >> > What I meant was more like "allowedLatenessAfterWindowEnd", but
> > that's
> > >> too
> > >> > verbose. I think that "close" + some documentation about what it
> means
> > >> will
> > >> > be better.
> > >> >
> > >> > 1: "Close" would be measured from the end of the window, so a
> > reasonable
> > >> > default would be "0". Recall that "close" really only needs to be
> > >> specified
> > >> > for final results, and a default of 0 would produce the most
> intuitive
> > >> > results. If folks later discover that they are missing some late
> > events,
> > >> > they can adjust the parameter accordingly. IMHO, any other value
> would
> > >> just
> > >> > be a guess on our part.
> > >> >
> > >> > 2a:
> > >> > I think you're saying to re-use "until" instead of adding "close" to
> > the
> > >> > window.
> > >> >
> > >> > The downside here would be that the semantic change could be more
> > >> confusing
> > >> > than deprecating "until" and introducing window "close" and a
> > >> > "retentionTime" on the store builder. The deprecation is a good,
> > >> controlled
> > >> > way for us to make sure people are getting the semantics they think
> > >> they're
> > >> > getting, as well as giving us an opportunity to link people to the
> API
> > >> they
> > >> > should use instead.
> > >> >
> > >> > I didn't fully understand the second part, but it sounds like you're
> > >> > suggesting to add a new "retentionTime" setter to Windows to bridge
> > the
> > >> gap
> > >> > until we add it to the store builder? That seems kind of roundabout
> to
> > >> me,
> > >> > if that's what you meant. We could just immediately add it to the
> > store
> > >> > builders in the same PR.
> > >> >
> > >> > 2b: Sounds good to me!
> > >> >
> > >> > Thanks again,
> > >> > -John
> > >> >
> > >> >
> > >> > On Mon, Jul 9, 2018 at 4:55 PM Guozhang Wang <wangg...@gmail.com>
> > >> wrote:
> > >> >
> > >> > > John,
> > >> > >
> > >> > > Thanks for your replies. As for the two options of the API, I
> think
> > >> I'm
> > >> > > slightly inclined to the first option as well. My motivation is a
> > bit
> > >> > > different, as I think of the first one maybe more flexible, for
> > >> example:
> > >> > >
> > >> > > KTable<Windowed<..>> table = ... count();
> > >> > >
> > >> > > table.toStream().peek(..);   // want to peek at the changelog
> > stream,
> > >> do
> > >> > > not care about final results.
> > >> > >
> > >> > > table.suppress().toStream().to("topic");    // sending to a topic,
> > >> want
> > >> > to
> > >> > > only send the final results.
> > >> > >
> > >> > > --------------
> > >> > >
> > >> > > Besides that, I have a few more minor questions:
> > >> > >
> > >> > > 1. For "allowedLateness", what should be the default value? I.e.
> if
> > >> user
> > >> > do
> > >> > > not specify "allowedLateness" in TimeWindows, what value should we
> > >> set?
> > >> > >
> > >> > > 2. For API names, some personal suggestions here:
> > >> > >
> > >> > > 2.a) "allowedLateness"  -> "until" (semantics changed, and also
> > value
> > >> is
> > >> > > defined as delta on top of window length), where "until" ->
> > >> > > "retentionPeriod", and the latter will be removed from `Windows`
> to
> > `
> > >> > > WindowStoreBuilder` in the future.
> > >> > >
> > >> > > 2.b) "BufferConfig" -> "Buffered" ?
> > >> > >
> > >> > >
> > >> > >
> > >> > > Guozhang
> > >> > >
> > >> > >
> > >> > > On Mon, Jul 9, 2018 at 2:09 PM, John Roesler <j...@confluent.io>
> > >> wrote:
> > >> > >
> > >> > > > Hey Matthias and Guozhang,
> > >> > > >
> > >> > > > Sorry for the slow reply. I was mulling about your feedback and
> > >> > weighing
> > >> > > > some ideas in a sketchbook PR: https://github.com/apache/
> > >> > kafka/pull/5337
> > >> > > .
> > >> > > >
> > >> > > > Your thought about keeping suppression independent of business
> > logic
> > >> > is a
> > >> > > > very good one. I agree that it would make more sense to add some
> > >> kind
> > >> > of
> > >> > > > "window close" concept to the window definition.
> > >> > > >
> > >> > > > In fact, doing that immediately solves the inconsistency problem
> > >> > Guozhang
> > >> > > > brought up. There's no need to add a "final results" or
> "emission"
> > >> > option
> > >> > > > to the windowed aggregation.
> > >> > > >
> > >> > > > What do you think about an API more like this:
> > >> > > >
> > >> > > > final StreamsBuilder builder = new StreamsBuilder();
> > >> > > >
> > >> > > > builder
> > >> > > >   .stream("input", Consumed.with(STRING_SERDE, STRING_SERDE))
> > >> > > >   .groupBy(
> > >> > > >     (String k1, String v1) -> k1,
> > >> > > >     Serialized.with(STRING_SERDE, STRING_SERDE)
> > >> > > >   )
> > >> > > >   .windowedBy(TimeWindows
> > >> > > >     .of(scaledTime(2L))
> > >> > > >     .until(scaledTime(3L))
> > >> > > >     .allowedLateness(scaledTime(1L))
> > >> > > >   )
> > >> > > >   .count(Materialized.as("counts"))
> > >> > > >   .suppress(
> > >> > > >     emitFinalResultsOnly(
> > >> > > >       BufferConfig.withBufferKeys(10_000L).bufferFullStrategy(
> > >> > SHUT_DOWN)
> > >> > > >     )
> > >> > > >   )
> > >> > > >   .toStream()
> > >> > > >   .to("output-suppressed", Produced.with(STRING_SERDE,
> > LONG_SERDE));
> > >> > > >
> > >> > > > Note that:
> > >> > > >  * "emitFinalResultsOnly" is available *only* on windowed tables
> > >> > > (enforced
> > >> > > > by the type system at compile time), and it determines the time
> to
> > >> wait
> > >> > > by
> > >> > > > looking at "allowedLateness" on the TimeWindows config.
> > >> > > >  * querying "counts" will produce results (eventually)
> consistent
> > >> with
> > >> > > > what's observable in "output-suppressed".
> > >> > > >  * in all cases, "suppress" has no effect on business logic,
> just
> > on
> > >> > > event
> > >> > > > suppression.
> > >> > > >
> > >> > > > Is this API straightforward? Or do you still prefer the version
> > that
> > >> > both
> > >> > > > proposed:
> > >> > > >
> > >> > > >   ...
> > >> > > >   .windowedBy(TimeWindows
> > >> > > >     .of(scaledTime(2L))
> > >> > > >     .until(scaledTime(3L))
> > >> > > >     .allowedLateness(scaledTime(1L))
> > >> > > >   )
> > >> > > >   .count(
> > >> > > >     Materialized.as("counts"),
> > >> > > >     emitFinalResultsOnly(
> > >> > > >       BufferConfig.withBufferKeys(10_000L).bufferFullStrategy(
> > >> > SHUT_DOWN)
> > >> > > >     )
> > >> > > >   )
> > >> > > >   ...
> > >> > > >
> > >> > > > To me, these two are practically identical, and I still vaguely
> > >> prefer
> > >> > > the
> > >> > > > first one.
> > >> > > >
> > >> > > > The prototype has made clearer to me that users of "final
> results
> > >> for
> > >> > > > windows" and users of "suppression for table events" both need
> to
> > >> > > configure
> > >> > > > the suppression buffer.
> > >> > > >
> > >> > > > This buffer configuration consists of:
> > >> > > > 1. how many keys or bytes to keep in memory
> > >> > > > 2. what to do if memory runs out (shut down, start using disk,
> > ...)
> > >> > > >
> > >> > > > So it's not as simple as setting a "final results" flag. We'll
> > >> either
> > >> > > have
> > >> > > > an "Emit" config object on the windowed aggregators that takes
> the
> > >> same
> > >> > > > BufferConfig that the "Suppress" config on the suppression
> > >> operator, or
> > >> > > we
> > >> > > > just use the suppression operator for both.
> > >> > > >
> > >> > > > Perhaps it would sweeten the deal a little to point out that we
> > >> have 2
> > >> > > > overloads already for each windowed aggregator (with and without
> > >> > > > Materialized). Adding "Emitted" or something would mean that
> we'd
> > >> add a
> > >> > > new
> > >> > > > overload for each one, taking us up to 4 overloads each for
> > "count",
> > >> > > > "aggregate" and "reduce". Using "suppress" means that we don't
> add
> > >> any
> > >> > > new
> > >> > > > overloads.
> > >> > > >
> > >> > > > Thanks again for helping to hash this out,
> > >> > > > -John
> > >> > > >
> > >> > > > On Fri, Jul 6, 2018 at 6:20 PM Guozhang Wang <
> wangg...@gmail.com>
> > >> > wrote:
> > >> > > >
> > >> > > > > I think I agree with Matthias for having dedicated APIs for
> > >> windowed
> > >> > > > > operation final output scenario, PLUS separating the window
> > close
> > >> > which
> > >> > > > the
> > >> > > > > "final output" would rely on, from the window retention time
> > >> itself
> > >> > > > > (admittedly it would make this KIP effort larger, but if we
> > >> believe
> > >> > we
> > >> > > > need
> > >> > > > > to do this separation anyways we could just do it now).
> > >> > > > >
> > >> > > > > And then we can have the `KTable#suppress()` for
> > >> > > intermediate-suppression
> > >> > > > > only, not for late-record-suppression, until we've seen that
> > >> becomes
> > >> > a
> > >> > > > > common feature request because our current design still allows
> > to
> > >> be
> > >> > > > > extended for that purpose.
> > >> > > > >
> > >> > > > >
> > >> > > > > Guozhang
> > >> > > > >
> > >> > > > > On Wed, Jul 4, 2018 at 12:53 PM, Matthias J. Sax <
> > >> > > matth...@confluent.io>
> > >> > > > > wrote:
> > >> > > > >
> > >> > > > > > Thanks for the discussion. I am just catching up.
> > >> > > > > >
> > >> > > > > > In general, I think we have different uses cases and
> > >> non-windowed
> > >> > and
> > >> > > > > > windowed is quite different. For the non-windowed case,
> > >> suppress()
> > >> > > has
> > >> > > > > > no (useful) close or retention time, no final semantics, and
> > >> also
> > >> > no
> > >> > > > > > business logic impact.
> > >> > > > > >
> > >> > > > > > On the other hand, for windowed aggregations, close time and
> > >> final
> > >> > > > > > result do have a meaning. IMHO, `close()` is part of
> business
> > >> logic
> > >> > > > > > while retention time is not. Also, suppression of
> intermediate
> > >> > result
> > >> > > > is
> > >> > > > > > not a business rule and there might be use case for which
> > either
> > >> > > "early
> > >> > > > > > intermediate" (before window end time) are suppressed only,
> or
> > >> all
> > >> > > > > > intermediates are suppressed (maybe also something in the
> > >> middle,
> > >> > ie,
> > >> > > > > > just reduce the load of intermediate updates). Thus,
> > >> > > window-suppression
> > >> > > > > > is much richer.
> > >> > > > > >
> > >> > > > > > IMHO, a generic `suppress()` operator that can be inserted
> > into
> > >> the
> > >> > > > data
> > >> > > > > > flow at any point is useful. Maybe we should keep is as
> > generic
> > >> as
> > >> > > > > > possible. However, it might be difficult to use with regard
> to
> > >> > > > > > windowing, as the mental effort to use it is high.
> > >> > > > > >
> > >> > > > > > With regard to Guozhang's comment:
> > >> > > > > >
> > >> > > > > > > we will actually
> > >> > > > > > > process data as old as 30 days as well, while most of the
> > late
> > >> > > > updates
> > >> > > > > > > beyond 5 minutes would be discarded anyways.
> > >> > > > > >
> > >> > > > > > If we use `suppress()` as a standalone operator, this is
> > correct
> > >> > and
> > >> > > > > > intended IMHO. To address the issue if the behavior is
> > >> unwanted, I
> > >> > > > would
> > >> > > > > > suggest to add a "suppress option" directly to
> > >> > > > > > `count()/reduce()/aggregate()` window operator similar to
> > >> > > > > > `Materialized`. This would be an "embedded suppress" and
> avoid
> > >> the
> > >> > > > > > issue. It would also address the issue about mental effort
> for
> > >> > > "single
> > >> > > > > > final window result" use case.
> > >> > > > > >
> > >> > > > > > I also think that a shorter close-time than retention time
> is
> > >> > useful
> > >> > > > for
> > >> > > > > > window aggregation. If we add close() to the window
> definition
> > >> and
> > >> > > > > > until() to `Materialized`, we can separate both correctly
> > IMHO.
> > >> > > > > >
> > >> > > > > > About setting `close = min(close,retention)` I am not sure.
> We
> > >> > might
> > >> > > > > > rather throw an exception than reducing the close time
> > >> > automatically.
> > >> > > > > > Otherwise, I see many user question about "I set close to X
> > but
> > >> it
> > >> > > does
> > >> > > > > > not get updated for some data that is with delay of X".
> > >> > > > > >
> > >> > > > > > The tricky question might be to design the API in a backward
> > >> > > compatible
> > >> > > > > > way though.
> > >> > > > > >
> > >> > > > > >
> > >> > > > > >
> > >> > > > > > -Matthias
> > >> > > > > >
> > >> > > > > > On 7/3/18 5:38 AM, John Roesler wrote:
> > >> > > > > > > Hi Guozhang,
> > >> > > > > > >
> > >> > > > > > > I see. It seems like if we want to decouple 1) and 2), we
> > >> need to
> > >> > > > alter
> > >> > > > > > the
> > >> > > > > > > definition of the window. Do you think it would close the
> > gap
> > >> if
> > >> > we
> > >> > > > > > added a
> > >> > > > > > > "window close" time to the window definition?
> > >> > > > > > >
> > >> > > > > > > Such as:
> > >> > > > > > >
> > >> > > > > > > builder.stream("input")
> > >> > > > > > > .groupByKey()
> > >> > > > > > > .windowedBy(
> > >> > > > > > >   TimeWindows
> > >> > > > > > >     .of(60_000)
> > >> > > > > > >     .closeAfter(10 * 60)
> > >> > > > > > >     .until(30L * 24 * 60 * 60 * 1000)
> > >> > > > > > > )
> > >> > > > > > > .count()
> > >> > > > > > > .suppress(Suppression.finalResultsOnly());
> > >> > > > > > >
> > >> > > > > > > Possibly called "finalResultsAtWindowClose" or something?
> > >> > > > > > >
> > >> > > > > > > Thanks,
> > >> > > > > > > -John
> > >> > > > > > >
> > >> > > > > > > On Mon, Jul 2, 2018 at 6:50 PM Guozhang Wang <
> > >> wangg...@gmail.com
> > >> > >
> > >> > > > > wrote:
> > >> > > > > > >
> > >> > > > > > >> Hey John,
> > >> > > > > > >>
> > >> > > > > > >> Obviously I'm too lazy on email replying diligence
> compared
> > >> with
> > >> > > you
> > >> > > > > :)
> > >> > > > > > >> Will try to reply them separately:
> > >> > > > > > >>
> > >> > > > > > >>
> > >> > > > > > >> ------------------------------
> > ------------------------------
> > >> > > > > > -----------------
> > >> > > > > > >>
> > >> > > > > > >> To reply your email on "Mon, Jul 2, 2018 at 8:23 AM":
> > >> > > > > > >>
> > >> > > > > > >> I'm aware of this use case, but again, the concern is
> that,
> > >> in
> > >> > > this
> > >> > > > > > setting
> > >> > > > > > >> in order to let the window be queryable for 30 days, we
> > will
> > >> > > > actually
> > >> > > > > > >> process data as old as 30 days as well, while most of the
> > >> late
> > >> > > > updates
> > >> > > > > > >> beyond 5 minutes would be discarded anyways. Personally I
> > >> think
> > >> > > for
> > >> > > > > the
> > >> > > > > > >> final update scenario, the ideal situation users would
> want
> > >> is
> > >> > > that
> > >> > > > > "do
> > >> > > > > > not
> > >> > > > > > >> process any data that is less than 5 minutes, and of
> course
> > >> no
> > >> > > > update
> > >> > > > > > >> records to the downstream later than 5 minutes either;
> but
> > >> > retain
> > >> > > > the
> > >> > > > > > >> window to be queryable for 30 days". And by doing that
> the
> > >> final
> > >> > > > > window
> > >> > > > > > >> snapshot would also be aligned with the update stream as
> > >> well.
> > >> > In
> > >> > > > > other
> > >> > > > > > >> words, among these three periods:
> > >> > > > > > >>
> > >> > > > > > >> 1) the retention length of the window / table.
> > >> > > > > > >> 2) the late records acceptance for updating the window.
> > >> > > > > > >> 3) the late records update to be sent downstream.
> > >> > > > > > >>
> > >> > > > > > >> Final update use cases would naturally want 2) = 3),
> while
> > 1)
> > >> > may
> > >> > > be
> > >> > > > > > >> different and larger, while what we provide now is that
> 1)
> > =
> > >> 2),
> > >> > > > which
> > >> > > > > > >> could be different and in practice larger than 3), hence
> > not
> > >> the
> > >> > > > most
> > >> > > > > > >> intuitive for their needs.
> > >> > > > > > >>
> > >> > > > > > >>
> > >> > > > > > >>
> > >> > > > > > >> ------------------------------
> > ------------------------------
> > >> > > > > > -----------------
> > >> > > > > > >>
> > >> > > > > > >> To reply your email on "Mon, Jul 2, 2018 at 10:27 AM":
> > >> > > > > > >>
> > >> > > > > > >> I'd like option 2) over option 1) better as well from
> > >> > programming
> > >> > > > pov.
> > >> > > > > > But
> > >> > > > > > >> I'm wondering if option 2) would provide the above
> > semantics
> > >> or
> > >> > it
> > >> > > > is
> > >> > > > > > still
> > >> > > > > > >> coupling 1) with 2) as well ?
> > >> > > > > > >>
> > >> > > > > > >>
> > >> > > > > > >>
> > >> > > > > > >> Guozhang
> > >> > > > > > >>
> > >> > > > > > >>
> > >> > > > > > >>
> > >> > > > > > >>
> > >> > > > > > >> On Mon, Jul 2, 2018 at 1:08 PM, John Roesler <
> > >> j...@confluent.io
> > >> > >
> > >> > > > > wrote:
> > >> > > > > > >>
> > >> > > > > > >>> In fact, to push the idea further (which IIRC is what
> > >> Matthias
> > >> > > > > > originally
> > >> > > > > > >>> proposed), if we can accept
> "Suppression#finalResultsOnly"
> > >> in
> > >> > my
> > >> > > > last
> > >> > > > > > >>> email, then we could also consider whether to eliminate
> > >> > > > > > >>> "suppressLateEvents" entirely.
> > >> > > > > > >>>
> > >> > > > > > >>> We could always add it later, but you've both expressed
> > >> doubt
> > >> > > that
> > >> > > > > > there
> > >> > > > > > >>> are practical use cases for it outside of final-results.
> > >> > > > > > >>>
> > >> > > > > > >>> -John
> > >> > > > > > >>>
> > >> > > > > > >>> On Mon, Jul 2, 2018 at 12:27 PM John Roesler <
> > >> > j...@confluent.io>
> > >> > > > > > wrote:
> > >> > > > > > >>>
> > >> > > > > > >>>> Hi again, Guozhang ;) Here's the second part of my
> > >> response...
> > >> > > > > > >>>>
> > >> > > > > > >>>> It seems like your main concern is: "if I'm a user who
> > >> wants
> > >> > > final
> > >> > > > > > >> update
> > >> > > > > > >>>> semantics, how complicated is it for me to get it?"
> > >> > > > > > >>>>
> > >> > > > > > >>>> I think we have to assume that people don't always have
> > >> time
> > >> > to
> > >> > > > > become
> > >> > > > > > >>>> deeply familiar with all the nuances of a programming
> > >> > > environment
> > >> > > > > > >> before
> > >> > > > > > >>>> they use it. Especially if they're evaluating several
> > >> > frameworks
> > >> > > > for
> > >> > > > > > >>> their
> > >> > > > > > >>>> use case, it's very valuable to make it as obvious as
> > >> possible
> > >> > > how
> > >> > > > > to
> > >> > > > > > >>>> accomplish various computations with Streams.
> > >> > > > > > >>>>
> > >> > > > > > >>>> To me the biggest question is whether with a fresh
> > >> > perspective,
> > >> > > > > people
> > >> > > > > > >>>> would say "oh, I get it, I have to bound my lateness
> and
> > >> > > suppress
> > >> > > > > > >>>> intermediate updates, and of course I'll get only the
> > final
> > >> > > > > result!",
> > >> > > > > > >> or
> > >> > > > > > >>> if
> > >> > > > > > >>>> it's more like "wtf? all I want is the final result,
> what
> > >> are
> > >> > > all
> > >> > > > > > these
> > >> > > > > > >>>> parameters?".
> > >> > > > > > >>>>
> > >> > > > > > >>>> I was talking with Matthias a while back, and he had an
> > >> idea
> > >> > > that
> > >> > > > I
> > >> > > > > > >> think
> > >> > > > > > >>>> can help, which is to essentially set up a final-result
> > >> recipe
> > >> > > in
> > >> > > > > > >>> addition
> > >> > > > > > >>>> to the raw parameters. I previously thought that it
> > >> wouldn't
> > >> > be
> > >> > > > > > >> possible
> > >> > > > > > >>> to
> > >> > > > > > >>>> restrict its usage to Windowed KTables, but thinking
> > about
> > >> it
> > >> > > > again
> > >> > > > > > >> this
> > >> > > > > > >>>> weekend, I have a couple of ideas:
> > >> > > > > > >>>>
> > >> > > > > > >>>> ================
> > >> > > > > > >>>> = 1. Static Wrapper =
> > >> > > > > > >>>> ================
> > >> > > > > > >>>> We can define an extra static function that "wraps" a
> > >> KTable
> > >> > > with
> > >> > > > > > >>>> final-result semantics.
> > >> > > > > > >>>>
> > >> > > > > > >>>> public static <K extends Windowed, V> KTable<K, V>
> > >> > > > finalResultsOnly(
> > >> > > > > > >>>>   final KTable<K, V> windowedKTable,
> > >> > > > > > >>>>   final Duration maxAllowedLateness,
> > >> > > > > > >>>>   final Suppression.BufferFullStrategy
> > bufferFullStrategy)
> > >> {
> > >> > > > > > >>>>     return windowedKTable.suppress(
> > >> > > > > > >>>>         Suppression.suppressLateEvents(
> > maxAllowedLateness)
> > >> > > > > > >>>>                    .suppressIntermediateEvents(
> > >> > > > > > >>>>                      IntermediateSuppression
> > >> > > > > > >>>>                        .emitAfter(maxAllowedLateness)
> > >> > > > > > >>>>                        .bufferFullStrategy(
> > >> > bufferFullStrategy)
> > >> > > > > > >>>>                    )
> > >> > > > > > >>>>     );
> > >> > > > > > >>>> }
> > >> > > > > > >>>>
> > >> > > > > > >>>> Because windowedKTable is a parameter, the static
> > function
> > >> can
> > >> > > > > easily
> > >> > > > > > >>>> impose an extra bound on the key type, that it extends
> > >> > Windowed.
> > >> > > > > This
> > >> > > > > > >>> would
> > >> > > > > > >>>> make "final results only" only available on windowed
> > >> ktables.
> > >> > > > > > >>>>
> > >> > > > > > >>>> Here's how it would look to use:
> > >> > > > > > >>>>
> > >> > > > > > >>>> final KTable<Windowed<Integer>, Long> windowCounts =
> ...
> > >> > > > > > >>>> final KTable<Windowed<Integer>, Long> finalCounts =
> > >> > > > > > >>>>   finalResultsOnly(
> > >> > > > > > >>>>     windowCounts,
> > >> > > > > > >>>>     Duration.ofMinutes(10),
> > >> > > > > > >>>>     Suppression.BufferFullStrategy.SHUT_DOWN
> > >> > > > > > >>>>   );
> > >> > > > > > >>>>
> > >> > > > > > >>>> Trying to use it on a non-windowed KTable yields:
> > >> > > > > > >>>>
> > >> > > > > > >>>>> Error:(129, 35) java: method finalResultsOnly in class
> > >> > > > > > >>>>> org.apache.kafka.streams.kstream.internals.
> > >> > KTableAggregateTest
> > >> > > > > > cannot
> > >> > > > > > >>> be
> > >> > > > > > >>>>> applied to given types;
> > >> > > > > > >>>>>   required:
> > >> > > > > > >>>>>
> org.apache.kafka.streams.kstream.KTable<K,V>,java.time.
> > >> > > > > > >>> Duration,org.apache.kafka.streams.kstream.Suppression.
> > >> > > > > > BufferFullStrategy
> > >> > > > > > >>>>>   found:
> > >> > > > > > >>>>> org.apache.kafka.streams.kstream.KTable<java.lang.
> > >> > > > > > >>> String,java.lang.String>,java.time.Duration,org.apache.
> > >> > > > > > >>> kafka.streams.kstream.Suppression.BufferFullStrategy
> > >> > > > > > >>>>>   reason: inference variable K has incompatible bounds
> > >> > > > > > >>>>>     equality constraints: java.lang.String
> > >> > > > > > >>>>>     upper bounds:
> > >> org.apache.kafka.streams.kstream.Windowed
> > >> > > > > > >>>>
> > >> > > > > > >>>>
> > >> > > > > > >>>>
> > >> > > > > > >>>> =================================================
> > >> > > > > > >>>> = 2. Add <K,V> parameters and recipe method to
> > Suppression
> > >> =
> > >> > > > > > >>>> =================================================
> > >> > > > > > >>>>
> > >> > > > > > >>>> By adding K,V parameters to Suppression, we can
> provide a
> > >> > > > similarly
> > >> > > > > > >>>> bounded config method directly on the Suppression
> class:
> > >> > > > > > >>>>
> > >> > > > > > >>>> public static <K extends Windowed, V> Suppression<K, V>
> > >> > > > > > >>>> finalResultsOnly(final Duration maxAllowedLateness,
> final
> > >> > > > > > >>>> BufferFullStrategy bufferFullStrategy) {
> > >> > > > > > >>>>     return Suppression
> > >> > > > > > >>>>         .<K, V>suppressLateEvents(maxAllowedLateness)
> > >> > > > > > >>>>         .suppressIntermediateEvents(
> > IntermediateSuppression
> > >> > > > > > >>>>             .emitAfter(maxAllowedLateness)
> > >> > > > > > >>>>             .bufferFullStrategy(bufferFullStrategy)
> > >> > > > > > >>>>         );
> > >> > > > > > >>>> }
> > >> > > > > > >>>>
> > >> > > > > > >>>> Then, here's how it would look to use it:
> > >> > > > > > >>>>
> > >> > > > > > >>>> final KTable<Windowed<Integer>, Long> windowCounts =
> ...
> > >> > > > > > >>>> final KTable<Windowed<Integer>, Long> finalCounts =
> > >> > > > > > >>>>   windowCounts.suppress(
> > >> > > > > > >>>>     Suppression.finalResultsOnly(
> > >> > > > > > >>>>       Duration.ofMinutes(10)
> > >> > > > > > >>>>       Suppression.BufferFullStrategy.SHUT_DOWN
> > >> > > > > > >>>>     )
> > >> > > > > > >>>>   );
> > >> > > > > > >>>>
> > >> > > > > > >>>> Trying to use it on a non-windowed ktable yields:
> > >> > > > > > >>>>
> > >> > > > > > >>>>> Error:(127, 35) java: method finalResultsOnly in class
> > >> > > > > > >>>>> org.apache.kafka.streams.kstream.Suppression<K,V>
> > cannot
> > >> be
> > >> > > > applied
> > >> > > > > > to
> > >> > > > > > >>>>> given types;
> > >> > > > > > >>>>>   required:
> > >> > > > > > >>>>> java.time.Duration,org.apache.kafka.streams.kstream.
> > >> > > > > > >>> Suppression.BufferFullStrategy
> > >> > > > > > >>>>>   found:
> > >> > > > > > >>>>> java.time.Duration,org.apache.kafka.streams.kstream.
> > >> > > > > > >>> Suppression.BufferFullStrategy
> > >> > > > > > >>>>>   reason: explicit type argument java.lang.String does
> > not
> > >> > > > conform
> > >> > > > > to
> > >> > > > > > >>>>> declared bound(s)
> > >> org.apache.kafka.streams.kstream.Windowed
> > >> > > > > > >>>>
> > >> > > > > > >>>>
> > >> > > > > > >>>>
> > >> > > > > > >>>> ============
> > >> > > > > > >>>> = Downsides =
> > >> > > > > > >>>> ============
> > >> > > > > > >>>>
> > >> > > > > > >>>> Of course, there's a downside either way:
> > >> > > > > > >>>> * for 1:  this "wrapper" interaction would be the first
> > in
> > >> the
> > >> > > > DSL.
> > >> > > > > Is
> > >> > > > > > >> it
> > >> > > > > > >>>> too strange, and how discoverable would it be?
> > >> > > > > > >>>> * for 2: adding those type parameters to Suppression
> will
> > >> > force
> > >> > > > all
> > >> > > > > > >>>> callers to provide them in the event of a chained
> > >> construction
> > >> > > > > because
> > >> > > > > > >>> Java
> > >> > > > > > >>>> doesn't do RHS recursive type inference. This is
> already
> > >> > visible
> > >> > > > in
> > >> > > > > > >> other
> > >> > > > > > >>>> parts of the Streams DSL. For example, often calls to
> > >> > > Materialized
> > >> > > > > > >>> builders
> > >> > > > > > >>>> have to provide seemingly obvious type bounds.
> > >> > > > > > >>>>
> > >> > > > > > >>>> ============
> > >> > > > > > >>>> = Conclusion =
> > >> > > > > > >>>> ============
> > >> > > > > > >>>>
> > >> > > > > > >>>> I think option 2 is more "normal" and discoverable. It
> > does
> > >> > > have a
> > >> > > > > > >>>> downside, but it's one that's pre-existing elsewhere in
> > the
> > >> > DSL.
> > >> > > > > > >>>>
> > >> > > > > > >>>> WDYT? Would the addition of this "recipe" method to
> > >> > Suppression
> > >> > > > > > resolve
> > >> > > > > > >>>> your concern?
> > >> > > > > > >>>>
> > >> > > > > > >>>> Thanks again,
> > >> > > > > > >>>> -John
> > >> > > > > > >>>>
> > >> > > > > > >>>> On Sun, Jul 1, 2018 at 11:24 PM Guozhang Wang <
> > >> > > wangg...@gmail.com
> > >> > > > >
> > >> > > > > > >>> wrote:
> > >> > > > > > >>>>
> > >> > > > > > >>>>> Hi John,
> > >> > > > > > >>>>>
> > >> > > > > > >>>>> Regarding the metrics: yeah I think I'm with you that
> > the
> > >> > > dropped
> > >> > > > > > >>> records
> > >> > > > > > >>>>> due to window retention or emit suppression policies
> > >> should
> > >> > be
> > >> > > > > > >> recorded
> > >> > > > > > >>>>> differently, and using this KIP's proposed metric
> would
> > be
> > >> > > fine.
> > >> > > > If
> > >> > > > > > >> you
> > >> > > > > > >>>>> also think we can use this KIP's proposed metrics to
> > cover
> > >> > the
> > >> > > > > window
> > >> > > > > > >>>>> retention cased skipping records, then we can include
> > the
> > >> > > changes
> > >> > > > > in
> > >> > > > > > >>> this
> > >> > > > > > >>>>> KIP as well.
> > >> > > > > > >>>>>
> > >> > > > > > >>>>> Regarding the current proposal, I'm actually not too
> > >> worried
> > >> > > > about
> > >> > > > > > the
> > >> > > > > > >>>>> inconsistency between query semantics and downstream
> > emit
> > >> > > > > semantics.
> > >> > > > > > >> For
> > >> > > > > > >>>>> queries, we will always return the current running
> > >> results of
> > >> > > the
> > >> > > > > > >>> windows,
> > >> > > > > > >>>>> being it partial or final results depending on the
> > window
> > >> > > > retention
> > >> > > > > > >> time
> > >> > > > > > >>>>> anyways, which has nothing to do whether the emitted
> > >> stream
> > >> > > > should
> > >> > > > > be
> > >> > > > > > >>> one
> > >> > > > > > >>>>> final output per key or not. I also agree that having
> a
> > >> > unified
> > >> > > > > > >>> operation
> > >> > > > > > >>>>> is generally better for users to focus on leveraging
> > that
> > >> one
> > >> > > > only
> > >> > > > > > >> than
> > >> > > > > > >>>>> learning about two set of operations. The only
> question
> > I
> > >> had
> > >> > > is,
> > >> > > > > for
> > >> > > > > > >>>>> final
> > >> > > > > > >>>>> updates of window stores, if it is a bit awkward to
> > >> > understand
> > >> > > > the
> > >> > > > > > >>>>> configuration combo. Thinking about this more, I think
> > my
> > >> > root
> > >> > > > > worry
> > >> > > > > > >> in
> > >> > > > > > >>>>> the
> > >> > > > > > >>>>> "suppressLateEvents" call for windowed tables, since
> > from
> > >> a
> > >> > > user
> > >> > > > > > >>>>> perspective: if my retention time is X which means
> "pay
> > >> the
> > >> > > cost
> > >> > > > to
> > >> > > > > > >>> allow
> > >> > > > > > >>>>> late records up to X to still be applied updating the
> > >> > tables",
> > >> > > > why
> > >> > > > > > >>> would I
> > >> > > > > > >>>>> ever want to suppressLateEvents by Y ( < X), to say
> "do
> > >> not
> > >> > > send
> > >> > > > > the
> > >> > > > > > >>>>> updates up to Y, which means the downstream operator
> or
> > >> sink
> > >> > > > topic
> > >> > > > > > for
> > >> > > > > > >>>>> this
> > >> > > > > > >>>>> stream would actually see a truncated update stream
> > while
> > >> > I've
> > >> > > > paid
> > >> > > > > > >>> larger
> > >> > > > > > >>>>> cost for that"; and of course, Y > X would not make
> > sense
> > >> > > either
> > >> > > > as
> > >> > > > > > >> you
> > >> > > > > > >>>>> would not see any updates later than X anyways. So in
> > >> all, my
> > >> > > > > feeling
> > >> > > > > > >> is
> > >> > > > > > >>>>> that it makes less sense for windowed table's
> > >> > > > "suppressLateEvents"
> > >> > > > > > >> with
> > >> > > > > > >>> a
> > >> > > > > > >>>>> parameter that is not equal to the window retention,
> and
> > >> > > opening
> > >> > > > > the
> > >> > > > > > >>> door
> > >> > > > > > >>>>> in the current proposal may confuse people with that.
> > >> > > > > > >>>>>
> > >> > > > > > >>>>> Again, above is just a subjective opinion and probably
> > we
> > >> can
> > >> > > > also
> > >> > > > > > >> bring
> > >> > > > > > >>>>> up
> > >> > > > > > >>>>> some scenarios that users does want to set X != Y..
> but
> > >> > > > personally
> > >> > > > > I
> > >> > > > > > >>> feel
> > >> > > > > > >>>>> that even if the semantics for this scenario if
> > intuitive
> > >> for
> > >> > > > user
> > >> > > > > to
> > >> > > > > > >>>>> understand, doe that really make sense and should we
> > >> really
> > >> > > open
> > >> > > > > the
> > >> > > > > > >>> door
> > >> > > > > > >>>>> for it. So I think maybe separating the final update
> in
> > a
> > >> > > > separate
> > >> > > > > > >> API's
> > >> > > > > > >>>>> benefits may overwhelm the advantage of having one
> > uniform
> > >> > > > > > definition.
> > >> > > > > > >>> And
> > >> > > > > > >>>>> for my alternative proposal, the rationale was from
> both
> > >> my
> > >> > > > concern
> > >> > > > > > >>> about
> > >> > > > > > >>>>> "suppressLateEvents" for windowed store, and Matthias'
> > >> > question
> > >> > > > > about
> > >> > > > > > >>>>> "suppressLateEvents" for non-windowed stores, that if
> it
> > >> is
> > >> > > less
> > >> > > > > > >>>>> meaningful
> > >> > > > > > >>>>> for both, we can consider removing it completely and
> > only
> > >> do
> > >> > > > > > >>>>> "IntermediateSuppression" in Suppress instead.
> > >> > > > > > >>>>>
> > >> > > > > > >>>>> So I'd summarize my thoughts in the following
> questions:
> > >> > > > > > >>>>>
> > >> > > > > > >>>>> 1. Does "suppressLateEvents" with parameter Y != X
> > (window
> > >> > > > > retention
> > >> > > > > > >>> time)
> > >> > > > > > >>>>> for windowed stores make sense in practice?
> > >> > > > > > >>>>> 2. Does "suppressLateEvents" with any parameter Y for
> > >> > > > non-windowed
> > >> > > > > > >>> stores
> > >> > > > > > >>>>> make sense in practice?
> > >> > > > > > >>>>>
> > >> > > > > > >>>>>
> > >> > > > > > >>>>>
> > >> > > > > > >>>>> Guozhang
> > >> > > > > > >>>>>
> > >> > > > > > >>>>>
> > >> > > > > > >>>>> On Fri, Jun 29, 2018 at 2:26 PM, Bill Bejeck <
> > >> > > bbej...@gmail.com>
> > >> > > > > > >> wrote:
> > >> > > > > > >>>>>
> > >> > > > > > >>>>>> Thanks for the explanation, that does make sense.  I
> > have
> > >> > some
> > >> > > > > > >>>>> questions on
> > >> > > > > > >>>>>> operations, but I'll just wait for the PR and tests.
> > >> > > > > > >>>>>>
> > >> > > > > > >>>>>> Thanks,
> > >> > > > > > >>>>>> Bill
> > >> > > > > > >>>>>>
> > >> > > > > > >>>>>> On Wed, Jun 27, 2018 at 8:14 PM John Roesler <
> > >> > > j...@confluent.io
> > >> > > > >
> > >> > > > > > >>> wrote:
> > >> > > > > > >>>>>>
> > >> > > > > > >>>>>>> Hi Bill,
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> Thanks for the review!
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> Your question is very much applicable to the KIP and
> > >> not at
> > >> > > all
> > >> > > > > an
> > >> > > > > > >>>>>>> implementation detail. Thanks for bringing it up.
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> I'm proposing not to change the existing caches and
> > >> > > > > configurations
> > >> > > > > > >>> at
> > >> > > > > > >>>>> all
> > >> > > > > > >>>>>>> (for now).
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> Imagine you have a topology like this:
> > >> > > > > > >>>>>>> commit.interval.ms = 100
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> (ktable1 (cached)) -> (suppress emitAfter 200)
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> The first ktable (ktable1) will respect the commit
> > >> interval
> > >> > > and
> > >> > > > > > >>> buffer
> > >> > > > > > >>>>>>> events for 100ms before logging, storing, or
> > forwarding
> > >> > them
> > >> > > > > > >> (IIRC).
> > >> > > > > > >>>>>>> Therefore, the second ktable (suppress) will only
> see
> > >> the
> > >> > > > events
> > >> > > > > > >> at
> > >> > > > > > >>> a
> > >> > > > > > >>>>>> rate
> > >> > > > > > >>>>>>> of once per 100ms. It will apply its own buffering,
> > and
> > >> > emit
> > >> > > > once
> > >> > > > > > >>> per
> > >> > > > > > >>>>>> 200ms
> > >> > > > > > >>>>>>> This case is pretty trivial because the suppress
> time
> > >> is a
> > >> > > > > > >> multiple
> > >> > > > > > >>> of
> > >> > > > > > >>>>>> the
> > >> > > > > > >>>>>>> commit interval.
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> When it's not an integer multiple, you'll get
> behavior
> > >> like
> > >> > > in
> > >> > > > > > >> this
> > >> > > > > > >>>>>> marble
> > >> > > > > > >>>>>>> diagram:
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> <-(k:1)--(k:2)--(k:3)--(k:4)--(k:5)--(k:6)->
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> [ KTable caching with commit interval = 2 ]
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> <--------(k:2)---------(k:4)---------(k:6)->
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>>       [ suppress with emitAfter = 3 ]
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> <---------------(k:2)----------------(k:6)->
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> If this behavior isn't desired (for example, if you
> > >> wanted
> > >> > to
> > >> > > > > emit
> > >> > > > > > >>>>> (k:3)
> > >> > > > > > >>>>>> at
> > >> > > > > > >>>>>>> time 3, I'd recommend setting the
> > >> > "cache.max.bytes.buffering"
> > >> > > > to
> > >> > > > > 0
> > >> > > > > > >>> or
> > >> > > > > > >>>>>>> modifying the topology to disable caching. Then, the
> > >> > behavior
> > >> > > > is
> > >> > > > > > >>> more
> > >> > > > > > >>>>>>> simply determined just by the suppress operator.
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> Does that seem right to you?
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> Regarding the changelogs, because the suppression
> > >> operator
> > >> > > > hangs
> > >> > > > > > >>> onto
> > >> > > > > > >>>>>>> events for a while, it will need its own changelog.
> > The
> > >> > > > changelog
> > >> > > > > > >>>>>>> should represent the current state of the buffer at
> > all
> > >> > > times.
> > >> > > > So
> > >> > > > > > >>> when
> > >> > > > > > >>>>>> the
> > >> > > > > > >>>>>>> suppress operator sees (k:2), for example, it will
> log
> > >> > (k:2).
> > >> > > > > When
> > >> > > > > > >>> it
> > >> > > > > > >>>>>>> later gets to time 3, it's time to emit (k:2)
> > >> downstream.
> > >> > > > Because
> > >> > > > > > >> k
> > >> > > > > > >>>>> is no
> > >> > > > > > >>>>>>> longer buffered, the suppress operator will log
> > >> (k:null).
> > >> > > Thus,
> > >> > > > > > >> when
> > >> > > > > > >>>>>>> recovering,
> > >> > > > > > >>>>>>> it can rebuild the buffer by reading its changelog.
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> What do you think about this?
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> Thanks,
> > >> > > > > > >>>>>>> -John
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>> On Wed, Jun 27, 2018 at 4:16 PM Bill Bejeck <
> > >> > > bbej...@gmail.com
> > >> > > > >
> > >> > > > > > >>>>> wrote:
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>>> Hi John,  thanks for the KIP.
> > >> > > > > > >>>>>>>>
> > >> > > > > > >>>>>>>> Early on in the KIP, you mention the current
> > approaches
> > >> > for
> > >> > > > > > >>>>> controlling
> > >> > > > > > >>>>>>> the
> > >> > > > > > >>>>>>>> rate of downstream records from a KTable, cache
> size
> > >> > > > > > >> configuration
> > >> > > > > > >>>>> and
> > >> > > > > > >>>>>>>> commit time.
> > >> > > > > > >>>>>>>>
> > >> > > > > > >>>>>>>> Will these configuration parameters still be in
> > effect
> > >> for
> > >> > > > > > >> tables
> > >> > > > > > >>>>> that
> > >> > > > > > >>>>>>>> don't use suppression?  For tables taking advantage
> > of
> > >> > > > > > >>> suppression,
> > >> > > > > > >>>>>> will
> > >> > > > > > >>>>>>>> these configurations have no impact?
> > >> > > > > > >>>>>>>> This last question may be to implementation
> specific
> > >> but
> > >> > if
> > >> > > > the
> > >> > > > > > >>>>>> requested
> > >> > > > > > >>>>>>>> suppression time is longer than the specified
> commit
> > >> time,
> > >> > > > will
> > >> > > > > > >>> the
> > >> > > > > > >>>>>>> latest
> > >> > > > > > >>>>>>>> record in the suppression buffer get stored in a
> > >> > changelog?
> > >> > > > > > >>>>>>>>
> > >> > > > > > >>>>>>>> Thanks,
> > >> > > > > > >>>>>>>> Bill
> > >> > > > > > >>>>>>>>
> > >> > > > > > >>>>>>>> On Wed, Jun 27, 2018 at 3:04 PM John Roesler <
> > >> > > > j...@confluent.io
> > >> > > > > > >>>
> > >> > > > > > >>>>>> wrote:
> > >> > > > > > >>>>>>>>
> > >> > > > > > >>>>>>>>> Thanks for the feedback, Matthias,
> > >> > > > > > >>>>>>>>>
> > >> > > > > > >>>>>>>>> It seems like in straightforward relational
> > processing
> > >> > > cases,
> > >> > > > > > >> it
> > >> > > > > > >>>>>> would
> > >> > > > > > >>>>>>>> not
> > >> > > > > > >>>>>>>>> make sense to bound the lateness of KTables. In
> > >> general,
> > >> > it
> > >> > > > > > >>> seems
> > >> > > > > > >>>>>>> better
> > >> > > > > > >>>>>>>> to
> > >> > > > > > >>>>>>>>> have "guard rails" in place that make it easier to
> > >> write
> > >> > > > > > >>> sensible
> > >> > > > > > >>>>>>>> programs
> > >> > > > > > >>>>>>>>> than insensible ones.
> > >> > > > > > >>>>>>>>>
> > >> > > > > > >>>>>>>>> But I'm still going to argue in favor of keeping
> it
> > >> for
> > >> > all
> > >> > > > > > >>>>> KTables
> > >> > > > > > >>>>>> ;)
> > >> > > > > > >>>>>>>>>
> > >> > > > > > >>>>>>>>> 1. I believe it is simpler to understand the
> > operator
> > >> if
> > >> > it
> > >> > > > > > >> has
> > >> > > > > > >>>>> one
> > >> > > > > > >>>>>>>> uniform
> > >> > > > > > >>>>>>>>> definition, regardless of context. It's well
> defined
> > >> and
> > >> > > > > > >>> intuitive
> > >> > > > > > >>>>>> what
> > >> > > > > > >>>>>>>>> will happen when you use late-event suppression
> on a
> > >> > > KTable,
> > >> > > > > > >> so
> > >> > > > > > >>> I
> > >> > > > > > >>>>>> think
> > >> > > > > > >>>>>>>>> nothing surprising or dangerous will happen in
> that
> > >> case.
> > >> > > > From
> > >> > > > > > >>> my
> > >> > > > > > >>>>>>>>> perspective, having two sets of allowed operations
> > is
> > >> > > > actually
> > >> > > > > > >>> an
> > >> > > > > > >>>>>>>> increase
> > >> > > > > > >>>>>>>>> in cognitive complexity.
> > >> > > > > > >>>>>>>>>
> > >> > > > > > >>>>>>>>> 2. To me, it's not crazy to use the operator this
> > way.
> > >> > For
> > >> > > > > > >>>>> example,
> > >> > > > > > >>>>>> in
> > >> > > > > > >>>>>>>> lieu
> > >> > > > > > >>>>>>>>> of full-featured timestamp semantics, I can
> > implement
> > >> > MVCC
> > >> > > > > > >>>>> behavior
> > >> > > > > > >>>>>>> when
> > >> > > > > > >>>>>>>>> building a KTable by
> > >> "suppressLateEvents(Duration.ZERO)".
> > >> > I
> > >> > > > > > >>>>> suspect
> > >> > > > > > >>>>>>> that
> > >> > > > > > >>>>>>>>> there are other, non-obvious applications of
> > >> suppressing
> > >> > > late
> > >> > > > > > >>>>> events
> > >> > > > > > >>>>>> on
> > >> > > > > > >>>>>>>>> KTables.
> > >> > > > > > >>>>>>>>>
> > >> > > > > > >>>>>>>>> 3. Not to get too much into implementation details
> > in
> > >> a
> > >> > KIP
> > >> > > > > > >>>>>> discussion,
> > >> > > > > > >>>>>>>> but
> > >> > > > > > >>>>>>>>> if we did want to make late-event suppression
> > >> available
> > >> > > only
> > >> > > > > > >> on
> > >> > > > > > >>>>>>> windowed
> > >> > > > > > >>>>>>>>> KTables, we have two enforcement options:
> > >> > > > > > >>>>>>>>>   a. check when we build the topology - this would
> > be
> > >> > > simple
> > >> > > > > > >> to
> > >> > > > > > >>>>>>>> implement,
> > >> > > > > > >>>>>>>>> but would be a runtime check. Hopefully, people
> > write
> > >> > tests
> > >> > > > > > >> for
> > >> > > > > > >>>>> their
> > >> > > > > > >>>>>>>>> topology before deploying them, so the feedback
> loop
> > >> > isn't
> > >> > > > > > >>>>>>> instantaneous,
> > >> > > > > > >>>>>>>>> but it's not too long either.
> > >> > > > > > >>>>>>>>>   b. add a new WindowedKTable type - this would
> be a
> > >> > > compile
> > >> > > > > > >>> time
> > >> > > > > > >>>>>>> check,
> > >> > > > > > >>>>>>>>> but would also be substantial increase of both
> > >> interface
> > >> > > and
> > >> > > > > > >>> code
> > >> > > > > > >>>>>>>>> complexity.
> > >> > > > > > >>>>>>>>>
> > >> > > > > > >>>>>>>>> We should definitely strive to have guard rails
> > >> > protecting
> > >> > > > > > >>> against
> > >> > > > > > >>>>>>>>> surprising or dangerous behavior. Protecting
> against
> > >> > > programs
> > >> > > > > > >>>>> that we
> > >> > > > > > >>>>>>>> don't
> > >> > > > > > >>>>>>>>> currently predict is a lesser benefit, and I think
> > we
> > >> can
> > >> > > put
> > >> > > > > > >> up
> > >> > > > > > >>>>>> guard
> > >> > > > > > >>>>>>>>> rails on a case-by-case basis for that. It seems
> > like
> > >> the
> > >> > > > > > >>>>> increase in
> > >> > > > > > >>>>>>>>> cognitive (and potentially code and interface)
> > >> complexity
> > >> > > > > > >> makes
> > >> > > > > > >>> me
> > >> > > > > > >>>>>>> think
> > >> > > > > > >>>>>>>> we
> > >> > > > > > >>>>>>>>> should skip this case.
> > >> > > > > > >>>>>>>>>
> > >> > > > > > >>>>>>>>> What do you think?
> > >> > > > > > >>>>>>>>>
> > >> > > > > > >>>>>>>>> Thanks,
> > >> > > > > > >>>>>>>>> -John
> > >> > > > > > >>>>>>>>>
> > >> > > > > > >>>>>>>>> On Wed, Jun 27, 2018 at 11:59 AM Matthias J. Sax <
> > >> > > > > > >>>>>>> matth...@confluent.io>
> > >> > > > > > >>>>>>>>> wrote:
> > >> > > > > > >>>>>>>>>
> > >> > > > > > >>>>>>>>>> Thanks for the KIP John.
> > >> > > > > > >>>>>>>>>>
> > >> > > > > > >>>>>>>>>> One initial comments about the last example
> > "Bounded
> > >> > > > > > >>> lateness":
> > >> > > > > > >>>>>> For a
> > >> > > > > > >>>>>>>>>> non-windowed KTable bounding the lateness does
> not
> > >> > really
> > >> > > > > > >> make
> > >> > > > > > >>>>>> sense,
> > >> > > > > > >>>>>>>>>> does it?
> > >> > > > > > >>>>>>>>>>
> > >> > > > > > >>>>>>>>>> Thus, I am wondering if we should allow
> > >> > > > > > >> `suppressLateEvents()`
> > >> > > > > > >>>>> for
> > >> > > > > > >>>>>>> this
> > >> > > > > > >>>>>>>>>> case? It seems to be better to only allow it for
> > >> > > > > > >>>>> windowed-KTables.
> > >> > > > > > >>>>>>>>>>
> > >> > > > > > >>>>>>>>>>
> > >> > > > > > >>>>>>>>>> -Matthias
> > >> > > > > > >>>>>>>>>>
> > >> > > > > > >>>>>>>>>>
> > >> > > > > > >>>>>>>>>> On 6/27/18 8:53 AM, Ted Yu wrote:
> > >> > > > > > >>>>>>>>>>> I noticed this (lack of primary parameter) as
> > well.
> > >> > > > > > >>>>>>>>>>>
> > >> > > > > > >>>>>>>>>>> What you gave as new example is semantically the
> > >> same
> > >> > as
> > >> > > > > > >>> what
> > >> > > > > > >>>>> I
> > >> > > > > > >>>>>>>>>> suggested.
> > >> > > > > > >>>>>>>>>>> So it is good by me.
> > >> > > > > > >>>>>>>>>>>
> > >> > > > > > >>>>>>>>>>> Thanks
> > >> > > > > > >>>>>>>>>>>
> > >> > > > > > >>>>>>>>>>> On Wed, Jun 27, 2018 at 7:31 AM, John Roesler <
> > >> > > > > > >>>>> j...@confluent.io
> > >> > > > > > >>>>>>>
> > >> > > > > > >>>>>>>>> wrote:
> > >> > > > > > >>>>>>>>>>>
> > >> > > > > > >>>>>>>>>>>> Thanks for taking look, Ted,
> > >> > > > > > >>>>>>>>>>>>
> > >> > > > > > >>>>>>>>>>>> I agree this is a departure from the
> conventions
> > of
> > >> > > > > > >> Streams
> > >> > > > > > >>>>> DSL.
> > >> > > > > > >>>>>>>>>>>>
> > >> > > > > > >>>>>>>>>>>> Most of our config objects have one or two
> > >> "required"
> > >> > > > > > >>>>>> parameters,
> > >> > > > > > >>>>>>>>> which
> > >> > > > > > >>>>>>>>>> fit
> > >> > > > > > >>>>>>>>>>>> naturally with the static factory method
> > approach.
> > >> > > > > > >>>>> TimeWindow,
> > >> > > > > > >>>>>> for
> > >> > > > > > >>>>>>>>>> example,
> > >> > > > > > >>>>>>>>>>>> requires a size parameter, so we can naturally
> > say
> > >> > > > > > >>>>>>>>> TimeWindows.of(size).
> > >> > > > > > >>>>>>>>>>>>
> > >> > > > > > >>>>>>>>>>>> I think in the case of a suppression, there's
> > >> really
> > >> > no
> > >> > > > > > >>>>> "core"
> > >> > > > > > >>>>>>>>>> parameter,
> > >> > > > > > >>>>>>>>>>>> and "Suppression.of()" seems sillier than "new
> > >> > > > > > >>>>> Suppression()". I
> > >> > > > > > >>>>>>>> think
> > >> > > > > > >>>>>>>>>> that
> > >> > > > > > >>>>>>>>>>>> Suppression.of(duration) would be ambiguous,
> > since
> > >> > there
> > >> > > > > > >>> are
> > >> > > > > > >>>>>> many
> > >> > > > > > >>>>>>>>>> durations
> > >> > > > > > >>>>>>>>>>>> that we can configure.
> > >> > > > > > >>>>>>>>>>>>
> > >> > > > > > >>>>>>>>>>>> However, thinking about it again, I suppose
> that
> > I
> > >> can
> > >> > > > > > >> give
> > >> > > > > > >>>>> each
> > >> > > > > > >>>>>>>>>>>> configuration method a static version, which
> > would
> > >> let
> > >> > > > > > >> you
> > >> > > > > > >>>>>> replace
> > >> > > > > > >>>>>>>>> "new
> > >> > > > > > >>>>>>>>>>>> Suppression()." with "Suppression." in all the
> > >> > examples.
> > >> > > > > > >>>>>>> Basically,
> > >> > > > > > >>>>>>>>>> instead
> > >> > > > > > >>>>>>>>>>>> of "of()", we'd support any of the methods I
> > >> listed.
> > >> > > > > > >>>>>>>>>>>>
> > >> > > > > > >>>>>>>>>>>> For example:
> > >> > > > > > >>>>>>>>>>>>
> > >> > > > > > >>>>>>>>>>>> windowCounts
> > >> > > > > > >>>>>>>>>>>>     .suppress(
> > >> > > > > > >>>>>>>>>>>>         Suppression
> > >> > > > > > >>>>>>>>>>>>             .suppressLateEvents(Duration.
> > >> > ofMinutes(10))
> > >> > > > > > >>>>>>>>>>>>             .suppressIntermediateEvents(
> > >> > > > > > >>>>>>>>>>>>
> > >> > > > > > >>>>>>>>>>
> > >> IntermediateSuppression.emitAfter(Duration.ofMinutes(
> > >> > 10))
> > >> > > > > > >>>>>>>>>>>>             )
> > >> > > > > > >>>>>>>>>>>>     );
> > >> > > > > > >>>>>>>>>>>>
> > >> > > > > > >>>>>>>>>>>>
> > >> > > > > > >>>>>>>>>>>> Does that seem better?
> > >> > > > > > >>>>>>>>>>>>
> > >> > > > > > >>>>>>>>>>>> Thanks,
> > >> > > > > > >>>>>>>>>>>> -John
> > >> > > > > > >>>>>>>>>>>>
> > >> > > > > > >>>>>>>>>>>>
> > >> > > > > > >>>>>>>>>>>> On Wed, Jun 27, 2018 at 12:44 AM Ted Yu <
> > >> > > > > > >>> yuzhih...@gmail.com
> > >> > > > > > >>>>>>
> > >> > > > > > >>>>>>>> wrote:
> > >> > > > > > >>>>>>>>>>>>
> > >> > > > > > >>>>>>>>>>>>> I started to read this KIP which contains a
> lot
> > of
> > >> > > > > > >>>>> materials.
> > >> > > > > > >>>>>>>>>>>>>
> > >> > > > > > >>>>>>>>>>>>> One suggestion:
> > >> > > > > > >>>>>>>>>>>>>
> > >> > > > > > >>>>>>>>>>>>>     .suppress(
> > >> > > > > > >>>>>>>>>>>>>         new Suppression()
> > >> > > > > > >>>>>>>>

Reply via email to