From my point of view, a Tumbling/Hopping window has different semantics
than a Sliding-Window, and hence, I am not convinced atm that it's a
good idea to use 1ms-hopping-windows.



(1) I think that the window bounds are different, ie, while a
time-window hast the lower-start-time as an inclusive bound and the
larger-end-time as exclusive bound, both sliding-window both bounds are
inclusive:

5ms Hopping-Window with 1ms advance: [0,5), [1,6), [2,7)
5ms Sliding-Window: [0,4] [1,5] [2,6]

It might seem like a subtle difference, because the end-timestamp of the
sliding-windows is one smaller compared to the corresponding
hopping-window (and hence both windows span the same time effectively);
however, I think it is an important difference, as the result record
will have windowed-keys with corresponding (ie, different) start/end
timestamps.



(2) A sliding-window should only emit data if the content changes, but
not for every ms-instance.

Assume records with timestamps (5) and (7). For hopping windows we would
instantiate the following windows:

[1,6) [2,7) [3,8) [4,9) [5,10) [6,11) [7,12)

For sliding windows we only get 3 window instances:

[1,5]       [3,7]                     [7,11] [8,12]

A sliding window is only created if the content changes, while a hopping
window is created for each time-span than contains data. Note, that the
sliding window should also emit an empty aggregation when it becomes empty.



(3) I am also not sure if the idea to use "panes" of size 1ms would
really be efficient; note, that we need panes of that size to mimic
sliding-windows and we cannot have larger panes. Hence, the
data-reduction to only update a single pane instead of all overlapping
hopping-window might not be significant enough. (I am not saying that we
cannot use panes for tumbling/hopping windows, but for those, panes can
be much larger).



-Matthias




