Hello all, 

Thanks for the KIP, Leah!

Regarding (1): I'd go farther actually. Making Windows an abstract
class was a mistake from the beginning that led to us not being
able to fix a very confusing situation for users around retention times,
final results emitting, etc. Thus, I would not suggest extending
TimeWindows for sure, but would also not suggest extending Windows.

The very simplest thing to do is follow the example of SessionWindows,
which is just a completely self-contained class. If we don't mess with
class inheritance, we won't ever have any of the problems related to
class inheritance. This is my preferred solution.

Still, Sliding windows has a lot in common with TimeWindows and other
fixed-size windows, namely that the windows are fixed in size. If we want
to preserve the current two-part windowing API in which you can window
by either "fixed" or "data driven" modes, I'd suggest we avoid increasing
the blast radius of Windows by taking the opportunity to replace it with
a proper interface and implement that interface instead.

For example:
https://github.com/apache/kafka/pull/9031

Then, SlidingWindows would just implement FixedSizeWindowDefinition

======

Regarding (2), it seems more straightforward as a user of Streams
to just have one mental model. _All_ of our aggregation operations
follow an eager emission model, in which we just emit an update whenever
an update is available. We already provided Suppression to explicitly apply
different update semantics in the case it's required. Why should we define
a snowflake operation with completely different semantics from everything
else? I.e., systems are generally easier to use when they follow a few
simple, composable rules than when they have a lot of different, specific
rules.


======

New point: (4):
It would be nice to include some examples of user code that would use the
new API, which should include:
1. using the DSL with the sliding window definition
2. accessing the stored results of a sliding window aggregation via IQ
3. defining a custom processor to access sliding windows in a store

It generally helps reviewers wrap their heads around the proposal, as well
as shaking out any design issues that would otherwise only come up during
implementation/testing/review.

Thanks again for the awesome proposal!
-John


