Thanks Bruno!

Argument for option 1 is:
1. Concise and descriptive. It avoids overloading existing functions and
it's very clear what it's doing. Imagine if there's a autocomplete feature
in Intellij or other IDE for our DSL in the future, it's not favorable to
show 6 `windowedBy` functions.
2. Option 1 is operated on `windowedStream` to configure how it should be
outputted. Option 2 operates on `KGroupedStream` to produce
`windowedStream` as well as configure how `windowedStream` should be
    outputted. I feel it's better to have a `windowedStream` and then
configure how it can be outputted. Somehow I feel option 2 breaks the
builder pattern.
3. `WindowedByParameters` doesn't seem very descriptive. If we put all
kinds of different parameters into it to avoid future overloading, it's too
bloated and not very user friendly.

I agree option 1's `trigger` function is configuring the stream which feels
different from existing `count` or `aggregate` etc. Configuring might be
also a kind of action to stream :) I'm not sure if it breaks DSL principle
and if it does,
can we relax the principle given the benefits compared to option 2)? Maybe
John can chime in as the DSL grammar author.

Thanks,
Hao

On Wed, Mar 23, 2022 at 2:59 AM Bruno Cadonna <cado...@apache.org> wrote:

> Hi Hao,
>
> I agree with Guozhang: Great summary! Thank you!
>
> Regarding "aligned with other config class names", there is this DSL
> grammar John once specified
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+DSL+Grammar
> and we have already used it in the code. I found the grammar quite useful.
>
> I am undecided if option 1 is really worth it. What are actually the
> arguments in favor of it? Is it only that we do not need to overload
> other methods? This does not seem worth to break DSL principles. An
> alternative proposal would be to go with option 2 and conform with the
> grammar above:
>
> <W extends Window> TimeWindowedKStream<K, V> windowedBy(final Windows<W>
> windows, WindowedByParameters parameters);
>
> TimeWindowedKStream<K, V> windowedBy(final SlidingWindows windows,
> WindowedByParameters parameters);
>
> SessionWindowedKStream<K, V> windowedBy(final SessionWindows windows,
> WindowedByParameters parameters);
>
> This is similar to option 2 in the KIP, but it ensures that we put all
> future needed configs in the parameters object and we do not need to
> overload the methods anymore.
>
> Then if we also get KAFKA-10298 done, we could even collapse all
> `windowedBy()` methods into one.
>
> Best,
> Bruno
>
> On 22.03.22 22:31, Guozhang Wang wrote:
> > Thanks for the great summary Hao. I'm still learning towards option 2)
> > here, and I'm in favor of `trigger` as function name, and `Triggered` as
> > config class name (mainly to be aligned with other config class names).
> > Also want to see other's preferences between the options, as well as the
> > namings.
> >
> >
> > Guozhang
> >
> >
> >
> > On Tue, Mar 22, 2022 at 12:23 PM Hao Li <h...@confluent.io.invalid>
> wrote:
> >
> >> `windowedStream.onWindowClose()` was the original option 1
> >> (`windowedStream.emitFinal()`) but was rejected
> >> because we could add more emit types and this will result in adding more
> >> functions. I still prefer the
> >> "windowedStream.someFunc(Controlled.onWindowClose)"
> >> model since it's flexible and clear that it's configuring the emit
> policy.
> >> Let me summarize all the naming options we have and compare:
> >>
> >> *API function name:*
> >>
> >> *1. `windowedStream.trigger()`*
> >>      Pros:
> >>         i. Simple
> >>         ii. Similar to Flink's trigger function (is this a con
> actually?)
> >>      Cons:
> >>         i. `trigger()` can be confused with Flink trigger (raised by
> John)
> >>         ii. `trigger()` feels like an operation instead of a configure
> >> function (raised by Bruno)?
> >>
> >> *2. `windowedStream.emitTrigger()`*
> >>       Pros:
> >>         i. Avoid confusion from Flink's trigger API
> >>         ii. `emitTrigger` feels like configuring the trigger because
> >> "trigger" here is a noun instead of verbose in `trigger()`
> >>       Cons:
> >>       i: Verbose?
> >>      ii: Not consistent with `Suppressed.untilWindowClose`?
> >>
> >>
> >> *Config class/object name:*
> >>
> >> 1. *`Emitted.onWindowClose()`* and *`Emitted.onEachUpdate()`*
> >>       Cons:
> >>       i. Doesn't go along with `trigger` (raised by Bruno)
> >>
> >> 2. *`Triggered.onWindowClose()`* and *`Triggered.onEachUpdate()`*
> >>
> >> 3. *`EmitTrigger.onWindowClose()`* and *`EmitTrigger.onEachUpdate()`*
> >>
> >> 4. *`(Emit|Trigger)(Config|Policy).onWindowClose()`* and
> >> *`(Emit|Trigger)(Config|Policy).onEachUpdate()`*
> >>       This is a combination of different names like: `EmitConfig`,
> >> `EmitPolicy`, `TriggerConfig` and `TriggerPolicy`...
> >>
> >>
> >> If we are settled with option 1), we can add new options to these names
> and
> >> comment on their Pros and Cons.
> >>
> >> Hao
> >>
> >>
> >>
> >>
> >>
> >> On Tue, Mar 22, 2022 at 10:48 AM Guozhang Wang <wangg...@gmail.com>
> wrote:
> >>
> >>> I see what you mean now, and I think it's a fair point that composing
> >>> `trigger` and `emitted` seems awkward.
> >>>
> >>> Re: data process operator v.s. control operator, I shared your concern
> as
> >>> well, and here's my train of thoughts: Having only data process
> operators
> >>> was my primary motivation for how we add the suppress operator --- it
> >>> indeed "suppresses" data. But as a hind-sight it's disadvantage is
> that,
> >>> for example in Suppressed.onWindowClose() should be only related to an
> >>> earlier windowedBy operator which is possibly very far from it in the
> >>> resulting DSL code. It's not only a bit awkward for users to write such
> >>> code, but also in such cases the DSL builder needs to maintain and
> >>> propagate this information to the suppress operator further down. So we
> >> are
> >>> now thinking about "putting the control object as close as to where the
> >>> related processor really happens". And in that world my original
> >>> preference was somewhere in option 2), i.e. just put the control as a
> >> param
> >>> of the related "windowedBy" operator, but the trade-off is we keep
> adding
> >>> overloaded functions to these operators. So after some back and forth
> >>> thoughts I'm learning towards relaxing our principles to only have
> >>> processing operators but no flow-control operators. That being said, if
> >> you
> >>> have any ideas that we can have both world's benefits I'm all ears.
> >>>
> >>> Re: using a direct function like "windowedStream.onWindowClose()" v.s.
> >>> "windowedStream.someFunc(Controlled.onWindowClose)", again my
> motivation
> >>> for the latter is for extensibility without adding more functions in
> the
> >>> future. If people feel this is not worthy we can do the first option as
> >>> well. If we just feel the `trigger` and `emitted` does not feel
> >> composible
> >>> together, maybe we can consider something like
> >>> `windowedStream.trigger(Triggered.onWindowClose())"?
> >>>
> >>> Re: windowedBy v.s. windowBy, yeah I do not really have a good reason
> why
> >>> we should use past term as well :P But if it's not bothering people
> much
> >>> I'd say we just keep it than deprecate/rename new APIs.
> >>>
> >>>
> >>> On Tue, Mar 22, 2022 at 9:42 AM Bruno Cadonna <cado...@apache.org>
> >> wrote:
> >>>
> >>>> Hi Guozhang,
> >>>>
> >>>> There is no semantic difference. It is a cosmetic difference.
> >>>> Conceptually, I relate `Emitted` with the aggregation and not with
> >>>> `trigger()` in the API flow, because the aggregation emits the result
> >>>> not `trigger()`. Therefore, I proposed to not use `Emitted` as the
> name
> >>>> of the config object passed to `trigger()`.
> >>>>
> >>>>
> >>>> Best,
> >>>> Bruno
> >>>>
> >>>> On 22.03.22 17:24, Guozhang Wang wrote:
> >>>>> Hi Bruno,
> >>>>>
> >>>>> Could you elaborate a bit more here, what's the semantic difference
> >>>> between
> >>>>> "the aggregation is triggered on window close and all aggregation
> >>> results
> >>>>> are emitted." for trigger(TriggerParameters.onWindowClose()), and
> >> "the
> >>>>> aggregation is configured to only emit final results." for
> >>>>> trigger(Emitted.onWindowClose())?
> >>>>>
> >>>>> On Tue, Mar 22, 2022 at 4:19 AM Bruno Cadonna <cado...@apache.org>
> >>>> wrote:
> >>>>>
> >>>>>> Hi Hao,
> >>>>>>
> >>>>>> Thank you for the KIP!
> >>>>>>
> >>>>>> Regarding option 1, I would not use `Emitted.onWindowClose()` since
> >>> that
> >>>>>> does not seem compatible with the proposed flow. Conceptually, now
> >> the
> >>>>>> flow states that the aggregation is triggered on window close and
> >> all
> >>>>>> aggregation results are emitted. `Emitted` suggests that the
> >>> aggregation
> >>>>>> is configured to only emit final results.
> >>>>>>
> >>>>>> Thus, I propose the following:
> >>>>>>
> >>>>>> stream
> >>>>>>        .groupBy(..)
> >>>>>>        .windowedBy(..)
> >>>>>>        .trigger(TriggerParameters.onWindowClose())
> >>>>>>        .aggregate(..) //result in a KTable<Windowed<..>>
> >>>>>>        .mapValues(..)
> >>>>>>
> >>>>>> An alternative to `trigger()` could be `schedule()`, but I do not
> >>> really
> >>>>>> like it.
> >>>>>>
> >>>>>> One thing I noticed with option 1 is that all other methods in the
> >>>>>> example above are operations on data. `groupBy()` groups,
> >>> `windowedBy()`
> >>>>>> partitions, `aggregate()` computes the aggregate, `mapValues()` maps
> >>>>>> values, even `suppress()` suppresses intermediate results. But what
> >>> does
> >>>>>> `trigger()` do? `trigger()` seems a config lost among operations.
> >>>>>>
> >>>>>> However, if we do not want to restrict ourselves to only use methods
> >>>>>> when we want to specify operations on data, I have the following
> >>>> proposal:
> >>>>>>
> >>>>>> stream
> >>>>>>        .groupBy(..)
> >>>>>>        .windowedBy(..)
> >>>>>>        .onWindowClose()
> >>>>>>        .aggregate(..) //result in a KTable<Windowed<..>>
> >>>>>>        .mapValues(..)
> >>>>>>
> >>>>>> Best,
> >>>>>> Bruno
> >>>>>>
> >>>>>> P.S.: Why is it `windowedBy()` and not `windowBy()`? All other
> >>>>>> operations also use present tense.
> >>>>>>
> >>>>>> On 22.03.22 06:36, Hao Li wrote:
> >>>>>>> Hi John,
> >>>>>>>
> >>>>>>> Yes. For naming, `trigger` is similar to Flink's trigger, but it
> >> has
> >>> a
> >>>>>>> different meaning in our case. `emit` sounds like an action to
> >> emit?
> >>>> How
> >>>>>>> about `emitTrigger`? I'm open to suggestions for the naming.
> >>>>>>>
> >>>>>>> For deprecating `Suppressed.untilWindowClose`, I agree with
> >> Guozhang
> >>> we
> >>>>>> can
> >>>>>>> deprecate `Suppressed` config as a whole later. Or we can deprecate
> >>>>>>> `Suppressed.untilWindowClose` in later KIP after implementation of
> >>> emit
> >>>>>>> final is done.
> >>>>>>>
> >>>>>>> BTW, isn't
> >>>>>>>
> >>>>>>> stream
> >>>>>>>      .groupBy(..)
> >>>>>>>      .windowBy(..)
> >>>>>>>      .aggregate(..) //result in a KTable<Windowed<..>>
> >>>>>>>      .mapValues(..)
> >>>>>>>      .suppress(Suppressed.untilWindowClose) // since we can trace
> >> back
> >>>> to
> >>>>>>> parent node, to find a window definition
> >>>>>>>
> >>>>>>> same as
> >>>>>>>
> >>>>>>> stream
> >>>>>>>      .groupBy(..)
> >>>>>>>      .windowBy(..)
> >>>>>>>      .trigger(Emitted.onWindowClose)
> >>>>>>>      .aggregate(..) //result in a KTable<Windowed<..>>
> >>>>>>>      .mapValues(..)
> >>>>>>> ?
> >>>>>>>
> >>>>>>> Hao
> >>>>>>>
> >>>>>>>
> >>>>>>> On Mon, Mar 21, 2022 at 7:28 PM Guozhang Wang <wangg...@gmail.com>
> >>>>>> wrote:
> >>>>>>>
> >>>>>>>> I think the following case is only doable via `suppress`:
> >>>>>>>>
> >>>>>>>> stream
> >>>>>>>>      .groupBy(..)
> >>>>>>>>      .windowBy(..)
> >>>>>>>>      .aggregate(..) //result in a KTable<Windowed<..>>
> >>>>>>>>      .mapValues(..)
> >>>>>>>>      .suppress(Suppressed.untilWindowClose) // since we can trace
> >>> back
> >>>> to
> >>>>>>>> parent node, to find a window definition
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Guozhang
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Mon, Mar 21, 2022 at 6:36 PM John Roesler <vvcep...@apache.org
> >>>
> >>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Thanks, Guozhang!
> >>>>>>>>>
> >>>>>>>>> To clarify, I was asking specifically about deprecating just the
> >>>> method
> >>>>>>>>> ‘untilWindowClose’. I might not be thinking clearly about it,
> >>> though.
> >>>>>>>> What
> >>>>>>>>> does untilWindowClose do that this KIP doesn’t cover?
> >>>>>>>>>
> >>>>>>>>> Thanks,
> >>>>>>>>> John
> >>>>>>>>>
> >>>>>>>>> On Mon, Mar 21, 2022, at 20:31, Guozhang Wang wrote:
> >>>>>>>>>> Just my 2c: Suppressed is in `suppress` whose application scope
> >> is
> >>>>>> much
> >>>>>>>>>> larger and hence more flexible. I.e. it can be used anywhere
> >> for a
> >>>>>>>>> `KTable`
> >>>>>>>>>> (but internally we would check whether certain emit policies
> >> like
> >>>>>>>>>> `untilWindowClose` is valid or not), whereas `trigger` as for
> >> now
> >>> is
> >>>>>>>> only
> >>>>>>>>>> applicable in XXWindowedKStream. So I think it would not be
> >>>> completely
> >>>>>>>>>> replacing Suppressed.untilWindowClose.
> >>>>>>>>>>
> >>>>>>>>>> In the future, personally I'd still want to keep one control
> >>> object
> >>>>>>>> still
> >>>>>>>>>> for all emit policies, and maybe if we have extended Emitted for
> >>>> other
> >>>>>>>>>> emitting policies covered by Suppressed today, we can discuss if
> >>> we
> >>>>>>>> could
> >>>>>>>>>> have `KTable.suppress(Emitted..)` replacing
> >>>>>>>>> `KTable.suppress(Suppressed..)`
> >>>>>>>>>> as a whole, but for this KIP I think it's too early.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> Guozhang
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On Mon, Mar 21, 2022 at 6:18 PM John Roesler <
> >> vvcep...@apache.org
> >>>>
> >>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Hi all,
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks for the Kip, Hao!
> >>>>>>>>>>>
> >>>>>>>>>>> For what it’s worth, I’m also in favor of your latest framing
> >> of
> >>>> the
> >>>>>>>>> API,
> >>>>>>>>>>>
> >>>>>>>>>>> I think the name is fine. I assume it’s inspired by Flink? It’s
> >>> not
> >>>>>>>>>>> identical to the concept of a trigger in Flink, which specifies
> >>>> when
> >>>>>>>> to
> >>>>>>>>>>> evaluate the window, which might be confusing to some people
> >> who
> >>>> have
> >>>>>>>>> deep
> >>>>>>>>>>> experience with Flink. Then again, it seems close enough that
> >> it
> >>>>>>>> should
> >>>>>>>>> be
> >>>>>>>>>>> clear to casual Flink users. For people with no other stream
> >>>>>>>> processing
> >>>>>>>>>>> experience, it might seem a bit esoteric compared to something
> >>>>>>>>>>> self-documenting like ‘emit()’, but the docs should  make it
> >>> clear.
> >>>>>>>>>>>
> >>>>>>>>>>> One small question: it seems like this proposal is identical to
> >>>>>>>>>>> Suppressed.untilWindowClose, and the KIP states that this API
> >> is
> >>>>>>>>> superior.
> >>>>>>>>>>> In that case, should we deprecate Suppressed.untilWindowClose?
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks,
> >>>>>>>>>>> John
> >>>>>>>>>>>
> >>>>>>>>>>> On Mon, Mar 21, 2022, at 19:30, Guozhang Wang wrote:
> >>>>>>>>>>>> Hi Hao,
> >>>>>>>>>>>>
> >>>>>>>>>>>> For 2), I think it's a good idea in general to use a separate
> >>>>>>>>> function on
> >>>>>>>>>>>> the Time/SessionWindowedKStream itself, to achieve the same
> >>> effect
> >>>>>>>>> that,
> >>>>>>>>>>>> for now, the emitting control is only for windowed
> >> aggregations
> >>> as
> >>>>>>>> in
> >>>>>>>>>>> this
> >>>>>>>>>>>> KIP, than overloading existing functions. We can discuss
> >> further
> >>>>>>>> about
> >>>>>>>>>>> the
> >>>>>>>>>>>> actual function names, whether others like the name `trigger`
> >> or
> >>>>>>>> not.
> >>>>>>>>> As
> >>>>>>>>>>>> for myself I feel `trigger` is a good one but I'd like to see
> >> if
> >>>>>>>>> others
> >>>>>>>>>>>> have opinions as well.
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> Guozhang
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Mon, Mar 21, 2022 at 5:18 PM Hao Li
> >> <h...@confluent.io.invalid
> >>>>
> >>>>>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> Hi Guozhang,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks for the feedback.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 1. I agree to have an `Emitted` control class and two static
> >>>>>>>>>>> constructors
> >>>>>>>>>>>>> named `onWindowClose` and `onEachUpdate`.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 2. For the API function changes, I'm thinking of adding a new
> >>>>>>>>> function
> >>>>>>>>>>>>> called `trigger` to `TimeWindowedKStream` and
> >>>>>>>>> `SessionWindowedKStream`.
> >>>>>>>>>>> It
> >>>>>>>>>>>>> takes `Emitted` config and returns the same stream. Example:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> stream
> >>>>>>>>>>>>>      .groupBy(...)
> >>>>>>>>>>>>>      .windowedBy(...)
> >>>>>>>>>>>>>      .trigger(Emitted.onWindowClose). // N
> >>>>>>>>>>>>>      .count()
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> The benefits are:
> >>>>>>>>>>>>>      1. It's simple and avoids creating overloading of
> >> existing
> >>>>>>>>> functions
> >>>>>>>>>>> like
> >>>>>>>>>>>>> `windowedBy` or `count`, `reduce` or `aggregate`. In fact, to
> >>> add
> >>>>>>>> it
> >>>>>>>>> to
> >>>>>>>>>>>>> `aggregate` functions, we need to add it to all existing
> >>> `count`,
> >>>>>>>>>>>>> `aggregate` overloading functions which is a lot.
> >>>>>>>>>>>>>      2. It operates directly on windowed kstream and tells
> how
> >>> its
> >>>>>>>>> output
> >>>>>>>>>>>>> should be configured, if later we need to add this other type
> >>> of
> >>>>>>>>>>> streams,
> >>>>>>>>>>>>> we can reuse same `trigger` API whereas other type of
> >>>>>>>> streams/tables
> >>>>>>>>> may
> >>>>>>>>>>>>> not have `aggregate`, `windowedby` api to make it consistent.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Hao
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Sat, Mar 19, 2022 at 5:40 PM Guozhang Wang <
> >>>> wangg...@gmail.com>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> Hello Hao,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I'm preferring option 2 over the other options mainly
> >> because
> >>>> the
> >>>>>>>>>>> added
> >>>>>>>>>>>>>> config object could potentially be used in other operators
> >> as
> >>>>>>>> well
> >>>>>>>>>>> (not
> >>>>>>>>>>>>>> necessarily has to be a windowed operator and hence have to
> >> be
> >>>>>>>>>>>>> piggy-backed
> >>>>>>>>>>>>>> on `windowedBy`, and that's also why I suggested not naming
> >> it
> >>>>>>>>>>>>>> `WindowConfig` but just `EmitConfig`).
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> As for Matthias' question, I think the difference between
> >> the
> >>>>>>>>> windowed
> >>>>>>>>>>>>>> aggregate operator and the stream-stream join operator is
> >>> that,
> >>>>>>>> for
> >>>>>>>>>>> the
> >>>>>>>>>>>>>> latter we think emit-final should be the only right emitting
> >>>>>>>> policy
> >>>>>>>>>>> and
> >>>>>>>>>>>>>> hence we should not let users to configure it. If users
> >>>> configure
> >>>>>>>>> it
> >>>>>>>>>>> to
> >>>>>>>>>>>>>> e.g. emit eager they may get the old spurious emitting
> >>> behavior
> >>>>>>>>> which
> >>>>>>>>>>> is
> >>>>>>>>>>>>>> violating the semantics.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> For option 2) itself, I have a few more thoughts:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 1. Thinking about Matthias' suggestions, I'm also leaning a
> >>> bit
> >>>>>>>>>>>>>> towards adding the new param in the overloaded `aggregate`,
> >>> than
> >>>>>>>>> the
> >>>>>>>>>>>>>> overloaded `windowBy` function. The reason is that the
> >>> emitting
> >>>>>>>>> logic
> >>>>>>>>>>>>> could
> >>>>>>>>>>>>>> be either window based or non-window based, in the long run.
> >>>>>>>> Though
> >>>>>>>>>>> for
> >>>>>>>>>>>>>> this KIP we could just add it in
> >>>>>>>> `XXXWindowedKStream.aggregate()`,
> >>>>>>>>> we
> >>>>>>>>>>> may
> >>>>>>>>>>>>>> want to extend to other non-windowed operators in the
> >> future.
> >>>>>>>>>>>>>> 2. To be consistent with other control class names, I feel
> >>> maybe
> >>>>>>>> we
> >>>>>>>>>>> can
> >>>>>>>>>>>>>> name it "Emitted", not "EmitConfig".
> >>>>>>>>>>>>>> 3. Following the first comment, I think we can have the
> >> static
> >>>>>>>>>>>>> constructor
> >>>>>>>>>>>>>> names as "onWindowClose" and "onEachUpdate".
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> The resulted code pattern would be like this:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>       stream
> >>>>>>>>>>>>>>         .groupBy(..)
> >>>>>>>>>>>>>>         .windowBy(TimeWindow..)
> >>>>>>>>>>>>>>         .count(Emitted.onWindowClose)
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> WDYT?
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On Wed, Mar 16, 2022 at 12:07 PM Matthias J. Sax <
> >>>>>>>> mj...@apache.org
> >>>>>>>>>>
> >>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> `allowedLateness` may not be a good name. What I have in
> >>>>>>>> mind
> >>>>>>>>> is
> >>>>>>>>>>> to
> >>>>>>>>>>>>>> use
> >>>>>>>>>>>>>>>>> this to control how frequently we try to emit final
> >>> results.
> >>>>>>>>>>> Maybe
> >>>>>>>>>>>>>> it's
> >>>>>>>>>>>>>>>>> more flexible to be used as config in properties as we
> >>> don't
> >>>>>>>>>>> need to
> >>>>>>>>>>>>>>>>> recompile DSL to change it.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> I see; making it a config seems better. Frankly, I am not
> >>> even
> >>>>>>>>> sure
> >>>>>>>>>>> if
> >>>>>>>>>>>>>>> we need a config at all or if we can just hard code it? For
> >>> the
> >>>>>>>>>>>>>>> stream-stream join left/outer join fix, there is only an
> >>>>>>>> internal
> >>>>>>>>>>>>> config
> >>>>>>>>>>>>>>> but no public config either.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Option 1: Your proposal is?
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>       stream
> >>>>>>>>>>>>>>>         .groupByKey()
> >>>>>>>>>>>>>>>         .windowBy(TimeWindow.ofSizeNoGrace(...))
> >>>>>>>>>>>>>>>         .configure(EmitConfig.emitFinal()
> >>>>>>>>>>>>>>>         .count()
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Does not change my argument that it seems to be misplace
> >> from
> >>>>>>>> an
> >>>>>>>>> API
> >>>>>>>>>>>>>>> flow POV.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Option 1 seems to be the least desirable to me.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> For option 2 and 3, and not sure which one I like better.
> >>> Might
> >>>>>>>>> be
> >>>>>>>>>>> good
> >>>>>>>>>>>>>>> if other could chime in, too. I think I slightly prefer
> >>> option
> >>>>>>>> 2
> >>>>>>>>>>> over
> >>>>>>>>>>>>>>> option 3.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On 3/15/22 5:33 PM, Hao Li wrote:
> >>>>>>>>>>>>>>>> Thanks for the feedback Matthias.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> `allowedLateness` may not be a good name. What I have in
> >>> mind
> >>>>>>>>> is
> >>>>>>>>>>> to
> >>>>>>>>>>>>> use
> >>>>>>>>>>>>>>>> this to control how frequently we try to emit final
> >> results.
> >>>>>>>>> Maybe
> >>>>>>>>>>>>> it's
> >>>>>>>>>>>>>>>> more flexible to be used as config in properties as we
> >> don't
> >>>>>>>>> need
> >>>>>>>>>>> to
> >>>>>>>>>>>>>>>> recompile DSL to change it.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> For option 1, I intend to use `emitFinal` to configure how
> >>>>>>>>>>>>>>>> `TimeWindowedKStream` should be outputted to `KTable`
> >> after
> >>>>>>>>>>>>>> aggregation.
> >>>>>>>>>>>>>>>> But `emitFinal` is not an action to the
> >>> `TimeWindowedKStream`
> >>>>>>>>>>>>>> interface.
> >>>>>>>>>>>>>>>> Maybe adding `configure(EmitConfig config)` makes more
> >>> sense?
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> For option 2, config can be created using
> >>>>>>>>>>> `WindowConfig.emitFinal()`
> >>>>>>>>>>>>> or
> >>>>>>>>>>>>>>>> `EmitConfig.emitFinal`
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> For option 3, it will be something like `TimeWindows(...,
> >>>>>>>>>>> EmitConfig
> >>>>>>>>>>>>>>>> emitConfig)`.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> For putting `EmitConfig` in aggregation operator, I think
> >> it
> >>>>>>>>>>> doesn't
> >>>>>>>>>>>>>>>> control how we do aggregation but how we output to
> >> `KTable`.
> >>>>>>>>>>> That's
> >>>>>>>>>>>>>> why I
> >>>>>>>>>>>>>>>> feel option 1 makes more sense as it applies to
> >>>>>>>>>>>>> `TimeWindowedKStream`.
> >>>>>>>>>>>>>>> But
> >>>>>>>>>>>>>>>> I'm also OK with option 2.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Hao
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> On Tue, Mar 15, 2022 at 4:48 PM Matthias J. Sax <
> >>>>>>>>> mj...@apache.org
> >>>>>>>>>>>>
> >>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Thanks for the KIP.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> A general comment: it seem that we won't need any new
> >>>>>>>>>>>>>> `allowedLateness`
> >>>>>>>>>>>>>>>>> parameter because the grace-period is defined on the
> >> window
> >>>>>>>>>>> itself
> >>>>>>>>>>>>>>> already?
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> (On the other hand, if I think about it once more, maybe
> >>> the
> >>>>>>>>>>>>>>>>> `grace-period` is actually not a property of the window
> >> but
> >>>>>>>> a
> >>>>>>>>>>>>> property
> >>>>>>>>>>>>>>>>> of the aggregation operator? _thinking_)
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>      From an API flow point of view, option 1 might not
> be
> >>>>>>>>> desirable
> >>>>>>>>>>>>>> IMHO:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>        stream
> >>>>>>>>>>>>>>>>>          .groupByKey()
> >>>>>>>>>>>>>>>>>          .windowBy(TimeWindow.ofSizeNoGrace(...))
> >>>>>>>>>>>>>>>>>          .emitFinal()
> >>>>>>>>>>>>>>>>>          .count()
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> The call to `emitFinal(0` seems not to be on the right
> >>> place
> >>>>>>>>> for
> >>>>>>>>>>>>> this
> >>>>>>>>>>>>>>> case?
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Option 2 might work (I think we need to discuss a few
> >>>>>>>> details
> >>>>>>>>> of
> >>>>>>>>>>> the
> >>>>>>>>>>>>>> API
> >>>>>>>>>>>>>>>>> though):
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>        stream
> >>>>>>>>>>>>>>>>>          .groupByKey()
> >>>>>>>>>>>>>>>>>          .windowBy(
> >>>>>>>>>>>>>>>>>            TimeWindow.ofSizeNoGrace(...),
> >>>>>>>>>>>>>>>>>            EmitConfig.emitFinal() -- just made this up;
> >> it's
> >>>>>>>> not
> >>>>>>>>> in
> >>>>>>>>>>> the
> >>>>>>>>>>>>>> KIP
> >>>>>>>>>>>>>>>>>          )
> >>>>>>>>>>>>>>>>>          .count()
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> I made up the `WindowConfig.emitFinal()` call -- from the
> >>>>>>>> KIP
> >>>>>>>>>>> it's
> >>>>>>>>>>>>>>>>> unclear what API you have in mind? `EmitFinalConfig` has
> >>> not
> >>>>>>>>>>> public
> >>>>>>>>>>>>>>>>> constructor not any builder method.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> For option 3, I am not sure what you really have in mind.
> >>>>>>>> Can
> >>>>>>>>> you
> >>>>>>>>>>>>>> given
> >>>>>>>>>>>>>>>>> a concrete example (similar to above) how users would
> >> write
> >>>>>>>>> their
> >>>>>>>>>>>>>> code?
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Did you consider to actually pass in the `EmitConfig`
> >> into
> >>>>>>>> the
> >>>>>>>>>>>>>>>>> aggregation operator? In the end, it seems not to be
> >>>>>>>> property
> >>>>>>>>> of
> >>>>>>>>>>> the
> >>>>>>>>>>>>>>>>> window definition or windowing step, but a property of
> >> the
> >>>>>>>>> actual
> >>>>>>>>>>>>>>> operator:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>        stream
> >>>>>>>>>>>>>>>>>          .groupByKey()
> >>>>>>>>>>>>>>>>>          .windowBy(
> >>>>>>>>>>>>>>>>>            TimeWindow.ofSizeNoGrace(...)
> >>>>>>>>>>>>>>>>>          )
> >>>>>>>>>>>>>>>>>          .count(EmitConfig.emitFinal())
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> The API surface area that need to be updated might be
> >>> larger
> >>>>>>>>> for
> >>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>> case though...
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> On 3/14/22 9:21 PM, Hao Li wrote:
> >>>>>>>>>>>>>>>>>> Thanks Guozhang!
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 1. I agree `EmitConfig` is better than `WindowConfig`
> >> and
> >>>>>>>>>>> option 2
> >>>>>>>>>>>>>>>>> modifies
> >>>>>>>>>>>>>>>>>> less places. What do you think of option 1 which doesn't
> >>>>>>>>> change
> >>>>>>>>>>> the
> >>>>>>>>>>>>>>>>> current
> >>>>>>>>>>>>>>>>>> `windowedBy` api but configures `EmitConfig` separately.
> >>>>>>>> The
> >>>>>>>>>>>>> benefit
> >>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>> option 1 is if we need to configure something else
> >> later,
> >>>>>>>> we
> >>>>>>>>>>> don't
> >>>>>>>>>>>>>> need
> >>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>> pile them on `windowedBy` but can add separate APIs.
> >>>>>>>>>>>>>>>>>> 2. I added it to `Stores` mainly to conform to
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>
> >>>
> >>
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java#L227-L231
> >>>>>>>>>>>>>>>>> .
> >>>>>>>>>>>>>>>>>> But We can also create an internal API to do that
> >> without
> >>>>>>>>>>> modifying
> >>>>>>>>>>>>>>>>>> `Stores`.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Hao
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> On Mon, Mar 14, 2022 at 7:52 PM Guozhang Wang <
> >>>>>>>>>>> wangg...@gmail.com>
> >>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Hello Hao,
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Thanks for the proposal, I have some preference among
> >> the
> >>>>>>>>>>> options
> >>>>>>>>>>>>>> here
> >>>>>>>>>>>>>>>>> so I
> >>>>>>>>>>>>>>>>>>> will copy them here:
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> I'm now thinking if it's better to not add this new
> >>> config
> >>>>>>>>> on
> >>>>>>>>>>> each
> >>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> Window interfaces, but instead add that at the
> >>>>>>>>>>>>>>> KGroupedStream#windowedBy
> >>>>>>>>>>>>>>>>>>> function. Also instead of adding just a boolean flag,
> >>>>>>>> maybe
> >>>>>>>>> we
> >>>>>>>>>>> can
> >>>>>>>>>>>>>>> add a
> >>>>>>>>>>>>>>>>>>> Configured class like Grouped, Suppressed, etc, e.g.
> >>> let's
> >>>>>>>>> call
> >>>>>>>>>>>>> it a
> >>>>>>>>>>>>>>>>>>> Emitted which for now would just have a single
> >> construct
> >>>>>>>> as
> >>>>>>>>>>>>>>>>>>> Emitted.atWindowClose whose semantics is the same as
> >>>>>>>>> emitFinal
> >>>>>>>>>>> ==
> >>>>>>>>>>>>>>> true.
> >>>>>>>>>>>>>>>>> I
> >>>>>>>>>>>>>>>>>>> think the benefits are:
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 1) you do not need to modify multiple Window classes,
> >> but
> >>>>>>>>> just
> >>>>>>>>>>>>>>> overload
> >>>>>>>>>>>>>>>>> one
> >>>>>>>>>>>>>>>>>>> windowedBy function with a second param. This is less
> >> of
> >>> a
> >>>>>>>>>>> scope
> >>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>> now,
> >>>>>>>>>>>>>>>>>>> and also more extensible for any future changes.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 2) With a config interface, we maintain its
> >> extensibility
> >>>>>>>> as
> >>>>>>>>>>> well
> >>>>>>>>>>>>> as
> >>>>>>>>>>>>>>>>> being
> >>>>>>>>>>>>>>>>>>> able to reuse this Emitted interface for other
> >> operators
> >>>>>>>> if
> >>>>>>>>> we
> >>>>>>>>>>>>>> wanted
> >>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>> expand to.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> ----------------------------
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> So in general I'm leaning towards option 2). For that,
> >>>>>>>> some
> >>>>>>>>>>> more
> >>>>>>>>>>>>>>>>> detailed
> >>>>>>>>>>>>>>>>>>> comments:
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 1) If we want to reuse that config object for other
> >>>>>>>>> non-window
> >>>>>>>>>>>>>>> stateful
> >>>>>>>>>>>>>>>>>>> operations, I think naming it as `EmitConfig` is
> >> probably
> >>>>>>>>>>> better
> >>>>>>>>>>>>>> than
> >>>>>>>>>>>>>>>>>>> `WindowConfig`.
> >>>>>>>>>>>>>>>>>>> 2) I saw your PR (
> >>>>>>>>> https://github.com/apache/kafka/pull/11892)
> >>>>>>>>>>>>> that
> >>>>>>>>>>>>>>> you
> >>>>>>>>>>>>>>>>> are
> >>>>>>>>>>>>>>>>>>> also proposing to add new stores into the public
> >> factory
> >>>>>>>>>>> Stores,
> >>>>>>>>>>>>> but
> >>>>>>>>>>>>>>>>> it's
> >>>>>>>>>>>>>>>>>>> not included in the KIP. Is that intentional?
> >> Personally
> >>> I
> >>>>>>>>>>> think
> >>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>> although we may eventually want to add a new store type
> >>> to
> >>>>>>>>> the
> >>>>>>>>>>>>>> public
> >>>>>>>>>>>>>>>>> APIs,
> >>>>>>>>>>>>>>>>>>> for this KIP maybe we do not have to add them but can
> >>>>>>>> delay
> >>>>>>>>> for
> >>>>>>>>>>>>>> later
> >>>>>>>>>>>>>>>>> after
> >>>>>>>>>>>>>>>>>>> we've learned the best way to layout. LMK what do you
> >>>>>>>> think?
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Guozhang
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> On Fri, Mar 11, 2022 at 2:13 PM Hao Li
> >>>>>>>>>>> <h...@confluent.io.invalid>
> >>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Hi Dev team,
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> I'd like to start a discussion thread on Kafka Streams
> >>>>>>>>>>> KIP-825:
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>
> >>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-825%3A+introduce+a+new+API+to+control+when+aggregated+results+are+produced
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> This KIP is aimed to add new APIs to support
> >> outputting
> >>>>>>>>> final
> >>>>>>>>>>>>>>>>> aggregated
> >>>>>>>>>>>>>>>>>>>> results for windowed aggregations. I listed several
> >>>>>>>> options
> >>>>>>>>>>> there
> >>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>> I'm
> >>>>>>>>>>>>>>>>>>>> looking forward to your feedback.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>>>>>> Hao
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>>>>>> -- Guozhang
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> --
> >>>>>>>>>>>>>> -- Guozhang
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> --
> >>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>> Hao
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> --
> >>>>>>>>>>>> -- Guozhang
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> --
> >>>>>>>>>> -- Guozhang
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> --
> >>>>>>>> -- Guozhang
> >>>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>>
> >>>
> >>>
> >>> --
> >>> -- Guozhang
> >>>
> >>
> >>
> >> --
> >> Thanks,
> >> Hao
> >>
> >
> >
>


-- 
Thanks,
Hao

Reply via email to