Hi folks,

I've been thinking more about this KIP and my understanding is that we want
to introduce a new SlidingWindow notion for aggregation since our current
TimeWindow aggregation is not very efficient with very small steps. So I'm
wondering that rather than introducing a new implementation mechanism, what
if we just optimize the TimeWindowed aggregations where we can allow a very
small advance step (which would in practice sufficient mimic the sliding
window behavior) compared to the window length itself, e.g. a window length
of 10 minutes with 1 second advance.

I've quickly write up an alternative proposal for KIP-450 here:
https://cwiki.apache.org/confluence/display/KAFKA/Optimize+Windowed+Aggregation
Please
let me know your thoughts.


Guozhang

On Tue, Apr 16, 2019 at 3:14 PM Matthias J. Sax <matth...@confluent.io>
wrote:

> Thanks Sophie!
>
>
> Regarding (4), I am in favor to support both. Not sure if we can reuse
> existing window store (with enabling to store duplicates) for this case
> or not though, or if we need to design a new store to keep all raw records?
>
> Btw: for holistic aggregations, like media, we would need to support a
> different store layout for existing aggregations (time-window,
> session-window), too. Thus, if we add support for this, we might be able
> to kill two birds with one stone. Of course, we would still need new
> APIs for existing aggregations to allow users to pick between both cases.
>
> I only bring this up, because it might make sense to design the store in
> a way such that we can use it for all cases.
>
>
> About (3): atm we support wall-clock time via the corresponding
> `WallclockTimestampeExtractor`. Maybe Bill can elaborate a little bit
> more what he has in mind exactly, and why using this extractor would not
> meet the requirements for processing-time sliding windows?
>
>
> -Matthias
>
>
> On 4/16/19 10:16 AM, Guozhang Wang wrote:
> > Regarding 4): yes I agree with you that invertibility is not a common
> > property for agg-functions. Just to be clear about our current APIs: for
> > stream.aggregate we only require a single Adder function, whereas for
> > table.aggregate we require both Adder and Subtractor, but these are not
> > used to leverage any properties just that the incoming table changelog
> > stream may contain "tombstones" and hence we need to negate the effect of
> > the previous record that has been deleted by this tombstone.
> >
> > What I'm proposing is exactly having two APIs, one for Adder only (like
> > other Streams aggregations) and one for Subtractor + Adder (for agg
> > functions users think are invertible) for efficiency. Some other
> frameworks
> > (e.g. Spark) have similar options for users and will recommend using the
> > latter so that some optimization in implementation can be done.
> >
> >
> > Guozhang
> >
> > On Mon, Apr 15, 2019 at 12:29 PM Sophie Blee-Goldman <
> sop...@confluent.io>
> > wrote:
> >
> >> Thanks for the feedback Matthias and Bill. After discussing offline we
> >> realized the type of windows I originally had in mind were quite
> different,
> >> and I agree now that the semantics outlined by Matthias are the
> direction
> >> to go in here. I will update the KIP accordingly with the new semantics
> >> (and corresponding design) and restart the discussion from there.
> >>
> >> In the meantime, to respond to some other points:
> >>
> >> 1) API:
> >>
> >> I propose adding only the one class -- public class SlidingWindows
> extends
> >> Windows<TimeWindow> {} --  so I do not believe we need any new Serdes?
> It
> >> will still be a fixed size TimeWindow, but handled a bit differently.
> I've
> >> updated the KIP to state explicitly all of the classes/methods being
> added
> >>
> >> 2) Zero grace period
> >>
> >> The "zero grace period" was essentially just consequence of my original
> >> definition for sliding windows; with the new semantics we can (and
> should)
> >> allow for a nonzero grace period
> >>
> >> 3) Wall-clock time
> >>
> >> Hm, I had not considered this yet but it may be a good idea to keep in
> mind
> >> while rethinking the design. To clarify, we don't support wall-clock
> based
> >> aggregations with hopping or tumbling windows though (yet?)
> >>
> >> 4) Commutative vs associative vs invertible aggregations
> >>
> >> I agree that it's reasonable to assume commutativity and associativity,
> but
> >> that's not the same as being subtractable -- that requires
> invertibility,
> >> which is broken by a lot of very simple functions and is not, I think,
> ok
> >> to assume. However we could consider adding a separate API which also
> takes
> >> a subtractor and corresponds to a completely different implementation.
> We
> >> could also consider an API that takes a function that aggregates two
> >> aggregates together in addition to the existing aggregator (which
> >> aggregates a single value with an existing aggregate) WDYT?
> >>
> >>
> >>
> >>
> >> On Thu, Apr 11, 2019 at 1:13 AM Matthias J. Sax <matth...@confluent.io>
> >> wrote:
> >>
> >>> Thanks for the KIP Sophie. Couple of comments:
> >>>
> >>> It's a little unclear to me, what public API you propose. It seems you
> >>> want to add
> >>>
> >>>> public class SlidingWindow extends TimeWindow {}
> >>>
> >>> and
> >>>
> >>>> public class SlidingWindows extends TimeWindows {} // or maybe
> `extends
> >>> Windows`
> >>>
> >>> If yes, should we add corresponding public Serdes classes?
> >>>
> >>> Also, can you list all newly added classes/methods explicitly in the
> >> wiki?
> >>>
> >>>
> >>> About the semantics of the operator.
> >>>
> >>>> "Only one single window is defined at a time,"
> >>>
> >>> Should this be "one window per key" instead?
> >>>
> >>> I agree that both window boundaries should be inclusive. However, I am
> >>> not sure about:
> >>>
> >>>> At most one record is forwarded when new data arrives
> >>>
> >>> (1) For what case, no output would be produced?
> >>>
> >>> (2) I think, if we advance in time, it can also happen that we emit
> >>> multiple records. If a window "slides" (not "hops"), we cannot just
> >>> advance it to the current record stream time but would need to emit
> more
> >>> result if records expire before the current input record is added. For
> >>> example, consider a window with size 5ms, and the following ts (all
> >>> records have the same key):
> >>>
> >>> 1 2 3 10 11
> >>>
> >>> This should result in windows:
> >>>
> >>> [1]
> >>> [1,2]
> >>> [1,2,3]
> >>> [2,3]
> >>> [3]
> >>> [10]
> >>> [10,11]
> >>>
> >>> Ie, when the record with ts=10 is processed, it will trigger the
> >>> computation of [2,3], [3] and [10].
> >>>
> >>>
> >>> About out-of-order handling: I am wondering, if the current design that
> >>> does not allow any grace period is too restrictive. Can you elaborate
> >>> more on the motivation for this suggestions?
> >>>
> >>>
> >>> Can you give more details about the "simple design"? Atm, it's not
> clear
> >>> to me how it works. I though we always need to store all raw values. If
> >>> we only store the current aggregate, would we end up with the same
> >>> inefficient solution as using a hopping window with advance 1ms?
> >>>
> >>>
> >>> For the O(sqrt(N)) proposal: can you maybe add an example with concrete
> >>> bucket sizes, window size etc. The current proposal is a little unclear
> >>> to me, atm.
> >>>
> >>>
> >>> How are windows advance? Do you propose to advance all windows over all
> >>> keys at the same time, or would each window (per key) advance
> >>> independent from all other windows? What would be the pros/cons for
> both
> >>> approaches?
> >>>
> >>>
> >>> To add to Guozhang's comment: atm, DSL operators assume that aggregate
> >>> functions are commutative and associative. Hence, it seems ok to make
> >>> the same assumption for sliding window. Addressing holistic and
> >>> non-subtractable aggregations should be supported out of the box at
> some
> >>> point, too, but this would be a different KIP adding this to all
> >>> existing aggregations.
> >>>
> >>>
> >>> -Matthias
> >>>
> >>>
> >>>
> >>> On 4/9/19 4:38 PM, Guozhang Wang wrote:
> >>>> Hi Sophie,
> >>>>
> >>>> Thanks for the proposed KIP. I've made a pass over it and here are
> some
> >>>> thoughts:
> >>>>
> >>>> 1. "The window size is effectively the grace and retention period".
> The
> >>>> grace time is defined as "the time to admit late-arriving events after
> >>> the
> >>>> end of the window." hence it is the additional time beyond the window
> >>> size.
> >>>> I guess your were trying to say it should be zero?
> >>>>
> >>>> Also for retention period, it is not a notion of the window spec any
> >>> more,
> >>>> but only for the window store itself. So I'd suggest talking about
> >> window
> >>>> size here, and note that store retention time cannot be controlled via
> >>>> window spec at all.
> >>>>
> >>>> 2. In the "O(sqrt(N)) Design" you did not mention when / how to expire
> >> a
> >>>> bucket, so I'd assume you will expire one bucket as a whole when its
> >> end
> >>>> time is smaller than the current window's starting time, right?
> >>>>
> >>>> 3. Also in your algorithm how to choose "M" seems tricky, would it be
> a
> >>>> configurable parameter exposed to users or is it abstracted away and
> >> only
> >>>> being selected internally?
> >>>>
> >>>> 4. "There is some tradeoff between purely optimizing " seems
> incomplete
> >>>> paragraph?
> >>>>
> >>>> 5. Meta comment: for many aggregations it is commutative and
> >> associative
> >>> so
> >>>> we can require users to pass in a "substract" function as well. Given
> >>> these
> >>>> two function I think we can propose two set of APIs, 1) with the adder
> >>> and
> >>>> subtractor and 2) with the added only (if the aggregate logic is not
> >>> comm.
> >>>> and assoc.).
> >>>>
> >>>> We just maintain an aggregate value for each bucket (call it
> >>>> bucket_aggregate) plus for the whole window (call it total_aggregate),
> >>> i.e.
> >>>> at most M + 1 values per key. We use the total_aggregate for queries,
> >> and
> >>>> each update will cause 2 writes (to the bucket and to the total
> >>> aggregate).
> >>>>
> >>>> And with 1) when expiring the oldest bucket we simply call
> >>>> subtract(total_aggregate, bucket_aggregate); with 2) when expiring the
> >>>> oldest bucket we can re-compute the total_aggregate by
> >>>> sum(bucket_aggregate) over other buckets again.
> >>>>
> >>>> 6. Meta comment: it is reasonable to assume in practice
> out-of-ordering
> >>>> data is not very common, hence most of the updates will be falling
> into
> >>> the
> >>>> latest bucket. So I'm wondering if it makes sense to always store the
> >>> first
> >>>> bucket in memory while making other buckets optionally on persistent
> >>>> storage. In practice, as long as M is large enough (we probably need
> it
> >>> to
> >>>> be large enough to have sufficiently sensitive expiration anyways)
> then
> >>>> each bucket's aggregate data is small enough to be in memory.
> >>>>
> >>>>
> >>>>
> >>>> Guozhang
> >>>>
> >>>>
> >>>>
> >>>> On Fri, Apr 5, 2019 at 7:58 PM Sophie Blee-Goldman <
> >> sop...@confluent.io>
> >>>> wrote:
> >>>>
> >>>>> Hello all,
> >>>>>
> >>>>> I would like to kick off discussion of this KIP aimed at providing
> >>> sliding
> >>>>> window semantics to DSL aggregations.
> >>>>>
> >>>>>
> >>>>>
> >>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL
> >>>>>
> >>>>> Please take a look and share any thoughts you have regarding the API,
> >>>>> semantics, design, etc!
> >>>>>
> >>>>> I also have a POC PR open with the "naive" implementation for your
> >>>>> reference: https://github.com/apache/kafka/pull/6549
> >>>>>
> >>>>> Cheers,
> >>>>> Sophie
> >>>>>
> >>>>
> >>>>
> >>>
> >>>
> >>
> >
> >
>
>

-- 
-- Guozhang

Reply via email to