On Tue, Jul 14, 2020, at 12:31, Guozhang Wang wrote:
> Hello Leah,
> 
> Thanks for the nice written KIP. A few thoughts:
> 
> 1) I echo the other reviewer's comments regarding the typing: why extending
> TimeWindow instead of just extending Window?
> 
> 2) I also feel that emitting policy for this type of windowing aggregation
> may be different from the existing ones. Existing emitting policy is very
> simple: emit every time when window get updates, and emit every time on
> out-of-ordering data within grace period, this is because for time-windows
> the window close time is strictly depend on the window start time which is
> fixed, while for session-windows although the window open/close time is
> also data-dependent it is relatively infrequent compared to the
> sliding-windows. For this KIP, since each new data would cause a
> new sliding-window, the num. windows maintained logically could be much
> larger and hence emitting on each update may be too aggressive.
> 
> 3) Although KIP itself should be focusing on user face interfaces, I'd
> suggest we create a children page of KIP-450 discussing about its
> implementations as well, since some of that may drive the interface design.
> E.g. personally I think having a combiner interface in addition to
> aggregator would be useful but that's based on my 2cents about the
> implementation design (I once created a child page describing it:
> https://cwiki.apache.org/confluence/display/KAFKA/Optimize+Windowed+Aggregation
> ).
> 
> 
> Guozhang
> 
> 
> 
> 
> On Tue, Jul 14, 2020 at 5:31 AM Bruno Cadonna <br...@confluent.io> wrote:
> 
> > Hi Leah,
> >
> > Thank you for the KIP!
> >
> > Here is my feedback:
> >
> > 1. The KIP would benefit from some code examples that show how to use
> > sliding windows in aggregations.
> >
> > 2. The different sliding windows in Figure 1 and 2 are really hard to
> > distinguish. Could you please try to make them graphically better
> > distinguishable? You could try to draw the frames of consecutive
> > windows shifted to each other.
> >
> > 3. I agree with Matthias, that extending Windows<TimeWindow> does not
> > seem to be the best approach. What would be the result of
> > windowsFor()?
> >
> > 4. In the section "Public Interfaces" you should remove implementation
> > details like private constructors and private fields.
> >
> > 5. Do we need a new store interface or can we use WindowStore? Some
> > words about that would be informative.
> >
> > 6. @Matthias, if the subtrator is not strictly needed, I would skip it
> > for now and add it later.
> >
> > 7. I also agree that having a section that describes how to handle
> > out-of-order records would be good to understand what is still missing
> > and what we can reuse.
> >
> > Best,
> > Bruno
> >
> > On Sat, Jul 11, 2020 at 9:16 PM Matthias J. Sax <mj...@apache.org> wrote:
> > >
> > > Leah,
> > >
> > > thanks for your update. However, it does not completely answer my
> > question.
> > >
> > > In our current window implementations, we emit a window result update
> > > record (ie, early/partial result) for each input record. When an
> > > out-of-order record arrives, we just update to corresponding old window
> > > and emit another update.
> > >
> > > It's unclear from the KIP if you propose the same emit strategy? -- For
> > > sliding windows it might be worth to consider to use a different emit
> > > strategy and only support emitting the final result only (ie, after the
> > > grace period passed)?
> > >
> > >
> > >
> > > Boyang, also raises a good point that relates to my point from above
> > > about pre-aggregations and storage layout. Our current time windows are
> > > all pre-aggregated and stored in parallel. We can also lookup windows
> > > efficiently, as we can compute the windowed-key given the input record
> > > key and timestamp based on the window definition.
> > >
> > > However, for sliding windows, window boundaries are data dependent and
> > > thus we cannot compute them upfront. Thus, how can we "find" existing
> > > window efficiently? Furthermore, out-of-order data would create new
> > > windows in the past and we need to be able to handle this case.
> > >
> > > Thus, to handle out-of-order data correctly, we need to store all raw
> > > input events. Additionally, we could also store pre-aggregated results
> > > if we thinks it's benfitial. -- If we apply "emit only final results"
> > > strategy, storing pre-aggregated result would not be necessary though.
> > >
> > >
> > > Btw: for sliding windows it might also be useful to consider allowing
> > > users to supply a `Subtractor` -- this subtractor could be applied on
> > > the current window result (in case we store it) if a record drops out of
> > > the window. Of course, not all aggregation functions are subtractable
> > > and we can consider this as a follow up task, too, and not include in
> > > this KIP for now. Thoughts?
> > >
> > >
> > >
> > > I was also thinking about the type hierarchy. I am not sure if extending
> > > TimeWindow is the best approach? For TimeWindows, we can pre-compute
> > > window boundaries (cf `windowsFor()`) while for a sliding window the
> > > boundaries are data dependent. Session windows are also data dependent
> > > and thus they don't inherit from TimeWindow (Maybe check out the KIP
> > > that added session windows? It could provides some good insights.) -- I
> > > believe the same rational applies to sliding windows?
> > >
> > >
> > >
> > > -Matthias
> > >
> > >
> > >
> > >
> > > On 7/10/20 12:47 PM, Boyang Chen wrote:
> > > > Thanks Leah and Sophie for the KIP.
> > > >
> > > > 1. I'm a bit surprised that we don't have an advance time. Could we
> > > > elaborate how the storage layer is structured?
> > > >
> > > > 2. IIUC, there will be extra cost in terms of fetching aggregation
> > results,
> > > > since we couldn't pre-aggregate until the user asks for it. Would be
> > good
> > > > to also discuss it.
> > > >
> > > > 3. We haven't discussed the possibility of supporting sliding windows
> > > > inherently. For a user who actually uses a hopping window, Streams
> > could
> > > > detect such an inefficiency doing a window_size/advance_time ratio to
> > reach
> > > > a conclusion on whether the write amplification is too high compared
> > with
> > > > some configured threshold. The benefit of doing so is that existing
> > Streams
> > > > users don't need to change their code, learn a new API, but only to
> > upgrade
> > > > Streams library to get benefits for their inefficient hopping window
> > > > implementation. There might be some compatibility issues for sure, but
> > > > worth listing them out for trade-off.
> > > >
> > > > Boyang
> > > >
> > > > On Fri, Jul 10, 2020 at 12:40 PM Leah Thomas <ltho...@confluent.io>
> > wrote:
> > > >
> > > >> Hey Matthias,
> > > >>
> > > >> Thanks for pointing that out. I added the following to the Propose
> > Changes
> > > >> section of the KIP:
> > > >>
> > > >> "Records that come out of order will be processed the same way as
> > in-order
> > > >> records, as long as they fall within the grace period. Any new windows
> > > >> created by the late record will still be created, and the existing
> > windows
> > > >> that are changed by the late record will be updated. Any record that
> > falls
> > > >> outside of the grace period (either user defined or default) will be
> > > >> discarded. "
> > > >>
> > > >> All the best,
> > > >> Leah
> > > >>
> > > >> On Thu, Jul 9, 2020 at 9:47 PM Matthias J. Sax <mj...@apache.org>
> > wrote:
> > > >>
> > > >>> Leah,
> > > >>>
> > > >>> thanks a lot for the KIP. Very well written.
> > > >>>
> > > >>> The KIP does not talk about the handling of out-of-order data though.
> > > >>> How do you propose to address this?
> > > >>>
> > > >>>
> > > >>> -Matthias
> > > >>>
> > > >>> On 7/8/20 5:33 PM, Leah Thomas wrote:
> > > >>>> Hi all,
> > > >>>> I'd like to kick-off the discussion for KIP-450, adding sliding
> > window
> > > >>>> aggregation support to Kafka Streams.
> > > >>>>
> > > >>>>
> > > >>>
> > > >>
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL
> > > >>>>
> > > >>>> Let me know what you think,
> > > >>>> Leah
> > > >>>>
> > > >>>
> > > >>>
> > > >>
> > > >
> > >
> >
> 
> 
> -- 
> -- Guozhang
>

Reply via email to