From my point of view, a Tumbling/Hopping window has different semantics than a Sliding-Window, and hence, I am not convinced atm that it's a good idea to use 1ms-hopping-windows.
(1) I think that the window bounds are different, ie, while a time-window hast the lower-start-time as an inclusive bound and the larger-end-time as exclusive bound, both sliding-window both bounds are inclusive: 5ms Hopping-Window with 1ms advance: [0,5), [1,6), [2,7) 5ms Sliding-Window: [0,4] [1,5] [2,6] It might seem like a subtle difference, because the end-timestamp of the sliding-windows is one smaller compared to the corresponding hopping-window (and hence both windows span the same time effectively); however, I think it is an important difference, as the result record will have windowed-keys with corresponding (ie, different) start/end timestamps. (2) A sliding-window should only emit data if the content changes, but not for every ms-instance. Assume records with timestamps (5) and (7). For hopping windows we would instantiate the following windows: [1,6) [2,7) [3,8) [4,9) [5,10) [6,11) [7,12) For sliding windows we only get 3 window instances: [1,5] [3,7] [7,11] [8,12] A sliding window is only created if the content changes, while a hopping window is created for each time-span than contains data. Note, that the sliding window should also emit an empty aggregation when it becomes empty. (3) I am also not sure if the idea to use "panes" of size 1ms would really be efficient; note, that we need panes of that size to mimic sliding-windows and we cannot have larger panes. Hence, the data-reduction to only update a single pane instead of all overlapping hopping-window might not be significant enough. (I am not saying that we cannot use panes for tumbling/hopping windows, but for those, panes can be much larger). -Matthias On 9/9/19 1:54 PM, Guozhang Wang wrote: > Hello John, > > I like your idea of adding a new Combinator interface better! In addition > to your arguments, we can also leverage on each overloaded function that > users supplies for different aggregation implementation (i.e. if combinator > is provided we can do window-slicing, otherwise we follow the current > approach). This is similar as existing optimizations in other frameworks > like Spark. > > Will update the corresponding wiki page. > > Guozhang > > > On Fri, Sep 6, 2019 at 11:08 AM John Roesler <j...@confluent.io> wrote: > >> Thanks for this idea, Guozhang, it does seem to be a nice way to solve >> the problem. >> >> I'm a _little_ concerned about the interface, though. It might be >> better to just add a new argument to a new method overload like >> `(initializer, aggregator, merger/combinator/whatever)`. >> >> Two reasons come to mind for this: >> 1) CombineAggregator is no longer a functional interface, so users >> have to switch to anonymous classes >> 2) there's a discoverability problem, because the API doesn't >> advertise CombineAggregator anywhere, it's just a magic parameter you >> can pass to get more efficient executions >> >> On the other hand, adding an argument (initializer, aggregator, >> merger/combinator/whatever) lets you supply lambdas for all the args, >> and also makes it clear that you're getting different (more efficient) >> execution behavior. >> >> WDYT? >> >> Thanks again, >> -John >> >> >> On Wed, Sep 4, 2019 at 7:53 PM Guozhang Wang <wangg...@gmail.com> wrote: >>> >>> Hi folks, >>> >>> I've been thinking more about this KIP and my understanding is that we >> want >>> to introduce a new SlidingWindow notion for aggregation since our current >>> TimeWindow aggregation is not very efficient with very small steps. So >> I'm >>> wondering that rather than introducing a new implementation mechanism, >> what >>> if we just optimize the TimeWindowed aggregations where we can allow a >> very >>> small advance step (which would in practice sufficient mimic the sliding >>> window behavior) compared to the window length itself, e.g. a window >> length >>> of 10 minutes with 1 second advance. >>> >>> I've quickly write up an alternative proposal for KIP-450 here: >>> >> https://cwiki.apache.org/confluence/display/KAFKA/Optimize+Windowed+Aggregation >>> Please >>> let me know your thoughts. >>> >>> >>> Guozhang >>> >>> On Tue, Apr 16, 2019 at 3:14 PM Matthias J. Sax <matth...@confluent.io> >>> wrote: >>> >>>> Thanks Sophie! >>>> >>>> >>>> Regarding (4), I am in favor to support both. Not sure if we can reuse >>>> existing window store (with enabling to store duplicates) for this case >>>> or not though, or if we need to design a new store to keep all raw >> records? >>>> >>>> Btw: for holistic aggregations, like media, we would need to support a >>>> different store layout for existing aggregations (time-window, >>>> session-window), too. Thus, if we add support for this, we might be >> able >>>> to kill two birds with one stone. Of course, we would still need new >>>> APIs for existing aggregations to allow users to pick between both >> cases. >>>> >>>> I only bring this up, because it might make sense to design the store >> in >>>> a way such that we can use it for all cases. >>>> >>>> >>>> About (3): atm we support wall-clock time via the corresponding >>>> `WallclockTimestampeExtractor`. Maybe Bill can elaborate a little bit >>>> more what he has in mind exactly, and why using this extractor would >> not >>>> meet the requirements for processing-time sliding windows? >>>> >>>> >>>> -Matthias >>>> >>>> >>>> On 4/16/19 10:16 AM, Guozhang Wang wrote: >>>>> Regarding 4): yes I agree with you that invertibility is not a common >>>>> property for agg-functions. Just to be clear about our current APIs: >> for >>>>> stream.aggregate we only require a single Adder function, whereas for >>>>> table.aggregate we require both Adder and Subtractor, but these are >> not >>>>> used to leverage any properties just that the incoming table >> changelog >>>>> stream may contain "tombstones" and hence we need to negate the >> effect of >>>>> the previous record that has been deleted by this tombstone. >>>>> >>>>> What I'm proposing is exactly having two APIs, one for Adder only >> (like >>>>> other Streams aggregations) and one for Subtractor + Adder (for agg >>>>> functions users think are invertible) for efficiency. Some other >>>> frameworks >>>>> (e.g. Spark) have similar options for users and will recommend using >> the >>>>> latter so that some optimization in implementation can be done. >>>>> >>>>> >>>>> Guozhang >>>>> >>>>> On Mon, Apr 15, 2019 at 12:29 PM Sophie Blee-Goldman < >>>> sop...@confluent.io> >>>>> wrote: >>>>> >>>>>> Thanks for the feedback Matthias and Bill. After discussing offline >> we >>>>>> realized the type of windows I originally had in mind were quite >>>> different, >>>>>> and I agree now that the semantics outlined by Matthias are the >>>> direction >>>>>> to go in here. I will update the KIP accordingly with the new >> semantics >>>>>> (and corresponding design) and restart the discussion from there. >>>>>> >>>>>> In the meantime, to respond to some other points: >>>>>> >>>>>> 1) API: >>>>>> >>>>>> I propose adding only the one class -- public class SlidingWindows >>>> extends >>>>>> Windows<TimeWindow> {} -- so I do not believe we need any new >> Serdes? >>>> It >>>>>> will still be a fixed size TimeWindow, but handled a bit >> differently. >>>> I've >>>>>> updated the KIP to state explicitly all of the classes/methods being >>>> added >>>>>> >>>>>> 2) Zero grace period >>>>>> >>>>>> The "zero grace period" was essentially just consequence of my >> original >>>>>> definition for sliding windows; with the new semantics we can (and >>>> should) >>>>>> allow for a nonzero grace period >>>>>> >>>>>> 3) Wall-clock time >>>>>> >>>>>> Hm, I had not considered this yet but it may be a good idea to keep >> in >>>> mind >>>>>> while rethinking the design. To clarify, we don't support wall-clock >>>> based >>>>>> aggregations with hopping or tumbling windows though (yet?) >>>>>> >>>>>> 4) Commutative vs associative vs invertible aggregations >>>>>> >>>>>> I agree that it's reasonable to assume commutativity and >> associativity, >>>> but >>>>>> that's not the same as being subtractable -- that requires >>>> invertibility, >>>>>> which is broken by a lot of very simple functions and is not, I >> think, >>>> ok >>>>>> to assume. However we could consider adding a separate API which >> also >>>> takes >>>>>> a subtractor and corresponds to a completely different >> implementation. >>>> We >>>>>> could also consider an API that takes a function that aggregates two >>>>>> aggregates together in addition to the existing aggregator (which >>>>>> aggregates a single value with an existing aggregate) WDYT? >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Thu, Apr 11, 2019 at 1:13 AM Matthias J. Sax < >> matth...@confluent.io> >>>>>> wrote: >>>>>> >>>>>>> Thanks for the KIP Sophie. Couple of comments: >>>>>>> >>>>>>> It's a little unclear to me, what public API you propose. It seems >> you >>>>>>> want to add >>>>>>> >>>>>>>> public class SlidingWindow extends TimeWindow {} >>>>>>> >>>>>>> and >>>>>>> >>>>>>>> public class SlidingWindows extends TimeWindows {} // or maybe >>>> `extends >>>>>>> Windows` >>>>>>> >>>>>>> If yes, should we add corresponding public Serdes classes? >>>>>>> >>>>>>> Also, can you list all newly added classes/methods explicitly in >> the >>>>>> wiki? >>>>>>> >>>>>>> >>>>>>> About the semantics of the operator. >>>>>>> >>>>>>>> "Only one single window is defined at a time," >>>>>>> >>>>>>> Should this be "one window per key" instead? >>>>>>> >>>>>>> I agree that both window boundaries should be inclusive. However, >> I am >>>>>>> not sure about: >>>>>>> >>>>>>>> At most one record is forwarded when new data arrives >>>>>>> >>>>>>> (1) For what case, no output would be produced? >>>>>>> >>>>>>> (2) I think, if we advance in time, it can also happen that we emit >>>>>>> multiple records. If a window "slides" (not "hops"), we cannot just >>>>>>> advance it to the current record stream time but would need to emit >>>> more >>>>>>> result if records expire before the current input record is added. >> For >>>>>>> example, consider a window with size 5ms, and the following ts (all >>>>>>> records have the same key): >>>>>>> >>>>>>> 1 2 3 10 11 >>>>>>> >>>>>>> This should result in windows: >>>>>>> >>>>>>> [1] >>>>>>> [1,2] >>>>>>> [1,2,3] >>>>>>> [2,3] >>>>>>> [3] >>>>>>> [10] >>>>>>> [10,11] >>>>>>> >>>>>>> Ie, when the record with ts=10 is processed, it will trigger the >>>>>>> computation of [2,3], [3] and [10]. >>>>>>> >>>>>>> >>>>>>> About out-of-order handling: I am wondering, if the current design >> that >>>>>>> does not allow any grace period is too restrictive. Can you >> elaborate >>>>>>> more on the motivation for this suggestions? >>>>>>> >>>>>>> >>>>>>> Can you give more details about the "simple design"? Atm, it's not >>>> clear >>>>>>> to me how it works. I though we always need to store all raw >> values. If >>>>>>> we only store the current aggregate, would we end up with the same >>>>>>> inefficient solution as using a hopping window with advance 1ms? >>>>>>> >>>>>>> >>>>>>> For the O(sqrt(N)) proposal: can you maybe add an example with >> concrete >>>>>>> bucket sizes, window size etc. The current proposal is a little >> unclear >>>>>>> to me, atm. >>>>>>> >>>>>>> >>>>>>> How are windows advance? Do you propose to advance all windows >> over all >>>>>>> keys at the same time, or would each window (per key) advance >>>>>>> independent from all other windows? What would be the pros/cons for >>>> both >>>>>>> approaches? >>>>>>> >>>>>>> >>>>>>> To add to Guozhang's comment: atm, DSL operators assume that >> aggregate >>>>>>> functions are commutative and associative. Hence, it seems ok to >> make >>>>>>> the same assumption for sliding window. Addressing holistic and >>>>>>> non-subtractable aggregations should be supported out of the box at >>>> some >>>>>>> point, too, but this would be a different KIP adding this to all >>>>>>> existing aggregations. >>>>>>> >>>>>>> >>>>>>> -Matthias >>>>>>> >>>>>>> >>>>>>> >>>>>>> On 4/9/19 4:38 PM, Guozhang Wang wrote: >>>>>>>> Hi Sophie, >>>>>>>> >>>>>>>> Thanks for the proposed KIP. I've made a pass over it and here are >>>> some >>>>>>>> thoughts: >>>>>>>> >>>>>>>> 1. "The window size is effectively the grace and retention >> period". >>>> The >>>>>>>> grace time is defined as "the time to admit late-arriving events >> after >>>>>>> the >>>>>>>> end of the window." hence it is the additional time beyond the >> window >>>>>>> size. >>>>>>>> I guess your were trying to say it should be zero? >>>>>>>> >>>>>>>> Also for retention period, it is not a notion of the window spec >> any >>>>>>> more, >>>>>>>> but only for the window store itself. So I'd suggest talking about >>>>>> window >>>>>>>> size here, and note that store retention time cannot be >> controlled via >>>>>>>> window spec at all. >>>>>>>> >>>>>>>> 2. In the "O(sqrt(N)) Design" you did not mention when / how to >> expire >>>>>> a >>>>>>>> bucket, so I'd assume you will expire one bucket as a whole when >> its >>>>>> end >>>>>>>> time is smaller than the current window's starting time, right? >>>>>>>> >>>>>>>> 3. Also in your algorithm how to choose "M" seems tricky, would >> it be >>>> a >>>>>>>> configurable parameter exposed to users or is it abstracted away >> and >>>>>> only >>>>>>>> being selected internally? >>>>>>>> >>>>>>>> 4. "There is some tradeoff between purely optimizing " seems >>>> incomplete >>>>>>>> paragraph? >>>>>>>> >>>>>>>> 5. Meta comment: for many aggregations it is commutative and >>>>>> associative >>>>>>> so >>>>>>>> we can require users to pass in a "substract" function as well. >> Given >>>>>>> these >>>>>>>> two function I think we can propose two set of APIs, 1) with the >> adder >>>>>>> and >>>>>>>> subtractor and 2) with the added only (if the aggregate logic is >> not >>>>>>> comm. >>>>>>>> and assoc.). >>>>>>>> >>>>>>>> We just maintain an aggregate value for each bucket (call it >>>>>>>> bucket_aggregate) plus for the whole window (call it >> total_aggregate), >>>>>>> i.e. >>>>>>>> at most M + 1 values per key. We use the total_aggregate for >> queries, >>>>>> and >>>>>>>> each update will cause 2 writes (to the bucket and to the total >>>>>>> aggregate). >>>>>>>> >>>>>>>> And with 1) when expiring the oldest bucket we simply call >>>>>>>> subtract(total_aggregate, bucket_aggregate); with 2) when >> expiring the >>>>>>>> oldest bucket we can re-compute the total_aggregate by >>>>>>>> sum(bucket_aggregate) over other buckets again. >>>>>>>> >>>>>>>> 6. Meta comment: it is reasonable to assume in practice >>>> out-of-ordering >>>>>>>> data is not very common, hence most of the updates will be falling >>>> into >>>>>>> the >>>>>>>> latest bucket. So I'm wondering if it makes sense to always store >> the >>>>>>> first >>>>>>>> bucket in memory while making other buckets optionally on >> persistent >>>>>>>> storage. In practice, as long as M is large enough (we probably >> need >>>> it >>>>>>> to >>>>>>>> be large enough to have sufficiently sensitive expiration anyways) >>>> then >>>>>>>> each bucket's aggregate data is small enough to be in memory. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Guozhang >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Fri, Apr 5, 2019 at 7:58 PM Sophie Blee-Goldman < >>>>>> sop...@confluent.io> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hello all, >>>>>>>>> >>>>>>>>> I would like to kick off discussion of this KIP aimed at >> providing >>>>>>> sliding >>>>>>>>> window semantics to DSL aggregations. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL >>>>>>>>> >>>>>>>>> Please take a look and share any thoughts you have regarding the >> API, >>>>>>>>> semantics, design, etc! >>>>>>>>> >>>>>>>>> I also have a POC PR open with the "naive" implementation for >> your >>>>>>>>> reference: https://github.com/apache/kafka/pull/6549 >>>>>>>>> >>>>>>>>> Cheers, >>>>>>>>> Sophie >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>>> >>>> >>>> >>> >>> -- >>> -- Guozhang >> > >
signature.asc
Description: OpenPGP digital signature