Hi folks, I've been thinking more about this KIP and my understanding is that we want to introduce a new SlidingWindow notion for aggregation since our current TimeWindow aggregation is not very efficient with very small steps. So I'm wondering that rather than introducing a new implementation mechanism, what if we just optimize the TimeWindowed aggregations where we can allow a very small advance step (which would in practice sufficient mimic the sliding window behavior) compared to the window length itself, e.g. a window length of 10 minutes with 1 second advance.
I've quickly write up an alternative proposal for KIP-450 here: https://cwiki.apache.org/confluence/display/KAFKA/Optimize+Windowed+Aggregation Please let me know your thoughts. Guozhang On Tue, Apr 16, 2019 at 3:14 PM Matthias J. Sax <matth...@confluent.io> wrote: > Thanks Sophie! > > > Regarding (4), I am in favor to support both. Not sure if we can reuse > existing window store (with enabling to store duplicates) for this case > or not though, or if we need to design a new store to keep all raw records? > > Btw: for holistic aggregations, like media, we would need to support a > different store layout for existing aggregations (time-window, > session-window), too. Thus, if we add support for this, we might be able > to kill two birds with one stone. Of course, we would still need new > APIs for existing aggregations to allow users to pick between both cases. > > I only bring this up, because it might make sense to design the store in > a way such that we can use it for all cases. > > > About (3): atm we support wall-clock time via the corresponding > `WallclockTimestampeExtractor`. Maybe Bill can elaborate a little bit > more what he has in mind exactly, and why using this extractor would not > meet the requirements for processing-time sliding windows? > > > -Matthias > > > On 4/16/19 10:16 AM, Guozhang Wang wrote: > > Regarding 4): yes I agree with you that invertibility is not a common > > property for agg-functions. Just to be clear about our current APIs: for > > stream.aggregate we only require a single Adder function, whereas for > > table.aggregate we require both Adder and Subtractor, but these are not > > used to leverage any properties just that the incoming table changelog > > stream may contain "tombstones" and hence we need to negate the effect of > > the previous record that has been deleted by this tombstone. > > > > What I'm proposing is exactly having two APIs, one for Adder only (like > > other Streams aggregations) and one for Subtractor + Adder (for agg > > functions users think are invertible) for efficiency. Some other > frameworks > > (e.g. Spark) have similar options for users and will recommend using the > > latter so that some optimization in implementation can be done. > > > > > > Guozhang > > > > On Mon, Apr 15, 2019 at 12:29 PM Sophie Blee-Goldman < > sop...@confluent.io> > > wrote: > > > >> Thanks for the feedback Matthias and Bill. After discussing offline we > >> realized the type of windows I originally had in mind were quite > different, > >> and I agree now that the semantics outlined by Matthias are the > direction > >> to go in here. I will update the KIP accordingly with the new semantics > >> (and corresponding design) and restart the discussion from there. > >> > >> In the meantime, to respond to some other points: > >> > >> 1) API: > >> > >> I propose adding only the one class -- public class SlidingWindows > extends > >> Windows<TimeWindow> {} -- so I do not believe we need any new Serdes? > It > >> will still be a fixed size TimeWindow, but handled a bit differently. > I've > >> updated the KIP to state explicitly all of the classes/methods being > added > >> > >> 2) Zero grace period > >> > >> The "zero grace period" was essentially just consequence of my original > >> definition for sliding windows; with the new semantics we can (and > should) > >> allow for a nonzero grace period > >> > >> 3) Wall-clock time > >> > >> Hm, I had not considered this yet but it may be a good idea to keep in > mind > >> while rethinking the design. To clarify, we don't support wall-clock > based > >> aggregations with hopping or tumbling windows though (yet?) > >> > >> 4) Commutative vs associative vs invertible aggregations > >> > >> I agree that it's reasonable to assume commutativity and associativity, > but > >> that's not the same as being subtractable -- that requires > invertibility, > >> which is broken by a lot of very simple functions and is not, I think, > ok > >> to assume. However we could consider adding a separate API which also > takes > >> a subtractor and corresponds to a completely different implementation. > We > >> could also consider an API that takes a function that aggregates two > >> aggregates together in addition to the existing aggregator (which > >> aggregates a single value with an existing aggregate) WDYT? > >> > >> > >> > >> > >> On Thu, Apr 11, 2019 at 1:13 AM Matthias J. Sax <matth...@confluent.io> > >> wrote: > >> > >>> Thanks for the KIP Sophie. Couple of comments: > >>> > >>> It's a little unclear to me, what public API you propose. It seems you > >>> want to add > >>> > >>>> public class SlidingWindow extends TimeWindow {} > >>> > >>> and > >>> > >>>> public class SlidingWindows extends TimeWindows {} // or maybe > `extends > >>> Windows` > >>> > >>> If yes, should we add corresponding public Serdes classes? > >>> > >>> Also, can you list all newly added classes/methods explicitly in the > >> wiki? > >>> > >>> > >>> About the semantics of the operator. > >>> > >>>> "Only one single window is defined at a time," > >>> > >>> Should this be "one window per key" instead? > >>> > >>> I agree that both window boundaries should be inclusive. However, I am > >>> not sure about: > >>> > >>>> At most one record is forwarded when new data arrives > >>> > >>> (1) For what case, no output would be produced? > >>> > >>> (2) I think, if we advance in time, it can also happen that we emit > >>> multiple records. If a window "slides" (not "hops"), we cannot just > >>> advance it to the current record stream time but would need to emit > more > >>> result if records expire before the current input record is added. For > >>> example, consider a window with size 5ms, and the following ts (all > >>> records have the same key): > >>> > >>> 1 2 3 10 11 > >>> > >>> This should result in windows: > >>> > >>> [1] > >>> [1,2] > >>> [1,2,3] > >>> [2,3] > >>> [3] > >>> [10] > >>> [10,11] > >>> > >>> Ie, when the record with ts=10 is processed, it will trigger the > >>> computation of [2,3], [3] and [10]. > >>> > >>> > >>> About out-of-order handling: I am wondering, if the current design that > >>> does not allow any grace period is too restrictive. Can you elaborate > >>> more on the motivation for this suggestions? > >>> > >>> > >>> Can you give more details about the "simple design"? Atm, it's not > clear > >>> to me how it works. I though we always need to store all raw values. If > >>> we only store the current aggregate, would we end up with the same > >>> inefficient solution as using a hopping window with advance 1ms? > >>> > >>> > >>> For the O(sqrt(N)) proposal: can you maybe add an example with concrete > >>> bucket sizes, window size etc. The current proposal is a little unclear > >>> to me, atm. > >>> > >>> > >>> How are windows advance? Do you propose to advance all windows over all > >>> keys at the same time, or would each window (per key) advance > >>> independent from all other windows? What would be the pros/cons for > both > >>> approaches? > >>> > >>> > >>> To add to Guozhang's comment: atm, DSL operators assume that aggregate > >>> functions are commutative and associative. Hence, it seems ok to make > >>> the same assumption for sliding window. Addressing holistic and > >>> non-subtractable aggregations should be supported out of the box at > some > >>> point, too, but this would be a different KIP adding this to all > >>> existing aggregations. > >>> > >>> > >>> -Matthias > >>> > >>> > >>> > >>> On 4/9/19 4:38 PM, Guozhang Wang wrote: > >>>> Hi Sophie, > >>>> > >>>> Thanks for the proposed KIP. I've made a pass over it and here are > some > >>>> thoughts: > >>>> > >>>> 1. "The window size is effectively the grace and retention period". > The > >>>> grace time is defined as "the time to admit late-arriving events after > >>> the > >>>> end of the window." hence it is the additional time beyond the window > >>> size. > >>>> I guess your were trying to say it should be zero? > >>>> > >>>> Also for retention period, it is not a notion of the window spec any > >>> more, > >>>> but only for the window store itself. So I'd suggest talking about > >> window > >>>> size here, and note that store retention time cannot be controlled via > >>>> window spec at all. > >>>> > >>>> 2. In the "O(sqrt(N)) Design" you did not mention when / how to expire > >> a > >>>> bucket, so I'd assume you will expire one bucket as a whole when its > >> end > >>>> time is smaller than the current window's starting time, right? > >>>> > >>>> 3. Also in your algorithm how to choose "M" seems tricky, would it be > a > >>>> configurable parameter exposed to users or is it abstracted away and > >> only > >>>> being selected internally? > >>>> > >>>> 4. "There is some tradeoff between purely optimizing " seems > incomplete > >>>> paragraph? > >>>> > >>>> 5. Meta comment: for many aggregations it is commutative and > >> associative > >>> so > >>>> we can require users to pass in a "substract" function as well. Given > >>> these > >>>> two function I think we can propose two set of APIs, 1) with the adder > >>> and > >>>> subtractor and 2) with the added only (if the aggregate logic is not > >>> comm. > >>>> and assoc.). > >>>> > >>>> We just maintain an aggregate value for each bucket (call it > >>>> bucket_aggregate) plus for the whole window (call it total_aggregate), > >>> i.e. > >>>> at most M + 1 values per key. We use the total_aggregate for queries, > >> and > >>>> each update will cause 2 writes (to the bucket and to the total > >>> aggregate). > >>>> > >>>> And with 1) when expiring the oldest bucket we simply call > >>>> subtract(total_aggregate, bucket_aggregate); with 2) when expiring the > >>>> oldest bucket we can re-compute the total_aggregate by > >>>> sum(bucket_aggregate) over other buckets again. > >>>> > >>>> 6. Meta comment: it is reasonable to assume in practice > out-of-ordering > >>>> data is not very common, hence most of the updates will be falling > into > >>> the > >>>> latest bucket. So I'm wondering if it makes sense to always store the > >>> first > >>>> bucket in memory while making other buckets optionally on persistent > >>>> storage. In practice, as long as M is large enough (we probably need > it > >>> to > >>>> be large enough to have sufficiently sensitive expiration anyways) > then > >>>> each bucket's aggregate data is small enough to be in memory. > >>>> > >>>> > >>>> > >>>> Guozhang > >>>> > >>>> > >>>> > >>>> On Fri, Apr 5, 2019 at 7:58 PM Sophie Blee-Goldman < > >> sop...@confluent.io> > >>>> wrote: > >>>> > >>>>> Hello all, > >>>>> > >>>>> I would like to kick off discussion of this KIP aimed at providing > >>> sliding > >>>>> window semantics to DSL aggregations. > >>>>> > >>>>> > >>>>> > >>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL > >>>>> > >>>>> Please take a look and share any thoughts you have regarding the API, > >>>>> semantics, design, etc! > >>>>> > >>>>> I also have a POC PR open with the "naive" implementation for your > >>>>> reference: https://github.com/apache/kafka/pull/6549 > >>>>> > >>>>> Cheers, > >>>>> Sophie > >>>>> > >>>> > >>>> > >>> > >>> > >> > > > > > > -- -- Guozhang