Thanks for the comments Guozhang! I've answered your questions below

On Tue, Apr 9, 2019 at 4:38 PM Guozhang Wang <wangg...@gmail.com> wrote:

> Hi Sophie,
>
> Thanks for the proposed KIP. I've made a pass over it and here are some
> thoughts:
>
> 1. "The window size is effectively the grace and retention period". The
> grace time is defined as "the time to admit late-arriving events after the
> end of the window." hence it is the additional time beyond the window size.
> I guess your were trying to say it should be zero?
>
> Also for retention period, it is not a notion of the window spec any more,
> but only for the window store itself. So I'd suggest talking about window
> size here, and note that store retention time cannot be controlled via
> window spec at all.
>

Yes, I meant to say the grace period is effectively zero -- the retention
period will ultimately be the same as the window size, which is
configurable, but it can't be configured independently if that's what you
mean?


> 2. In the "O(sqrt(N)) Design" you did not mention when / how to expire a
> bucket, so I'd assume you will expire one bucket as a whole when its end
> time is smaller than the current window's starting time, right?
>

Since this design assumes we don't have a subtracter, each bucket would
expire when it's start time is outside the current window; the remaining
values in that bucket are then aggregated with the "running aggregate" of
the next bucket to get the total aggregate for the entire window. I'll try
to come up with a diagram and/or better way to explain what I have in mind
here...
(The values themselves in the buckets will expire automatically by setting
the retention period of the underlying window store)


> 3. Also in your algorithm how to choose "M" seems tricky, would it be a
> configurable parameter exposed to users or is it abstracted away and only
> being selected internally?
>

Good question. If we ignore the difference in cost between aggregation
operations and writes to the underlying store, the optimal value of M is
sqrt(N). But the reality is the aggregation might be very simple vs
expensive RocksDB writes -- conversely the aggregation itself could be
complicated/costly while the underlying store is cheap to write  (ie
in-memory). I do feel it should be abstracted away from the user however
and not an additional parameter they need to consider and tune (eg
segmentInterval) ... some profiling would probably be needed to determine a
reasonable choice


> 4. "There is some tradeoff between purely optimizing " seems incomplete
> paragraph?
>

Whoops


> 5. Meta comment: for many aggregations it is commutative and associative so
> we can require users to pass in a "substract" function as well. Given these
> two function I think we can propose two set of APIs, 1) with the adder and
> subtractor and 2) with the added only (if the aggregate logic is not comm.
> and assoc.).
>
> We just maintain an aggregate value for each bucket (call it
> bucket_aggregate) plus for the whole window (call it total_aggregate), i.e.
> at most M + 1 values per key. We use the total_aggregate for queries, and
> each update will cause 2 writes (to the bucket and to the total aggregate).
>
> And with 1) when expiring the oldest bucket we simply call
> subtract(total_aggregate, bucket_aggregate); with 2) when expiring the
> oldest bucket we can re-compute the total_aggregate by
> sum(bucket_aggregate) over other buckets again.
>

This is a good point, ie we can definitely be much smarter in our design if
we have a subtracter, in which case it's probably worth separate sets of
APIs/implementations based on what the user can provide. I'll work this
into the KIP


> 6. Meta comment: it is reasonable to assume in practice out-of-ordering
> data is not very common, hence most of the updates will be falling into the
> latest bucket. So I'm wondering if it makes sense to always store the first
> bucket in memory while making other buckets optionally on persistent
> storage. In practice, as long as M is large enough (we probably need it to
> be large enough to have sufficiently sensitive expiration anyways) then
> each bucket's aggregate data is small enough to be in memory.
>

This sounds reasonable to me (looking into the future, if we want to
eventually support a way to "tune" the total memory usage by Streams this
could be turned on/off)


> Guozhang
>
>
>
> On Fri, Apr 5, 2019 at 7:58 PM Sophie Blee-Goldman <sop...@confluent.io>
> wrote:
>
> > Hello all,
> >
> > I would like to kick off discussion of this KIP aimed at providing
> sliding
> > window semantics to DSL aggregations.
> >
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL
> >
> > Please take a look and share any thoughts you have regarding the API,
> > semantics, design, etc!
> >
> > I also have a POC PR open with the "naive" implementation for your
> > reference: https://github.com/apache/kafka/pull/6549
> >
> > Cheers,
> > Sophie
> >
>
>
> --
> -- Guozhang
>

Reply via email to