Hello John,

I like your idea of adding a new Combinator interface better! In addition
to your arguments, we can also leverage on each overloaded function that
users supplies for different aggregation implementation (i.e. if combinator
is provided we can do window-slicing, otherwise we follow the current
approach). This is similar as existing optimizations in other frameworks
like Spark.

Will update the corresponding wiki page.

Guozhang


On Fri, Sep 6, 2019 at 11:08 AM John Roesler <j...@confluent.io> wrote:

> Thanks for this idea, Guozhang, it does seem to be a nice way to solve
> the problem.
>
> I'm a _little_ concerned about the interface, though. It might be
> better to just add a new argument to a new method overload like
> `(initializer, aggregator, merger/combinator/whatever)`.
>
> Two reasons come to mind for this:
> 1) CombineAggregator is no longer a functional interface, so users
> have to switch to anonymous classes
> 2) there's a discoverability problem, because the API doesn't
> advertise CombineAggregator anywhere, it's just a magic parameter you
> can pass to get more efficient executions
>
> On the other hand, adding an argument (initializer, aggregator,
> merger/combinator/whatever) lets you supply lambdas for all the args,
> and also makes it clear that you're getting different (more efficient)
> execution behavior.
>
> WDYT?
>
> Thanks again,
> -John
>
>
> On Wed, Sep 4, 2019 at 7:53 PM Guozhang Wang <wangg...@gmail.com> wrote:
> >
> > Hi folks,
> >
> > I've been thinking more about this KIP and my understanding is that we
> want
> > to introduce a new SlidingWindow notion for aggregation since our current
> > TimeWindow aggregation is not very efficient with very small steps. So
> I'm
> > wondering that rather than introducing a new implementation mechanism,
> what
> > if we just optimize the TimeWindowed aggregations where we can allow a
> very
> > small advance step (which would in practice sufficient mimic the sliding
> > window behavior) compared to the window length itself, e.g. a window
> length
> > of 10 minutes with 1 second advance.
> >
> > I've quickly write up an alternative proposal for KIP-450 here:
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Optimize+Windowed+Aggregation
> > Please
> > let me know your thoughts.
> >
> >
> > Guozhang
> >
> > On Tue, Apr 16, 2019 at 3:14 PM Matthias J. Sax <matth...@confluent.io>
> > wrote:
> >
> > > Thanks Sophie!
> > >
> > >
> > > Regarding (4), I am in favor to support both. Not sure if we can reuse
> > > existing window store (with enabling to store duplicates) for this case
> > > or not though, or if we need to design a new store to keep all raw
> records?
> > >
> > > Btw: for holistic aggregations, like media, we would need to support a
> > > different store layout for existing aggregations (time-window,
> > > session-window), too. Thus, if we add support for this, we might be
> able
> > > to kill two birds with one stone. Of course, we would still need new
> > > APIs for existing aggregations to allow users to pick between both
> cases.
> > >
> > > I only bring this up, because it might make sense to design the store
> in
> > > a way such that we can use it for all cases.
> > >
> > >
> > > About (3): atm we support wall-clock time via the corresponding
> > > `WallclockTimestampeExtractor`. Maybe Bill can elaborate a little bit
> > > more what he has in mind exactly, and why using this extractor would
> not
> > > meet the requirements for processing-time sliding windows?
> > >
> > >
> > > -Matthias
> > >
> > >
> > > On 4/16/19 10:16 AM, Guozhang Wang wrote:
> > > > Regarding 4): yes I agree with you that invertibility is not a common
> > > > property for agg-functions. Just to be clear about our current APIs:
> for
> > > > stream.aggregate we only require a single Adder function, whereas for
> > > > table.aggregate we require both Adder and Subtractor, but these are
> not
> > > > used to leverage any properties just that the incoming table
> changelog
> > > > stream may contain "tombstones" and hence we need to negate the
> effect of
> > > > the previous record that has been deleted by this tombstone.
> > > >
> > > > What I'm proposing is exactly having two APIs, one for Adder only
> (like
> > > > other Streams aggregations) and one for Subtractor + Adder (for agg
> > > > functions users think are invertible) for efficiency. Some other
> > > frameworks
> > > > (e.g. Spark) have similar options for users and will recommend using
> the
> > > > latter so that some optimization in implementation can be done.
> > > >
> > > >
> > > > Guozhang
> > > >
> > > > On Mon, Apr 15, 2019 at 12:29 PM Sophie Blee-Goldman <
> > > sop...@confluent.io>
> > > > wrote:
> > > >
> > > >> Thanks for the feedback Matthias and Bill. After discussing offline
> we
> > > >> realized the type of windows I originally had in mind were quite
> > > different,
> > > >> and I agree now that the semantics outlined by Matthias are the
> > > direction
> > > >> to go in here. I will update the KIP accordingly with the new
> semantics
> > > >> (and corresponding design) and restart the discussion from there.
> > > >>
> > > >> In the meantime, to respond to some other points:
> > > >>
> > > >> 1) API:
> > > >>
> > > >> I propose adding only the one class -- public class SlidingWindows
> > > extends
> > > >> Windows<TimeWindow> {} --  so I do not believe we need any new
> Serdes?
> > > It
> > > >> will still be a fixed size TimeWindow, but handled a bit
> differently.
> > > I've
> > > >> updated the KIP to state explicitly all of the classes/methods being
> > > added
> > > >>
> > > >> 2) Zero grace period
> > > >>
> > > >> The "zero grace period" was essentially just consequence of my
> original
> > > >> definition for sliding windows; with the new semantics we can (and
> > > should)
> > > >> allow for a nonzero grace period
> > > >>
> > > >> 3) Wall-clock time
> > > >>
> > > >> Hm, I had not considered this yet but it may be a good idea to keep
> in
> > > mind
> > > >> while rethinking the design. To clarify, we don't support wall-clock
> > > based
> > > >> aggregations with hopping or tumbling windows though (yet?)
> > > >>
> > > >> 4) Commutative vs associative vs invertible aggregations
> > > >>
> > > >> I agree that it's reasonable to assume commutativity and
> associativity,
> > > but
> > > >> that's not the same as being subtractable -- that requires
> > > invertibility,
> > > >> which is broken by a lot of very simple functions and is not, I
> think,
> > > ok
> > > >> to assume. However we could consider adding a separate API which
> also
> > > takes
> > > >> a subtractor and corresponds to a completely different
> implementation.
> > > We
> > > >> could also consider an API that takes a function that aggregates two
> > > >> aggregates together in addition to the existing aggregator (which
> > > >> aggregates a single value with an existing aggregate) WDYT?
> > > >>
> > > >>
> > > >>
> > > >>
> > > >> On Thu, Apr 11, 2019 at 1:13 AM Matthias J. Sax <
> matth...@confluent.io>
> > > >> wrote:
> > > >>
> > > >>> Thanks for the KIP Sophie. Couple of comments:
> > > >>>
> > > >>> It's a little unclear to me, what public API you propose. It seems
> you
> > > >>> want to add
> > > >>>
> > > >>>> public class SlidingWindow extends TimeWindow {}
> > > >>>
> > > >>> and
> > > >>>
> > > >>>> public class SlidingWindows extends TimeWindows {} // or maybe
> > > `extends
> > > >>> Windows`
> > > >>>
> > > >>> If yes, should we add corresponding public Serdes classes?
> > > >>>
> > > >>> Also, can you list all newly added classes/methods explicitly in
> the
> > > >> wiki?
> > > >>>
> > > >>>
> > > >>> About the semantics of the operator.
> > > >>>
> > > >>>> "Only one single window is defined at a time,"
> > > >>>
> > > >>> Should this be "one window per key" instead?
> > > >>>
> > > >>> I agree that both window boundaries should be inclusive. However,
> I am
> > > >>> not sure about:
> > > >>>
> > > >>>> At most one record is forwarded when new data arrives
> > > >>>
> > > >>> (1) For what case, no output would be produced?
> > > >>>
> > > >>> (2) I think, if we advance in time, it can also happen that we emit
> > > >>> multiple records. If a window "slides" (not "hops"), we cannot just
> > > >>> advance it to the current record stream time but would need to emit
> > > more
> > > >>> result if records expire before the current input record is added.
> For
> > > >>> example, consider a window with size 5ms, and the following ts (all
> > > >>> records have the same key):
> > > >>>
> > > >>> 1 2 3 10 11
> > > >>>
> > > >>> This should result in windows:
> > > >>>
> > > >>> [1]
> > > >>> [1,2]
> > > >>> [1,2,3]
> > > >>> [2,3]
> > > >>> [3]
> > > >>> [10]
> > > >>> [10,11]
> > > >>>
> > > >>> Ie, when the record with ts=10 is processed, it will trigger the
> > > >>> computation of [2,3], [3] and [10].
> > > >>>
> > > >>>
> > > >>> About out-of-order handling: I am wondering, if the current design
> that
> > > >>> does not allow any grace period is too restrictive. Can you
> elaborate
> > > >>> more on the motivation for this suggestions?
> > > >>>
> > > >>>
> > > >>> Can you give more details about the "simple design"? Atm, it's not
> > > clear
> > > >>> to me how it works. I though we always need to store all raw
> values. If
> > > >>> we only store the current aggregate, would we end up with the same
> > > >>> inefficient solution as using a hopping window with advance 1ms?
> > > >>>
> > > >>>
> > > >>> For the O(sqrt(N)) proposal: can you maybe add an example with
> concrete
> > > >>> bucket sizes, window size etc. The current proposal is a little
> unclear
> > > >>> to me, atm.
> > > >>>
> > > >>>
> > > >>> How are windows advance? Do you propose to advance all windows
> over all
> > > >>> keys at the same time, or would each window (per key) advance
> > > >>> independent from all other windows? What would be the pros/cons for
> > > both
> > > >>> approaches?
> > > >>>
> > > >>>
> > > >>> To add to Guozhang's comment: atm, DSL operators assume that
> aggregate
> > > >>> functions are commutative and associative. Hence, it seems ok to
> make
> > > >>> the same assumption for sliding window. Addressing holistic and
> > > >>> non-subtractable aggregations should be supported out of the box at
> > > some
> > > >>> point, too, but this would be a different KIP adding this to all
> > > >>> existing aggregations.
> > > >>>
> > > >>>
> > > >>> -Matthias
> > > >>>
> > > >>>
> > > >>>
> > > >>> On 4/9/19 4:38 PM, Guozhang Wang wrote:
> > > >>>> Hi Sophie,
> > > >>>>
> > > >>>> Thanks for the proposed KIP. I've made a pass over it and here are
> > > some
> > > >>>> thoughts:
> > > >>>>
> > > >>>> 1. "The window size is effectively the grace and retention
> period".
> > > The
> > > >>>> grace time is defined as "the time to admit late-arriving events
> after
> > > >>> the
> > > >>>> end of the window." hence it is the additional time beyond the
> window
> > > >>> size.
> > > >>>> I guess your were trying to say it should be zero?
> > > >>>>
> > > >>>> Also for retention period, it is not a notion of the window spec
> any
> > > >>> more,
> > > >>>> but only for the window store itself. So I'd suggest talking about
> > > >> window
> > > >>>> size here, and note that store retention time cannot be
> controlled via
> > > >>>> window spec at all.
> > > >>>>
> > > >>>> 2. In the "O(sqrt(N)) Design" you did not mention when / how to
> expire
> > > >> a
> > > >>>> bucket, so I'd assume you will expire one bucket as a whole when
> its
> > > >> end
> > > >>>> time is smaller than the current window's starting time, right?
> > > >>>>
> > > >>>> 3. Also in your algorithm how to choose "M" seems tricky, would
> it be
> > > a
> > > >>>> configurable parameter exposed to users or is it abstracted away
> and
> > > >> only
> > > >>>> being selected internally?
> > > >>>>
> > > >>>> 4. "There is some tradeoff between purely optimizing " seems
> > > incomplete
> > > >>>> paragraph?
> > > >>>>
> > > >>>> 5. Meta comment: for many aggregations it is commutative and
> > > >> associative
> > > >>> so
> > > >>>> we can require users to pass in a "substract" function as well.
> Given
> > > >>> these
> > > >>>> two function I think we can propose two set of APIs, 1) with the
> adder
> > > >>> and
> > > >>>> subtractor and 2) with the added only (if the aggregate logic is
> not
> > > >>> comm.
> > > >>>> and assoc.).
> > > >>>>
> > > >>>> We just maintain an aggregate value for each bucket (call it
> > > >>>> bucket_aggregate) plus for the whole window (call it
> total_aggregate),
> > > >>> i.e.
> > > >>>> at most M + 1 values per key. We use the total_aggregate for
> queries,
> > > >> and
> > > >>>> each update will cause 2 writes (to the bucket and to the total
> > > >>> aggregate).
> > > >>>>
> > > >>>> And with 1) when expiring the oldest bucket we simply call
> > > >>>> subtract(total_aggregate, bucket_aggregate); with 2) when
> expiring the
> > > >>>> oldest bucket we can re-compute the total_aggregate by
> > > >>>> sum(bucket_aggregate) over other buckets again.
> > > >>>>
> > > >>>> 6. Meta comment: it is reasonable to assume in practice
> > > out-of-ordering
> > > >>>> data is not very common, hence most of the updates will be falling
> > > into
> > > >>> the
> > > >>>> latest bucket. So I'm wondering if it makes sense to always store
> the
> > > >>> first
> > > >>>> bucket in memory while making other buckets optionally on
> persistent
> > > >>>> storage. In practice, as long as M is large enough (we probably
> need
> > > it
> > > >>> to
> > > >>>> be large enough to have sufficiently sensitive expiration anyways)
> > > then
> > > >>>> each bucket's aggregate data is small enough to be in memory.
> > > >>>>
> > > >>>>
> > > >>>>
> > > >>>> Guozhang
> > > >>>>
> > > >>>>
> > > >>>>
> > > >>>> On Fri, Apr 5, 2019 at 7:58 PM Sophie Blee-Goldman <
> > > >> sop...@confluent.io>
> > > >>>> wrote:
> > > >>>>
> > > >>>>> Hello all,
> > > >>>>>
> > > >>>>> I would like to kick off discussion of this KIP aimed at
> providing
> > > >>> sliding
> > > >>>>> window semantics to DSL aggregations.
> > > >>>>>
> > > >>>>>
> > > >>>>>
> > > >>>
> > > >>
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL
> > > >>>>>
> > > >>>>> Please take a look and share any thoughts you have regarding the
> API,
> > > >>>>> semantics, design, etc!
> > > >>>>>
> > > >>>>> I also have a POC PR open with the "naive" implementation for
> your
> > > >>>>> reference: https://github.com/apache/kafka/pull/6549
> > > >>>>>
> > > >>>>> Cheers,
> > > >>>>> Sophie
> > > >>>>>
> > > >>>>
> > > >>>>
> > > >>>
> > > >>>
> > > >>
> > > >
> > > >
> > >
> > >
> >
> > --
> > -- Guozhang
>


-- 
-- Guozhang

Reply via email to