Hi Guozhang, Thanks for the feedback.
1. I agree to have an `Emitted` control class and two static constructors named `onWindowClose` and `onEachUpdate`. 2. For the API function changes, I'm thinking of adding a new function called `trigger` to `TimeWindowedKStream` and `SessionWindowedKStream`. It takes `Emitted` config and returns the same stream. Example: stream .groupBy(...) .windowedBy(...) .trigger(Emitted.onWindowClose). // N .count() The benefits are: 1. It's simple and avoids creating overloading of existing functions like `windowedBy` or `count`, `reduce` or `aggregate`. In fact, to add it to `aggregate` functions, we need to add it to all existing `count`, `aggregate` overloading functions which is a lot. 2. It operates directly on windowed kstream and tells how its output should be configured, if later we need to add this other type of streams, we can reuse same `trigger` API whereas other type of streams/tables may not have `aggregate`, `windowedby` api to make it consistent. Hao On Sat, Mar 19, 2022 at 5:40 PM Guozhang Wang <wangg...@gmail.com> wrote: > Hello Hao, > > I'm preferring option 2 over the other options mainly because the added > config object could potentially be used in other operators as well (not > necessarily has to be a windowed operator and hence have to be piggy-backed > on `windowedBy`, and that's also why I suggested not naming it > `WindowConfig` but just `EmitConfig`). > > As for Matthias' question, I think the difference between the windowed > aggregate operator and the stream-stream join operator is that, for the > latter we think emit-final should be the only right emitting policy and > hence we should not let users to configure it. If users configure it to > e.g. emit eager they may get the old spurious emitting behavior which is > violating the semantics. > > For option 2) itself, I have a few more thoughts: > > 1. Thinking about Matthias' suggestions, I'm also leaning a bit > towards adding the new param in the overloaded `aggregate`, than the > overloaded `windowBy` function. The reason is that the emitting logic could > be either window based or non-window based, in the long run. Though for > this KIP we could just add it in `XXXWindowedKStream.aggregate()`, we may > want to extend to other non-windowed operators in the future. > 2. To be consistent with other control class names, I feel maybe we can > name it "Emitted", not "EmitConfig". > 3. Following the first comment, I think we can have the static constructor > names as "onWindowClose" and "onEachUpdate". > > The resulted code pattern would be like this: > > stream > .groupBy(..) > .windowBy(TimeWindow..) > .count(Emitted.onWindowClose) > > WDYT? > > > On Wed, Mar 16, 2022 at 12:07 PM Matthias J. Sax <mj...@apache.org> wrote: > > > >> `allowedLateness` may not be a good name. What I have in mind is to > use > > >> this to control how frequently we try to emit final results. Maybe > it's > > >> more flexible to be used as config in properties as we don't need to > > >> recompile DSL to change it. > > > > I see; making it a config seems better. Frankly, I am not even sure if > > we need a config at all or if we can just hard code it? For the > > stream-stream join left/outer join fix, there is only an internal config > > but no public config either. > > > > Option 1: Your proposal is? > > > > stream > > .groupByKey() > > .windowBy(TimeWindow.ofSizeNoGrace(...)) > > .configure(EmitConfig.emitFinal() > > .count() > > > > Does not change my argument that it seems to be misplace from an API > > flow POV. > > > > Option 1 seems to be the least desirable to me. > > > > For option 2 and 3, and not sure which one I like better. Might be good > > if other could chime in, too. I think I slightly prefer option 2 over > > option 3. > > > > > > -Matthias > > > > On 3/15/22 5:33 PM, Hao Li wrote: > > > Thanks for the feedback Matthias. > > > > > > `allowedLateness` may not be a good name. What I have in mind is to use > > > this to control how frequently we try to emit final results. Maybe it's > > > more flexible to be used as config in properties as we don't need to > > > recompile DSL to change it. > > > > > > For option 1, I intend to use `emitFinal` to configure how > > > `TimeWindowedKStream` should be outputted to `KTable` after > aggregation. > > > But `emitFinal` is not an action to the `TimeWindowedKStream` > interface. > > > Maybe adding `configure(EmitConfig config)` makes more sense? > > > > > > For option 2, config can be created using `WindowConfig.emitFinal()` or > > > `EmitConfig.emitFinal` > > > > > > For option 3, it will be something like `TimeWindows(..., EmitConfig > > > emitConfig)`. > > > > > > For putting `EmitConfig` in aggregation operator, I think it doesn't > > > control how we do aggregation but how we output to `KTable`. That's > why I > > > feel option 1 makes more sense as it applies to `TimeWindowedKStream`. > > But > > > I'm also OK with option 2. > > > > > > Hao > > > > > > > > > > > > > > > On Tue, Mar 15, 2022 at 4:48 PM Matthias J. Sax <mj...@apache.org> > > wrote: > > > > > >> Thanks for the KIP. > > >> > > >> A general comment: it seem that we won't need any new > `allowedLateness` > > >> parameter because the grace-period is defined on the window itself > > already? > > >> > > >> (On the other hand, if I think about it once more, maybe the > > >> `grace-period` is actually not a property of the window but a property > > >> of the aggregation operator? _thinking_) > > >> > > >> From an API flow point of view, option 1 might not be desirable > IMHO: > > >> > > >> stream > > >> .groupByKey() > > >> .windowBy(TimeWindow.ofSizeNoGrace(...)) > > >> .emitFinal() > > >> .count() > > >> > > >> The call to `emitFinal(0` seems not to be on the right place for this > > case? > > >> > > >> > > >> Option 2 might work (I think we need to discuss a few details of the > API > > >> though): > > >> > > >> stream > > >> .groupByKey() > > >> .windowBy( > > >> TimeWindow.ofSizeNoGrace(...), > > >> EmitConfig.emitFinal() -- just made this up; it's not in the > KIP > > >> ) > > >> .count() > > >> > > >> I made up the `WindowConfig.emitFinal()` call -- from the KIP it's > > >> unclear what API you have in mind? `EmitFinalConfig` has not public > > >> constructor not any builder method. > > >> > > >> > > >> For option 3, I am not sure what you really have in mind. Can you > given > > >> a concrete example (similar to above) how users would write their > code? > > >> > > >> > > >> > > >> Did you consider to actually pass in the `EmitConfig` into the > > >> aggregation operator? In the end, it seems not to be property of the > > >> window definition or windowing step, but a property of the actual > > operator: > > >> > > >> stream > > >> .groupByKey() > > >> .windowBy( > > >> TimeWindow.ofSizeNoGrace(...) > > >> ) > > >> .count(EmitConfig.emitFinal()) > > >> > > >> The API surface area that need to be updated might be larger for this > > >> case though... > > >> > > >> > > >> -Matthias > > >> > > >> > > >> > > >> On 3/14/22 9:21 PM, Hao Li wrote: > > >>> Thanks Guozhang! > > >>> > > >>> 1. I agree `EmitConfig` is better than `WindowConfig` and option 2 > > >> modifies > > >>> less places. What do you think of option 1 which doesn't change the > > >> current > > >>> `windowedBy` api but configures `EmitConfig` separately. The benefit > of > > >>> option 1 is if we need to configure something else later, we don't > need > > >> to > > >>> pile them on `windowedBy` but can add separate APIs. > > >>> 2. I added it to `Stores` mainly to conform to > > >>> > > >> > > > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java#L227-L231 > > >> . > > >>> But We can also create an internal API to do that without modifying > > >>> `Stores`. > > >>> > > >>> Hao > > >>> > > >>> On Mon, Mar 14, 2022 at 7:52 PM Guozhang Wang <wangg...@gmail.com> > > >> wrote: > > >>> > > >>>> Hello Hao, > > >>>> > > >>>> Thanks for the proposal, I have some preference among the options > here > > >> so I > > >>>> will copy them here: > > >>>> > > >>>> I'm now thinking if it's better to not add this new config on each > of > > >> the > > >>>> Window interfaces, but instead add that at the > > KGroupedStream#windowedBy > > >>>> function. Also instead of adding just a boolean flag, maybe we can > > add a > > >>>> Configured class like Grouped, Suppressed, etc, e.g. let's call it a > > >>>> Emitted which for now would just have a single construct as > > >>>> Emitted.atWindowClose whose semantics is the same as emitFinal == > > true. > > >> I > > >>>> think the benefits are: > > >>>> > > >>>> 1) you do not need to modify multiple Window classes, but just > > overload > > >> one > > >>>> windowedBy function with a second param. This is less of a scope for > > >> now, > > >>>> and also more extensible for any future changes. > > >>>> > > >>>> 2) With a config interface, we maintain its extensibility as well as > > >> being > > >>>> able to reuse this Emitted interface for other operators if we > wanted > > to > > >>>> expand to. > > >>>> > > >>>> ---------------------------- > > >>>> > > >>>> So in general I'm leaning towards option 2). For that, some more > > >> detailed > > >>>> comments: > > >>>> > > >>>> 1) If we want to reuse that config object for other non-window > > stateful > > >>>> operations, I think naming it as `EmitConfig` is probably better > than > > >>>> `WindowConfig`. > > >>>> 2) I saw your PR (https://github.com/apache/kafka/pull/11892) that > > you > > >> are > > >>>> also proposing to add new stores into the public factory Stores, but > > >> it's > > >>>> not included in the KIP. Is that intentional? Personally I think > that > > >>>> although we may eventually want to add a new store type to the > public > > >> APIs, > > >>>> for this KIP maybe we do not have to add them but can delay for > later > > >> after > > >>>> we've learned the best way to layout. LMK what do you think? > > >>>> > > >>>> > > >>>> > > >>>> Guozhang > > >>>> > > >>>> > > >>>> > > >>>> On Fri, Mar 11, 2022 at 2:13 PM Hao Li <h...@confluent.io.invalid> > > >> wrote: > > >>>> > > >>>>> Hi Dev team, > > >>>>> > > >>>>> I'd like to start a discussion thread on Kafka Streams KIP-825: > > >>>>> > > >>>>> > > >>>> > > >> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-825%3A+introduce+a+new+API+to+control+when+aggregated+results+are+produced > > >>>>> > > >>>>> This KIP is aimed to add new APIs to support outputting final > > >> aggregated > > >>>>> results for windowed aggregations. I listed several options there > and > > >> I'm > > >>>>> looking forward to your feedback. > > >>>>> > > >>>>> Thanks, > > >>>>> Hao > > >>>>> > > >>>> > > >>>> > > >>>> -- > > >>>> -- Guozhang > > >>>> > > >>> > > >>> > > >> > > > > > > > > > > > -- > -- Guozhang > -- Thanks, Hao