I think the following case is only doable via `suppress`: stream .groupBy(..) .windowBy(..) .aggregate(..) //result in a KTable<Windowed<..>> .mapValues(..) .suppress(Suppressed.untilWindowClose) // since we can trace back to parent node, to find a window definition
Guozhang On Mon, Mar 21, 2022 at 6:36 PM John Roesler <vvcep...@apache.org> wrote: > Thanks, Guozhang! > > To clarify, I was asking specifically about deprecating just the method > ‘untilWindowClose’. I might not be thinking clearly about it, though. What > does untilWindowClose do that this KIP doesn’t cover? > > Thanks, > John > > On Mon, Mar 21, 2022, at 20:31, Guozhang Wang wrote: > > Just my 2c: Suppressed is in `suppress` whose application scope is much > > larger and hence more flexible. I.e. it can be used anywhere for a > `KTable` > > (but internally we would check whether certain emit policies like > > `untilWindowClose` is valid or not), whereas `trigger` as for now is only > > applicable in XXWindowedKStream. So I think it would not be completely > > replacing Suppressed.untilWindowClose. > > > > In the future, personally I'd still want to keep one control object still > > for all emit policies, and maybe if we have extended Emitted for other > > emitting policies covered by Suppressed today, we can discuss if we could > > have `KTable.suppress(Emitted..)` replacing > `KTable.suppress(Suppressed..)` > > as a whole, but for this KIP I think it's too early. > > > > > > Guozhang > > > > > > On Mon, Mar 21, 2022 at 6:18 PM John Roesler <vvcep...@apache.org> > wrote: > > > >> Hi all, > >> > >> Thanks for the Kip, Hao! > >> > >> For what it’s worth, I’m also in favor of your latest framing of the > API, > >> > >> I think the name is fine. I assume it’s inspired by Flink? It’s not > >> identical to the concept of a trigger in Flink, which specifies when to > >> evaluate the window, which might be confusing to some people who have > deep > >> experience with Flink. Then again, it seems close enough that it should > be > >> clear to casual Flink users. For people with no other stream processing > >> experience, it might seem a bit esoteric compared to something > >> self-documenting like ‘emit()’, but the docs should make it clear. > >> > >> One small question: it seems like this proposal is identical to > >> Suppressed.untilWindowClose, and the KIP states that this API is > superior. > >> In that case, should we deprecate Suppressed.untilWindowClose? > >> > >> Thanks, > >> John > >> > >> On Mon, Mar 21, 2022, at 19:30, Guozhang Wang wrote: > >> > Hi Hao, > >> > > >> > For 2), I think it's a good idea in general to use a separate > function on > >> > the Time/SessionWindowedKStream itself, to achieve the same effect > that, > >> > for now, the emitting control is only for windowed aggregations as in > >> this > >> > KIP, than overloading existing functions. We can discuss further about > >> the > >> > actual function names, whether others like the name `trigger` or not. > As > >> > for myself I feel `trigger` is a good one but I'd like to see if > others > >> > have opinions as well. > >> > > >> > > >> > Guozhang > >> > > >> > On Mon, Mar 21, 2022 at 5:18 PM Hao Li <h...@confluent.io.invalid> > wrote: > >> > > >> >> Hi Guozhang, > >> >> > >> >> Thanks for the feedback. > >> >> > >> >> 1. I agree to have an `Emitted` control class and two static > >> constructors > >> >> named `onWindowClose` and `onEachUpdate`. > >> >> > >> >> 2. For the API function changes, I'm thinking of adding a new > function > >> >> called `trigger` to `TimeWindowedKStream` and > `SessionWindowedKStream`. > >> It > >> >> takes `Emitted` config and returns the same stream. Example: > >> >> > >> >> stream > >> >> .groupBy(...) > >> >> .windowedBy(...) > >> >> .trigger(Emitted.onWindowClose). // N > >> >> .count() > >> >> > >> >> The benefits are: > >> >> 1. It's simple and avoids creating overloading of existing > functions > >> like > >> >> `windowedBy` or `count`, `reduce` or `aggregate`. In fact, to add it > to > >> >> `aggregate` functions, we need to add it to all existing `count`, > >> >> `aggregate` overloading functions which is a lot. > >> >> 2. It operates directly on windowed kstream and tells how its > output > >> >> should be configured, if later we need to add this other type of > >> streams, > >> >> we can reuse same `trigger` API whereas other type of streams/tables > may > >> >> not have `aggregate`, `windowedby` api to make it consistent. > >> >> > >> >> Hao > >> >> > >> >> > >> >> On Sat, Mar 19, 2022 at 5:40 PM Guozhang Wang <wangg...@gmail.com> > >> wrote: > >> >> > >> >> > Hello Hao, > >> >> > > >> >> > I'm preferring option 2 over the other options mainly because the > >> added > >> >> > config object could potentially be used in other operators as well > >> (not > >> >> > necessarily has to be a windowed operator and hence have to be > >> >> piggy-backed > >> >> > on `windowedBy`, and that's also why I suggested not naming it > >> >> > `WindowConfig` but just `EmitConfig`). > >> >> > > >> >> > As for Matthias' question, I think the difference between the > windowed > >> >> > aggregate operator and the stream-stream join operator is that, for > >> the > >> >> > latter we think emit-final should be the only right emitting policy > >> and > >> >> > hence we should not let users to configure it. If users configure > it > >> to > >> >> > e.g. emit eager they may get the old spurious emitting behavior > which > >> is > >> >> > violating the semantics. > >> >> > > >> >> > For option 2) itself, I have a few more thoughts: > >> >> > > >> >> > 1. Thinking about Matthias' suggestions, I'm also leaning a bit > >> >> > towards adding the new param in the overloaded `aggregate`, than > the > >> >> > overloaded `windowBy` function. The reason is that the emitting > logic > >> >> could > >> >> > be either window based or non-window based, in the long run. Though > >> for > >> >> > this KIP we could just add it in `XXXWindowedKStream.aggregate()`, > we > >> may > >> >> > want to extend to other non-windowed operators in the future. > >> >> > 2. To be consistent with other control class names, I feel maybe we > >> can > >> >> > name it "Emitted", not "EmitConfig". > >> >> > 3. Following the first comment, I think we can have the static > >> >> constructor > >> >> > names as "onWindowClose" and "onEachUpdate". > >> >> > > >> >> > The resulted code pattern would be like this: > >> >> > > >> >> > stream > >> >> > .groupBy(..) > >> >> > .windowBy(TimeWindow..) > >> >> > .count(Emitted.onWindowClose) > >> >> > > >> >> > WDYT? > >> >> > > >> >> > > >> >> > On Wed, Mar 16, 2022 at 12:07 PM Matthias J. Sax <mj...@apache.org > > > >> >> wrote: > >> >> > > >> >> > > >> `allowedLateness` may not be a good name. What I have in mind > is > >> to > >> >> > use > >> >> > > >> this to control how frequently we try to emit final results. > >> Maybe > >> >> > it's > >> >> > > >> more flexible to be used as config in properties as we don't > >> need to > >> >> > > >> recompile DSL to change it. > >> >> > > > >> >> > > I see; making it a config seems better. Frankly, I am not even > sure > >> if > >> >> > > we need a config at all or if we can just hard code it? For the > >> >> > > stream-stream join left/outer join fix, there is only an internal > >> >> config > >> >> > > but no public config either. > >> >> > > > >> >> > > Option 1: Your proposal is? > >> >> > > > >> >> > > stream > >> >> > > .groupByKey() > >> >> > > .windowBy(TimeWindow.ofSizeNoGrace(...)) > >> >> > > .configure(EmitConfig.emitFinal() > >> >> > > .count() > >> >> > > > >> >> > > Does not change my argument that it seems to be misplace from an > API > >> >> > > flow POV. > >> >> > > > >> >> > > Option 1 seems to be the least desirable to me. > >> >> > > > >> >> > > For option 2 and 3, and not sure which one I like better. Might > be > >> good > >> >> > > if other could chime in, too. I think I slightly prefer option 2 > >> over > >> >> > > option 3. > >> >> > > > >> >> > > > >> >> > > -Matthias > >> >> > > > >> >> > > On 3/15/22 5:33 PM, Hao Li wrote: > >> >> > > > Thanks for the feedback Matthias. > >> >> > > > > >> >> > > > `allowedLateness` may not be a good name. What I have in mind > is > >> to > >> >> use > >> >> > > > this to control how frequently we try to emit final results. > Maybe > >> >> it's > >> >> > > > more flexible to be used as config in properties as we don't > need > >> to > >> >> > > > recompile DSL to change it. > >> >> > > > > >> >> > > > For option 1, I intend to use `emitFinal` to configure how > >> >> > > > `TimeWindowedKStream` should be outputted to `KTable` after > >> >> > aggregation. > >> >> > > > But `emitFinal` is not an action to the `TimeWindowedKStream` > >> >> > interface. > >> >> > > > Maybe adding `configure(EmitConfig config)` makes more sense? > >> >> > > > > >> >> > > > For option 2, config can be created using > >> `WindowConfig.emitFinal()` > >> >> or > >> >> > > > `EmitConfig.emitFinal` > >> >> > > > > >> >> > > > For option 3, it will be something like `TimeWindows(..., > >> EmitConfig > >> >> > > > emitConfig)`. > >> >> > > > > >> >> > > > For putting `EmitConfig` in aggregation operator, I think it > >> doesn't > >> >> > > > control how we do aggregation but how we output to `KTable`. > >> That's > >> >> > why I > >> >> > > > feel option 1 makes more sense as it applies to > >> >> `TimeWindowedKStream`. > >> >> > > But > >> >> > > > I'm also OK with option 2. > >> >> > > > > >> >> > > > Hao > >> >> > > > > >> >> > > > > >> >> > > > > >> >> > > > > >> >> > > > On Tue, Mar 15, 2022 at 4:48 PM Matthias J. Sax < > mj...@apache.org > >> > > >> >> > > wrote: > >> >> > > > > >> >> > > >> Thanks for the KIP. > >> >> > > >> > >> >> > > >> A general comment: it seem that we won't need any new > >> >> > `allowedLateness` > >> >> > > >> parameter because the grace-period is defined on the window > >> itself > >> >> > > already? > >> >> > > >> > >> >> > > >> (On the other hand, if I think about it once more, maybe the > >> >> > > >> `grace-period` is actually not a property of the window but a > >> >> property > >> >> > > >> of the aggregation operator? _thinking_) > >> >> > > >> > >> >> > > >> From an API flow point of view, option 1 might not be > desirable > >> >> > IMHO: > >> >> > > >> > >> >> > > >> stream > >> >> > > >> .groupByKey() > >> >> > > >> .windowBy(TimeWindow.ofSizeNoGrace(...)) > >> >> > > >> .emitFinal() > >> >> > > >> .count() > >> >> > > >> > >> >> > > >> The call to `emitFinal(0` seems not to be on the right place > for > >> >> this > >> >> > > case? > >> >> > > >> > >> >> > > >> > >> >> > > >> Option 2 might work (I think we need to discuss a few details > of > >> the > >> >> > API > >> >> > > >> though): > >> >> > > >> > >> >> > > >> stream > >> >> > > >> .groupByKey() > >> >> > > >> .windowBy( > >> >> > > >> TimeWindow.ofSizeNoGrace(...), > >> >> > > >> EmitConfig.emitFinal() -- just made this up; it's not > in > >> the > >> >> > KIP > >> >> > > >> ) > >> >> > > >> .count() > >> >> > > >> > >> >> > > >> I made up the `WindowConfig.emitFinal()` call -- from the KIP > >> it's > >> >> > > >> unclear what API you have in mind? `EmitFinalConfig` has not > >> public > >> >> > > >> constructor not any builder method. > >> >> > > >> > >> >> > > >> > >> >> > > >> For option 3, I am not sure what you really have in mind. Can > you > >> >> > given > >> >> > > >> a concrete example (similar to above) how users would write > their > >> >> > code? > >> >> > > >> > >> >> > > >> > >> >> > > >> > >> >> > > >> Did you consider to actually pass in the `EmitConfig` into the > >> >> > > >> aggregation operator? In the end, it seems not to be property > of > >> the > >> >> > > >> window definition or windowing step, but a property of the > actual > >> >> > > operator: > >> >> > > >> > >> >> > > >> stream > >> >> > > >> .groupByKey() > >> >> > > >> .windowBy( > >> >> > > >> TimeWindow.ofSizeNoGrace(...) > >> >> > > >> ) > >> >> > > >> .count(EmitConfig.emitFinal()) > >> >> > > >> > >> >> > > >> The API surface area that need to be updated might be larger > for > >> >> this > >> >> > > >> case though... > >> >> > > >> > >> >> > > >> > >> >> > > >> -Matthias > >> >> > > >> > >> >> > > >> > >> >> > > >> > >> >> > > >> On 3/14/22 9:21 PM, Hao Li wrote: > >> >> > > >>> Thanks Guozhang! > >> >> > > >>> > >> >> > > >>> 1. I agree `EmitConfig` is better than `WindowConfig` and > >> option 2 > >> >> > > >> modifies > >> >> > > >>> less places. What do you think of option 1 which doesn't > change > >> the > >> >> > > >> current > >> >> > > >>> `windowedBy` api but configures `EmitConfig` separately. The > >> >> benefit > >> >> > of > >> >> > > >>> option 1 is if we need to configure something else later, we > >> don't > >> >> > need > >> >> > > >> to > >> >> > > >>> pile them on `windowedBy` but can add separate APIs. > >> >> > > >>> 2. I added it to `Stores` mainly to conform to > >> >> > > >>> > >> >> > > >> > >> >> > > > >> >> > > >> >> > >> > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java#L227-L231 > >> >> > > >> . > >> >> > > >>> But We can also create an internal API to do that without > >> modifying > >> >> > > >>> `Stores`. > >> >> > > >>> > >> >> > > >>> Hao > >> >> > > >>> > >> >> > > >>> On Mon, Mar 14, 2022 at 7:52 PM Guozhang Wang < > >> wangg...@gmail.com> > >> >> > > >> wrote: > >> >> > > >>> > >> >> > > >>>> Hello Hao, > >> >> > > >>>> > >> >> > > >>>> Thanks for the proposal, I have some preference among the > >> options > >> >> > here > >> >> > > >> so I > >> >> > > >>>> will copy them here: > >> >> > > >>>> > >> >> > > >>>> I'm now thinking if it's better to not add this new config > on > >> each > >> >> > of > >> >> > > >> the > >> >> > > >>>> Window interfaces, but instead add that at the > >> >> > > KGroupedStream#windowedBy > >> >> > > >>>> function. Also instead of adding just a boolean flag, maybe > we > >> can > >> >> > > add a > >> >> > > >>>> Configured class like Grouped, Suppressed, etc, e.g. let's > call > >> >> it a > >> >> > > >>>> Emitted which for now would just have a single construct as > >> >> > > >>>> Emitted.atWindowClose whose semantics is the same as > emitFinal > >> == > >> >> > > true. > >> >> > > >> I > >> >> > > >>>> think the benefits are: > >> >> > > >>>> > >> >> > > >>>> 1) you do not need to modify multiple Window classes, but > just > >> >> > > overload > >> >> > > >> one > >> >> > > >>>> windowedBy function with a second param. This is less of a > >> scope > >> >> for > >> >> > > >> now, > >> >> > > >>>> and also more extensible for any future changes. > >> >> > > >>>> > >> >> > > >>>> 2) With a config interface, we maintain its extensibility as > >> well > >> >> as > >> >> > > >> being > >> >> > > >>>> able to reuse this Emitted interface for other operators if > we > >> >> > wanted > >> >> > > to > >> >> > > >>>> expand to. > >> >> > > >>>> > >> >> > > >>>> ---------------------------- > >> >> > > >>>> > >> >> > > >>>> So in general I'm leaning towards option 2). For that, some > >> more > >> >> > > >> detailed > >> >> > > >>>> comments: > >> >> > > >>>> > >> >> > > >>>> 1) If we want to reuse that config object for other > non-window > >> >> > > stateful > >> >> > > >>>> operations, I think naming it as `EmitConfig` is probably > >> better > >> >> > than > >> >> > > >>>> `WindowConfig`. > >> >> > > >>>> 2) I saw your PR ( > https://github.com/apache/kafka/pull/11892) > >> >> that > >> >> > > you > >> >> > > >> are > >> >> > > >>>> also proposing to add new stores into the public factory > >> Stores, > >> >> but > >> >> > > >> it's > >> >> > > >>>> not included in the KIP. Is that intentional? Personally I > >> think > >> >> > that > >> >> > > >>>> although we may eventually want to add a new store type to > the > >> >> > public > >> >> > > >> APIs, > >> >> > > >>>> for this KIP maybe we do not have to add them but can delay > for > >> >> > later > >> >> > > >> after > >> >> > > >>>> we've learned the best way to layout. LMK what do you think? > >> >> > > >>>> > >> >> > > >>>> > >> >> > > >>>> > >> >> > > >>>> Guozhang > >> >> > > >>>> > >> >> > > >>>> > >> >> > > >>>> > >> >> > > >>>> On Fri, Mar 11, 2022 at 2:13 PM Hao Li > >> <h...@confluent.io.invalid> > >> >> > > >> wrote: > >> >> > > >>>> > >> >> > > >>>>> Hi Dev team, > >> >> > > >>>>> > >> >> > > >>>>> I'd like to start a discussion thread on Kafka Streams > >> KIP-825: > >> >> > > >>>>> > >> >> > > >>>>> > >> >> > > >>>> > >> >> > > >> > >> >> > > > >> >> > > >> >> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-825%3A+introduce+a+new+API+to+control+when+aggregated+results+are+produced > >> >> > > >>>>> > >> >> > > >>>>> This KIP is aimed to add new APIs to support outputting > final > >> >> > > >> aggregated > >> >> > > >>>>> results for windowed aggregations. I listed several options > >> there > >> >> > and > >> >> > > >> I'm > >> >> > > >>>>> looking forward to your feedback. > >> >> > > >>>>> > >> >> > > >>>>> Thanks, > >> >> > > >>>>> Hao > >> >> > > >>>>> > >> >> > > >>>> > >> >> > > >>>> > >> >> > > >>>> -- > >> >> > > >>>> -- Guozhang > >> >> > > >>>> > >> >> > > >>> > >> >> > > >>> > >> >> > > >> > >> >> > > > > >> >> > > > > >> >> > > > >> >> > > >> >> > > >> >> > -- > >> >> > -- Guozhang > >> >> > > >> >> > >> >> > >> >> -- > >> >> Thanks, > >> >> Hao > >> >> > >> > > >> > > >> > -- > >> > -- Guozhang > >> > > > > > > -- > > -- Guozhang > -- -- Guozhang