On 9/9/19 1:54 PM, Guozhang Wang wrote:
> Hello John,
> 
> I like your idea of adding a new Combinator interface better! In addition
> to your arguments, we can also leverage on each overloaded function that
> users supplies for different aggregation implementation (i.e. if combinator
> is provided we can do window-slicing, otherwise we follow the current
> approach). This is similar as existing optimizations in other frameworks
> like Spark.
> 
> Will update the corresponding wiki page.
> 
> Guozhang
> 
> 
> On Fri, Sep 6, 2019 at 11:08 AM John Roesler <j...@confluent.io> wrote:
> 
>> Thanks for this idea, Guozhang, it does seem to be a nice way to solve
>> the problem.
>>
>> I'm a _little_ concerned about the interface, though. It might be
>> better to just add a new argument to a new method overload like
>> `(initializer, aggregator, merger/combinator/whatever)`.
>>
>> Two reasons come to mind for this:
>> 1) CombineAggregator is no longer a functional interface, so users
>> have to switch to anonymous classes
>> 2) there's a discoverability problem, because the API doesn't
>> advertise CombineAggregator anywhere, it's just a magic parameter you
>> can pass to get more efficient executions
>>
>> On the other hand, adding an argument (initializer, aggregator,
>> merger/combinator/whatever) lets you supply lambdas for all the args,
>> and also makes it clear that you're getting different (more efficient)
>> execution behavior.
>>
>> WDYT?
>>
>> Thanks again,
>> -John
>>
>>
>> On Wed, Sep 4, 2019 at 7:53 PM Guozhang Wang <wangg...@gmail.com> wrote:
>>>
>>> Hi folks,
>>>
>>> I've been thinking more about this KIP and my understanding is that we
>> want
>>> to introduce a new SlidingWindow notion for aggregation since our current
>>> TimeWindow aggregation is not very efficient with very small steps. So
>> I'm
>>> wondering that rather than introducing a new implementation mechanism,
>> what
>>> if we just optimize the TimeWindowed aggregations where we can allow a
>> very
>>> small advance step (which would in practice sufficient mimic the sliding
>>> window behavior) compared to the window length itself, e.g. a window
>> length
>>> of 10 minutes with 1 second advance.
>>>
>>> I've quickly write up an alternative proposal for KIP-450 here:
>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/Optimize+Windowed+Aggregation
>>> Please
>>> let me know your thoughts.
>>>
>>>
>>> Guozhang
>>>
>>> On Tue, Apr 16, 2019 at 3:14 PM Matthias J. Sax <matth...@confluent.io>
>>> wrote:
>>>
>>>> Thanks Sophie!
>>>>
>>>>
>>>> Regarding (4), I am in favor to support both. Not sure if we can reuse
>>>> existing window store (with enabling to store duplicates) for this case
>>>> or not though, or if we need to design a new store to keep all raw
>> records?
>>>>
>>>> Btw: for holistic aggregations, like media, we would need to support a
>>>> different store layout for existing aggregations (time-window,
>>>> session-window), too. Thus, if we add support for this, we might be
>> able
>>>> to kill two birds with one stone. Of course, we would still need new
>>>> APIs for existing aggregations to allow users to pick between both
>> cases.
>>>>
>>>> I only bring this up, because it might make sense to design the store
>> in
>>>> a way such that we can use it for all cases.
>>>>
>>>>
>>>> About (3): atm we support wall-clock time via the corresponding
>>>> `WallclockTimestampeExtractor`. Maybe Bill can elaborate a little bit
>>>> more what he has in mind exactly, and why using this extractor would
>> not
>>>> meet the requirements for processing-time sliding windows?
>>>>
>>>>
>>>> -Matthias
>>>>
>>>>
>>>> On 4/16/19 10:16 AM, Guozhang Wang wrote:
>>>>> Regarding 4): yes I agree with you that invertibility is not a common
>>>>> property for agg-functions. Just to be clear about our current APIs:
>> for
>>>>> stream.aggregate we only require a single Adder function, whereas for
>>>>> table.aggregate we require both Adder and Subtractor, but these are
>> not
>>>>> used to leverage any properties just that the incoming table
>> changelog
>>>>> stream may contain "tombstones" and hence we need to negate the
>> effect of
>>>>> the previous record that has been deleted by this tombstone.
>>>>>
>>>>> What I'm proposing is exactly having two APIs, one for Adder only
>> (like
>>>>> other Streams aggregations) and one for Subtractor + Adder (for agg
>>>>> functions users think are invertible) for efficiency. Some other
>>>> frameworks
>>>>> (e.g. Spark) have similar options for users and will recommend using
>> the
>>>>> latter so that some optimization in implementation can be done.
>>>>>
>>>>>
>>>>> Guozhang
>>>>>
>>>>> On Mon, Apr 15, 2019 at 12:29 PM Sophie Blee-Goldman <
>>>> sop...@confluent.io>
>>>>> wrote:
>>>>>
>>>>>> Thanks for the feedback Matthias and Bill. After discussing offline
>> we
>>>>>> realized the type of windows I originally had in mind were quite
>>>> different,
>>>>>> and I agree now that the semantics outlined by Matthias are the
>>>> direction
>>>>>> to go in here. I will update the KIP accordingly with the new
>> semantics
>>>>>> (and corresponding design) and restart the discussion from there.
>>>>>>
>>>>>> In the meantime, to respond to some other points:
>>>>>>
>>>>>> 1) API:
>>>>>>
>>>>>> I propose adding only the one class -- public class SlidingWindows
>>>> extends
>>>>>> Windows<TimeWindow> {} --  so I do not believe we need any new
>> Serdes?
>>>> It
>>>>>> will still be a fixed size TimeWindow, but handled a bit
>> differently.
>>>> I've
>>>>>> updated the KIP to state explicitly all of the classes/methods being
>>>> added
>>>>>>
>>>>>> 2) Zero grace period
>>>>>>
>>>>>> The "zero grace period" was essentially just consequence of my
>> original
>>>>>> definition for sliding windows; with the new semantics we can (and
>>>> should)
>>>>>> allow for a nonzero grace period
>>>>>>
>>>>>> 3) Wall-clock time
>>>>>>
>>>>>> Hm, I had not considered this yet but it may be a good idea to keep
>> in
>>>> mind
>>>>>> while rethinking the design. To clarify, we don't support wall-clock
>>>> based
>>>>>> aggregations with hopping or tumbling windows though (yet?)
>>>>>>
>>>>>> 4) Commutative vs associative vs invertible aggregations
>>>>>>
>>>>>> I agree that it's reasonable to assume commutativity and
>> associativity,
>>>> but
>>>>>> that's not the same as being subtractable -- that requires
>>>> invertibility,
>>>>>> which is broken by a lot of very simple functions and is not, I
>> think,
>>>> ok
>>>>>> to assume. However we could consider adding a separate API which
>> also
>>>> takes
>>>>>> a subtractor and corresponds to a completely different
>> implementation.
>>>> We
>>>>>> could also consider an API that takes a function that aggregates two
>>>>>> aggregates together in addition to the existing aggregator (which
>>>>>> aggregates a single value with an existing aggregate) WDYT?
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, Apr 11, 2019 at 1:13 AM Matthias J. Sax <
>> matth...@confluent.io>
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks for the KIP Sophie. Couple of comments:
>>>>>>>
>>>>>>> It's a little unclear to me, what public API you propose. It seems
>> you
>>>>>>> want to add
>>>>>>>
>>>>>>>> public class SlidingWindow extends TimeWindow {}
>>>>>>>
>>>>>>> and
>>>>>>>
>>>>>>>> public class SlidingWindows extends TimeWindows {} // or maybe
>>>> `extends
>>>>>>> Windows`
>>>>>>>
>>>>>>> If yes, should we add corresponding public Serdes classes?
>>>>>>>
>>>>>>> Also, can you list all newly added classes/methods explicitly in
>> the
>>>>>> wiki?
>>>>>>>
>>>>>>>
>>>>>>> About the semantics of the operator.
>>>>>>>
>>>>>>>> "Only one single window is defined at a time,"
>>>>>>>
>>>>>>> Should this be "one window per key" instead?
>>>>>>>
>>>>>>> I agree that both window boundaries should be inclusive. However,
>> I am
>>>>>>> not sure about:
>>>>>>>
>>>>>>>> At most one record is forwarded when new data arrives
>>>>>>>
>>>>>>> (1) For what case, no output would be produced?
>>>>>>>
>>>>>>> (2) I think, if we advance in time, it can also happen that we emit
>>>>>>> multiple records. If a window "slides" (not "hops"), we cannot just
>>>>>>> advance it to the current record stream time but would need to emit
>>>> more
>>>>>>> result if records expire before the current input record is added.
>> For
>>>>>>> example, consider a window with size 5ms, and the following ts (all
>>>>>>> records have the same key):
>>>>>>>
>>>>>>> 1 2 3 10 11
>>>>>>>
>>>>>>> This should result in windows:
>>>>>>>
>>>>>>> [1]
>>>>>>> [1,2]
>>>>>>> [1,2,3]
>>>>>>> [2,3]
>>>>>>> [3]
>>>>>>> [10]
>>>>>>> [10,11]
>>>>>>>
>>>>>>> Ie, when the record with ts=10 is processed, it will trigger the
>>>>>>> computation of [2,3], [3] and [10].
>>>>>>>
>>>>>>>
>>>>>>> About out-of-order handling: I am wondering, if the current design
>> that
>>>>>>> does not allow any grace period is too restrictive. Can you
>> elaborate
>>>>>>> more on the motivation for this suggestions?
>>>>>>>
>>>>>>>
>>>>>>> Can you give more details about the "simple design"? Atm, it's not
>>>> clear
>>>>>>> to me how it works. I though we always need to store all raw
>> values. If
>>>>>>> we only store the current aggregate, would we end up with the same
>>>>>>> inefficient solution as using a hopping window with advance 1ms?
>>>>>>>
>>>>>>>
>>>>>>> For the O(sqrt(N)) proposal: can you maybe add an example with
>> concrete
>>>>>>> bucket sizes, window size etc. The current proposal is a little
>> unclear
>>>>>>> to me, atm.
>>>>>>>
>>>>>>>
>>>>>>> How are windows advance? Do you propose to advance all windows
>> over all
>>>>>>> keys at the same time, or would each window (per key) advance
>>>>>>> independent from all other windows? What would be the pros/cons for
>>>> both
>>>>>>> approaches?
>>>>>>>
>>>>>>>
>>>>>>> To add to Guozhang's comment: atm, DSL operators assume that
>> aggregate
>>>>>>> functions are commutative and associative. Hence, it seems ok to
>> make
>>>>>>> the same assumption for sliding window. Addressing holistic and
>>>>>>> non-subtractable aggregations should be supported out of the box at
>>>> some
>>>>>>> point, too, but this would be a different KIP adding this to all
>>>>>>> existing aggregations.
>>>>>>>
>>>>>>>
>>>>>>> -Matthias
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On 4/9/19 4:38 PM, Guozhang Wang wrote:
>>>>>>>> Hi Sophie,
>>>>>>>>
>>>>>>>> Thanks for the proposed KIP. I've made a pass over it and here are
>>>> some
>>>>>>>> thoughts:
>>>>>>>>
>>>>>>>> 1. "The window size is effectively the grace and retention
>> period".
>>>> The
>>>>>>>> grace time is defined as "the time to admit late-arriving events
>> after
>>>>>>> the
>>>>>>>> end of the window." hence it is the additional time beyond the
>> window
>>>>>>> size.
>>>>>>>> I guess your were trying to say it should be zero?
>>>>>>>>
>>>>>>>> Also for retention period, it is not a notion of the window spec
>> any
>>>>>>> more,
>>>>>>>> but only for the window store itself. So I'd suggest talking about
>>>>>> window
>>>>>>>> size here, and note that store retention time cannot be
>> controlled via
>>>>>>>> window spec at all.
>>>>>>>>
>>>>>>>> 2. In the "O(sqrt(N)) Design" you did not mention when / how to
>> expire
>>>>>> a
>>>>>>>> bucket, so I'd assume you will expire one bucket as a whole when
>> its
>>>>>> end
>>>>>>>> time is smaller than the current window's starting time, right?
>>>>>>>>
>>>>>>>> 3. Also in your algorithm how to choose "M" seems tricky, would
>> it be
>>>> a
>>>>>>>> configurable parameter exposed to users or is it abstracted away
>> and
>>>>>> only
>>>>>>>> being selected internally?
>>>>>>>>
>>>>>>>> 4. "There is some tradeoff between purely optimizing " seems
>>>> incomplete
>>>>>>>> paragraph?
>>>>>>>>
>>>>>>>> 5. Meta comment: for many aggregations it is commutative and
>>>>>> associative
>>>>>>> so
>>>>>>>> we can require users to pass in a "substract" function as well.
>> Given
>>>>>>> these
>>>>>>>> two function I think we can propose two set of APIs, 1) with the
>> adder
>>>>>>> and
>>>>>>>> subtractor and 2) with the added only (if the aggregate logic is
>> not
>>>>>>> comm.
>>>>>>>> and assoc.).
>>>>>>>>
>>>>>>>> We just maintain an aggregate value for each bucket (call it
>>>>>>>> bucket_aggregate) plus for the whole window (call it
>> total_aggregate),
>>>>>>> i.e.
>>>>>>>> at most M + 1 values per key. We use the total_aggregate for
>> queries,
>>>>>> and
>>>>>>>> each update will cause 2 writes (to the bucket and to the total
>>>>>>> aggregate).
>>>>>>>>
>>>>>>>> And with 1) when expiring the oldest bucket we simply call
>>>>>>>> subtract(total_aggregate, bucket_aggregate); with 2) when
>> expiring the
>>>>>>>> oldest bucket we can re-compute the total_aggregate by
>>>>>>>> sum(bucket_aggregate) over other buckets again.
>>>>>>>>
>>>>>>>> 6. Meta comment: it is reasonable to assume in practice
>>>> out-of-ordering
>>>>>>>> data is not very common, hence most of the updates will be falling
>>>> into
>>>>>>> the
>>>>>>>> latest bucket. So I'm wondering if it makes sense to always store
>> the
>>>>>>> first
>>>>>>>> bucket in memory while making other buckets optionally on
>> persistent
>>>>>>>> storage. In practice, as long as M is large enough (we probably
>> need
>>>> it
>>>>>>> to
>>>>>>>> be large enough to have sufficiently sensitive expiration anyways)
>>>> then
>>>>>>>> each bucket's aggregate data is small enough to be in memory.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Guozhang
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Apr 5, 2019 at 7:58 PM Sophie Blee-Goldman <
>>>>>> sop...@confluent.io>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hello all,
>>>>>>>>>
>>>>>>>>> I would like to kick off discussion of this KIP aimed at
>> providing
>>>>>>> sliding
>>>>>>>>> window semantics to DSL aggregations.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>>
>>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL
>>>>>>>>>
>>>>>>>>> Please take a look and share any thoughts you have regarding the
>> API,
>>>>>>>>> semantics, design, etc!
>>>>>>>>>
>>>>>>>>> I also have a POC PR open with the "naive" implementation for
>> your
>>>>>>>>> reference: https://github.com/apache/kafka/pull/6549
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Sophie
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>
>>> --
>>> -- Guozhang
>>
> 
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to