Hao is right, I think that's the hindsight we have for `suppress` which
since can be applied anywhere for a K(windowed)Table, incurs an awkward
programming flexibility and I felt it's better to make its application
scope more constraint.

And I also agree with John that, unless any of us feel strongly about any
options, Hao could make the final call about the namings.


Guozhang

On Wed, Mar 23, 2022 at 1:49 PM Hao Li <h...@confluent.io.invalid> wrote:

> For
>
> stream
>       .groupBy(..)
>       .windowedBy(..)
>       .aggregate(..)
>       .withEmitStrategy(EmitStrategy.ON_WINDOW_CLOSE)
>       .mapValues(..)
>
> I think after `aggregate` it's already a table and then the emit strategy
> is too late to control
> how windowed stream is outputted to table. This is the concern Guozhang
> raised about having this in existing `suppress` operator as well.
>
> Thanks,
> Hao
>
> On Wed, Mar 23, 2022 at 1:05 PM Bruno Cadonna <cado...@apache.org> wrote:
>
> > Hi,
> >
> > Thank you for your answers to my questions!
> >
> > I see the argument about conciseness of configuring a stream with
> > methods instead of config objects. I just miss a bit the descriptive
> > aspect.
> >
> > What about
> >
> > stream
> >       .groupBy(..)
> >       .windowedBy(..)
> >       .withEmitStrategy(EmitStrategy.ON_WINDOW_CLOSE)
> >       .aggregate(..)
> >       .mapValues(..)
> >
> > I have also another question. Why should emitting of results be
> > controlled by the window level api? If I want to emit results for each
> > input record the emit strategy is quite independent from the window. So
> > I somehow share Matthias' and Guozhang's concern that the emit strategy
> > seems misplaced there.
> >
> > What are the arguments against?
> >
> > stream
> >       .groupBy(..)
> >       .windowedBy(..)
> >       .aggregate(..)
> >       .withEmitStrategy(EmitStrategy.ON_WINDOW_CLOSE)
> >       .mapValues(..)
> >
> >
> > A final administrative request: Hao, could you please add the rejected
> > alternatives to the KIP so that future us will know why we rejected them?
> >
> > Best,
> > Bruno
> >
> > On 23.03.22 19:38, John Roesler wrote:
> > > Hi all,
> > >
> > > I can see both sides of this.
> > >
> > > On one hand, when we say
> > > "stream.groupBy().windowBy().count()", it seems like we're
> > > telling KS to take the raw stream, group it based on key,
> > > then window it based on time, and then compute an
> > > aggregation on the windows. In that model, "trigger()" would
> > > have to mean something like "trigger it", which doesn't
> > > really make sense, since we aren't "triggering" the
> > > aggregation (then again, to an outside observer, it would
> > > appear that way... food for thought).
> > >
> > > Another way to look at it is that all we're really doing is
> > > configuring a windowed aggreation on the stream, and we're
> > > doing it with a progressive builder interface. In other
> > > words, the above is just a progressive builder for
> > > configuring an operation like
> > > "stream.aggregate(groupingConfig, windowingConfig,
> > > countFn)". Under the latter interpretation of the DSL, it
> > > makes perfect sense to add more optional progressive builder
> > > methods like trigger() to the WindowedKStream interfaces.
> > >
> > > Since part of the motivation for choosing the word "trigger"
> > > here is to stay close to what Flink defines, I'll also point
> > > out that Flink's syntax is also
> > > "stream.keyBy().window().trigger().aggregate()". Not that
> > > their API is the holy grail or anything, but it's at least
> > > an indication that this API isn't a horrible mistake.
> > >
> > > All other things being equal, I also prefer to leave tie-
> > > breakers in the hands of the contributor. So, if we've all
> > > said our piece and Hao still prefers option 1, then (as long
> > > as we don't think it's a horrible mistake), I think we
> > > should just let him go for it.
> > >
> > > Speaking of which, after reviewing the responses regarding
> > > deprecating `Suppressed#onWindowClose`, I still think we
> > > should just go ahead and deprecate it. Although it's not
> > > expressed exactly the same way, it still does exactly the
> > > same thing, or so close that it seems confusing to keep
> > > both. But again, if Hao really prefers to keep both, I won't
> > > insist on it :)
> > >
> > > Thanks all,
> > > -John
> > >
> > > On Wed, 2022-03-23 at 09:59 -0700, Hao Li wrote:
> > >> Thanks Bruno!
> > >>
> > >> Argument for option 1 is:
> > >> 1. Concise and descriptive. It avoids overloading existing functions
> and
> > >> it's very clear what it's doing. Imagine if there's a autocomplete
> > feature
> > >> in Intellij or other IDE for our DSL in the future, it's not favorable
> > to
> > >> show 6 `windowedBy` functions.
> > >> 2. Option 1 is operated on `windowedStream` to configure how it should
> > be
> > >> outputted. Option 2 operates on `KGroupedStream` to produce
> > >> `windowedStream` as well as configure how `windowedStream` should be
> > >>      outputted. I feel it's better to have a `windowedStream` and then
> > >> configure how it can be outputted. Somehow I feel option 2 breaks the
> > >> builder pattern.
> > >> 3. `WindowedByParameters` doesn't seem very descriptive. If we put all
> > >> kinds of different parameters into it to avoid future overloading,
> it's
> > too
> > >> bloated and not very user friendly.
> > >>
> > >> I agree option 1's `trigger` function is configuring the stream which
> > feels
> > >> different from existing `count` or `aggregate` etc. Configuring might
> be
> > >> also a kind of action to stream :) I'm not sure if it breaks DSL
> > principle
> > >> and if it does,
> > >> can we relax the principle given the benefits compared to option 2)?
> > Maybe
> > >> John can chime in as the DSL grammar author.
> > >>
> > >> Thanks,
> > >> Hao
> > >>
> > >> On Wed, Mar 23, 2022 at 2:59 AM Bruno Cadonna <cado...@apache.org>
> > wrote:
> > >>
> > >>> Hi Hao,
> > >>>
> > >>> I agree with Guozhang: Great summary! Thank you!
> > >>>
> > >>> Regarding "aligned with other config class names", there is this DSL
> > >>> grammar John once specified
> > >>>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+DSL+Grammar
> > >>> and we have already used it in the code. I found the grammar quite
> > useful.
> > >>>
> > >>> I am undecided if option 1 is really worth it. What are actually the
> > >>> arguments in favor of it? Is it only that we do not need to overload
> > >>> other methods? This does not seem worth to break DSL principles. An
> > >>> alternative proposal would be to go with option 2 and conform with
> the
> > >>> grammar above:
> > >>>
> > >>> <W extends Window> TimeWindowedKStream<K, V> windowedBy(final
> > Windows<W>
> > >>> windows, WindowedByParameters parameters);
> > >>>
> > >>> TimeWindowedKStream<K, V> windowedBy(final SlidingWindows windows,
> > >>> WindowedByParameters parameters);
> > >>>
> > >>> SessionWindowedKStream<K, V> windowedBy(final SessionWindows windows,
> > >>> WindowedByParameters parameters);
> > >>>
> > >>> This is similar to option 2 in the KIP, but it ensures that we put
> all
> > >>> future needed configs in the parameters object and we do not need to
> > >>> overload the methods anymore.
> > >>>
> > >>> Then if we also get KAFKA-10298 done, we could even collapse all
> > >>> `windowedBy()` methods into one.
> > >>>
> > >>> Best,
> > >>> Bruno
> > >>>
> > >>> On 22.03.22 22:31, Guozhang Wang wrote:
> > >>>> Thanks for the great summary Hao. I'm still learning towards option
> 2)
> > >>>> here, and I'm in favor of `trigger` as function name, and
> `Triggered`
> > as
> > >>>> config class name (mainly to be aligned with other config class
> > names).
> > >>>> Also want to see other's preferences between the options, as well as
> > the
> > >>>> namings.
> > >>>>
> > >>>>
> > >>>> Guozhang
> > >>>>
> > >>>>
> > >>>>
> > >>>> On Tue, Mar 22, 2022 at 12:23 PM Hao Li <h...@confluent.io.invalid>
> > >>> wrote:
> > >>>>
> > >>>>> `windowedStream.onWindowClose()` was the original option 1
> > >>>>> (`windowedStream.emitFinal()`) but was rejected
> > >>>>> because we could add more emit types and this will result in adding
> > more
> > >>>>> functions. I still prefer the
> > >>>>> "windowedStream.someFunc(Controlled.onWindowClose)"
> > >>>>> model since it's flexible and clear that it's configuring the emit
> > >>> policy.
> > >>>>> Let me summarize all the naming options we have and compare:
> > >>>>>
> > >>>>> *API function name:*
> > >>>>>
> > >>>>> *1. `windowedStream.trigger()`*
> > >>>>>       Pros:
> > >>>>>          i. Simple
> > >>>>>          ii. Similar to Flink's trigger function (is this a con
> > >>> actually?)
> > >>>>>       Cons:
> > >>>>>          i. `trigger()` can be confused with Flink trigger (raised
> by
> > >>> John)
> > >>>>>          ii. `trigger()` feels like an operation instead of a
> > configure
> > >>>>> function (raised by Bruno)?
> > >>>>>
> > >>>>> *2. `windowedStream.emitTrigger()`*
> > >>>>>        Pros:
> > >>>>>          i. Avoid confusion from Flink's trigger API
> > >>>>>          ii. `emitTrigger` feels like configuring the trigger
> because
> > >>>>> "trigger" here is a noun instead of verbose in `trigger()`
> > >>>>>        Cons:
> > >>>>>        i: Verbose?
> > >>>>>       ii: Not consistent with `Suppressed.untilWindowClose`?
> > >>>>>
> > >>>>>
> > >>>>> *Config class/object name:*
> > >>>>>
> > >>>>> 1. *`Emitted.onWindowClose()`* and *`Emitted.onEachUpdate()`*
> > >>>>>        Cons:
> > >>>>>        i. Doesn't go along with `trigger` (raised by Bruno)
> > >>>>>
> > >>>>> 2. *`Triggered.onWindowClose()`* and *`Triggered.onEachUpdate()`*
> > >>>>>
> > >>>>> 3. *`EmitTrigger.onWindowClose()`* and
> *`EmitTrigger.onEachUpdate()`*
> > >>>>>
> > >>>>> 4. *`(Emit|Trigger)(Config|Policy).onWindowClose()`* and
> > >>>>> *`(Emit|Trigger)(Config|Policy).onEachUpdate()`*
> > >>>>>        This is a combination of different names like: `EmitConfig`,
> > >>>>> `EmitPolicy`, `TriggerConfig` and `TriggerPolicy`...
> > >>>>>
> > >>>>>
> > >>>>> If we are settled with option 1), we can add new options to these
> > names
> > >>> and
> > >>>>> comment on their Pros and Cons.
> > >>>>>
> > >>>>> Hao
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>> On Tue, Mar 22, 2022 at 10:48 AM Guozhang Wang <wangg...@gmail.com
> >
> > >>> wrote:
> > >>>>>
> > >>>>>> I see what you mean now, and I think it's a fair point that
> > composing
> > >>>>>> `trigger` and `emitted` seems awkward.
> > >>>>>>
> > >>>>>> Re: data process operator v.s. control operator, I shared your
> > concern
> > >>> as
> > >>>>>> well, and here's my train of thoughts: Having only data process
> > >>> operators
> > >>>>>> was my primary motivation for how we add the suppress operator ---
> > it
> > >>>>>> indeed "suppresses" data. But as a hind-sight it's disadvantage is
> > >>> that,
> > >>>>>> for example in Suppressed.onWindowClose() should be only related
> to
> > an
> > >>>>>> earlier windowedBy operator which is possibly very far from it in
> > the
> > >>>>>> resulting DSL code. It's not only a bit awkward for users to write
> > such
> > >>>>>> code, but also in such cases the DSL builder needs to maintain and
> > >>>>>> propagate this information to the suppress operator further down.
> > So we
> > >>>>> are
> > >>>>>> now thinking about "putting the control object as close as to
> where
> > the
> > >>>>>> related processor really happens". And in that world my original
> > >>>>>> preference was somewhere in option 2), i.e. just put the control
> as
> > a
> > >>>>> param
> > >>>>>> of the related "windowedBy" operator, but the trade-off is we keep
> > >>> adding
> > >>>>>> overloaded functions to these operators. So after some back and
> > forth
> > >>>>>> thoughts I'm learning towards relaxing our principles to only have
> > >>>>>> processing operators but no flow-control operators. That being
> > said, if
> > >>>>> you
> > >>>>>> have any ideas that we can have both world's benefits I'm all
> ears.
> > >>>>>>
> > >>>>>> Re: using a direct function like "windowedStream.onWindowClose()"
> > v.s.
> > >>>>>> "windowedStream.someFunc(Controlled.onWindowClose)", again my
> > >>> motivation
> > >>>>>> for the latter is for extensibility without adding more functions
> in
> > >>> the
> > >>>>>> future. If people feel this is not worthy we can do the first
> > option as
> > >>>>>> well. If we just feel the `trigger` and `emitted` does not feel
> > >>>>> composible
> > >>>>>> together, maybe we can consider something like
> > >>>>>> `windowedStream.trigger(Triggered.onWindowClose())"?
> > >>>>>>
> > >>>>>> Re: windowedBy v.s. windowBy, yeah I do not really have a good
> > reason
> > >>> why
> > >>>>>> we should use past term as well :P But if it's not bothering
> people
> > >>> much
> > >>>>>> I'd say we just keep it than deprecate/rename new APIs.
> > >>>>>>
> > >>>>>>
> > >>>>>> On Tue, Mar 22, 2022 at 9:42 AM Bruno Cadonna <cado...@apache.org
> >
> > >>>>> wrote:
> > >>>>>>
> > >>>>>>> Hi Guozhang,
> > >>>>>>>
> > >>>>>>> There is no semantic difference. It is a cosmetic difference.
> > >>>>>>> Conceptually, I relate `Emitted` with the aggregation and not
> with
> > >>>>>>> `trigger()` in the API flow, because the aggregation emits the
> > result
> > >>>>>>> not `trigger()`. Therefore, I proposed to not use `Emitted` as
> the
> > >>> name
> > >>>>>>> of the config object passed to `trigger()`.
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> Best,
> > >>>>>>> Bruno
> > >>>>>>>
> > >>>>>>> On 22.03.22 17:24, Guozhang Wang wrote:
> > >>>>>>>> Hi Bruno,
> > >>>>>>>>
> > >>>>>>>> Could you elaborate a bit more here, what's the semantic
> > difference
> > >>>>>>> between
> > >>>>>>>> "the aggregation is triggered on window close and all
> aggregation
> > >>>>>> results
> > >>>>>>>> are emitted." for trigger(TriggerParameters.onWindowClose()),
> and
> > >>>>> "the
> > >>>>>>>> aggregation is configured to only emit final results." for
> > >>>>>>>> trigger(Emitted.onWindowClose())?
> > >>>>>>>>
> > >>>>>>>> On Tue, Mar 22, 2022 at 4:19 AM Bruno Cadonna <
> cado...@apache.org
> > >
> > >>>>>>> wrote:
> > >>>>>>>>
> > >>>>>>>>> Hi Hao,
> > >>>>>>>>>
> > >>>>>>>>> Thank you for the KIP!
> > >>>>>>>>>
> > >>>>>>>>> Regarding option 1, I would not use `Emitted.onWindowClose()`
> > since
> > >>>>>> that
> > >>>>>>>>> does not seem compatible with the proposed flow. Conceptually,
> > now
> > >>>>> the
> > >>>>>>>>> flow states that the aggregation is triggered on window close
> and
> > >>>>> all
> > >>>>>>>>> aggregation results are emitted. `Emitted` suggests that the
> > >>>>>> aggregation
> > >>>>>>>>> is configured to only emit final results.
> > >>>>>>>>>
> > >>>>>>>>> Thus, I propose the following:
> > >>>>>>>>>
> > >>>>>>>>> stream
> > >>>>>>>>>         .groupBy(..)
> > >>>>>>>>>         .windowedBy(..)
> > >>>>>>>>>         .trigger(TriggerParameters.onWindowClose())
> > >>>>>>>>>         .aggregate(..) //result in a KTable<Windowed<..>>
> > >>>>>>>>>         .mapValues(..)
> > >>>>>>>>>
> > >>>>>>>>> An alternative to `trigger()` could be `schedule()`, but I do
> not
> > >>>>>> really
> > >>>>>>>>> like it.
> > >>>>>>>>>
> > >>>>>>>>> One thing I noticed with option 1 is that all other methods in
> > the
> > >>>>>>>>> example above are operations on data. `groupBy()` groups,
> > >>>>>> `windowedBy()`
> > >>>>>>>>> partitions, `aggregate()` computes the aggregate, `mapValues()`
> > maps
> > >>>>>>>>> values, even `suppress()` suppresses intermediate results. But
> > what
> > >>>>>> does
> > >>>>>>>>> `trigger()` do? `trigger()` seems a config lost among
> operations.
> > >>>>>>>>>
> > >>>>>>>>> However, if we do not want to restrict ourselves to only use
> > methods
> > >>>>>>>>> when we want to specify operations on data, I have the
> following
> > >>>>>>> proposal:
> > >>>>>>>>>
> > >>>>>>>>> stream
> > >>>>>>>>>         .groupBy(..)
> > >>>>>>>>>         .windowedBy(..)
> > >>>>>>>>>         .onWindowClose()
> > >>>>>>>>>         .aggregate(..) //result in a KTable<Windowed<..>>
> > >>>>>>>>>         .mapValues(..)
> > >>>>>>>>>
> > >>>>>>>>> Best,
> > >>>>>>>>> Bruno
> > >>>>>>>>>
> > >>>>>>>>> P.S.: Why is it `windowedBy()` and not `windowBy()`? All other
> > >>>>>>>>> operations also use present tense.
> > >>>>>>>>>
> > >>>>>>>>> On 22.03.22 06:36, Hao Li wrote:
> > >>>>>>>>>> Hi John,
> > >>>>>>>>>>
> > >>>>>>>>>> Yes. For naming, `trigger` is similar to Flink's trigger, but
> it
> > >>>>> has
> > >>>>>> a
> > >>>>>>>>>> different meaning in our case. `emit` sounds like an action to
> > >>>>> emit?
> > >>>>>>> How
> > >>>>>>>>>> about `emitTrigger`? I'm open to suggestions for the naming.
> > >>>>>>>>>>
> > >>>>>>>>>> For deprecating `Suppressed.untilWindowClose`, I agree with
> > >>>>> Guozhang
> > >>>>>> we
> > >>>>>>>>> can
> > >>>>>>>>>> deprecate `Suppressed` config as a whole later. Or we can
> > deprecate
> > >>>>>>>>>> `Suppressed.untilWindowClose` in later KIP after
> implementation
> > of
> > >>>>>> emit
> > >>>>>>>>>> final is done.
> > >>>>>>>>>>
> > >>>>>>>>>> BTW, isn't
> > >>>>>>>>>>
> > >>>>>>>>>> stream
> > >>>>>>>>>>       .groupBy(..)
> > >>>>>>>>>>       .windowBy(..)
> > >>>>>>>>>>       .aggregate(..) //result in a KTable<Windowed<..>>
> > >>>>>>>>>>       .mapValues(..)
> > >>>>>>>>>>       .suppress(Suppressed.untilWindowClose) // since we can
> > trace
> > >>>>> back
> > >>>>>>> to
> > >>>>>>>>>> parent node, to find a window definition
> > >>>>>>>>>>
> > >>>>>>>>>> same as
> > >>>>>>>>>>
> > >>>>>>>>>> stream
> > >>>>>>>>>>       .groupBy(..)
> > >>>>>>>>>>       .windowBy(..)
> > >>>>>>>>>>       .trigger(Emitted.onWindowClose)
> > >>>>>>>>>>       .aggregate(..) //result in a KTable<Windowed<..>>
> > >>>>>>>>>>       .mapValues(..)
> > >>>>>>>>>> ?
> > >>>>>>>>>>
> > >>>>>>>>>> Hao
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> On Mon, Mar 21, 2022 at 7:28 PM Guozhang Wang <
> > wangg...@gmail.com>
> > >>>>>>>>> wrote:
> > >>>>>>>>>>
> > >>>>>>>>>>> I think the following case is only doable via `suppress`:
> > >>>>>>>>>>>
> > >>>>>>>>>>> stream
> > >>>>>>>>>>>       .groupBy(..)
> > >>>>>>>>>>>       .windowBy(..)
> > >>>>>>>>>>>       .aggregate(..) //result in a KTable<Windowed<..>>
> > >>>>>>>>>>>       .mapValues(..)
> > >>>>>>>>>>>       .suppress(Suppressed.untilWindowClose) // since we can
> > trace
> > >>>>>> back
> > >>>>>>> to
> > >>>>>>>>>>> parent node, to find a window definition
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> Guozhang
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> On Mon, Mar 21, 2022 at 6:36 PM John Roesler <
> > vvcep...@apache.org
> > >>>>>>
> > >>>>>>>>> wrote:
> > >>>>>>>>>>>
> > >>>>>>>>>>>> Thanks, Guozhang!
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> To clarify, I was asking specifically about deprecating just
> > the
> > >>>>>>> method
> > >>>>>>>>>>>> ‘untilWindowClose’. I might not be thinking clearly about
> it,
> > >>>>>> though.
> > >>>>>>>>>>> What
> > >>>>>>>>>>>> does untilWindowClose do that this KIP doesn’t cover?
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Thanks,
> > >>>>>>>>>>>> John
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> On Mon, Mar 21, 2022, at 20:31, Guozhang Wang wrote:
> > >>>>>>>>>>>>> Just my 2c: Suppressed is in `suppress` whose application
> > scope
> > >>>>> is
> > >>>>>>>>> much
> > >>>>>>>>>>>>> larger and hence more flexible. I.e. it can be used
> anywhere
> > >>>>> for a
> > >>>>>>>>>>>> `KTable`
> > >>>>>>>>>>>>> (but internally we would check whether certain emit
> policies
> > >>>>> like
> > >>>>>>>>>>>>> `untilWindowClose` is valid or not), whereas `trigger` as
> for
> > >>>>> now
> > >>>>>> is
> > >>>>>>>>>>> only
> > >>>>>>>>>>>>> applicable in XXWindowedKStream. So I think it would not be
> > >>>>>>> completely
> > >>>>>>>>>>>>> replacing Suppressed.untilWindowClose.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> In the future, personally I'd still want to keep one
> control
> > >>>>>> object
> > >>>>>>>>>>> still
> > >>>>>>>>>>>>> for all emit policies, and maybe if we have extended
> Emitted
> > for
> > >>>>>>> other
> > >>>>>>>>>>>>> emitting policies covered by Suppressed today, we can
> > discuss if
> > >>>>>> we
> > >>>>>>>>>>> could
> > >>>>>>>>>>>>> have `KTable.suppress(Emitted..)` replacing
> > >>>>>>>>>>>> `KTable.suppress(Suppressed..)`
> > >>>>>>>>>>>>> as a whole, but for this KIP I think it's too early.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Guozhang
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> On Mon, Mar 21, 2022 at 6:18 PM John Roesler <
> > >>>>> vvcep...@apache.org
> > >>>>>>>
> > >>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Hi all,
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Thanks for the Kip, Hao!
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> For what it’s worth, I’m also in favor of your latest
> > framing
> > >>>>> of
> > >>>>>>> the
> > >>>>>>>>>>>> API,
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> I think the name is fine. I assume it’s inspired by Flink?
> > It’s
> > >>>>>> not
> > >>>>>>>>>>>>>> identical to the concept of a trigger in Flink, which
> > specifies
> > >>>>>>> when
> > >>>>>>>>>>> to
> > >>>>>>>>>>>>>> evaluate the window, which might be confusing to some
> people
> > >>>>> who
> > >>>>>>> have
> > >>>>>>>>>>>> deep
> > >>>>>>>>>>>>>> experience with Flink. Then again, it seems close enough
> > that
> > >>>>> it
> > >>>>>>>>>>> should
> > >>>>>>>>>>>> be
> > >>>>>>>>>>>>>> clear to casual Flink users. For people with no other
> stream
> > >>>>>>>>>>> processing
> > >>>>>>>>>>>>>> experience, it might seem a bit esoteric compared to
> > something
> > >>>>>>>>>>>>>> self-documenting like ‘emit()’, but the docs should  make
> it
> > >>>>>> clear.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> One small question: it seems like this proposal is
> > identical to
> > >>>>>>>>>>>>>> Suppressed.untilWindowClose, and the KIP states that this
> > API
> > >>>>> is
> > >>>>>>>>>>>> superior.
> > >>>>>>>>>>>>>> In that case, should we deprecate
> > Suppressed.untilWindowClose?
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Thanks,
> > >>>>>>>>>>>>>> John
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> On Mon, Mar 21, 2022, at 19:30, Guozhang Wang wrote:
> > >>>>>>>>>>>>>>> Hi Hao,
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> For 2), I think it's a good idea in general to use a
> > separate
> > >>>>>>>>>>>> function on
> > >>>>>>>>>>>>>>> the Time/SessionWindowedKStream itself, to achieve the
> same
> > >>>>>> effect
> > >>>>>>>>>>>> that,
> > >>>>>>>>>>>>>>> for now, the emitting control is only for windowed
> > >>>>> aggregations
> > >>>>>> as
> > >>>>>>>>>>> in
> > >>>>>>>>>>>>>> this
> > >>>>>>>>>>>>>>> KIP, than overloading existing functions. We can discuss
> > >>>>> further
> > >>>>>>>>>>> about
> > >>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>> actual function names, whether others like the name
> > `trigger`
> > >>>>> or
> > >>>>>>>>>>> not.
> > >>>>>>>>>>>> As
> > >>>>>>>>>>>>>>> for myself I feel `trigger` is a good one but I'd like to
> > see
> > >>>>> if
> > >>>>>>>>>>>> others
> > >>>>>>>>>>>>>>> have opinions as well.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Guozhang
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> On Mon, Mar 21, 2022 at 5:18 PM Hao Li
> > >>>>> <h...@confluent.io.invalid
> > >>>>>>>
> > >>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Hi Guozhang,
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Thanks for the feedback.
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> 1. I agree to have an `Emitted` control class and two
> > static
> > >>>>>>>>>>>>>> constructors
> > >>>>>>>>>>>>>>>> named `onWindowClose` and `onEachUpdate`.
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> 2. For the API function changes, I'm thinking of adding
> a
> > new
> > >>>>>>>>>>>> function
> > >>>>>>>>>>>>>>>> called `trigger` to `TimeWindowedKStream` and
> > >>>>>>>>>>>> `SessionWindowedKStream`.
> > >>>>>>>>>>>>>> It
> > >>>>>>>>>>>>>>>> takes `Emitted` config and returns the same stream.
> > Example:
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> stream
> > >>>>>>>>>>>>>>>>       .groupBy(...)
> > >>>>>>>>>>>>>>>>       .windowedBy(...)
> > >>>>>>>>>>>>>>>>       .trigger(Emitted.onWindowClose). // N
> > >>>>>>>>>>>>>>>>       .count()
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> The benefits are:
> > >>>>>>>>>>>>>>>>       1. It's simple and avoids creating overloading of
> > >>>>> existing
> > >>>>>>>>>>>> functions
> > >>>>>>>>>>>>>> like
> > >>>>>>>>>>>>>>>> `windowedBy` or `count`, `reduce` or `aggregate`. In
> > fact, to
> > >>>>>> add
> > >>>>>>>>>>> it
> > >>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>> `aggregate` functions, we need to add it to all existing
> > >>>>>> `count`,
> > >>>>>>>>>>>>>>>> `aggregate` overloading functions which is a lot.
> > >>>>>>>>>>>>>>>>       2. It operates directly on windowed kstream and
> > tells
> > >>> how
> > >>>>>> its
> > >>>>>>>>>>>> output
> > >>>>>>>>>>>>>>>> should be configured, if later we need to add this other
> > type
> > >>>>>> of
> > >>>>>>>>>>>>>> streams,
> > >>>>>>>>>>>>>>>> we can reuse same `trigger` API whereas other type of
> > >>>>>>>>>>> streams/tables
> > >>>>>>>>>>>> may
> > >>>>>>>>>>>>>>>> not have `aggregate`, `windowedby` api to make it
> > consistent.
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Hao
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> On Sat, Mar 19, 2022 at 5:40 PM Guozhang Wang <
> > >>>>>>> wangg...@gmail.com>
> > >>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> Hello Hao,
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> I'm preferring option 2 over the other options mainly
> > >>>>> because
> > >>>>>>> the
> > >>>>>>>>>>>>>> added
> > >>>>>>>>>>>>>>>>> config object could potentially be used in other
> > operators
> > >>>>> as
> > >>>>>>>>>>> well
> > >>>>>>>>>>>>>> (not
> > >>>>>>>>>>>>>>>>> necessarily has to be a windowed operator and hence
> have
> > to
> > >>>>> be
> > >>>>>>>>>>>>>>>> piggy-backed
> > >>>>>>>>>>>>>>>>> on `windowedBy`, and that's also why I suggested not
> > naming
> > >>>>> it
> > >>>>>>>>>>>>>>>>> `WindowConfig` but just `EmitConfig`).
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> As for Matthias' question, I think the difference
> between
> > >>>>> the
> > >>>>>>>>>>>> windowed
> > >>>>>>>>>>>>>>>>> aggregate operator and the stream-stream join operator
> is
> > >>>>>> that,
> > >>>>>>>>>>> for
> > >>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>> latter we think emit-final should be the only right
> > emitting
> > >>>>>>>>>>> policy
> > >>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>> hence we should not let users to configure it. If users
> > >>>>>>> configure
> > >>>>>>>>>>>> it
> > >>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>> e.g. emit eager they may get the old spurious emitting
> > >>>>>> behavior
> > >>>>>>>>>>>> which
> > >>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>> violating the semantics.
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> For option 2) itself, I have a few more thoughts:
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> 1. Thinking about Matthias' suggestions, I'm also
> > leaning a
> > >>>>>> bit
> > >>>>>>>>>>>>>>>>> towards adding the new param in the overloaded
> > `aggregate`,
> > >>>>>> than
> > >>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>> overloaded `windowBy` function. The reason is that the
> > >>>>>> emitting
> > >>>>>>>>>>>> logic
> > >>>>>>>>>>>>>>>> could
> > >>>>>>>>>>>>>>>>> be either window based or non-window based, in the long
> > run.
> > >>>>>>>>>>> Though
> > >>>>>>>>>>>>>> for
> > >>>>>>>>>>>>>>>>> this KIP we could just add it in
> > >>>>>>>>>>> `XXXWindowedKStream.aggregate()`,
> > >>>>>>>>>>>> we
> > >>>>>>>>>>>>>> may
> > >>>>>>>>>>>>>>>>> want to extend to other non-windowed operators in the
> > >>>>> future.
> > >>>>>>>>>>>>>>>>> 2. To be consistent with other control class names, I
> > feel
> > >>>>>> maybe
> > >>>>>>>>>>> we
> > >>>>>>>>>>>>>> can
> > >>>>>>>>>>>>>>>>> name it "Emitted", not "EmitConfig".
> > >>>>>>>>>>>>>>>>> 3. Following the first comment, I think we can have the
> > >>>>> static
> > >>>>>>>>>>>>>>>> constructor
> > >>>>>>>>>>>>>>>>> names as "onWindowClose" and "onEachUpdate".
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> The resulted code pattern would be like this:
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>        stream
> > >>>>>>>>>>>>>>>>>          .groupBy(..)
> > >>>>>>>>>>>>>>>>>          .windowBy(TimeWindow..)
> > >>>>>>>>>>>>>>>>>          .count(Emitted.onWindowClose)
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> WDYT?
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> On Wed, Mar 16, 2022 at 12:07 PM Matthias J. Sax <
> > >>>>>>>>>>> mj...@apache.org
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> `allowedLateness` may not be a good name. What I
> have
> > in
> > >>>>>>>>>>> mind
> > >>>>>>>>>>>> is
> > >>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>> use
> > >>>>>>>>>>>>>>>>>>>> this to control how frequently we try to emit final
> > >>>>>> results.
> > >>>>>>>>>>>>>> Maybe
> > >>>>>>>>>>>>>>>>> it's
> > >>>>>>>>>>>>>>>>>>>> more flexible to be used as config in properties as
> we
> > >>>>>> don't
> > >>>>>>>>>>>>>> need to
> > >>>>>>>>>>>>>>>>>>>> recompile DSL to change it.
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> I see; making it a config seems better. Frankly, I am
> > not
> > >>>>>> even
> > >>>>>>>>>>>> sure
> > >>>>>>>>>>>>>> if
> > >>>>>>>>>>>>>>>>>> we need a config at all or if we can just hard code
> it?
> > For
> > >>>>>> the
> > >>>>>>>>>>>>>>>>>> stream-stream join left/outer join fix, there is only
> an
> > >>>>>>>>>>> internal
> > >>>>>>>>>>>>>>>> config
> > >>>>>>>>>>>>>>>>>> but no public config either.
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> Option 1: Your proposal is?
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>        stream
> > >>>>>>>>>>>>>>>>>>          .groupByKey()
> > >>>>>>>>>>>>>>>>>>          .windowBy(TimeWindow.ofSizeNoGrace(...))
> > >>>>>>>>>>>>>>>>>>          .configure(EmitConfig.emitFinal()
> > >>>>>>>>>>>>>>>>>>          .count()
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> Does not change my argument that it seems to be
> misplace
> > >>>>> from
> > >>>>>>>>>>> an
> > >>>>>>>>>>>> API
> > >>>>>>>>>>>>>>>>>> flow POV.
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> Option 1 seems to be the least desirable to me.
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> For option 2 and 3, and not sure which one I like
> > better.
> > >>>>>> Might
> > >>>>>>>>>>>> be
> > >>>>>>>>>>>>>> good
> > >>>>>>>>>>>>>>>>>> if other could chime in, too. I think I slightly
> prefer
> > >>>>>> option
> > >>>>>>>>>>> 2
> > >>>>>>>>>>>>>> over
> > >>>>>>>>>>>>>>>>>> option 3.
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> -Matthias
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> On 3/15/22 5:33 PM, Hao Li wrote:
> > >>>>>>>>>>>>>>>>>>> Thanks for the feedback Matthias.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> `allowedLateness` may not be a good name. What I have
> > in
> > >>>>>> mind
> > >>>>>>>>>>>> is
> > >>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>> use
> > >>>>>>>>>>>>>>>>>>> this to control how frequently we try to emit final
> > >>>>> results.
> > >>>>>>>>>>>> Maybe
> > >>>>>>>>>>>>>>>> it's
> > >>>>>>>>>>>>>>>>>>> more flexible to be used as config in properties as
> we
> > >>>>> don't
> > >>>>>>>>>>>> need
> > >>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>> recompile DSL to change it.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> For option 1, I intend to use `emitFinal` to
> configure
> > how
> > >>>>>>>>>>>>>>>>>>> `TimeWindowedKStream` should be outputted to `KTable`
> > >>>>> after
> > >>>>>>>>>>>>>>>>> aggregation.
> > >>>>>>>>>>>>>>>>>>> But `emitFinal` is not an action to the
> > >>>>>> `TimeWindowedKStream`
> > >>>>>>>>>>>>>>>>> interface.
> > >>>>>>>>>>>>>>>>>>> Maybe adding `configure(EmitConfig config)` makes
> more
> > >>>>>> sense?
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> For option 2, config can be created using
> > >>>>>>>>>>>>>> `WindowConfig.emitFinal()`
> > >>>>>>>>>>>>>>>> or
> > >>>>>>>>>>>>>>>>>>> `EmitConfig.emitFinal`
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> For option 3, it will be something like
> > `TimeWindows(...,
> > >>>>>>>>>>>>>> EmitConfig
> > >>>>>>>>>>>>>>>>>>> emitConfig)`.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> For putting `EmitConfig` in aggregation operator, I
> > think
> > >>>>> it
> > >>>>>>>>>>>>>> doesn't
> > >>>>>>>>>>>>>>>>>>> control how we do aggregation but how we output to
> > >>>>> `KTable`.
> > >>>>>>>>>>>>>> That's
> > >>>>>>>>>>>>>>>>> why I
> > >>>>>>>>>>>>>>>>>>> feel option 1 makes more sense as it applies to
> > >>>>>>>>>>>>>>>> `TimeWindowedKStream`.
> > >>>>>>>>>>>>>>>>>> But
> > >>>>>>>>>>>>>>>>>>> I'm also OK with option 2.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Hao
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> On Tue, Mar 15, 2022 at 4:48 PM Matthias J. Sax <
> > >>>>>>>>>>>> mj...@apache.org
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> Thanks for the KIP.
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> A general comment: it seem that we won't need any
> new
> > >>>>>>>>>>>>>>>>> `allowedLateness`
> > >>>>>>>>>>>>>>>>>>>> parameter because the grace-period is defined on the
> > >>>>> window
> > >>>>>>>>>>>>>> itself
> > >>>>>>>>>>>>>>>>>> already?
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> (On the other hand, if I think about it once more,
> > maybe
> > >>>>>> the
> > >>>>>>>>>>>>>>>>>>>> `grace-period` is actually not a property of the
> > window
> > >>>>> but
> > >>>>>>>>>>> a
> > >>>>>>>>>>>>>>>> property
> > >>>>>>>>>>>>>>>>>>>> of the aggregation operator? _thinking_)
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>       From an API flow point of view, option 1 might
> > not
> > >>> be
> > >>>>>>>>>>>> desirable
> > >>>>>>>>>>>>>>>>> IMHO:
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>         stream
> > >>>>>>>>>>>>>>>>>>>>           .groupByKey()
> > >>>>>>>>>>>>>>>>>>>>           .windowBy(TimeWindow.ofSizeNoGrace(...))
> > >>>>>>>>>>>>>>>>>>>>           .emitFinal()
> > >>>>>>>>>>>>>>>>>>>>           .count()
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> The call to `emitFinal(0` seems not to be on the
> right
> > >>>>>> place
> > >>>>>>>>>>>> for
> > >>>>>>>>>>>>>>>> this
> > >>>>>>>>>>>>>>>>>> case?
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> Option 2 might work (I think we need to discuss a
> few
> > >>>>>>>>>>> details
> > >>>>>>>>>>>> of
> > >>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>> API
> > >>>>>>>>>>>>>>>>>>>> though):
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>         stream
> > >>>>>>>>>>>>>>>>>>>>           .groupByKey()
> > >>>>>>>>>>>>>>>>>>>>           .windowBy(
> > >>>>>>>>>>>>>>>>>>>>             TimeWindow.ofSizeNoGrace(...),
> > >>>>>>>>>>>>>>>>>>>>             EmitConfig.emitFinal() -- just made this
> > up;
> > >>>>> it's
> > >>>>>>>>>>> not
> > >>>>>>>>>>>> in
> > >>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>> KIP
> > >>>>>>>>>>>>>>>>>>>>           )
> > >>>>>>>>>>>>>>>>>>>>           .count()
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> I made up the `WindowConfig.emitFinal()` call --
> from
> > the
> > >>>>>>>>>>> KIP
> > >>>>>>>>>>>>>> it's
> > >>>>>>>>>>>>>>>>>>>> unclear what API you have in mind? `EmitFinalConfig`
> > has
> > >>>>>> not
> > >>>>>>>>>>>>>> public
> > >>>>>>>>>>>>>>>>>>>> constructor not any builder method.
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> For option 3, I am not sure what you really have in
> > mind.
> > >>>>>>>>>>> Can
> > >>>>>>>>>>>> you
> > >>>>>>>>>>>>>>>>> given
> > >>>>>>>>>>>>>>>>>>>> a concrete example (similar to above) how users
> would
> > >>>>> write
> > >>>>>>>>>>>> their
> > >>>>>>>>>>>>>>>>> code?
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> Did you consider to actually pass in the
> `EmitConfig`
> > >>>>> into
> > >>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>> aggregation operator? In the end, it seems not to be
> > >>>>>>>>>>> property
> > >>>>>>>>>>>> of
> > >>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>> window definition or windowing step, but a property
> of
> > >>>>> the
> > >>>>>>>>>>>> actual
> > >>>>>>>>>>>>>>>>>> operator:
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>         stream
> > >>>>>>>>>>>>>>>>>>>>           .groupByKey()
> > >>>>>>>>>>>>>>>>>>>>           .windowBy(
> > >>>>>>>>>>>>>>>>>>>>             TimeWindow.ofSizeNoGrace(...)
> > >>>>>>>>>>>>>>>>>>>>           )
> > >>>>>>>>>>>>>>>>>>>>           .count(EmitConfig.emitFinal())
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> The API surface area that need to be updated might
> be
> > >>>>>> larger
> > >>>>>>>>>>>> for
> > >>>>>>>>>>>>>>>> this
> > >>>>>>>>>>>>>>>>>>>> case though...
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> -Matthias
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>> On 3/14/22 9:21 PM, Hao Li wrote:
> > >>>>>>>>>>>>>>>>>>>>> Thanks Guozhang!
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> 1. I agree `EmitConfig` is better than
> `WindowConfig`
> > >>>>> and
> > >>>>>>>>>>>>>> option 2
> > >>>>>>>>>>>>>>>>>>>> modifies
> > >>>>>>>>>>>>>>>>>>>>> less places. What do you think of option 1 which
> > doesn't
> > >>>>>>>>>>>> change
> > >>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>> current
> > >>>>>>>>>>>>>>>>>>>>> `windowedBy` api but configures `EmitConfig`
> > separately.
> > >>>>>>>>>>> The
> > >>>>>>>>>>>>>>>> benefit
> > >>>>>>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>>>>> option 1 is if we need to configure something else
> > >>>>> later,
> > >>>>>>>>>>> we
> > >>>>>>>>>>>>>> don't
> > >>>>>>>>>>>>>>>>> need
> > >>>>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>> pile them on `windowedBy` but can add separate
> APIs.
> > >>>>>>>>>>>>>>>>>>>>> 2. I added it to `Stores` mainly to conform to
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>
> >
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java#L227-L231
> > >>>>>>>>>>>>>>>>>>>> .
> > >>>>>>>>>>>>>>>>>>>>> But We can also create an internal API to do that
> > >>>>> without
> > >>>>>>>>>>>>>> modifying
> > >>>>>>>>>>>>>>>>>>>>> `Stores`.
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> Hao
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>> On Mon, Mar 14, 2022 at 7:52 PM Guozhang Wang <
> > >>>>>>>>>>>>>> wangg...@gmail.com>
> > >>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> Hello Hao,
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> Thanks for the proposal, I have some preference
> > among
> > >>>>> the
> > >>>>>>>>>>>>>> options
> > >>>>>>>>>>>>>>>>> here
> > >>>>>>>>>>>>>>>>>>>> so I
> > >>>>>>>>>>>>>>>>>>>>>> will copy them here:
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> I'm now thinking if it's better to not add this
> new
> > >>>>>> config
> > >>>>>>>>>>>> on
> > >>>>>>>>>>>>>> each
> > >>>>>>>>>>>>>>>>> of
> > >>>>>>>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>>>>>> Window interfaces, but instead add that at the
> > >>>>>>>>>>>>>>>>>> KGroupedStream#windowedBy
> > >>>>>>>>>>>>>>>>>>>>>> function. Also instead of adding just a boolean
> > flag,
> > >>>>>>>>>>> maybe
> > >>>>>>>>>>>> we
> > >>>>>>>>>>>>>> can
> > >>>>>>>>>>>>>>>>>> add a
> > >>>>>>>>>>>>>>>>>>>>>> Configured class like Grouped, Suppressed, etc,
> e.g.
> > >>>>>> let's
> > >>>>>>>>>>>> call
> > >>>>>>>>>>>>>>>> it a
> > >>>>>>>>>>>>>>>>>>>>>> Emitted which for now would just have a single
> > >>>>> construct
> > >>>>>>>>>>> as
> > >>>>>>>>>>>>>>>>>>>>>> Emitted.atWindowClose whose semantics is the same
> as
> > >>>>>>>>>>>> emitFinal
> > >>>>>>>>>>>>>> ==
> > >>>>>>>>>>>>>>>>>> true.
> > >>>>>>>>>>>>>>>>>>>> I
> > >>>>>>>>>>>>>>>>>>>>>> think the benefits are:
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> 1) you do not need to modify multiple Window
> > classes,
> > >>>>> but
> > >>>>>>>>>>>> just
> > >>>>>>>>>>>>>>>>>> overload
> > >>>>>>>>>>>>>>>>>>>> one
> > >>>>>>>>>>>>>>>>>>>>>> windowedBy function with a second param. This is
> > less
> > >>>>> of
> > >>>>>> a
> > >>>>>>>>>>>>>> scope
> > >>>>>>>>>>>>>>>> for
> > >>>>>>>>>>>>>>>>>>>> now,
> > >>>>>>>>>>>>>>>>>>>>>> and also more extensible for any future changes.
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> 2) With a config interface, we maintain its
> > >>>>> extensibility
> > >>>>>>>>>>> as
> > >>>>>>>>>>>>>> well
> > >>>>>>>>>>>>>>>> as
> > >>>>>>>>>>>>>>>>>>>> being
> > >>>>>>>>>>>>>>>>>>>>>> able to reuse this Emitted interface for other
> > >>>>> operators
> > >>>>>>>>>>> if
> > >>>>>>>>>>>> we
> > >>>>>>>>>>>>>>>>> wanted
> > >>>>>>>>>>>>>>>>>> to
> > >>>>>>>>>>>>>>>>>>>>>> expand to.
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> ----------------------------
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> So in general I'm leaning towards option 2). For
> > that,
> > >>>>>>>>>>> some
> > >>>>>>>>>>>>>> more
> > >>>>>>>>>>>>>>>>>>>> detailed
> > >>>>>>>>>>>>>>>>>>>>>> comments:
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> 1) If we want to reuse that config object for
> other
> > >>>>>>>>>>>> non-window
> > >>>>>>>>>>>>>>>>>> stateful
> > >>>>>>>>>>>>>>>>>>>>>> operations, I think naming it as `EmitConfig` is
> > >>>>> probably
> > >>>>>>>>>>>>>> better
> > >>>>>>>>>>>>>>>>> than
> > >>>>>>>>>>>>>>>>>>>>>> `WindowConfig`.
> > >>>>>>>>>>>>>>>>>>>>>> 2) I saw your PR (
> > >>>>>>>>>>>> https://github.com/apache/kafka/pull/11892)
> > >>>>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>> you
> > >>>>>>>>>>>>>>>>>>>> are
> > >>>>>>>>>>>>>>>>>>>>>> also proposing to add new stores into the public
> > >>>>> factory
> > >>>>>>>>>>>>>> Stores,
> > >>>>>>>>>>>>>>>> but
> > >>>>>>>>>>>>>>>>>>>> it's
> > >>>>>>>>>>>>>>>>>>>>>> not included in the KIP. Is that intentional?
> > >>>>> Personally
> > >>>>>> I
> > >>>>>>>>>>>>>> think
> > >>>>>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>>>>>>> although we may eventually want to add a new store
> > type
> > >>>>>> to
> > >>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>> public
> > >>>>>>>>>>>>>>>>>>>> APIs,
> > >>>>>>>>>>>>>>>>>>>>>> for this KIP maybe we do not have to add them but
> > can
> > >>>>>>>>>>> delay
> > >>>>>>>>>>>> for
> > >>>>>>>>>>>>>>>>> later
> > >>>>>>>>>>>>>>>>>>>> after
> > >>>>>>>>>>>>>>>>>>>>>> we've learned the best way to layout. LMK what do
> > you
> > >>>>>>>>>>> think?
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> Guozhang
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> On Fri, Mar 11, 2022 at 2:13 PM Hao Li
> > >>>>>>>>>>>>>> <h...@confluent.io.invalid>
> > >>>>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> Hi Dev team,
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> I'd like to start a discussion thread on Kafka
> > Streams
> > >>>>>>>>>>>>>> KIP-825:
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-825%3A+introduce+a+new+API+to+control+when+aggregated+results+are+produced
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> This KIP is aimed to add new APIs to support
> > >>>>> outputting
> > >>>>>>>>>>>> final
> > >>>>>>>>>>>>>>>>>>>> aggregated
> > >>>>>>>>>>>>>>>>>>>>>>> results for windowed aggregations. I listed
> several
> > >>>>>>>>>>> options
> > >>>>>>>>>>>>>> there
> > >>>>>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>>>> I'm
> > >>>>>>>>>>>>>>>>>>>>>>> looking forward to your feedback.
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>> Thanks,
> > >>>>>>>>>>>>>>>>>>>>>>> Hao
> > >>>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>> --
> > >>>>>>>>>>>>>>>>>>>>>> -- Guozhang
> > >>>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> --
> > >>>>>>>>>>>>>>>>> -- Guozhang
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> --
> > >>>>>>>>>>>>>>>> Thanks,
> > >>>>>>>>>>>>>>>> Hao
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> --
> > >>>>>>>>>>>>>>> -- Guozhang
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> --
> > >>>>>>>>>>>>> -- Guozhang
> > >>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> --
> > >>>>>>>>>>> -- Guozhang
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>> --
> > >>>>>> -- Guozhang
> > >>>>>>
> > >>>>>
> > >>>>>
> > >>>>> --
> > >>>>> Thanks,
> > >>>>> Hao
> > >>>>>
> > >>>>
> > >>>>
> > >>>
> > >>
> > >>
> > >
> >
>
>
> --
> Thanks,
> Hao
>


-- 
-- Guozhang

Reply via email to