Hao is right, I think that's the hindsight we have for `suppress` which since can be applied anywhere for a K(windowed)Table, incurs an awkward programming flexibility and I felt it's better to make its application scope more constraint.
And I also agree with John that, unless any of us feel strongly about any options, Hao could make the final call about the namings. Guozhang On Wed, Mar 23, 2022 at 1:49 PM Hao Li <h...@confluent.io.invalid> wrote: > For > > stream > .groupBy(..) > .windowedBy(..) > .aggregate(..) > .withEmitStrategy(EmitStrategy.ON_WINDOW_CLOSE) > .mapValues(..) > > I think after `aggregate` it's already a table and then the emit strategy > is too late to control > how windowed stream is outputted to table. This is the concern Guozhang > raised about having this in existing `suppress` operator as well. > > Thanks, > Hao > > On Wed, Mar 23, 2022 at 1:05 PM Bruno Cadonna <cado...@apache.org> wrote: > > > Hi, > > > > Thank you for your answers to my questions! > > > > I see the argument about conciseness of configuring a stream with > > methods instead of config objects. I just miss a bit the descriptive > > aspect. > > > > What about > > > > stream > > .groupBy(..) > > .windowedBy(..) > > .withEmitStrategy(EmitStrategy.ON_WINDOW_CLOSE) > > .aggregate(..) > > .mapValues(..) > > > > I have also another question. Why should emitting of results be > > controlled by the window level api? If I want to emit results for each > > input record the emit strategy is quite independent from the window. So > > I somehow share Matthias' and Guozhang's concern that the emit strategy > > seems misplaced there. > > > > What are the arguments against? > > > > stream > > .groupBy(..) > > .windowedBy(..) > > .aggregate(..) > > .withEmitStrategy(EmitStrategy.ON_WINDOW_CLOSE) > > .mapValues(..) > > > > > > A final administrative request: Hao, could you please add the rejected > > alternatives to the KIP so that future us will know why we rejected them? > > > > Best, > > Bruno > > > > On 23.03.22 19:38, John Roesler wrote: > > > Hi all, > > > > > > I can see both sides of this. > > > > > > On one hand, when we say > > > "stream.groupBy().windowBy().count()", it seems like we're > > > telling KS to take the raw stream, group it based on key, > > > then window it based on time, and then compute an > > > aggregation on the windows. In that model, "trigger()" would > > > have to mean something like "trigger it", which doesn't > > > really make sense, since we aren't "triggering" the > > > aggregation (then again, to an outside observer, it would > > > appear that way... food for thought). > > > > > > Another way to look at it is that all we're really doing is > > > configuring a windowed aggreation on the stream, and we're > > > doing it with a progressive builder interface. In other > > > words, the above is just a progressive builder for > > > configuring an operation like > > > "stream.aggregate(groupingConfig, windowingConfig, > > > countFn)". Under the latter interpretation of the DSL, it > > > makes perfect sense to add more optional progressive builder > > > methods like trigger() to the WindowedKStream interfaces. > > > > > > Since part of the motivation for choosing the word "trigger" > > > here is to stay close to what Flink defines, I'll also point > > > out that Flink's syntax is also > > > "stream.keyBy().window().trigger().aggregate()". Not that > > > their API is the holy grail or anything, but it's at least > > > an indication that this API isn't a horrible mistake. > > > > > > All other things being equal, I also prefer to leave tie- > > > breakers in the hands of the contributor. So, if we've all > > > said our piece and Hao still prefers option 1, then (as long > > > as we don't think it's a horrible mistake), I think we > > > should just let him go for it. > > > > > > Speaking of which, after reviewing the responses regarding > > > deprecating `Suppressed#onWindowClose`, I still think we > > > should just go ahead and deprecate it. Although it's not > > > expressed exactly the same way, it still does exactly the > > > same thing, or so close that it seems confusing to keep > > > both. But again, if Hao really prefers to keep both, I won't > > > insist on it :) > > > > > > Thanks all, > > > -John > > > > > > On Wed, 2022-03-23 at 09:59 -0700, Hao Li wrote: > > >> Thanks Bruno! > > >> > > >> Argument for option 1 is: > > >> 1. Concise and descriptive. It avoids overloading existing functions > and > > >> it's very clear what it's doing. Imagine if there's a autocomplete > > feature > > >> in Intellij or other IDE for our DSL in the future, it's not favorable > > to > > >> show 6 `windowedBy` functions. > > >> 2. Option 1 is operated on `windowedStream` to configure how it should > > be > > >> outputted. Option 2 operates on `KGroupedStream` to produce > > >> `windowedStream` as well as configure how `windowedStream` should be > > >> outputted. I feel it's better to have a `windowedStream` and then > > >> configure how it can be outputted. Somehow I feel option 2 breaks the > > >> builder pattern. > > >> 3. `WindowedByParameters` doesn't seem very descriptive. If we put all > > >> kinds of different parameters into it to avoid future overloading, > it's > > too > > >> bloated and not very user friendly. > > >> > > >> I agree option 1's `trigger` function is configuring the stream which > > feels > > >> different from existing `count` or `aggregate` etc. Configuring might > be > > >> also a kind of action to stream :) I'm not sure if it breaks DSL > > principle > > >> and if it does, > > >> can we relax the principle given the benefits compared to option 2)? > > Maybe > > >> John can chime in as the DSL grammar author. > > >> > > >> Thanks, > > >> Hao > > >> > > >> On Wed, Mar 23, 2022 at 2:59 AM Bruno Cadonna <cado...@apache.org> > > wrote: > > >> > > >>> Hi Hao, > > >>> > > >>> I agree with Guozhang: Great summary! Thank you! > > >>> > > >>> Regarding "aligned with other config class names", there is this DSL > > >>> grammar John once specified > > >>> > > > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+DSL+Grammar > > >>> and we have already used it in the code. I found the grammar quite > > useful. > > >>> > > >>> I am undecided if option 1 is really worth it. What are actually the > > >>> arguments in favor of it? Is it only that we do not need to overload > > >>> other methods? This does not seem worth to break DSL principles. An > > >>> alternative proposal would be to go with option 2 and conform with > the > > >>> grammar above: > > >>> > > >>> <W extends Window> TimeWindowedKStream<K, V> windowedBy(final > > Windows<W> > > >>> windows, WindowedByParameters parameters); > > >>> > > >>> TimeWindowedKStream<K, V> windowedBy(final SlidingWindows windows, > > >>> WindowedByParameters parameters); > > >>> > > >>> SessionWindowedKStream<K, V> windowedBy(final SessionWindows windows, > > >>> WindowedByParameters parameters); > > >>> > > >>> This is similar to option 2 in the KIP, but it ensures that we put > all > > >>> future needed configs in the parameters object and we do not need to > > >>> overload the methods anymore. > > >>> > > >>> Then if we also get KAFKA-10298 done, we could even collapse all > > >>> `windowedBy()` methods into one. > > >>> > > >>> Best, > > >>> Bruno > > >>> > > >>> On 22.03.22 22:31, Guozhang Wang wrote: > > >>>> Thanks for the great summary Hao. I'm still learning towards option > 2) > > >>>> here, and I'm in favor of `trigger` as function name, and > `Triggered` > > as > > >>>> config class name (mainly to be aligned with other config class > > names). > > >>>> Also want to see other's preferences between the options, as well as > > the > > >>>> namings. > > >>>> > > >>>> > > >>>> Guozhang > > >>>> > > >>>> > > >>>> > > >>>> On Tue, Mar 22, 2022 at 12:23 PM Hao Li <h...@confluent.io.invalid> > > >>> wrote: > > >>>> > > >>>>> `windowedStream.onWindowClose()` was the original option 1 > > >>>>> (`windowedStream.emitFinal()`) but was rejected > > >>>>> because we could add more emit types and this will result in adding > > more > > >>>>> functions. I still prefer the > > >>>>> "windowedStream.someFunc(Controlled.onWindowClose)" > > >>>>> model since it's flexible and clear that it's configuring the emit > > >>> policy. > > >>>>> Let me summarize all the naming options we have and compare: > > >>>>> > > >>>>> *API function name:* > > >>>>> > > >>>>> *1. `windowedStream.trigger()`* > > >>>>> Pros: > > >>>>> i. Simple > > >>>>> ii. Similar to Flink's trigger function (is this a con > > >>> actually?) > > >>>>> Cons: > > >>>>> i. `trigger()` can be confused with Flink trigger (raised > by > > >>> John) > > >>>>> ii. `trigger()` feels like an operation instead of a > > configure > > >>>>> function (raised by Bruno)? > > >>>>> > > >>>>> *2. `windowedStream.emitTrigger()`* > > >>>>> Pros: > > >>>>> i. Avoid confusion from Flink's trigger API > > >>>>> ii. `emitTrigger` feels like configuring the trigger > because > > >>>>> "trigger" here is a noun instead of verbose in `trigger()` > > >>>>> Cons: > > >>>>> i: Verbose? > > >>>>> ii: Not consistent with `Suppressed.untilWindowClose`? > > >>>>> > > >>>>> > > >>>>> *Config class/object name:* > > >>>>> > > >>>>> 1. *`Emitted.onWindowClose()`* and *`Emitted.onEachUpdate()`* > > >>>>> Cons: > > >>>>> i. Doesn't go along with `trigger` (raised by Bruno) > > >>>>> > > >>>>> 2. *`Triggered.onWindowClose()`* and *`Triggered.onEachUpdate()`* > > >>>>> > > >>>>> 3. *`EmitTrigger.onWindowClose()`* and > *`EmitTrigger.onEachUpdate()`* > > >>>>> > > >>>>> 4. *`(Emit|Trigger)(Config|Policy).onWindowClose()`* and > > >>>>> *`(Emit|Trigger)(Config|Policy).onEachUpdate()`* > > >>>>> This is a combination of different names like: `EmitConfig`, > > >>>>> `EmitPolicy`, `TriggerConfig` and `TriggerPolicy`... > > >>>>> > > >>>>> > > >>>>> If we are settled with option 1), we can add new options to these > > names > > >>> and > > >>>>> comment on their Pros and Cons. > > >>>>> > > >>>>> Hao > > >>>>> > > >>>>> > > >>>>> > > >>>>> > > >>>>> > > >>>>> On Tue, Mar 22, 2022 at 10:48 AM Guozhang Wang <wangg...@gmail.com > > > > >>> wrote: > > >>>>> > > >>>>>> I see what you mean now, and I think it's a fair point that > > composing > > >>>>>> `trigger` and `emitted` seems awkward. > > >>>>>> > > >>>>>> Re: data process operator v.s. control operator, I shared your > > concern > > >>> as > > >>>>>> well, and here's my train of thoughts: Having only data process > > >>> operators > > >>>>>> was my primary motivation for how we add the suppress operator --- > > it > > >>>>>> indeed "suppresses" data. But as a hind-sight it's disadvantage is > > >>> that, > > >>>>>> for example in Suppressed.onWindowClose() should be only related > to > > an > > >>>>>> earlier windowedBy operator which is possibly very far from it in > > the > > >>>>>> resulting DSL code. It's not only a bit awkward for users to write > > such > > >>>>>> code, but also in such cases the DSL builder needs to maintain and > > >>>>>> propagate this information to the suppress operator further down. > > So we > > >>>>> are > > >>>>>> now thinking about "putting the control object as close as to > where > > the > > >>>>>> related processor really happens". And in that world my original > > >>>>>> preference was somewhere in option 2), i.e. just put the control > as > > a > > >>>>> param > > >>>>>> of the related "windowedBy" operator, but the trade-off is we keep > > >>> adding > > >>>>>> overloaded functions to these operators. So after some back and > > forth > > >>>>>> thoughts I'm learning towards relaxing our principles to only have > > >>>>>> processing operators but no flow-control operators. That being > > said, if > > >>>>> you > > >>>>>> have any ideas that we can have both world's benefits I'm all > ears. > > >>>>>> > > >>>>>> Re: using a direct function like "windowedStream.onWindowClose()" > > v.s. > > >>>>>> "windowedStream.someFunc(Controlled.onWindowClose)", again my > > >>> motivation > > >>>>>> for the latter is for extensibility without adding more functions > in > > >>> the > > >>>>>> future. If people feel this is not worthy we can do the first > > option as > > >>>>>> well. If we just feel the `trigger` and `emitted` does not feel > > >>>>> composible > > >>>>>> together, maybe we can consider something like > > >>>>>> `windowedStream.trigger(Triggered.onWindowClose())"? > > >>>>>> > > >>>>>> Re: windowedBy v.s. windowBy, yeah I do not really have a good > > reason > > >>> why > > >>>>>> we should use past term as well :P But if it's not bothering > people > > >>> much > > >>>>>> I'd say we just keep it than deprecate/rename new APIs. > > >>>>>> > > >>>>>> > > >>>>>> On Tue, Mar 22, 2022 at 9:42 AM Bruno Cadonna <cado...@apache.org > > > > >>>>> wrote: > > >>>>>> > > >>>>>>> Hi Guozhang, > > >>>>>>> > > >>>>>>> There is no semantic difference. It is a cosmetic difference. > > >>>>>>> Conceptually, I relate `Emitted` with the aggregation and not > with > > >>>>>>> `trigger()` in the API flow, because the aggregation emits the > > result > > >>>>>>> not `trigger()`. Therefore, I proposed to not use `Emitted` as > the > > >>> name > > >>>>>>> of the config object passed to `trigger()`. > > >>>>>>> > > >>>>>>> > > >>>>>>> Best, > > >>>>>>> Bruno > > >>>>>>> > > >>>>>>> On 22.03.22 17:24, Guozhang Wang wrote: > > >>>>>>>> Hi Bruno, > > >>>>>>>> > > >>>>>>>> Could you elaborate a bit more here, what's the semantic > > difference > > >>>>>>> between > > >>>>>>>> "the aggregation is triggered on window close and all > aggregation > > >>>>>> results > > >>>>>>>> are emitted." for trigger(TriggerParameters.onWindowClose()), > and > > >>>>> "the > > >>>>>>>> aggregation is configured to only emit final results." for > > >>>>>>>> trigger(Emitted.onWindowClose())? > > >>>>>>>> > > >>>>>>>> On Tue, Mar 22, 2022 at 4:19 AM Bruno Cadonna < > cado...@apache.org > > > > > >>>>>>> wrote: > > >>>>>>>> > > >>>>>>>>> Hi Hao, > > >>>>>>>>> > > >>>>>>>>> Thank you for the KIP! > > >>>>>>>>> > > >>>>>>>>> Regarding option 1, I would not use `Emitted.onWindowClose()` > > since > > >>>>>> that > > >>>>>>>>> does not seem compatible with the proposed flow. Conceptually, > > now > > >>>>> the > > >>>>>>>>> flow states that the aggregation is triggered on window close > and > > >>>>> all > > >>>>>>>>> aggregation results are emitted. `Emitted` suggests that the > > >>>>>> aggregation > > >>>>>>>>> is configured to only emit final results. > > >>>>>>>>> > > >>>>>>>>> Thus, I propose the following: > > >>>>>>>>> > > >>>>>>>>> stream > > >>>>>>>>> .groupBy(..) > > >>>>>>>>> .windowedBy(..) > > >>>>>>>>> .trigger(TriggerParameters.onWindowClose()) > > >>>>>>>>> .aggregate(..) //result in a KTable<Windowed<..>> > > >>>>>>>>> .mapValues(..) > > >>>>>>>>> > > >>>>>>>>> An alternative to `trigger()` could be `schedule()`, but I do > not > > >>>>>> really > > >>>>>>>>> like it. > > >>>>>>>>> > > >>>>>>>>> One thing I noticed with option 1 is that all other methods in > > the > > >>>>>>>>> example above are operations on data. `groupBy()` groups, > > >>>>>> `windowedBy()` > > >>>>>>>>> partitions, `aggregate()` computes the aggregate, `mapValues()` > > maps > > >>>>>>>>> values, even `suppress()` suppresses intermediate results. But > > what > > >>>>>> does > > >>>>>>>>> `trigger()` do? `trigger()` seems a config lost among > operations. > > >>>>>>>>> > > >>>>>>>>> However, if we do not want to restrict ourselves to only use > > methods > > >>>>>>>>> when we want to specify operations on data, I have the > following > > >>>>>>> proposal: > > >>>>>>>>> > > >>>>>>>>> stream > > >>>>>>>>> .groupBy(..) > > >>>>>>>>> .windowedBy(..) > > >>>>>>>>> .onWindowClose() > > >>>>>>>>> .aggregate(..) //result in a KTable<Windowed<..>> > > >>>>>>>>> .mapValues(..) > > >>>>>>>>> > > >>>>>>>>> Best, > > >>>>>>>>> Bruno > > >>>>>>>>> > > >>>>>>>>> P.S.: Why is it `windowedBy()` and not `windowBy()`? All other > > >>>>>>>>> operations also use present tense. > > >>>>>>>>> > > >>>>>>>>> On 22.03.22 06:36, Hao Li wrote: > > >>>>>>>>>> Hi John, > > >>>>>>>>>> > > >>>>>>>>>> Yes. For naming, `trigger` is similar to Flink's trigger, but > it > > >>>>> has > > >>>>>> a > > >>>>>>>>>> different meaning in our case. `emit` sounds like an action to > > >>>>> emit? > > >>>>>>> How > > >>>>>>>>>> about `emitTrigger`? I'm open to suggestions for the naming. > > >>>>>>>>>> > > >>>>>>>>>> For deprecating `Suppressed.untilWindowClose`, I agree with > > >>>>> Guozhang > > >>>>>> we > > >>>>>>>>> can > > >>>>>>>>>> deprecate `Suppressed` config as a whole later. Or we can > > deprecate > > >>>>>>>>>> `Suppressed.untilWindowClose` in later KIP after > implementation > > of > > >>>>>> emit > > >>>>>>>>>> final is done. > > >>>>>>>>>> > > >>>>>>>>>> BTW, isn't > > >>>>>>>>>> > > >>>>>>>>>> stream > > >>>>>>>>>> .groupBy(..) > > >>>>>>>>>> .windowBy(..) > > >>>>>>>>>> .aggregate(..) //result in a KTable<Windowed<..>> > > >>>>>>>>>> .mapValues(..) > > >>>>>>>>>> .suppress(Suppressed.untilWindowClose) // since we can > > trace > > >>>>> back > > >>>>>>> to > > >>>>>>>>>> parent node, to find a window definition > > >>>>>>>>>> > > >>>>>>>>>> same as > > >>>>>>>>>> > > >>>>>>>>>> stream > > >>>>>>>>>> .groupBy(..) > > >>>>>>>>>> .windowBy(..) > > >>>>>>>>>> .trigger(Emitted.onWindowClose) > > >>>>>>>>>> .aggregate(..) //result in a KTable<Windowed<..>> > > >>>>>>>>>> .mapValues(..) > > >>>>>>>>>> ? > > >>>>>>>>>> > > >>>>>>>>>> Hao > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> On Mon, Mar 21, 2022 at 7:28 PM Guozhang Wang < > > wangg...@gmail.com> > > >>>>>>>>> wrote: > > >>>>>>>>>> > > >>>>>>>>>>> I think the following case is only doable via `suppress`: > > >>>>>>>>>>> > > >>>>>>>>>>> stream > > >>>>>>>>>>> .groupBy(..) > > >>>>>>>>>>> .windowBy(..) > > >>>>>>>>>>> .aggregate(..) //result in a KTable<Windowed<..>> > > >>>>>>>>>>> .mapValues(..) > > >>>>>>>>>>> .suppress(Suppressed.untilWindowClose) // since we can > > trace > > >>>>>> back > > >>>>>>> to > > >>>>>>>>>>> parent node, to find a window definition > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> Guozhang > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> On Mon, Mar 21, 2022 at 6:36 PM John Roesler < > > vvcep...@apache.org > > >>>>>> > > >>>>>>>>> wrote: > > >>>>>>>>>>> > > >>>>>>>>>>>> Thanks, Guozhang! > > >>>>>>>>>>>> > > >>>>>>>>>>>> To clarify, I was asking specifically about deprecating just > > the > > >>>>>>> method > > >>>>>>>>>>>> ‘untilWindowClose’. I might not be thinking clearly about > it, > > >>>>>> though. > > >>>>>>>>>>> What > > >>>>>>>>>>>> does untilWindowClose do that this KIP doesn’t cover? > > >>>>>>>>>>>> > > >>>>>>>>>>>> Thanks, > > >>>>>>>>>>>> John > > >>>>>>>>>>>> > > >>>>>>>>>>>> On Mon, Mar 21, 2022, at 20:31, Guozhang Wang wrote: > > >>>>>>>>>>>>> Just my 2c: Suppressed is in `suppress` whose application > > scope > > >>>>> is > > >>>>>>>>> much > > >>>>>>>>>>>>> larger and hence more flexible. I.e. it can be used > anywhere > > >>>>> for a > > >>>>>>>>>>>> `KTable` > > >>>>>>>>>>>>> (but internally we would check whether certain emit > policies > > >>>>> like > > >>>>>>>>>>>>> `untilWindowClose` is valid or not), whereas `trigger` as > for > > >>>>> now > > >>>>>> is > > >>>>>>>>>>> only > > >>>>>>>>>>>>> applicable in XXWindowedKStream. So I think it would not be > > >>>>>>> completely > > >>>>>>>>>>>>> replacing Suppressed.untilWindowClose. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> In the future, personally I'd still want to keep one > control > > >>>>>> object > > >>>>>>>>>>> still > > >>>>>>>>>>>>> for all emit policies, and maybe if we have extended > Emitted > > for > > >>>>>>> other > > >>>>>>>>>>>>> emitting policies covered by Suppressed today, we can > > discuss if > > >>>>>> we > > >>>>>>>>>>> could > > >>>>>>>>>>>>> have `KTable.suppress(Emitted..)` replacing > > >>>>>>>>>>>> `KTable.suppress(Suppressed..)` > > >>>>>>>>>>>>> as a whole, but for this KIP I think it's too early. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Guozhang > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> On Mon, Mar 21, 2022 at 6:18 PM John Roesler < > > >>>>> vvcep...@apache.org > > >>>>>>> > > >>>>>>>>>>>> wrote: > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> Hi all, > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> Thanks for the Kip, Hao! > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> For what it’s worth, I’m also in favor of your latest > > framing > > >>>>> of > > >>>>>>> the > > >>>>>>>>>>>> API, > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> I think the name is fine. I assume it’s inspired by Flink? > > It’s > > >>>>>> not > > >>>>>>>>>>>>>> identical to the concept of a trigger in Flink, which > > specifies > > >>>>>>> when > > >>>>>>>>>>> to > > >>>>>>>>>>>>>> evaluate the window, which might be confusing to some > people > > >>>>> who > > >>>>>>> have > > >>>>>>>>>>>> deep > > >>>>>>>>>>>>>> experience with Flink. Then again, it seems close enough > > that > > >>>>> it > > >>>>>>>>>>> should > > >>>>>>>>>>>> be > > >>>>>>>>>>>>>> clear to casual Flink users. For people with no other > stream > > >>>>>>>>>>> processing > > >>>>>>>>>>>>>> experience, it might seem a bit esoteric compared to > > something > > >>>>>>>>>>>>>> self-documenting like ‘emit()’, but the docs should make > it > > >>>>>> clear. > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> One small question: it seems like this proposal is > > identical to > > >>>>>>>>>>>>>> Suppressed.untilWindowClose, and the KIP states that this > > API > > >>>>> is > > >>>>>>>>>>>> superior. > > >>>>>>>>>>>>>> In that case, should we deprecate > > Suppressed.untilWindowClose? > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> Thanks, > > >>>>>>>>>>>>>> John > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> On Mon, Mar 21, 2022, at 19:30, Guozhang Wang wrote: > > >>>>>>>>>>>>>>> Hi Hao, > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> For 2), I think it's a good idea in general to use a > > separate > > >>>>>>>>>>>> function on > > >>>>>>>>>>>>>>> the Time/SessionWindowedKStream itself, to achieve the > same > > >>>>>> effect > > >>>>>>>>>>>> that, > > >>>>>>>>>>>>>>> for now, the emitting control is only for windowed > > >>>>> aggregations > > >>>>>> as > > >>>>>>>>>>> in > > >>>>>>>>>>>>>> this > > >>>>>>>>>>>>>>> KIP, than overloading existing functions. We can discuss > > >>>>> further > > >>>>>>>>>>> about > > >>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>> actual function names, whether others like the name > > `trigger` > > >>>>> or > > >>>>>>>>>>> not. > > >>>>>>>>>>>> As > > >>>>>>>>>>>>>>> for myself I feel `trigger` is a good one but I'd like to > > see > > >>>>> if > > >>>>>>>>>>>> others > > >>>>>>>>>>>>>>> have opinions as well. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Guozhang > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> On Mon, Mar 21, 2022 at 5:18 PM Hao Li > > >>>>> <h...@confluent.io.invalid > > >>>>>>> > > >>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> Hi Guozhang, > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> Thanks for the feedback. > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> 1. I agree to have an `Emitted` control class and two > > static > > >>>>>>>>>>>>>> constructors > > >>>>>>>>>>>>>>>> named `onWindowClose` and `onEachUpdate`. > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> 2. For the API function changes, I'm thinking of adding > a > > new > > >>>>>>>>>>>> function > > >>>>>>>>>>>>>>>> called `trigger` to `TimeWindowedKStream` and > > >>>>>>>>>>>> `SessionWindowedKStream`. > > >>>>>>>>>>>>>> It > > >>>>>>>>>>>>>>>> takes `Emitted` config and returns the same stream. > > Example: > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> stream > > >>>>>>>>>>>>>>>> .groupBy(...) > > >>>>>>>>>>>>>>>> .windowedBy(...) > > >>>>>>>>>>>>>>>> .trigger(Emitted.onWindowClose). // N > > >>>>>>>>>>>>>>>> .count() > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> The benefits are: > > >>>>>>>>>>>>>>>> 1. It's simple and avoids creating overloading of > > >>>>> existing > > >>>>>>>>>>>> functions > > >>>>>>>>>>>>>> like > > >>>>>>>>>>>>>>>> `windowedBy` or `count`, `reduce` or `aggregate`. In > > fact, to > > >>>>>> add > > >>>>>>>>>>> it > > >>>>>>>>>>>> to > > >>>>>>>>>>>>>>>> `aggregate` functions, we need to add it to all existing > > >>>>>> `count`, > > >>>>>>>>>>>>>>>> `aggregate` overloading functions which is a lot. > > >>>>>>>>>>>>>>>> 2. It operates directly on windowed kstream and > > tells > > >>> how > > >>>>>> its > > >>>>>>>>>>>> output > > >>>>>>>>>>>>>>>> should be configured, if later we need to add this other > > type > > >>>>>> of > > >>>>>>>>>>>>>> streams, > > >>>>>>>>>>>>>>>> we can reuse same `trigger` API whereas other type of > > >>>>>>>>>>> streams/tables > > >>>>>>>>>>>> may > > >>>>>>>>>>>>>>>> not have `aggregate`, `windowedby` api to make it > > consistent. > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> Hao > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> On Sat, Mar 19, 2022 at 5:40 PM Guozhang Wang < > > >>>>>>> wangg...@gmail.com> > > >>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> Hello Hao, > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> I'm preferring option 2 over the other options mainly > > >>>>> because > > >>>>>>> the > > >>>>>>>>>>>>>> added > > >>>>>>>>>>>>>>>>> config object could potentially be used in other > > operators > > >>>>> as > > >>>>>>>>>>> well > > >>>>>>>>>>>>>> (not > > >>>>>>>>>>>>>>>>> necessarily has to be a windowed operator and hence > have > > to > > >>>>> be > > >>>>>>>>>>>>>>>> piggy-backed > > >>>>>>>>>>>>>>>>> on `windowedBy`, and that's also why I suggested not > > naming > > >>>>> it > > >>>>>>>>>>>>>>>>> `WindowConfig` but just `EmitConfig`). > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> As for Matthias' question, I think the difference > between > > >>>>> the > > >>>>>>>>>>>> windowed > > >>>>>>>>>>>>>>>>> aggregate operator and the stream-stream join operator > is > > >>>>>> that, > > >>>>>>>>>>> for > > >>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>> latter we think emit-final should be the only right > > emitting > > >>>>>>>>>>> policy > > >>>>>>>>>>>>>> and > > >>>>>>>>>>>>>>>>> hence we should not let users to configure it. If users > > >>>>>>> configure > > >>>>>>>>>>>> it > > >>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>> e.g. emit eager they may get the old spurious emitting > > >>>>>> behavior > > >>>>>>>>>>>> which > > >>>>>>>>>>>>>> is > > >>>>>>>>>>>>>>>>> violating the semantics. > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> For option 2) itself, I have a few more thoughts: > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> 1. Thinking about Matthias' suggestions, I'm also > > leaning a > > >>>>>> bit > > >>>>>>>>>>>>>>>>> towards adding the new param in the overloaded > > `aggregate`, > > >>>>>> than > > >>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>> overloaded `windowBy` function. The reason is that the > > >>>>>> emitting > > >>>>>>>>>>>> logic > > >>>>>>>>>>>>>>>> could > > >>>>>>>>>>>>>>>>> be either window based or non-window based, in the long > > run. > > >>>>>>>>>>> Though > > >>>>>>>>>>>>>> for > > >>>>>>>>>>>>>>>>> this KIP we could just add it in > > >>>>>>>>>>> `XXXWindowedKStream.aggregate()`, > > >>>>>>>>>>>> we > > >>>>>>>>>>>>>> may > > >>>>>>>>>>>>>>>>> want to extend to other non-windowed operators in the > > >>>>> future. > > >>>>>>>>>>>>>>>>> 2. To be consistent with other control class names, I > > feel > > >>>>>> maybe > > >>>>>>>>>>> we > > >>>>>>>>>>>>>> can > > >>>>>>>>>>>>>>>>> name it "Emitted", not "EmitConfig". > > >>>>>>>>>>>>>>>>> 3. Following the first comment, I think we can have the > > >>>>> static > > >>>>>>>>>>>>>>>> constructor > > >>>>>>>>>>>>>>>>> names as "onWindowClose" and "onEachUpdate". > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> The resulted code pattern would be like this: > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> stream > > >>>>>>>>>>>>>>>>> .groupBy(..) > > >>>>>>>>>>>>>>>>> .windowBy(TimeWindow..) > > >>>>>>>>>>>>>>>>> .count(Emitted.onWindowClose) > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> WDYT? > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> On Wed, Mar 16, 2022 at 12:07 PM Matthias J. Sax < > > >>>>>>>>>>> mj...@apache.org > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> `allowedLateness` may not be a good name. What I > have > > in > > >>>>>>>>>>> mind > > >>>>>>>>>>>> is > > >>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>> use > > >>>>>>>>>>>>>>>>>>>> this to control how frequently we try to emit final > > >>>>>> results. > > >>>>>>>>>>>>>> Maybe > > >>>>>>>>>>>>>>>>> it's > > >>>>>>>>>>>>>>>>>>>> more flexible to be used as config in properties as > we > > >>>>>> don't > > >>>>>>>>>>>>>> need to > > >>>>>>>>>>>>>>>>>>>> recompile DSL to change it. > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> I see; making it a config seems better. Frankly, I am > > not > > >>>>>> even > > >>>>>>>>>>>> sure > > >>>>>>>>>>>>>> if > > >>>>>>>>>>>>>>>>>> we need a config at all or if we can just hard code > it? > > For > > >>>>>> the > > >>>>>>>>>>>>>>>>>> stream-stream join left/outer join fix, there is only > an > > >>>>>>>>>>> internal > > >>>>>>>>>>>>>>>> config > > >>>>>>>>>>>>>>>>>> but no public config either. > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> Option 1: Your proposal is? > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> stream > > >>>>>>>>>>>>>>>>>> .groupByKey() > > >>>>>>>>>>>>>>>>>> .windowBy(TimeWindow.ofSizeNoGrace(...)) > > >>>>>>>>>>>>>>>>>> .configure(EmitConfig.emitFinal() > > >>>>>>>>>>>>>>>>>> .count() > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> Does not change my argument that it seems to be > misplace > > >>>>> from > > >>>>>>>>>>> an > > >>>>>>>>>>>> API > > >>>>>>>>>>>>>>>>>> flow POV. > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> Option 1 seems to be the least desirable to me. > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> For option 2 and 3, and not sure which one I like > > better. > > >>>>>> Might > > >>>>>>>>>>>> be > > >>>>>>>>>>>>>> good > > >>>>>>>>>>>>>>>>>> if other could chime in, too. I think I slightly > prefer > > >>>>>> option > > >>>>>>>>>>> 2 > > >>>>>>>>>>>>>> over > > >>>>>>>>>>>>>>>>>> option 3. > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> -Matthias > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> On 3/15/22 5:33 PM, Hao Li wrote: > > >>>>>>>>>>>>>>>>>>> Thanks for the feedback Matthias. > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> `allowedLateness` may not be a good name. What I have > > in > > >>>>>> mind > > >>>>>>>>>>>> is > > >>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>> use > > >>>>>>>>>>>>>>>>>>> this to control how frequently we try to emit final > > >>>>> results. > > >>>>>>>>>>>> Maybe > > >>>>>>>>>>>>>>>> it's > > >>>>>>>>>>>>>>>>>>> more flexible to be used as config in properties as > we > > >>>>> don't > > >>>>>>>>>>>> need > > >>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>>>> recompile DSL to change it. > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> For option 1, I intend to use `emitFinal` to > configure > > how > > >>>>>>>>>>>>>>>>>>> `TimeWindowedKStream` should be outputted to `KTable` > > >>>>> after > > >>>>>>>>>>>>>>>>> aggregation. > > >>>>>>>>>>>>>>>>>>> But `emitFinal` is not an action to the > > >>>>>> `TimeWindowedKStream` > > >>>>>>>>>>>>>>>>> interface. > > >>>>>>>>>>>>>>>>>>> Maybe adding `configure(EmitConfig config)` makes > more > > >>>>>> sense? > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> For option 2, config can be created using > > >>>>>>>>>>>>>> `WindowConfig.emitFinal()` > > >>>>>>>>>>>>>>>> or > > >>>>>>>>>>>>>>>>>>> `EmitConfig.emitFinal` > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> For option 3, it will be something like > > `TimeWindows(..., > > >>>>>>>>>>>>>> EmitConfig > > >>>>>>>>>>>>>>>>>>> emitConfig)`. > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> For putting `EmitConfig` in aggregation operator, I > > think > > >>>>> it > > >>>>>>>>>>>>>> doesn't > > >>>>>>>>>>>>>>>>>>> control how we do aggregation but how we output to > > >>>>> `KTable`. > > >>>>>>>>>>>>>> That's > > >>>>>>>>>>>>>>>>> why I > > >>>>>>>>>>>>>>>>>>> feel option 1 makes more sense as it applies to > > >>>>>>>>>>>>>>>> `TimeWindowedKStream`. > > >>>>>>>>>>>>>>>>>> But > > >>>>>>>>>>>>>>>>>>> I'm also OK with option 2. > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> Hao > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> On Tue, Mar 15, 2022 at 4:48 PM Matthias J. Sax < > > >>>>>>>>>>>> mj...@apache.org > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> Thanks for the KIP. > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> A general comment: it seem that we won't need any > new > > >>>>>>>>>>>>>>>>> `allowedLateness` > > >>>>>>>>>>>>>>>>>>>> parameter because the grace-period is defined on the > > >>>>> window > > >>>>>>>>>>>>>> itself > > >>>>>>>>>>>>>>>>>> already? > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> (On the other hand, if I think about it once more, > > maybe > > >>>>>> the > > >>>>>>>>>>>>>>>>>>>> `grace-period` is actually not a property of the > > window > > >>>>> but > > >>>>>>>>>>> a > > >>>>>>>>>>>>>>>> property > > >>>>>>>>>>>>>>>>>>>> of the aggregation operator? _thinking_) > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> From an API flow point of view, option 1 might > > not > > >>> be > > >>>>>>>>>>>> desirable > > >>>>>>>>>>>>>>>>> IMHO: > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> stream > > >>>>>>>>>>>>>>>>>>>> .groupByKey() > > >>>>>>>>>>>>>>>>>>>> .windowBy(TimeWindow.ofSizeNoGrace(...)) > > >>>>>>>>>>>>>>>>>>>> .emitFinal() > > >>>>>>>>>>>>>>>>>>>> .count() > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> The call to `emitFinal(0` seems not to be on the > right > > >>>>>> place > > >>>>>>>>>>>> for > > >>>>>>>>>>>>>>>> this > > >>>>>>>>>>>>>>>>>> case? > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> Option 2 might work (I think we need to discuss a > few > > >>>>>>>>>>> details > > >>>>>>>>>>>> of > > >>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>> API > > >>>>>>>>>>>>>>>>>>>> though): > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> stream > > >>>>>>>>>>>>>>>>>>>> .groupByKey() > > >>>>>>>>>>>>>>>>>>>> .windowBy( > > >>>>>>>>>>>>>>>>>>>> TimeWindow.ofSizeNoGrace(...), > > >>>>>>>>>>>>>>>>>>>> EmitConfig.emitFinal() -- just made this > > up; > > >>>>> it's > > >>>>>>>>>>> not > > >>>>>>>>>>>> in > > >>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>> KIP > > >>>>>>>>>>>>>>>>>>>> ) > > >>>>>>>>>>>>>>>>>>>> .count() > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> I made up the `WindowConfig.emitFinal()` call -- > from > > the > > >>>>>>>>>>> KIP > > >>>>>>>>>>>>>> it's > > >>>>>>>>>>>>>>>>>>>> unclear what API you have in mind? `EmitFinalConfig` > > has > > >>>>>> not > > >>>>>>>>>>>>>> public > > >>>>>>>>>>>>>>>>>>>> constructor not any builder method. > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> For option 3, I am not sure what you really have in > > mind. > > >>>>>>>>>>> Can > > >>>>>>>>>>>> you > > >>>>>>>>>>>>>>>>> given > > >>>>>>>>>>>>>>>>>>>> a concrete example (similar to above) how users > would > > >>>>> write > > >>>>>>>>>>>> their > > >>>>>>>>>>>>>>>>> code? > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> Did you consider to actually pass in the > `EmitConfig` > > >>>>> into > > >>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>> aggregation operator? In the end, it seems not to be > > >>>>>>>>>>> property > > >>>>>>>>>>>> of > > >>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>> window definition or windowing step, but a property > of > > >>>>> the > > >>>>>>>>>>>> actual > > >>>>>>>>>>>>>>>>>> operator: > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> stream > > >>>>>>>>>>>>>>>>>>>> .groupByKey() > > >>>>>>>>>>>>>>>>>>>> .windowBy( > > >>>>>>>>>>>>>>>>>>>> TimeWindow.ofSizeNoGrace(...) > > >>>>>>>>>>>>>>>>>>>> ) > > >>>>>>>>>>>>>>>>>>>> .count(EmitConfig.emitFinal()) > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> The API surface area that need to be updated might > be > > >>>>>> larger > > >>>>>>>>>>>> for > > >>>>>>>>>>>>>>>> this > > >>>>>>>>>>>>>>>>>>>> case though... > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> -Matthias > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> On 3/14/22 9:21 PM, Hao Li wrote: > > >>>>>>>>>>>>>>>>>>>>> Thanks Guozhang! > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> 1. I agree `EmitConfig` is better than > `WindowConfig` > > >>>>> and > > >>>>>>>>>>>>>> option 2 > > >>>>>>>>>>>>>>>>>>>> modifies > > >>>>>>>>>>>>>>>>>>>>> less places. What do you think of option 1 which > > doesn't > > >>>>>>>>>>>> change > > >>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>> current > > >>>>>>>>>>>>>>>>>>>>> `windowedBy` api but configures `EmitConfig` > > separately. > > >>>>>>>>>>> The > > >>>>>>>>>>>>>>>> benefit > > >>>>>>>>>>>>>>>>> of > > >>>>>>>>>>>>>>>>>>>>> option 1 is if we need to configure something else > > >>>>> later, > > >>>>>>>>>>> we > > >>>>>>>>>>>>>> don't > > >>>>>>>>>>>>>>>>> need > > >>>>>>>>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>>>>>> pile them on `windowedBy` but can add separate > APIs. > > >>>>>>>>>>>>>>>>>>>>> 2. I added it to `Stores` mainly to conform to > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>> > > >>>>>>> > > >>>>>> > > >>>>> > > >>> > > > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java#L227-L231 > > >>>>>>>>>>>>>>>>>>>> . > > >>>>>>>>>>>>>>>>>>>>> But We can also create an internal API to do that > > >>>>> without > > >>>>>>>>>>>>>> modifying > > >>>>>>>>>>>>>>>>>>>>> `Stores`. > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> Hao > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> On Mon, Mar 14, 2022 at 7:52 PM Guozhang Wang < > > >>>>>>>>>>>>>> wangg...@gmail.com> > > >>>>>>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> Hello Hao, > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> Thanks for the proposal, I have some preference > > among > > >>>>> the > > >>>>>>>>>>>>>> options > > >>>>>>>>>>>>>>>>> here > > >>>>>>>>>>>>>>>>>>>> so I > > >>>>>>>>>>>>>>>>>>>>>> will copy them here: > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> I'm now thinking if it's better to not add this > new > > >>>>>> config > > >>>>>>>>>>>> on > > >>>>>>>>>>>>>> each > > >>>>>>>>>>>>>>>>> of > > >>>>>>>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>>>>>> Window interfaces, but instead add that at the > > >>>>>>>>>>>>>>>>>> KGroupedStream#windowedBy > > >>>>>>>>>>>>>>>>>>>>>> function. Also instead of adding just a boolean > > flag, > > >>>>>>>>>>> maybe > > >>>>>>>>>>>> we > > >>>>>>>>>>>>>> can > > >>>>>>>>>>>>>>>>>> add a > > >>>>>>>>>>>>>>>>>>>>>> Configured class like Grouped, Suppressed, etc, > e.g. > > >>>>>> let's > > >>>>>>>>>>>> call > > >>>>>>>>>>>>>>>> it a > > >>>>>>>>>>>>>>>>>>>>>> Emitted which for now would just have a single > > >>>>> construct > > >>>>>>>>>>> as > > >>>>>>>>>>>>>>>>>>>>>> Emitted.atWindowClose whose semantics is the same > as > > >>>>>>>>>>>> emitFinal > > >>>>>>>>>>>>>> == > > >>>>>>>>>>>>>>>>>> true. > > >>>>>>>>>>>>>>>>>>>> I > > >>>>>>>>>>>>>>>>>>>>>> think the benefits are: > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> 1) you do not need to modify multiple Window > > classes, > > >>>>> but > > >>>>>>>>>>>> just > > >>>>>>>>>>>>>>>>>> overload > > >>>>>>>>>>>>>>>>>>>> one > > >>>>>>>>>>>>>>>>>>>>>> windowedBy function with a second param. This is > > less > > >>>>> of > > >>>>>> a > > >>>>>>>>>>>>>> scope > > >>>>>>>>>>>>>>>> for > > >>>>>>>>>>>>>>>>>>>> now, > > >>>>>>>>>>>>>>>>>>>>>> and also more extensible for any future changes. > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> 2) With a config interface, we maintain its > > >>>>> extensibility > > >>>>>>>>>>> as > > >>>>>>>>>>>>>> well > > >>>>>>>>>>>>>>>> as > > >>>>>>>>>>>>>>>>>>>> being > > >>>>>>>>>>>>>>>>>>>>>> able to reuse this Emitted interface for other > > >>>>> operators > > >>>>>>>>>>> if > > >>>>>>>>>>>> we > > >>>>>>>>>>>>>>>>> wanted > > >>>>>>>>>>>>>>>>>> to > > >>>>>>>>>>>>>>>>>>>>>> expand to. > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> ---------------------------- > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> So in general I'm leaning towards option 2). For > > that, > > >>>>>>>>>>> some > > >>>>>>>>>>>>>> more > > >>>>>>>>>>>>>>>>>>>> detailed > > >>>>>>>>>>>>>>>>>>>>>> comments: > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> 1) If we want to reuse that config object for > other > > >>>>>>>>>>>> non-window > > >>>>>>>>>>>>>>>>>> stateful > > >>>>>>>>>>>>>>>>>>>>>> operations, I think naming it as `EmitConfig` is > > >>>>> probably > > >>>>>>>>>>>>>> better > > >>>>>>>>>>>>>>>>> than > > >>>>>>>>>>>>>>>>>>>>>> `WindowConfig`. > > >>>>>>>>>>>>>>>>>>>>>> 2) I saw your PR ( > > >>>>>>>>>>>> https://github.com/apache/kafka/pull/11892) > > >>>>>>>>>>>>>>>> that > > >>>>>>>>>>>>>>>>>> you > > >>>>>>>>>>>>>>>>>>>> are > > >>>>>>>>>>>>>>>>>>>>>> also proposing to add new stores into the public > > >>>>> factory > > >>>>>>>>>>>>>> Stores, > > >>>>>>>>>>>>>>>> but > > >>>>>>>>>>>>>>>>>>>> it's > > >>>>>>>>>>>>>>>>>>>>>> not included in the KIP. Is that intentional? > > >>>>> Personally > > >>>>>> I > > >>>>>>>>>>>>>> think > > >>>>>>>>>>>>>>>>> that > > >>>>>>>>>>>>>>>>>>>>>> although we may eventually want to add a new store > > type > > >>>>>> to > > >>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>> public > > >>>>>>>>>>>>>>>>>>>> APIs, > > >>>>>>>>>>>>>>>>>>>>>> for this KIP maybe we do not have to add them but > > can > > >>>>>>>>>>> delay > > >>>>>>>>>>>> for > > >>>>>>>>>>>>>>>>> later > > >>>>>>>>>>>>>>>>>>>> after > > >>>>>>>>>>>>>>>>>>>>>> we've learned the best way to layout. LMK what do > > you > > >>>>>>>>>>> think? > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> Guozhang > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> On Fri, Mar 11, 2022 at 2:13 PM Hao Li > > >>>>>>>>>>>>>> <h...@confluent.io.invalid> > > >>>>>>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> Hi Dev team, > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> I'd like to start a discussion thread on Kafka > > Streams > > >>>>>>>>>>>>>> KIP-825: > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>> > > >>>>>>> > > >>>>>> > > >>>>> > > >>> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-825%3A+introduce+a+new+API+to+control+when+aggregated+results+are+produced > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> This KIP is aimed to add new APIs to support > > >>>>> outputting > > >>>>>>>>>>>> final > > >>>>>>>>>>>>>>>>>>>> aggregated > > >>>>>>>>>>>>>>>>>>>>>>> results for windowed aggregations. I listed > several > > >>>>>>>>>>> options > > >>>>>>>>>>>>>> there > > >>>>>>>>>>>>>>>>> and > > >>>>>>>>>>>>>>>>>>>> I'm > > >>>>>>>>>>>>>>>>>>>>>>> looking forward to your feedback. > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>>> Thanks, > > >>>>>>>>>>>>>>>>>>>>>>> Hao > > >>>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>>> -- > > >>>>>>>>>>>>>>>>>>>>>> -- Guozhang > > >>>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> -- > > >>>>>>>>>>>>>>>>> -- Guozhang > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> -- > > >>>>>>>>>>>>>>>> Thanks, > > >>>>>>>>>>>>>>>> Hao > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> -- > > >>>>>>>>>>>>>>> -- Guozhang > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> -- > > >>>>>>>>>>>>> -- Guozhang > > >>>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> -- > > >>>>>>>>>>> -- Guozhang > > >>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>> > > >>>>>>>> > > >>>>>>>> > > >>>>>>> > > >>>>>> > > >>>>>> > > >>>>>> -- > > >>>>>> -- Guozhang > > >>>>>> > > >>>>> > > >>>>> > > >>>>> -- > > >>>>> Thanks, > > >>>>> Hao > > >>>>> > > >>>> > > >>>> > > >>> > > >> > > >> > > > > > > > > -- > Thanks, > Hao > -- -- Guozhang