Thanks, Guozhang! To clarify, I was asking specifically about deprecating just the method ‘untilWindowClose’. I might not be thinking clearly about it, though. What does untilWindowClose do that this KIP doesn’t cover?
Thanks, John On Mon, Mar 21, 2022, at 20:31, Guozhang Wang wrote: > Just my 2c: Suppressed is in `suppress` whose application scope is much > larger and hence more flexible. I.e. it can be used anywhere for a `KTable` > (but internally we would check whether certain emit policies like > `untilWindowClose` is valid or not), whereas `trigger` as for now is only > applicable in XXWindowedKStream. So I think it would not be completely > replacing Suppressed.untilWindowClose. > > In the future, personally I'd still want to keep one control object still > for all emit policies, and maybe if we have extended Emitted for other > emitting policies covered by Suppressed today, we can discuss if we could > have `KTable.suppress(Emitted..)` replacing `KTable.suppress(Suppressed..)` > as a whole, but for this KIP I think it's too early. > > > Guozhang > > > On Mon, Mar 21, 2022 at 6:18 PM John Roesler <vvcep...@apache.org> wrote: > >> Hi all, >> >> Thanks for the Kip, Hao! >> >> For what it’s worth, I’m also in favor of your latest framing of the API, >> >> I think the name is fine. I assume it’s inspired by Flink? It’s not >> identical to the concept of a trigger in Flink, which specifies when to >> evaluate the window, which might be confusing to some people who have deep >> experience with Flink. Then again, it seems close enough that it should be >> clear to casual Flink users. For people with no other stream processing >> experience, it might seem a bit esoteric compared to something >> self-documenting like ‘emit()’, but the docs should make it clear. >> >> One small question: it seems like this proposal is identical to >> Suppressed.untilWindowClose, and the KIP states that this API is superior. >> In that case, should we deprecate Suppressed.untilWindowClose? >> >> Thanks, >> John >> >> On Mon, Mar 21, 2022, at 19:30, Guozhang Wang wrote: >> > Hi Hao, >> > >> > For 2), I think it's a good idea in general to use a separate function on >> > the Time/SessionWindowedKStream itself, to achieve the same effect that, >> > for now, the emitting control is only for windowed aggregations as in >> this >> > KIP, than overloading existing functions. We can discuss further about >> the >> > actual function names, whether others like the name `trigger` or not. As >> > for myself I feel `trigger` is a good one but I'd like to see if others >> > have opinions as well. >> > >> > >> > Guozhang >> > >> > On Mon, Mar 21, 2022 at 5:18 PM Hao Li <h...@confluent.io.invalid> wrote: >> > >> >> Hi Guozhang, >> >> >> >> Thanks for the feedback. >> >> >> >> 1. I agree to have an `Emitted` control class and two static >> constructors >> >> named `onWindowClose` and `onEachUpdate`. >> >> >> >> 2. For the API function changes, I'm thinking of adding a new function >> >> called `trigger` to `TimeWindowedKStream` and `SessionWindowedKStream`. >> It >> >> takes `Emitted` config and returns the same stream. Example: >> >> >> >> stream >> >> .groupBy(...) >> >> .windowedBy(...) >> >> .trigger(Emitted.onWindowClose). // N >> >> .count() >> >> >> >> The benefits are: >> >> 1. It's simple and avoids creating overloading of existing functions >> like >> >> `windowedBy` or `count`, `reduce` or `aggregate`. In fact, to add it to >> >> `aggregate` functions, we need to add it to all existing `count`, >> >> `aggregate` overloading functions which is a lot. >> >> 2. It operates directly on windowed kstream and tells how its output >> >> should be configured, if later we need to add this other type of >> streams, >> >> we can reuse same `trigger` API whereas other type of streams/tables may >> >> not have `aggregate`, `windowedby` api to make it consistent. >> >> >> >> Hao >> >> >> >> >> >> On Sat, Mar 19, 2022 at 5:40 PM Guozhang Wang <wangg...@gmail.com> >> wrote: >> >> >> >> > Hello Hao, >> >> > >> >> > I'm preferring option 2 over the other options mainly because the >> added >> >> > config object could potentially be used in other operators as well >> (not >> >> > necessarily has to be a windowed operator and hence have to be >> >> piggy-backed >> >> > on `windowedBy`, and that's also why I suggested not naming it >> >> > `WindowConfig` but just `EmitConfig`). >> >> > >> >> > As for Matthias' question, I think the difference between the windowed >> >> > aggregate operator and the stream-stream join operator is that, for >> the >> >> > latter we think emit-final should be the only right emitting policy >> and >> >> > hence we should not let users to configure it. If users configure it >> to >> >> > e.g. emit eager they may get the old spurious emitting behavior which >> is >> >> > violating the semantics. >> >> > >> >> > For option 2) itself, I have a few more thoughts: >> >> > >> >> > 1. Thinking about Matthias' suggestions, I'm also leaning a bit >> >> > towards adding the new param in the overloaded `aggregate`, than the >> >> > overloaded `windowBy` function. The reason is that the emitting logic >> >> could >> >> > be either window based or non-window based, in the long run. Though >> for >> >> > this KIP we could just add it in `XXXWindowedKStream.aggregate()`, we >> may >> >> > want to extend to other non-windowed operators in the future. >> >> > 2. To be consistent with other control class names, I feel maybe we >> can >> >> > name it "Emitted", not "EmitConfig". >> >> > 3. Following the first comment, I think we can have the static >> >> constructor >> >> > names as "onWindowClose" and "onEachUpdate". >> >> > >> >> > The resulted code pattern would be like this: >> >> > >> >> > stream >> >> > .groupBy(..) >> >> > .windowBy(TimeWindow..) >> >> > .count(Emitted.onWindowClose) >> >> > >> >> > WDYT? >> >> > >> >> > >> >> > On Wed, Mar 16, 2022 at 12:07 PM Matthias J. Sax <mj...@apache.org> >> >> wrote: >> >> > >> >> > > >> `allowedLateness` may not be a good name. What I have in mind is >> to >> >> > use >> >> > > >> this to control how frequently we try to emit final results. >> Maybe >> >> > it's >> >> > > >> more flexible to be used as config in properties as we don't >> need to >> >> > > >> recompile DSL to change it. >> >> > > >> >> > > I see; making it a config seems better. Frankly, I am not even sure >> if >> >> > > we need a config at all or if we can just hard code it? For the >> >> > > stream-stream join left/outer join fix, there is only an internal >> >> config >> >> > > but no public config either. >> >> > > >> >> > > Option 1: Your proposal is? >> >> > > >> >> > > stream >> >> > > .groupByKey() >> >> > > .windowBy(TimeWindow.ofSizeNoGrace(...)) >> >> > > .configure(EmitConfig.emitFinal() >> >> > > .count() >> >> > > >> >> > > Does not change my argument that it seems to be misplace from an API >> >> > > flow POV. >> >> > > >> >> > > Option 1 seems to be the least desirable to me. >> >> > > >> >> > > For option 2 and 3, and not sure which one I like better. Might be >> good >> >> > > if other could chime in, too. I think I slightly prefer option 2 >> over >> >> > > option 3. >> >> > > >> >> > > >> >> > > -Matthias >> >> > > >> >> > > On 3/15/22 5:33 PM, Hao Li wrote: >> >> > > > Thanks for the feedback Matthias. >> >> > > > >> >> > > > `allowedLateness` may not be a good name. What I have in mind is >> to >> >> use >> >> > > > this to control how frequently we try to emit final results. Maybe >> >> it's >> >> > > > more flexible to be used as config in properties as we don't need >> to >> >> > > > recompile DSL to change it. >> >> > > > >> >> > > > For option 1, I intend to use `emitFinal` to configure how >> >> > > > `TimeWindowedKStream` should be outputted to `KTable` after >> >> > aggregation. >> >> > > > But `emitFinal` is not an action to the `TimeWindowedKStream` >> >> > interface. >> >> > > > Maybe adding `configure(EmitConfig config)` makes more sense? >> >> > > > >> >> > > > For option 2, config can be created using >> `WindowConfig.emitFinal()` >> >> or >> >> > > > `EmitConfig.emitFinal` >> >> > > > >> >> > > > For option 3, it will be something like `TimeWindows(..., >> EmitConfig >> >> > > > emitConfig)`. >> >> > > > >> >> > > > For putting `EmitConfig` in aggregation operator, I think it >> doesn't >> >> > > > control how we do aggregation but how we output to `KTable`. >> That's >> >> > why I >> >> > > > feel option 1 makes more sense as it applies to >> >> `TimeWindowedKStream`. >> >> > > But >> >> > > > I'm also OK with option 2. >> >> > > > >> >> > > > Hao >> >> > > > >> >> > > > >> >> > > > >> >> > > > >> >> > > > On Tue, Mar 15, 2022 at 4:48 PM Matthias J. Sax <mj...@apache.org >> > >> >> > > wrote: >> >> > > > >> >> > > >> Thanks for the KIP. >> >> > > >> >> >> > > >> A general comment: it seem that we won't need any new >> >> > `allowedLateness` >> >> > > >> parameter because the grace-period is defined on the window >> itself >> >> > > already? >> >> > > >> >> >> > > >> (On the other hand, if I think about it once more, maybe the >> >> > > >> `grace-period` is actually not a property of the window but a >> >> property >> >> > > >> of the aggregation operator? _thinking_) >> >> > > >> >> >> > > >> From an API flow point of view, option 1 might not be desirable >> >> > IMHO: >> >> > > >> >> >> > > >> stream >> >> > > >> .groupByKey() >> >> > > >> .windowBy(TimeWindow.ofSizeNoGrace(...)) >> >> > > >> .emitFinal() >> >> > > >> .count() >> >> > > >> >> >> > > >> The call to `emitFinal(0` seems not to be on the right place for >> >> this >> >> > > case? >> >> > > >> >> >> > > >> >> >> > > >> Option 2 might work (I think we need to discuss a few details of >> the >> >> > API >> >> > > >> though): >> >> > > >> >> >> > > >> stream >> >> > > >> .groupByKey() >> >> > > >> .windowBy( >> >> > > >> TimeWindow.ofSizeNoGrace(...), >> >> > > >> EmitConfig.emitFinal() -- just made this up; it's not in >> the >> >> > KIP >> >> > > >> ) >> >> > > >> .count() >> >> > > >> >> >> > > >> I made up the `WindowConfig.emitFinal()` call -- from the KIP >> it's >> >> > > >> unclear what API you have in mind? `EmitFinalConfig` has not >> public >> >> > > >> constructor not any builder method. >> >> > > >> >> >> > > >> >> >> > > >> For option 3, I am not sure what you really have in mind. Can you >> >> > given >> >> > > >> a concrete example (similar to above) how users would write their >> >> > code? >> >> > > >> >> >> > > >> >> >> > > >> >> >> > > >> Did you consider to actually pass in the `EmitConfig` into the >> >> > > >> aggregation operator? In the end, it seems not to be property of >> the >> >> > > >> window definition or windowing step, but a property of the actual >> >> > > operator: >> >> > > >> >> >> > > >> stream >> >> > > >> .groupByKey() >> >> > > >> .windowBy( >> >> > > >> TimeWindow.ofSizeNoGrace(...) >> >> > > >> ) >> >> > > >> .count(EmitConfig.emitFinal()) >> >> > > >> >> >> > > >> The API surface area that need to be updated might be larger for >> >> this >> >> > > >> case though... >> >> > > >> >> >> > > >> >> >> > > >> -Matthias >> >> > > >> >> >> > > >> >> >> > > >> >> >> > > >> On 3/14/22 9:21 PM, Hao Li wrote: >> >> > > >>> Thanks Guozhang! >> >> > > >>> >> >> > > >>> 1. I agree `EmitConfig` is better than `WindowConfig` and >> option 2 >> >> > > >> modifies >> >> > > >>> less places. What do you think of option 1 which doesn't change >> the >> >> > > >> current >> >> > > >>> `windowedBy` api but configures `EmitConfig` separately. The >> >> benefit >> >> > of >> >> > > >>> option 1 is if we need to configure something else later, we >> don't >> >> > need >> >> > > >> to >> >> > > >>> pile them on `windowedBy` but can add separate APIs. >> >> > > >>> 2. I added it to `Stores` mainly to conform to >> >> > > >>> >> >> > > >> >> >> > > >> >> > >> >> >> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java#L227-L231 >> >> > > >> . >> >> > > >>> But We can also create an internal API to do that without >> modifying >> >> > > >>> `Stores`. >> >> > > >>> >> >> > > >>> Hao >> >> > > >>> >> >> > > >>> On Mon, Mar 14, 2022 at 7:52 PM Guozhang Wang < >> wangg...@gmail.com> >> >> > > >> wrote: >> >> > > >>> >> >> > > >>>> Hello Hao, >> >> > > >>>> >> >> > > >>>> Thanks for the proposal, I have some preference among the >> options >> >> > here >> >> > > >> so I >> >> > > >>>> will copy them here: >> >> > > >>>> >> >> > > >>>> I'm now thinking if it's better to not add this new config on >> each >> >> > of >> >> > > >> the >> >> > > >>>> Window interfaces, but instead add that at the >> >> > > KGroupedStream#windowedBy >> >> > > >>>> function. Also instead of adding just a boolean flag, maybe we >> can >> >> > > add a >> >> > > >>>> Configured class like Grouped, Suppressed, etc, e.g. let's call >> >> it a >> >> > > >>>> Emitted which for now would just have a single construct as >> >> > > >>>> Emitted.atWindowClose whose semantics is the same as emitFinal >> == >> >> > > true. >> >> > > >> I >> >> > > >>>> think the benefits are: >> >> > > >>>> >> >> > > >>>> 1) you do not need to modify multiple Window classes, but just >> >> > > overload >> >> > > >> one >> >> > > >>>> windowedBy function with a second param. This is less of a >> scope >> >> for >> >> > > >> now, >> >> > > >>>> and also more extensible for any future changes. >> >> > > >>>> >> >> > > >>>> 2) With a config interface, we maintain its extensibility as >> well >> >> as >> >> > > >> being >> >> > > >>>> able to reuse this Emitted interface for other operators if we >> >> > wanted >> >> > > to >> >> > > >>>> expand to. >> >> > > >>>> >> >> > > >>>> ---------------------------- >> >> > > >>>> >> >> > > >>>> So in general I'm leaning towards option 2). For that, some >> more >> >> > > >> detailed >> >> > > >>>> comments: >> >> > > >>>> >> >> > > >>>> 1) If we want to reuse that config object for other non-window >> >> > > stateful >> >> > > >>>> operations, I think naming it as `EmitConfig` is probably >> better >> >> > than >> >> > > >>>> `WindowConfig`. >> >> > > >>>> 2) I saw your PR (https://github.com/apache/kafka/pull/11892) >> >> that >> >> > > you >> >> > > >> are >> >> > > >>>> also proposing to add new stores into the public factory >> Stores, >> >> but >> >> > > >> it's >> >> > > >>>> not included in the KIP. Is that intentional? Personally I >> think >> >> > that >> >> > > >>>> although we may eventually want to add a new store type to the >> >> > public >> >> > > >> APIs, >> >> > > >>>> for this KIP maybe we do not have to add them but can delay for >> >> > later >> >> > > >> after >> >> > > >>>> we've learned the best way to layout. LMK what do you think? >> >> > > >>>> >> >> > > >>>> >> >> > > >>>> >> >> > > >>>> Guozhang >> >> > > >>>> >> >> > > >>>> >> >> > > >>>> >> >> > > >>>> On Fri, Mar 11, 2022 at 2:13 PM Hao Li >> <h...@confluent.io.invalid> >> >> > > >> wrote: >> >> > > >>>> >> >> > > >>>>> Hi Dev team, >> >> > > >>>>> >> >> > > >>>>> I'd like to start a discussion thread on Kafka Streams >> KIP-825: >> >> > > >>>>> >> >> > > >>>>> >> >> > > >>>> >> >> > > >> >> >> > > >> >> > >> >> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-825%3A+introduce+a+new+API+to+control+when+aggregated+results+are+produced >> >> > > >>>>> >> >> > > >>>>> This KIP is aimed to add new APIs to support outputting final >> >> > > >> aggregated >> >> > > >>>>> results for windowed aggregations. I listed several options >> there >> >> > and >> >> > > >> I'm >> >> > > >>>>> looking forward to your feedback. >> >> > > >>>>> >> >> > > >>>>> Thanks, >> >> > > >>>>> Hao >> >> > > >>>>> >> >> > > >>>> >> >> > > >>>> >> >> > > >>>> -- >> >> > > >>>> -- Guozhang >> >> > > >>>> >> >> > > >>> >> >> > > >>> >> >> > > >> >> >> > > > >> >> > > > >> >> > > >> >> > >> >> > >> >> > -- >> >> > -- Guozhang >> >> > >> >> >> >> >> >> -- >> >> Thanks, >> >> Hao >> >> >> > >> > >> > -- >> > -- Guozhang >> > > > -- > -- Guozhang