Re: [DISCUSS] KIP-825: introduce a new API to control when aggregated results are produced
Thanks, Hao, The KIP looks good to me overall, so I'll go ahead and vote. I did notice a couple of typos, though: > static EmitStrategy onWindowClose() { > return new WindowCloseTrigger(); > } should return WindowCloseStrategy. The other strategy also references WindowUpdateTrigger, which I'm guessing should be WindowUpdateStrategy, and which should also be specified. More generally, it seems a bit roundabout to define an interface that specifies an enum along with static methods that return classes that themselves return enum values from the interface. I'm guessing this is all by way of just making the API look nice. Both of those issues seem relatively minor, though, so I'm comfortable casting my vote at this point. Thanks for all the good work here, -John On Wed, 2022-03-23 at 20:51 -0700, Hao Li wrote: > Hi all, > > I just updated the KIP with option 1 as design and put option 2 and 3 in > rejected alternatives. Since Matthias is strongly against `trigger`, > I adopted the proposed `EmitStrategy` and dropped the "with" in the > function name. So it's like this: > > stream > .groupBy(..) > .windowedBy(..) > .emitStrategy(EmitStrategy.onWindowClose()) > .aggregate(..) > .mapValues(..) > > I used `onWindowClose` since `EmitStrategy` is meant to be an interface. > > Hao > > On Wed, Mar 23, 2022 at 6:35 PM Matthias J. Sax wrote: > > > Wow. Quite a thread... #namingIsHard :D > > > > I won't repeat all arguments which are all very good ones. I can just > > state my personal favorite option: > > > > stream > > .groupBy(..) > > .windowedBy(..) > > .withEmitStrategy(EmitStrategy.ON_WINDOW_CLOSE) > > .aggregate(..) > > .mapValues(..) > > > > Is seems to be the best compromise / trade-off across the board. > > Personally, I would strong advocate against using `trigger()`! > > > > > > -Matthias > > > > > > On 3/23/22 4:38 PM, Guozhang Wang wrote: > > > Hao is right, I think that's the hindsight we have for `suppress` which > > > since can be applied anywhere for a K(windowed)Table, incurs an awkward > > > programming flexibility and I felt it's better to make its application > > > scope more constraint. > > > > > > And I also agree with John that, unless any of us feel strongly about any > > > options, Hao could make the final call about the namings. > > > > > > > > > Guozhang > > > > > > On Wed, Mar 23, 2022 at 1:49 PM Hao Li wrote: > > > > > > > For > > > > > > > > stream > > > >.groupBy(..) > > > >.windowedBy(..) > > > >.aggregate(..) > > > >.withEmitStrategy(EmitStrategy.ON_WINDOW_CLOSE) > > > >.mapValues(..) > > > > > > > > I think after `aggregate` it's already a table and then the emit > > strategy > > > > is too late to control > > > > how windowed stream is outputted to table. This is the concern Guozhang > > > > raised about having this in existing `suppress` operator as well. > > > > > > > > Thanks, > > > > Hao > > > > > > > > On Wed, Mar 23, 2022 at 1:05 PM Bruno Cadonna > > wrote: > > > > > > > > > Hi, > > > > > > > > > > Thank you for your answers to my questions! > > > > > > > > > > I see the argument about conciseness of configuring a stream with > > > > > methods instead of config objects. I just miss a bit the descriptive > > > > > aspect. > > > > > > > > > > What about > > > > > > > > > > stream > > > > >.groupBy(..) > > > > >.windowedBy(..) > > > > >.withEmitStrategy(EmitStrategy.ON_WINDOW_CLOSE) > > > > >.aggregate(..) > > > > >.mapValues(..) > > > > > > > > > > I have also another question. Why should emitting of results be > > > > > controlled by the window level api? If I want to emit results for each > > > > > input record the emit strategy is quite independent from the window. > > > > > So > > > > > I somehow share Matthias' and Guozhang's concern that the emit > > > > > strategy > > > > > seems misplaced there. > > > > > > > > > > What are the arguments against? > > > > > > > > > > stream > > > > >.groupBy(..) > > > > >.windowedBy(..) > > > > >.aggregate(..) > > > > >.withEmitStrategy(EmitStrategy.ON_WINDOW_CLOSE) > > > > >.mapValues(..) > > > > > > > > > > > > > > > A final administrative request: Hao, could you please add the rejected > > > > > alternatives to the KIP so that future us will know why we rejected > > them? > > > > > > > > > > Best, > > > > > Bruno > > > > > > > > > > On 23.03.22 19:38, John Roesler wrote: > > > > > > Hi all, > > > > > > > > > > > > I can see both sides of this. > > > > > > > > > > > > On one hand, when we say > > > > > > "stream.groupBy().windowBy().count()", it seems like we're > > > > > > telling KS to take the raw stream, group it based on key, > > > > > > then window it based on time, and then compute an > > > > > > aggregation on the windows. In that model, "trigger()" would > > > > > > have to mean something like "trigger it"
Re: [DISCUSS] KIP-825: introduce a new API to control when aggregated results are produced
Hi all, I just updated the KIP with option 1 as design and put option 2 and 3 in rejected alternatives. Since Matthias is strongly against `trigger`, I adopted the proposed `EmitStrategy` and dropped the "with" in the function name. So it's like this: stream .groupBy(..) .windowedBy(..) .emitStrategy(EmitStrategy.onWindowClose()) .aggregate(..) .mapValues(..) I used `onWindowClose` since `EmitStrategy` is meant to be an interface. Hao On Wed, Mar 23, 2022 at 6:35 PM Matthias J. Sax wrote: > Wow. Quite a thread... #namingIsHard :D > > I won't repeat all arguments which are all very good ones. I can just > state my personal favorite option: > > stream > .groupBy(..) > .windowedBy(..) > .withEmitStrategy(EmitStrategy.ON_WINDOW_CLOSE) > .aggregate(..) > .mapValues(..) > > Is seems to be the best compromise / trade-off across the board. > Personally, I would strong advocate against using `trigger()`! > > > -Matthias > > > On 3/23/22 4:38 PM, Guozhang Wang wrote: > > Hao is right, I think that's the hindsight we have for `suppress` which > > since can be applied anywhere for a K(windowed)Table, incurs an awkward > > programming flexibility and I felt it's better to make its application > > scope more constraint. > > > > And I also agree with John that, unless any of us feel strongly about any > > options, Hao could make the final call about the namings. > > > > > > Guozhang > > > > On Wed, Mar 23, 2022 at 1:49 PM Hao Li wrote: > > > >> For > >> > >> stream > >>.groupBy(..) > >>.windowedBy(..) > >>.aggregate(..) > >>.withEmitStrategy(EmitStrategy.ON_WINDOW_CLOSE) > >>.mapValues(..) > >> > >> I think after `aggregate` it's already a table and then the emit > strategy > >> is too late to control > >> how windowed stream is outputted to table. This is the concern Guozhang > >> raised about having this in existing `suppress` operator as well. > >> > >> Thanks, > >> Hao > >> > >> On Wed, Mar 23, 2022 at 1:05 PM Bruno Cadonna > wrote: > >> > >>> Hi, > >>> > >>> Thank you for your answers to my questions! > >>> > >>> I see the argument about conciseness of configuring a stream with > >>> methods instead of config objects. I just miss a bit the descriptive > >>> aspect. > >>> > >>> What about > >>> > >>> stream > >>>.groupBy(..) > >>>.windowedBy(..) > >>>.withEmitStrategy(EmitStrategy.ON_WINDOW_CLOSE) > >>>.aggregate(..) > >>>.mapValues(..) > >>> > >>> I have also another question. Why should emitting of results be > >>> controlled by the window level api? If I want to emit results for each > >>> input record the emit strategy is quite independent from the window. So > >>> I somehow share Matthias' and Guozhang's concern that the emit strategy > >>> seems misplaced there. > >>> > >>> What are the arguments against? > >>> > >>> stream > >>>.groupBy(..) > >>>.windowedBy(..) > >>>.aggregate(..) > >>>.withEmitStrategy(EmitStrategy.ON_WINDOW_CLOSE) > >>>.mapValues(..) > >>> > >>> > >>> A final administrative request: Hao, could you please add the rejected > >>> alternatives to the KIP so that future us will know why we rejected > them? > >>> > >>> Best, > >>> Bruno > >>> > >>> On 23.03.22 19:38, John Roesler wrote: > Hi all, > > I can see both sides of this. > > On one hand, when we say > "stream.groupBy().windowBy().count()", it seems like we're > telling KS to take the raw stream, group it based on key, > then window it based on time, and then compute an > aggregation on the windows. In that model, "trigger()" would > have to mean something like "trigger it", which doesn't > really make sense, since we aren't "triggering" the > aggregation (then again, to an outside observer, it would > appear that way... food for thought). > > Another way to look at it is that all we're really doing is > configuring a windowed aggreation on the stream, and we're > doing it with a progressive builder interface. In other > words, the above is just a progressive builder for > configuring an operation like > "stream.aggregate(groupingConfig, windowingConfig, > countFn)". Under the latter interpretation of the DSL, it > makes perfect sense to add more optional progressive builder > methods like trigger() to the WindowedKStream interfaces. > > Since part of the motivation for choosing the word "trigger" > here is to stay close to what Flink defines, I'll also point > out that Flink's syntax is also > "stream.keyBy().window().trigger().aggregate()". Not that > their API is the holy grail or anything, but it's at least > an indication that this API isn't a horrible mistake. > > All other things being equal, I also prefer to leave tie- > breakers in the hands of the contributor. So, if we've all > >>>
Re: [DISCUSS] KIP-825: introduce a new API to control when aggregated results are produced
Wow. Quite a thread... #namingIsHard :D I won't repeat all arguments which are all very good ones. I can just state my personal favorite option: stream .groupBy(..) .windowedBy(..) .withEmitStrategy(EmitStrategy.ON_WINDOW_CLOSE) .aggregate(..) .mapValues(..) Is seems to be the best compromise / trade-off across the board. Personally, I would strong advocate against using `trigger()`! -Matthias On 3/23/22 4:38 PM, Guozhang Wang wrote: Hao is right, I think that's the hindsight we have for `suppress` which since can be applied anywhere for a K(windowed)Table, incurs an awkward programming flexibility and I felt it's better to make its application scope more constraint. And I also agree with John that, unless any of us feel strongly about any options, Hao could make the final call about the namings. Guozhang On Wed, Mar 23, 2022 at 1:49 PM Hao Li wrote: For stream .groupBy(..) .windowedBy(..) .aggregate(..) .withEmitStrategy(EmitStrategy.ON_WINDOW_CLOSE) .mapValues(..) I think after `aggregate` it's already a table and then the emit strategy is too late to control how windowed stream is outputted to table. This is the concern Guozhang raised about having this in existing `suppress` operator as well. Thanks, Hao On Wed, Mar 23, 2022 at 1:05 PM Bruno Cadonna wrote: Hi, Thank you for your answers to my questions! I see the argument about conciseness of configuring a stream with methods instead of config objects. I just miss a bit the descriptive aspect. What about stream .groupBy(..) .windowedBy(..) .withEmitStrategy(EmitStrategy.ON_WINDOW_CLOSE) .aggregate(..) .mapValues(..) I have also another question. Why should emitting of results be controlled by the window level api? If I want to emit results for each input record the emit strategy is quite independent from the window. So I somehow share Matthias' and Guozhang's concern that the emit strategy seems misplaced there. What are the arguments against? stream .groupBy(..) .windowedBy(..) .aggregate(..) .withEmitStrategy(EmitStrategy.ON_WINDOW_CLOSE) .mapValues(..) A final administrative request: Hao, could you please add the rejected alternatives to the KIP so that future us will know why we rejected them? Best, Bruno On 23.03.22 19:38, John Roesler wrote: Hi all, I can see both sides of this. On one hand, when we say "stream.groupBy().windowBy().count()", it seems like we're telling KS to take the raw stream, group it based on key, then window it based on time, and then compute an aggregation on the windows. In that model, "trigger()" would have to mean something like "trigger it", which doesn't really make sense, since we aren't "triggering" the aggregation (then again, to an outside observer, it would appear that way... food for thought). Another way to look at it is that all we're really doing is configuring a windowed aggreation on the stream, and we're doing it with a progressive builder interface. In other words, the above is just a progressive builder for configuring an operation like "stream.aggregate(groupingConfig, windowingConfig, countFn)". Under the latter interpretation of the DSL, it makes perfect sense to add more optional progressive builder methods like trigger() to the WindowedKStream interfaces. Since part of the motivation for choosing the word "trigger" here is to stay close to what Flink defines, I'll also point out that Flink's syntax is also "stream.keyBy().window().trigger().aggregate()". Not that their API is the holy grail or anything, but it's at least an indication that this API isn't a horrible mistake. All other things being equal, I also prefer to leave tie- breakers in the hands of the contributor. So, if we've all said our piece and Hao still prefers option 1, then (as long as we don't think it's a horrible mistake), I think we should just let him go for it. Speaking of which, after reviewing the responses regarding deprecating `Suppressed#onWindowClose`, I still think we should just go ahead and deprecate it. Although it's not expressed exactly the same way, it still does exactly the same thing, or so close that it seems confusing to keep both. But again, if Hao really prefers to keep both, I won't insist on it :) Thanks all, -John On Wed, 2022-03-23 at 09:59 -0700, Hao Li wrote: Thanks Bruno! Argument for option 1 is: 1. Concise and descriptive. It avoids overloading existing functions and it's very clear what it's doing. Imagine if there's a autocomplete feature in Intellij or other IDE for our DSL in the future, it's not favorable to show 6 `windowedBy` functions. 2. Option 1 is operated on `windowedStream` to configure how it should be outputted. Option 2 operates on `KGroupedStream` to produce `windowedStream` as well as configure how `windowedStream` should be outputted. I feel it's better to have a `windowedStream` and then conf
Re: [DISCUSS] KIP-825: introduce a new API to control when aggregated results are produced
Hao is right, I think that's the hindsight we have for `suppress` which since can be applied anywhere for a K(windowed)Table, incurs an awkward programming flexibility and I felt it's better to make its application scope more constraint. And I also agree with John that, unless any of us feel strongly about any options, Hao could make the final call about the namings. Guozhang On Wed, Mar 23, 2022 at 1:49 PM Hao Li wrote: > For > > stream > .groupBy(..) > .windowedBy(..) > .aggregate(..) > .withEmitStrategy(EmitStrategy.ON_WINDOW_CLOSE) > .mapValues(..) > > I think after `aggregate` it's already a table and then the emit strategy > is too late to control > how windowed stream is outputted to table. This is the concern Guozhang > raised about having this in existing `suppress` operator as well. > > Thanks, > Hao > > On Wed, Mar 23, 2022 at 1:05 PM Bruno Cadonna wrote: > > > Hi, > > > > Thank you for your answers to my questions! > > > > I see the argument about conciseness of configuring a stream with > > methods instead of config objects. I just miss a bit the descriptive > > aspect. > > > > What about > > > > stream > > .groupBy(..) > > .windowedBy(..) > > .withEmitStrategy(EmitStrategy.ON_WINDOW_CLOSE) > > .aggregate(..) > > .mapValues(..) > > > > I have also another question. Why should emitting of results be > > controlled by the window level api? If I want to emit results for each > > input record the emit strategy is quite independent from the window. So > > I somehow share Matthias' and Guozhang's concern that the emit strategy > > seems misplaced there. > > > > What are the arguments against? > > > > stream > > .groupBy(..) > > .windowedBy(..) > > .aggregate(..) > > .withEmitStrategy(EmitStrategy.ON_WINDOW_CLOSE) > > .mapValues(..) > > > > > > A final administrative request: Hao, could you please add the rejected > > alternatives to the KIP so that future us will know why we rejected them? > > > > Best, > > Bruno > > > > On 23.03.22 19:38, John Roesler wrote: > > > Hi all, > > > > > > I can see both sides of this. > > > > > > On one hand, when we say > > > "stream.groupBy().windowBy().count()", it seems like we're > > > telling KS to take the raw stream, group it based on key, > > > then window it based on time, and then compute an > > > aggregation on the windows. In that model, "trigger()" would > > > have to mean something like "trigger it", which doesn't > > > really make sense, since we aren't "triggering" the > > > aggregation (then again, to an outside observer, it would > > > appear that way... food for thought). > > > > > > Another way to look at it is that all we're really doing is > > > configuring a windowed aggreation on the stream, and we're > > > doing it with a progressive builder interface. In other > > > words, the above is just a progressive builder for > > > configuring an operation like > > > "stream.aggregate(groupingConfig, windowingConfig, > > > countFn)". Under the latter interpretation of the DSL, it > > > makes perfect sense to add more optional progressive builder > > > methods like trigger() to the WindowedKStream interfaces. > > > > > > Since part of the motivation for choosing the word "trigger" > > > here is to stay close to what Flink defines, I'll also point > > > out that Flink's syntax is also > > > "stream.keyBy().window().trigger().aggregate()". Not that > > > their API is the holy grail or anything, but it's at least > > > an indication that this API isn't a horrible mistake. > > > > > > All other things being equal, I also prefer to leave tie- > > > breakers in the hands of the contributor. So, if we've all > > > said our piece and Hao still prefers option 1, then (as long > > > as we don't think it's a horrible mistake), I think we > > > should just let him go for it. > > > > > > Speaking of which, after reviewing the responses regarding > > > deprecating `Suppressed#onWindowClose`, I still think we > > > should just go ahead and deprecate it. Although it's not > > > expressed exactly the same way, it still does exactly the > > > same thing, or so close that it seems confusing to keep > > > both. But again, if Hao really prefers to keep both, I won't > > > insist on it :) > > > > > > Thanks all, > > > -John > > > > > > On Wed, 2022-03-23 at 09:59 -0700, Hao Li wrote: > > >> Thanks Bruno! > > >> > > >> Argument for option 1 is: > > >> 1. Concise and descriptive. It avoids overloading existing functions > and > > >> it's very clear what it's doing. Imagine if there's a autocomplete > > feature > > >> in Intellij or other IDE for our DSL in the future, it's not favorable > > to > > >> show 6 `windowedBy` functions. > > >> 2. Option 1 is operated on `windowedStream` to configure how it should > > be > > >> outputted. Option 2 operates on `KGroupedStream` to produce > > >> `windowedStream` as well as configure how `windowedStream` should be > > >> outputted. I fee
Re: [DISCUSS] KIP-825: introduce a new API to control when aggregated results are produced
For stream .groupBy(..) .windowedBy(..) .aggregate(..) .withEmitStrategy(EmitStrategy.ON_WINDOW_CLOSE) .mapValues(..) I think after `aggregate` it's already a table and then the emit strategy is too late to control how windowed stream is outputted to table. This is the concern Guozhang raised about having this in existing `suppress` operator as well. Thanks, Hao On Wed, Mar 23, 2022 at 1:05 PM Bruno Cadonna wrote: > Hi, > > Thank you for your answers to my questions! > > I see the argument about conciseness of configuring a stream with > methods instead of config objects. I just miss a bit the descriptive > aspect. > > What about > > stream > .groupBy(..) > .windowedBy(..) > .withEmitStrategy(EmitStrategy.ON_WINDOW_CLOSE) > .aggregate(..) > .mapValues(..) > > I have also another question. Why should emitting of results be > controlled by the window level api? If I want to emit results for each > input record the emit strategy is quite independent from the window. So > I somehow share Matthias' and Guozhang's concern that the emit strategy > seems misplaced there. > > What are the arguments against? > > stream > .groupBy(..) > .windowedBy(..) > .aggregate(..) > .withEmitStrategy(EmitStrategy.ON_WINDOW_CLOSE) > .mapValues(..) > > > A final administrative request: Hao, could you please add the rejected > alternatives to the KIP so that future us will know why we rejected them? > > Best, > Bruno > > On 23.03.22 19:38, John Roesler wrote: > > Hi all, > > > > I can see both sides of this. > > > > On one hand, when we say > > "stream.groupBy().windowBy().count()", it seems like we're > > telling KS to take the raw stream, group it based on key, > > then window it based on time, and then compute an > > aggregation on the windows. In that model, "trigger()" would > > have to mean something like "trigger it", which doesn't > > really make sense, since we aren't "triggering" the > > aggregation (then again, to an outside observer, it would > > appear that way... food for thought). > > > > Another way to look at it is that all we're really doing is > > configuring a windowed aggreation on the stream, and we're > > doing it with a progressive builder interface. In other > > words, the above is just a progressive builder for > > configuring an operation like > > "stream.aggregate(groupingConfig, windowingConfig, > > countFn)". Under the latter interpretation of the DSL, it > > makes perfect sense to add more optional progressive builder > > methods like trigger() to the WindowedKStream interfaces. > > > > Since part of the motivation for choosing the word "trigger" > > here is to stay close to what Flink defines, I'll also point > > out that Flink's syntax is also > > "stream.keyBy().window().trigger().aggregate()". Not that > > their API is the holy grail or anything, but it's at least > > an indication that this API isn't a horrible mistake. > > > > All other things being equal, I also prefer to leave tie- > > breakers in the hands of the contributor. So, if we've all > > said our piece and Hao still prefers option 1, then (as long > > as we don't think it's a horrible mistake), I think we > > should just let him go for it. > > > > Speaking of which, after reviewing the responses regarding > > deprecating `Suppressed#onWindowClose`, I still think we > > should just go ahead and deprecate it. Although it's not > > expressed exactly the same way, it still does exactly the > > same thing, or so close that it seems confusing to keep > > both. But again, if Hao really prefers to keep both, I won't > > insist on it :) > > > > Thanks all, > > -John > > > > On Wed, 2022-03-23 at 09:59 -0700, Hao Li wrote: > >> Thanks Bruno! > >> > >> Argument for option 1 is: > >> 1. Concise and descriptive. It avoids overloading existing functions and > >> it's very clear what it's doing. Imagine if there's a autocomplete > feature > >> in Intellij or other IDE for our DSL in the future, it's not favorable > to > >> show 6 `windowedBy` functions. > >> 2. Option 1 is operated on `windowedStream` to configure how it should > be > >> outputted. Option 2 operates on `KGroupedStream` to produce > >> `windowedStream` as well as configure how `windowedStream` should be > >> outputted. I feel it's better to have a `windowedStream` and then > >> configure how it can be outputted. Somehow I feel option 2 breaks the > >> builder pattern. > >> 3. `WindowedByParameters` doesn't seem very descriptive. If we put all > >> kinds of different parameters into it to avoid future overloading, it's > too > >> bloated and not very user friendly. > >> > >> I agree option 1's `trigger` function is configuring the stream which > feels > >> different from existing `count` or `aggregate` etc. Configuring might be > >> also a kind of action to stream :) I'm not sure if it breaks DSL > principle > >> and if it does, > >> can we relax the principle given the benefits compa
Re: [DISCUSS] KIP-825: introduce a new API to control when aggregated results are produced
Hi, Thank you for your answers to my questions! I see the argument about conciseness of configuring a stream with methods instead of config objects. I just miss a bit the descriptive aspect. What about stream .groupBy(..) .windowedBy(..) .withEmitStrategy(EmitStrategy.ON_WINDOW_CLOSE) .aggregate(..) .mapValues(..) I have also another question. Why should emitting of results be controlled by the window level api? If I want to emit results for each input record the emit strategy is quite independent from the window. So I somehow share Matthias' and Guozhang's concern that the emit strategy seems misplaced there. What are the arguments against? stream .groupBy(..) .windowedBy(..) .aggregate(..) .withEmitStrategy(EmitStrategy.ON_WINDOW_CLOSE) .mapValues(..) A final administrative request: Hao, could you please add the rejected alternatives to the KIP so that future us will know why we rejected them? Best, Bruno On 23.03.22 19:38, John Roesler wrote: Hi all, I can see both sides of this. On one hand, when we say "stream.groupBy().windowBy().count()", it seems like we're telling KS to take the raw stream, group it based on key, then window it based on time, and then compute an aggregation on the windows. In that model, "trigger()" would have to mean something like "trigger it", which doesn't really make sense, since we aren't "triggering" the aggregation (then again, to an outside observer, it would appear that way... food for thought). Another way to look at it is that all we're really doing is configuring a windowed aggreation on the stream, and we're doing it with a progressive builder interface. In other words, the above is just a progressive builder for configuring an operation like "stream.aggregate(groupingConfig, windowingConfig, countFn)". Under the latter interpretation of the DSL, it makes perfect sense to add more optional progressive builder methods like trigger() to the WindowedKStream interfaces. Since part of the motivation for choosing the word "trigger" here is to stay close to what Flink defines, I'll also point out that Flink's syntax is also "stream.keyBy().window().trigger().aggregate()". Not that their API is the holy grail or anything, but it's at least an indication that this API isn't a horrible mistake. All other things being equal, I also prefer to leave tie- breakers in the hands of the contributor. So, if we've all said our piece and Hao still prefers option 1, then (as long as we don't think it's a horrible mistake), I think we should just let him go for it. Speaking of which, after reviewing the responses regarding deprecating `Suppressed#onWindowClose`, I still think we should just go ahead and deprecate it. Although it's not expressed exactly the same way, it still does exactly the same thing, or so close that it seems confusing to keep both. But again, if Hao really prefers to keep both, I won't insist on it :) Thanks all, -John On Wed, 2022-03-23 at 09:59 -0700, Hao Li wrote: Thanks Bruno! Argument for option 1 is: 1. Concise and descriptive. It avoids overloading existing functions and it's very clear what it's doing. Imagine if there's a autocomplete feature in Intellij or other IDE for our DSL in the future, it's not favorable to show 6 `windowedBy` functions. 2. Option 1 is operated on `windowedStream` to configure how it should be outputted. Option 2 operates on `KGroupedStream` to produce `windowedStream` as well as configure how `windowedStream` should be outputted. I feel it's better to have a `windowedStream` and then configure how it can be outputted. Somehow I feel option 2 breaks the builder pattern. 3. `WindowedByParameters` doesn't seem very descriptive. If we put all kinds of different parameters into it to avoid future overloading, it's too bloated and not very user friendly. I agree option 1's `trigger` function is configuring the stream which feels different from existing `count` or `aggregate` etc. Configuring might be also a kind of action to stream :) I'm not sure if it breaks DSL principle and if it does, can we relax the principle given the benefits compared to option 2)? Maybe John can chime in as the DSL grammar author. Thanks, Hao On Wed, Mar 23, 2022 at 2:59 AM Bruno Cadonna wrote: Hi Hao, I agree with Guozhang: Great summary! Thank you! Regarding "aligned with other config class names", there is this DSL grammar John once specified https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+DSL+Grammar and we have already used it in the code. I found the grammar quite useful. I am undecided if option 1 is really worth it. What are actually the arguments in favor of it? Is it only that we do not need to overload other methods? This does not seem worth to break DSL principles. An alternative proposal would be to go with option 2 and conform with the grammar above: TimeWindowedKStream windowedBy(final Windows windows, WindowedByParameters parameters);
Re: [DISCUSS] KIP-825: introduce a new API to control when aggregated results are produced
Hi all, I can see both sides of this. On one hand, when we say "stream.groupBy().windowBy().count()", it seems like we're telling KS to take the raw stream, group it based on key, then window it based on time, and then compute an aggregation on the windows. In that model, "trigger()" would have to mean something like "trigger it", which doesn't really make sense, since we aren't "triggering" the aggregation (then again, to an outside observer, it would appear that way... food for thought). Another way to look at it is that all we're really doing is configuring a windowed aggreation on the stream, and we're doing it with a progressive builder interface. In other words, the above is just a progressive builder for configuring an operation like "stream.aggregate(groupingConfig, windowingConfig, countFn)". Under the latter interpretation of the DSL, it makes perfect sense to add more optional progressive builder methods like trigger() to the WindowedKStream interfaces. Since part of the motivation for choosing the word "trigger" here is to stay close to what Flink defines, I'll also point out that Flink's syntax is also "stream.keyBy().window().trigger().aggregate()". Not that their API is the holy grail or anything, but it's at least an indication that this API isn't a horrible mistake. All other things being equal, I also prefer to leave tie- breakers in the hands of the contributor. So, if we've all said our piece and Hao still prefers option 1, then (as long as we don't think it's a horrible mistake), I think we should just let him go for it. Speaking of which, after reviewing the responses regarding deprecating `Suppressed#onWindowClose`, I still think we should just go ahead and deprecate it. Although it's not expressed exactly the same way, it still does exactly the same thing, or so close that it seems confusing to keep both. But again, if Hao really prefers to keep both, I won't insist on it :) Thanks all, -John On Wed, 2022-03-23 at 09:59 -0700, Hao Li wrote: > Thanks Bruno! > > Argument for option 1 is: > 1. Concise and descriptive. It avoids overloading existing functions and > it's very clear what it's doing. Imagine if there's a autocomplete feature > in Intellij or other IDE for our DSL in the future, it's not favorable to > show 6 `windowedBy` functions. > 2. Option 1 is operated on `windowedStream` to configure how it should be > outputted. Option 2 operates on `KGroupedStream` to produce > `windowedStream` as well as configure how `windowedStream` should be > outputted. I feel it's better to have a `windowedStream` and then > configure how it can be outputted. Somehow I feel option 2 breaks the > builder pattern. > 3. `WindowedByParameters` doesn't seem very descriptive. If we put all > kinds of different parameters into it to avoid future overloading, it's too > bloated and not very user friendly. > > I agree option 1's `trigger` function is configuring the stream which feels > different from existing `count` or `aggregate` etc. Configuring might be > also a kind of action to stream :) I'm not sure if it breaks DSL principle > and if it does, > can we relax the principle given the benefits compared to option 2)? Maybe > John can chime in as the DSL grammar author. > > Thanks, > Hao > > On Wed, Mar 23, 2022 at 2:59 AM Bruno Cadonna wrote: > > > Hi Hao, > > > > I agree with Guozhang: Great summary! Thank you! > > > > Regarding "aligned with other config class names", there is this DSL > > grammar John once specified > > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+DSL+Grammar > > and we have already used it in the code. I found the grammar quite useful. > > > > I am undecided if option 1 is really worth it. What are actually the > > arguments in favor of it? Is it only that we do not need to overload > > other methods? This does not seem worth to break DSL principles. An > > alternative proposal would be to go with option 2 and conform with the > > grammar above: > > > > TimeWindowedKStream windowedBy(final Windows > > windows, WindowedByParameters parameters); > > > > TimeWindowedKStream windowedBy(final SlidingWindows windows, > > WindowedByParameters parameters); > > > > SessionWindowedKStream windowedBy(final SessionWindows windows, > > WindowedByParameters parameters); > > > > This is similar to option 2 in the KIP, but it ensures that we put all > > future needed configs in the parameters object and we do not need to > > overload the methods anymore. > > > > Then if we also get KAFKA-10298 done, we could even collapse all > > `windowedBy()` methods into one. > > > > Best, > > Bruno > > > > On 22.03.22 22:31, Guozhang Wang wrote: > > > Thanks for the great summary Hao. I'm still learning towards option 2) > > > here, and I'm in favor of `trigger` as function name, and `Triggered` as > > > config class name (mainly to be aligned with other config class names). > > > Also want to see other's preferences between the options, as well as the > > >
Re: [DISCUSS] KIP-825: introduce a new API to control when aggregated results are produced
Thanks Bruno! Argument for option 1 is: 1. Concise and descriptive. It avoids overloading existing functions and it's very clear what it's doing. Imagine if there's a autocomplete feature in Intellij or other IDE for our DSL in the future, it's not favorable to show 6 `windowedBy` functions. 2. Option 1 is operated on `windowedStream` to configure how it should be outputted. Option 2 operates on `KGroupedStream` to produce `windowedStream` as well as configure how `windowedStream` should be outputted. I feel it's better to have a `windowedStream` and then configure how it can be outputted. Somehow I feel option 2 breaks the builder pattern. 3. `WindowedByParameters` doesn't seem very descriptive. If we put all kinds of different parameters into it to avoid future overloading, it's too bloated and not very user friendly. I agree option 1's `trigger` function is configuring the stream which feels different from existing `count` or `aggregate` etc. Configuring might be also a kind of action to stream :) I'm not sure if it breaks DSL principle and if it does, can we relax the principle given the benefits compared to option 2)? Maybe John can chime in as the DSL grammar author. Thanks, Hao On Wed, Mar 23, 2022 at 2:59 AM Bruno Cadonna wrote: > Hi Hao, > > I agree with Guozhang: Great summary! Thank you! > > Regarding "aligned with other config class names", there is this DSL > grammar John once specified > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+DSL+Grammar > and we have already used it in the code. I found the grammar quite useful. > > I am undecided if option 1 is really worth it. What are actually the > arguments in favor of it? Is it only that we do not need to overload > other methods? This does not seem worth to break DSL principles. An > alternative proposal would be to go with option 2 and conform with the > grammar above: > > TimeWindowedKStream windowedBy(final Windows > windows, WindowedByParameters parameters); > > TimeWindowedKStream windowedBy(final SlidingWindows windows, > WindowedByParameters parameters); > > SessionWindowedKStream windowedBy(final SessionWindows windows, > WindowedByParameters parameters); > > This is similar to option 2 in the KIP, but it ensures that we put all > future needed configs in the parameters object and we do not need to > overload the methods anymore. > > Then if we also get KAFKA-10298 done, we could even collapse all > `windowedBy()` methods into one. > > Best, > Bruno > > On 22.03.22 22:31, Guozhang Wang wrote: > > Thanks for the great summary Hao. I'm still learning towards option 2) > > here, and I'm in favor of `trigger` as function name, and `Triggered` as > > config class name (mainly to be aligned with other config class names). > > Also want to see other's preferences between the options, as well as the > > namings. > > > > > > Guozhang > > > > > > > > On Tue, Mar 22, 2022 at 12:23 PM Hao Li > wrote: > > > >> `windowedStream.onWindowClose()` was the original option 1 > >> (`windowedStream.emitFinal()`) but was rejected > >> because we could add more emit types and this will result in adding more > >> functions. I still prefer the > >> "windowedStream.someFunc(Controlled.onWindowClose)" > >> model since it's flexible and clear that it's configuring the emit > policy. > >> Let me summarize all the naming options we have and compare: > >> > >> *API function name:* > >> > >> *1. `windowedStream.trigger()`* > >> Pros: > >> i. Simple > >> ii. Similar to Flink's trigger function (is this a con > actually?) > >> Cons: > >> i. `trigger()` can be confused with Flink trigger (raised by > John) > >> ii. `trigger()` feels like an operation instead of a configure > >> function (raised by Bruno)? > >> > >> *2. `windowedStream.emitTrigger()`* > >> Pros: > >> i. Avoid confusion from Flink's trigger API > >> ii. `emitTrigger` feels like configuring the trigger because > >> "trigger" here is a noun instead of verbose in `trigger()` > >> Cons: > >> i: Verbose? > >> ii: Not consistent with `Suppressed.untilWindowClose`? > >> > >> > >> *Config class/object name:* > >> > >> 1. *`Emitted.onWindowClose()`* and *`Emitted.onEachUpdate()`* > >> Cons: > >> i. Doesn't go along with `trigger` (raised by Bruno) > >> > >> 2. *`Triggered.onWindowClose()`* and *`Triggered.onEachUpdate()`* > >> > >> 3. *`EmitTrigger.onWindowClose()`* and *`EmitTrigger.onEachUpdate()`* > >> > >> 4. *`(Emit|Trigger)(Config|Policy).onWindowClose()`* and > >> *`(Emit|Trigger)(Config|Policy).onEachUpdate()`* > >> This is a combination of different names like: `EmitConfig`, > >> `EmitPolicy`, `TriggerConfig` and `TriggerPolicy`... > >> > >> > >> If we are settled with option 1), we can add new options to these names > and > >> comment on their Pros and Cons. > >> > >> Hao > >> > >> > >> > >> > >> > >> On Tue, Mar 22, 2022 at 10:48 AM Guozhang Wang > wrote: > >> > >>> I see what
Re: [DISCUSS] KIP-825: introduce a new API to control when aggregated results are produced
Hi Hao, I agree with Guozhang: Great summary! Thank you! Regarding "aligned with other config class names", there is this DSL grammar John once specified https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+DSL+Grammar and we have already used it in the code. I found the grammar quite useful. I am undecided if option 1 is really worth it. What are actually the arguments in favor of it? Is it only that we do not need to overload other methods? This does not seem worth to break DSL principles. An alternative proposal would be to go with option 2 and conform with the grammar above: TimeWindowedKStream windowedBy(final Windows windows, WindowedByParameters parameters); TimeWindowedKStream windowedBy(final SlidingWindows windows, WindowedByParameters parameters); SessionWindowedKStream windowedBy(final SessionWindows windows, WindowedByParameters parameters); This is similar to option 2 in the KIP, but it ensures that we put all future needed configs in the parameters object and we do not need to overload the methods anymore. Then if we also get KAFKA-10298 done, we could even collapse all `windowedBy()` methods into one. Best, Bruno On 22.03.22 22:31, Guozhang Wang wrote: Thanks for the great summary Hao. I'm still learning towards option 2) here, and I'm in favor of `trigger` as function name, and `Triggered` as config class name (mainly to be aligned with other config class names). Also want to see other's preferences between the options, as well as the namings. Guozhang On Tue, Mar 22, 2022 at 12:23 PM Hao Li wrote: `windowedStream.onWindowClose()` was the original option 1 (`windowedStream.emitFinal()`) but was rejected because we could add more emit types and this will result in adding more functions. I still prefer the "windowedStream.someFunc(Controlled.onWindowClose)" model since it's flexible and clear that it's configuring the emit policy. Let me summarize all the naming options we have and compare: *API function name:* *1. `windowedStream.trigger()`* Pros: i. Simple ii. Similar to Flink's trigger function (is this a con actually?) Cons: i. `trigger()` can be confused with Flink trigger (raised by John) ii. `trigger()` feels like an operation instead of a configure function (raised by Bruno)? *2. `windowedStream.emitTrigger()`* Pros: i. Avoid confusion from Flink's trigger API ii. `emitTrigger` feels like configuring the trigger because "trigger" here is a noun instead of verbose in `trigger()` Cons: i: Verbose? ii: Not consistent with `Suppressed.untilWindowClose`? *Config class/object name:* 1. *`Emitted.onWindowClose()`* and *`Emitted.onEachUpdate()`* Cons: i. Doesn't go along with `trigger` (raised by Bruno) 2. *`Triggered.onWindowClose()`* and *`Triggered.onEachUpdate()`* 3. *`EmitTrigger.onWindowClose()`* and *`EmitTrigger.onEachUpdate()`* 4. *`(Emit|Trigger)(Config|Policy).onWindowClose()`* and *`(Emit|Trigger)(Config|Policy).onEachUpdate()`* This is a combination of different names like: `EmitConfig`, `EmitPolicy`, `TriggerConfig` and `TriggerPolicy`... If we are settled with option 1), we can add new options to these names and comment on their Pros and Cons. Hao On Tue, Mar 22, 2022 at 10:48 AM Guozhang Wang wrote: I see what you mean now, and I think it's a fair point that composing `trigger` and `emitted` seems awkward. Re: data process operator v.s. control operator, I shared your concern as well, and here's my train of thoughts: Having only data process operators was my primary motivation for how we add the suppress operator --- it indeed "suppresses" data. But as a hind-sight it's disadvantage is that, for example in Suppressed.onWindowClose() should be only related to an earlier windowedBy operator which is possibly very far from it in the resulting DSL code. It's not only a bit awkward for users to write such code, but also in such cases the DSL builder needs to maintain and propagate this information to the suppress operator further down. So we are now thinking about "putting the control object as close as to where the related processor really happens". And in that world my original preference was somewhere in option 2), i.e. just put the control as a param of the related "windowedBy" operator, but the trade-off is we keep adding overloaded functions to these operators. So after some back and forth thoughts I'm learning towards relaxing our principles to only have processing operators but no flow-control operators. That being said, if you have any ideas that we can have both world's benefits I'm all ears. Re: using a direct function like "windowedStream.onWindowClose()" v.s. "windowedStream.someFunc(Controlled.onWindowClose)", again my motivation for the latter is for extensibility without adding more functions in the future. If people feel this is not worthy we can do the first option as well. If we just feel
Re: [DISCUSS] KIP-825: introduce a new API to control when aggregated results are produced
Thanks for the great summary Hao. I'm still learning towards option 2) here, and I'm in favor of `trigger` as function name, and `Triggered` as config class name (mainly to be aligned with other config class names). Also want to see other's preferences between the options, as well as the namings. Guozhang On Tue, Mar 22, 2022 at 12:23 PM Hao Li wrote: > `windowedStream.onWindowClose()` was the original option 1 > (`windowedStream.emitFinal()`) but was rejected > because we could add more emit types and this will result in adding more > functions. I still prefer the > "windowedStream.someFunc(Controlled.onWindowClose)" > model since it's flexible and clear that it's configuring the emit policy. > Let me summarize all the naming options we have and compare: > > *API function name:* > > *1. `windowedStream.trigger()`* > Pros: >i. Simple >ii. Similar to Flink's trigger function (is this a con actually?) > Cons: >i. `trigger()` can be confused with Flink trigger (raised by John) >ii. `trigger()` feels like an operation instead of a configure > function (raised by Bruno)? > > *2. `windowedStream.emitTrigger()`* > Pros: >i. Avoid confusion from Flink's trigger API >ii. `emitTrigger` feels like configuring the trigger because > "trigger" here is a noun instead of verbose in `trigger()` > Cons: > i: Verbose? > ii: Not consistent with `Suppressed.untilWindowClose`? > > > *Config class/object name:* > > 1. *`Emitted.onWindowClose()`* and *`Emitted.onEachUpdate()`* > Cons: > i. Doesn't go along with `trigger` (raised by Bruno) > > 2. *`Triggered.onWindowClose()`* and *`Triggered.onEachUpdate()`* > > 3. *`EmitTrigger.onWindowClose()`* and *`EmitTrigger.onEachUpdate()`* > > 4. *`(Emit|Trigger)(Config|Policy).onWindowClose()`* and > *`(Emit|Trigger)(Config|Policy).onEachUpdate()`* > This is a combination of different names like: `EmitConfig`, > `EmitPolicy`, `TriggerConfig` and `TriggerPolicy`... > > > If we are settled with option 1), we can add new options to these names and > comment on their Pros and Cons. > > Hao > > > > > > On Tue, Mar 22, 2022 at 10:48 AM Guozhang Wang wrote: > > > I see what you mean now, and I think it's a fair point that composing > > `trigger` and `emitted` seems awkward. > > > > Re: data process operator v.s. control operator, I shared your concern as > > well, and here's my train of thoughts: Having only data process operators > > was my primary motivation for how we add the suppress operator --- it > > indeed "suppresses" data. But as a hind-sight it's disadvantage is that, > > for example in Suppressed.onWindowClose() should be only related to an > > earlier windowedBy operator which is possibly very far from it in the > > resulting DSL code. It's not only a bit awkward for users to write such > > code, but also in such cases the DSL builder needs to maintain and > > propagate this information to the suppress operator further down. So we > are > > now thinking about "putting the control object as close as to where the > > related processor really happens". And in that world my original > > preference was somewhere in option 2), i.e. just put the control as a > param > > of the related "windowedBy" operator, but the trade-off is we keep adding > > overloaded functions to these operators. So after some back and forth > > thoughts I'm learning towards relaxing our principles to only have > > processing operators but no flow-control operators. That being said, if > you > > have any ideas that we can have both world's benefits I'm all ears. > > > > Re: using a direct function like "windowedStream.onWindowClose()" v.s. > > "windowedStream.someFunc(Controlled.onWindowClose)", again my motivation > > for the latter is for extensibility without adding more functions in the > > future. If people feel this is not worthy we can do the first option as > > well. If we just feel the `trigger` and `emitted` does not feel > composible > > together, maybe we can consider something like > > `windowedStream.trigger(Triggered.onWindowClose())"? > > > > Re: windowedBy v.s. windowBy, yeah I do not really have a good reason why > > we should use past term as well :P But if it's not bothering people much > > I'd say we just keep it than deprecate/rename new APIs. > > > > > > On Tue, Mar 22, 2022 at 9:42 AM Bruno Cadonna > wrote: > > > > > Hi Guozhang, > > > > > > There is no semantic difference. It is a cosmetic difference. > > > Conceptually, I relate `Emitted` with the aggregation and not with > > > `trigger()` in the API flow, because the aggregation emits the result > > > not `trigger()`. Therefore, I proposed to not use `Emitted` as the name > > > of the config object passed to `trigger()`. > > > > > > > > > Best, > > > Bruno > > > > > > On 22.03.22 17:24, Guozhang Wang wrote: > > > > Hi Bruno, > > > > > > > > Could you elaborate a bit more here, what's the semantic difference > > > between > > > > "the aggregat
Re: [DISCUSS] KIP-825: introduce a new API to control when aggregated results are produced
`windowedStream.onWindowClose()` was the original option 1 (`windowedStream.emitFinal()`) but was rejected because we could add more emit types and this will result in adding more functions. I still prefer the "windowedStream.someFunc(Controlled.onWindowClose)" model since it's flexible and clear that it's configuring the emit policy. Let me summarize all the naming options we have and compare: *API function name:* *1. `windowedStream.trigger()`* Pros: i. Simple ii. Similar to Flink's trigger function (is this a con actually?) Cons: i. `trigger()` can be confused with Flink trigger (raised by John) ii. `trigger()` feels like an operation instead of a configure function (raised by Bruno)? *2. `windowedStream.emitTrigger()`* Pros: i. Avoid confusion from Flink's trigger API ii. `emitTrigger` feels like configuring the trigger because "trigger" here is a noun instead of verbose in `trigger()` Cons: i: Verbose? ii: Not consistent with `Suppressed.untilWindowClose`? *Config class/object name:* 1. *`Emitted.onWindowClose()`* and *`Emitted.onEachUpdate()`* Cons: i. Doesn't go along with `trigger` (raised by Bruno) 2. *`Triggered.onWindowClose()`* and *`Triggered.onEachUpdate()`* 3. *`EmitTrigger.onWindowClose()`* and *`EmitTrigger.onEachUpdate()`* 4. *`(Emit|Trigger)(Config|Policy).onWindowClose()`* and *`(Emit|Trigger)(Config|Policy).onEachUpdate()`* This is a combination of different names like: `EmitConfig`, `EmitPolicy`, `TriggerConfig` and `TriggerPolicy`... If we are settled with option 1), we can add new options to these names and comment on their Pros and Cons. Hao On Tue, Mar 22, 2022 at 10:48 AM Guozhang Wang wrote: > I see what you mean now, and I think it's a fair point that composing > `trigger` and `emitted` seems awkward. > > Re: data process operator v.s. control operator, I shared your concern as > well, and here's my train of thoughts: Having only data process operators > was my primary motivation for how we add the suppress operator --- it > indeed "suppresses" data. But as a hind-sight it's disadvantage is that, > for example in Suppressed.onWindowClose() should be only related to an > earlier windowedBy operator which is possibly very far from it in the > resulting DSL code. It's not only a bit awkward for users to write such > code, but also in such cases the DSL builder needs to maintain and > propagate this information to the suppress operator further down. So we are > now thinking about "putting the control object as close as to where the > related processor really happens". And in that world my original > preference was somewhere in option 2), i.e. just put the control as a param > of the related "windowedBy" operator, but the trade-off is we keep adding > overloaded functions to these operators. So after some back and forth > thoughts I'm learning towards relaxing our principles to only have > processing operators but no flow-control operators. That being said, if you > have any ideas that we can have both world's benefits I'm all ears. > > Re: using a direct function like "windowedStream.onWindowClose()" v.s. > "windowedStream.someFunc(Controlled.onWindowClose)", again my motivation > for the latter is for extensibility without adding more functions in the > future. If people feel this is not worthy we can do the first option as > well. If we just feel the `trigger` and `emitted` does not feel composible > together, maybe we can consider something like > `windowedStream.trigger(Triggered.onWindowClose())"? > > Re: windowedBy v.s. windowBy, yeah I do not really have a good reason why > we should use past term as well :P But if it's not bothering people much > I'd say we just keep it than deprecate/rename new APIs. > > > On Tue, Mar 22, 2022 at 9:42 AM Bruno Cadonna wrote: > > > Hi Guozhang, > > > > There is no semantic difference. It is a cosmetic difference. > > Conceptually, I relate `Emitted` with the aggregation and not with > > `trigger()` in the API flow, because the aggregation emits the result > > not `trigger()`. Therefore, I proposed to not use `Emitted` as the name > > of the config object passed to `trigger()`. > > > > > > Best, > > Bruno > > > > On 22.03.22 17:24, Guozhang Wang wrote: > > > Hi Bruno, > > > > > > Could you elaborate a bit more here, what's the semantic difference > > between > > > "the aggregation is triggered on window close and all aggregation > results > > > are emitted." for trigger(TriggerParameters.onWindowClose()), and "the > > > aggregation is configured to only emit final results." for > > > trigger(Emitted.onWindowClose())? > > > > > > On Tue, Mar 22, 2022 at 4:19 AM Bruno Cadonna > > wrote: > > > > > >> Hi Hao, > > >> > > >> Thank you for the KIP! > > >> > > >> Regarding option 1, I would not use `Emitted.onWindowClose()` since > that > > >> does not seem compatible with the proposed flow. Conceptually, now the > > >> flow states that the aggre
Re: [DISCUSS] KIP-825: introduce a new API to control when aggregated results are produced
I see what you mean now, and I think it's a fair point that composing `trigger` and `emitted` seems awkward. Re: data process operator v.s. control operator, I shared your concern as well, and here's my train of thoughts: Having only data process operators was my primary motivation for how we add the suppress operator --- it indeed "suppresses" data. But as a hind-sight it's disadvantage is that, for example in Suppressed.onWindowClose() should be only related to an earlier windowedBy operator which is possibly very far from it in the resulting DSL code. It's not only a bit awkward for users to write such code, but also in such cases the DSL builder needs to maintain and propagate this information to the suppress operator further down. So we are now thinking about "putting the control object as close as to where the related processor really happens". And in that world my original preference was somewhere in option 2), i.e. just put the control as a param of the related "windowedBy" operator, but the trade-off is we keep adding overloaded functions to these operators. So after some back and forth thoughts I'm learning towards relaxing our principles to only have processing operators but no flow-control operators. That being said, if you have any ideas that we can have both world's benefits I'm all ears. Re: using a direct function like "windowedStream.onWindowClose()" v.s. "windowedStream.someFunc(Controlled.onWindowClose)", again my motivation for the latter is for extensibility without adding more functions in the future. If people feel this is not worthy we can do the first option as well. If we just feel the `trigger` and `emitted` does not feel composible together, maybe we can consider something like `windowedStream.trigger(Triggered.onWindowClose())"? Re: windowedBy v.s. windowBy, yeah I do not really have a good reason why we should use past term as well :P But if it's not bothering people much I'd say we just keep it than deprecate/rename new APIs. On Tue, Mar 22, 2022 at 9:42 AM Bruno Cadonna wrote: > Hi Guozhang, > > There is no semantic difference. It is a cosmetic difference. > Conceptually, I relate `Emitted` with the aggregation and not with > `trigger()` in the API flow, because the aggregation emits the result > not `trigger()`. Therefore, I proposed to not use `Emitted` as the name > of the config object passed to `trigger()`. > > > Best, > Bruno > > On 22.03.22 17:24, Guozhang Wang wrote: > > Hi Bruno, > > > > Could you elaborate a bit more here, what's the semantic difference > between > > "the aggregation is triggered on window close and all aggregation results > > are emitted." for trigger(TriggerParameters.onWindowClose()), and "the > > aggregation is configured to only emit final results." for > > trigger(Emitted.onWindowClose())? > > > > On Tue, Mar 22, 2022 at 4:19 AM Bruno Cadonna > wrote: > > > >> Hi Hao, > >> > >> Thank you for the KIP! > >> > >> Regarding option 1, I would not use `Emitted.onWindowClose()` since that > >> does not seem compatible with the proposed flow. Conceptually, now the > >> flow states that the aggregation is triggered on window close and all > >> aggregation results are emitted. `Emitted` suggests that the aggregation > >> is configured to only emit final results. > >> > >> Thus, I propose the following: > >> > >> stream > >> .groupBy(..) > >> .windowedBy(..) > >> .trigger(TriggerParameters.onWindowClose()) > >> .aggregate(..) //result in a KTable> > >> .mapValues(..) > >> > >> An alternative to `trigger()` could be `schedule()`, but I do not really > >> like it. > >> > >> One thing I noticed with option 1 is that all other methods in the > >> example above are operations on data. `groupBy()` groups, `windowedBy()` > >> partitions, `aggregate()` computes the aggregate, `mapValues()` maps > >> values, even `suppress()` suppresses intermediate results. But what does > >> `trigger()` do? `trigger()` seems a config lost among operations. > >> > >> However, if we do not want to restrict ourselves to only use methods > >> when we want to specify operations on data, I have the following > proposal: > >> > >> stream > >> .groupBy(..) > >> .windowedBy(..) > >> .onWindowClose() > >> .aggregate(..) //result in a KTable> > >> .mapValues(..) > >> > >> Best, > >> Bruno > >> > >> P.S.: Why is it `windowedBy()` and not `windowBy()`? All other > >> operations also use present tense. > >> > >> On 22.03.22 06:36, Hao Li wrote: > >>> Hi John, > >>> > >>> Yes. For naming, `trigger` is similar to Flink's trigger, but it has a > >>> different meaning in our case. `emit` sounds like an action to emit? > How > >>> about `emitTrigger`? I'm open to suggestions for the naming. > >>> > >>> For deprecating `Suppressed.untilWindowClose`, I agree with Guozhang we > >> can > >>> deprecate `Suppressed` config as a whole later. Or we can deprecate > >>> `Suppressed.untilWindowClose` in later KIP after implementation of em
Re: [DISCUSS] KIP-825: introduce a new API to control when aggregated results are produced
Hi Bruno, Guozhang, I think `Emitted.onWindowClose` is inspired by `Suppressed.untilWindowClose`. Given `Suppressed.untilWindowClose` is used inside `suppress()`, `Suppressed` sounds good. However, `trigger()` is a builder style setting instead of an action, `Emitted` may be a bit off. How about using `emitTrigger()` as the function name? Benefits are: 1. not to be confused with Flink's `trigger` as John mentioned. 2. make it sound like a setting instead of an action. `trigger()` sounds like we are triggering some action. For the config type name. How about `EmitConfig` or `EmitTriggerConfig` to make it clear it's a config for emit trigger. Hao On Tue, Mar 22, 2022 at 9:43 AM Bruno Cadonna wrote: > Hi Guozhang, > > There is no semantic difference. It is a cosmetic difference. > Conceptually, I relate `Emitted` with the aggregation and not with > `trigger()` in the API flow, because the aggregation emits the result > not `trigger()`. Therefore, I proposed to not use `Emitted` as the name > of the config object passed to `trigger()`. > > > Best, > Bruno > > On 22.03.22 17:24, Guozhang Wang wrote: > > Hi Bruno, > > > > Could you elaborate a bit more here, what's the semantic difference > between > > "the aggregation is triggered on window close and all aggregation results > > are emitted." for trigger(TriggerParameters.onWindowClose()), and "the > > aggregation is configured to only emit final results." for > > trigger(Emitted.onWindowClose())? > > > > On Tue, Mar 22, 2022 at 4:19 AM Bruno Cadonna > wrote: > > > >> Hi Hao, > >> > >> Thank you for the KIP! > >> > >> Regarding option 1, I would not use `Emitted.onWindowClose()` since that > >> does not seem compatible with the proposed flow. Conceptually, now the > >> flow states that the aggregation is triggered on window close and all > >> aggregation results are emitted. `Emitted` suggests that the aggregation > >> is configured to only emit final results. > >> > >> Thus, I propose the following: > >> > >> stream > >> .groupBy(..) > >> .windowedBy(..) > >> .trigger(TriggerParameters.onWindowClose()) > >> .aggregate(..) //result in a KTable> > >> .mapValues(..) > >> > >> An alternative to `trigger()` could be `schedule()`, but I do not really > >> like it. > >> > >> One thing I noticed with option 1 is that all other methods in the > >> example above are operations on data. `groupBy()` groups, `windowedBy()` > >> partitions, `aggregate()` computes the aggregate, `mapValues()` maps > >> values, even `suppress()` suppresses intermediate results. But what does > >> `trigger()` do? `trigger()` seems a config lost among operations. > >> > >> However, if we do not want to restrict ourselves to only use methods > >> when we want to specify operations on data, I have the following > proposal: > >> > >> stream > >> .groupBy(..) > >> .windowedBy(..) > >> .onWindowClose() > >> .aggregate(..) //result in a KTable> > >> .mapValues(..) > >> > >> Best, > >> Bruno > >> > >> P.S.: Why is it `windowedBy()` and not `windowBy()`? All other > >> operations also use present tense. > >> > >> On 22.03.22 06:36, Hao Li wrote: > >>> Hi John, > >>> > >>> Yes. For naming, `trigger` is similar to Flink's trigger, but it has a > >>> different meaning in our case. `emit` sounds like an action to emit? > How > >>> about `emitTrigger`? I'm open to suggestions for the naming. > >>> > >>> For deprecating `Suppressed.untilWindowClose`, I agree with Guozhang we > >> can > >>> deprecate `Suppressed` config as a whole later. Or we can deprecate > >>> `Suppressed.untilWindowClose` in later KIP after implementation of emit > >>> final is done. > >>> > >>> BTW, isn't > >>> > >>> stream > >>> .groupBy(..) > >>> .windowBy(..) > >>> .aggregate(..) //result in a KTable> > >>> .mapValues(..) > >>> .suppress(Suppressed.untilWindowClose) // since we can trace back > to > >>> parent node, to find a window definition > >>> > >>> same as > >>> > >>> stream > >>> .groupBy(..) > >>> .windowBy(..) > >>> .trigger(Emitted.onWindowClose) > >>> .aggregate(..) //result in a KTable> > >>> .mapValues(..) > >>> ? > >>> > >>> Hao > >>> > >>> > >>> On Mon, Mar 21, 2022 at 7:28 PM Guozhang Wang > >> wrote: > >>> > I think the following case is only doable via `suppress`: > > stream > .groupBy(..) > .windowBy(..) > .aggregate(..) //result in a KTable> > .mapValues(..) > .suppress(Suppressed.untilWindowClose) // since we can trace back > to > parent node, to find a window definition > > > Guozhang > > > On Mon, Mar 21, 2022 at 6:36 PM John Roesler > >> wrote: > > > Thanks, Guozhang! > > > > To clarify, I was asking specifically about deprecating just the > method > > ‘untilWindowClose’. I might not be thinking clearly about it, though. > What > > does untilWindowClose do that this KIP doesn’t co
Re: [DISCUSS] KIP-825: introduce a new API to control when aggregated results are produced
Hi Guozhang, There is no semantic difference. It is a cosmetic difference. Conceptually, I relate `Emitted` with the aggregation and not with `trigger()` in the API flow, because the aggregation emits the result not `trigger()`. Therefore, I proposed to not use `Emitted` as the name of the config object passed to `trigger()`. Best, Bruno On 22.03.22 17:24, Guozhang Wang wrote: Hi Bruno, Could you elaborate a bit more here, what's the semantic difference between "the aggregation is triggered on window close and all aggregation results are emitted." for trigger(TriggerParameters.onWindowClose()), and "the aggregation is configured to only emit final results." for trigger(Emitted.onWindowClose())? On Tue, Mar 22, 2022 at 4:19 AM Bruno Cadonna wrote: Hi Hao, Thank you for the KIP! Regarding option 1, I would not use `Emitted.onWindowClose()` since that does not seem compatible with the proposed flow. Conceptually, now the flow states that the aggregation is triggered on window close and all aggregation results are emitted. `Emitted` suggests that the aggregation is configured to only emit final results. Thus, I propose the following: stream .groupBy(..) .windowedBy(..) .trigger(TriggerParameters.onWindowClose()) .aggregate(..) //result in a KTable> .mapValues(..) An alternative to `trigger()` could be `schedule()`, but I do not really like it. One thing I noticed with option 1 is that all other methods in the example above are operations on data. `groupBy()` groups, `windowedBy()` partitions, `aggregate()` computes the aggregate, `mapValues()` maps values, even `suppress()` suppresses intermediate results. But what does `trigger()` do? `trigger()` seems a config lost among operations. However, if we do not want to restrict ourselves to only use methods when we want to specify operations on data, I have the following proposal: stream .groupBy(..) .windowedBy(..) .onWindowClose() .aggregate(..) //result in a KTable> .mapValues(..) Best, Bruno P.S.: Why is it `windowedBy()` and not `windowBy()`? All other operations also use present tense. On 22.03.22 06:36, Hao Li wrote: Hi John, Yes. For naming, `trigger` is similar to Flink's trigger, but it has a different meaning in our case. `emit` sounds like an action to emit? How about `emitTrigger`? I'm open to suggestions for the naming. For deprecating `Suppressed.untilWindowClose`, I agree with Guozhang we can deprecate `Suppressed` config as a whole later. Or we can deprecate `Suppressed.untilWindowClose` in later KIP after implementation of emit final is done. BTW, isn't stream .groupBy(..) .windowBy(..) .aggregate(..) //result in a KTable> .mapValues(..) .suppress(Suppressed.untilWindowClose) // since we can trace back to parent node, to find a window definition same as stream .groupBy(..) .windowBy(..) .trigger(Emitted.onWindowClose) .aggregate(..) //result in a KTable> .mapValues(..) ? Hao On Mon, Mar 21, 2022 at 7:28 PM Guozhang Wang wrote: I think the following case is only doable via `suppress`: stream .groupBy(..) .windowBy(..) .aggregate(..) //result in a KTable> .mapValues(..) .suppress(Suppressed.untilWindowClose) // since we can trace back to parent node, to find a window definition Guozhang On Mon, Mar 21, 2022 at 6:36 PM John Roesler wrote: Thanks, Guozhang! To clarify, I was asking specifically about deprecating just the method ‘untilWindowClose’. I might not be thinking clearly about it, though. What does untilWindowClose do that this KIP doesn’t cover? Thanks, John On Mon, Mar 21, 2022, at 20:31, Guozhang Wang wrote: Just my 2c: Suppressed is in `suppress` whose application scope is much larger and hence more flexible. I.e. it can be used anywhere for a `KTable` (but internally we would check whether certain emit policies like `untilWindowClose` is valid or not), whereas `trigger` as for now is only applicable in XXWindowedKStream. So I think it would not be completely replacing Suppressed.untilWindowClose. In the future, personally I'd still want to keep one control object still for all emit policies, and maybe if we have extended Emitted for other emitting policies covered by Suppressed today, we can discuss if we could have `KTable.suppress(Emitted..)` replacing `KTable.suppress(Suppressed..)` as a whole, but for this KIP I think it's too early. Guozhang On Mon, Mar 21, 2022 at 6:18 PM John Roesler wrote: Hi all, Thanks for the Kip, Hao! For what it’s worth, I’m also in favor of your latest framing of the API, I think the name is fine. I assume it’s inspired by Flink? It’s not identical to the concept of a trigger in Flink, which specifies when to evaluate the window, which might be confusing to some people who have deep experience with Flink. Then again, it seems close enough that it should be clear to casual Flink users. For people wi
Re: [DISCUSS] KIP-825: introduce a new API to control when aggregated results are produced
Hi Bruno, Could you elaborate a bit more here, what's the semantic difference between "the aggregation is triggered on window close and all aggregation results are emitted." for trigger(TriggerParameters.onWindowClose()), and "the aggregation is configured to only emit final results." for trigger(Emitted.onWindowClose())? On Tue, Mar 22, 2022 at 4:19 AM Bruno Cadonna wrote: > Hi Hao, > > Thank you for the KIP! > > Regarding option 1, I would not use `Emitted.onWindowClose()` since that > does not seem compatible with the proposed flow. Conceptually, now the > flow states that the aggregation is triggered on window close and all > aggregation results are emitted. `Emitted` suggests that the aggregation > is configured to only emit final results. > > Thus, I propose the following: > > stream > .groupBy(..) > .windowedBy(..) > .trigger(TriggerParameters.onWindowClose()) > .aggregate(..) //result in a KTable> > .mapValues(..) > > An alternative to `trigger()` could be `schedule()`, but I do not really > like it. > > One thing I noticed with option 1 is that all other methods in the > example above are operations on data. `groupBy()` groups, `windowedBy()` > partitions, `aggregate()` computes the aggregate, `mapValues()` maps > values, even `suppress()` suppresses intermediate results. But what does > `trigger()` do? `trigger()` seems a config lost among operations. > > However, if we do not want to restrict ourselves to only use methods > when we want to specify operations on data, I have the following proposal: > > stream > .groupBy(..) > .windowedBy(..) > .onWindowClose() > .aggregate(..) //result in a KTable> > .mapValues(..) > > Best, > Bruno > > P.S.: Why is it `windowedBy()` and not `windowBy()`? All other > operations also use present tense. > > On 22.03.22 06:36, Hao Li wrote: > > Hi John, > > > > Yes. For naming, `trigger` is similar to Flink's trigger, but it has a > > different meaning in our case. `emit` sounds like an action to emit? How > > about `emitTrigger`? I'm open to suggestions for the naming. > > > > For deprecating `Suppressed.untilWindowClose`, I agree with Guozhang we > can > > deprecate `Suppressed` config as a whole later. Or we can deprecate > > `Suppressed.untilWindowClose` in later KIP after implementation of emit > > final is done. > > > > BTW, isn't > > > > stream > >.groupBy(..) > >.windowBy(..) > >.aggregate(..) //result in a KTable> > >.mapValues(..) > >.suppress(Suppressed.untilWindowClose) // since we can trace back to > > parent node, to find a window definition > > > > same as > > > > stream > >.groupBy(..) > >.windowBy(..) > >.trigger(Emitted.onWindowClose) > >.aggregate(..) //result in a KTable> > >.mapValues(..) > > ? > > > > Hao > > > > > > On Mon, Mar 21, 2022 at 7:28 PM Guozhang Wang > wrote: > > > >> I think the following case is only doable via `suppress`: > >> > >> stream > >>.groupBy(..) > >>.windowBy(..) > >>.aggregate(..) //result in a KTable> > >>.mapValues(..) > >>.suppress(Suppressed.untilWindowClose) // since we can trace back to > >> parent node, to find a window definition > >> > >> > >> Guozhang > >> > >> > >> On Mon, Mar 21, 2022 at 6:36 PM John Roesler > wrote: > >> > >>> Thanks, Guozhang! > >>> > >>> To clarify, I was asking specifically about deprecating just the method > >>> ‘untilWindowClose’. I might not be thinking clearly about it, though. > >> What > >>> does untilWindowClose do that this KIP doesn’t cover? > >>> > >>> Thanks, > >>> John > >>> > >>> On Mon, Mar 21, 2022, at 20:31, Guozhang Wang wrote: > Just my 2c: Suppressed is in `suppress` whose application scope is > much > larger and hence more flexible. I.e. it can be used anywhere for a > >>> `KTable` > (but internally we would check whether certain emit policies like > `untilWindowClose` is valid or not), whereas `trigger` as for now is > >> only > applicable in XXWindowedKStream. So I think it would not be completely > replacing Suppressed.untilWindowClose. > > In the future, personally I'd still want to keep one control object > >> still > for all emit policies, and maybe if we have extended Emitted for other > emitting policies covered by Suppressed today, we can discuss if we > >> could > have `KTable.suppress(Emitted..)` replacing > >>> `KTable.suppress(Suppressed..)` > as a whole, but for this KIP I think it's too early. > > > Guozhang > > > On Mon, Mar 21, 2022 at 6:18 PM John Roesler > >>> wrote: > > > Hi all, > > > > Thanks for the Kip, Hao! > > > > For what it’s worth, I’m also in favor of your latest framing of the > >>> API, > > > > I think the name is fine. I assume it’s inspired by Flink? It’s not > > identical to the concept of a trigger in Flink, which specifies when > >> to > > evaluate the window, which might be confusing to
Re: [DISCUSS] KIP-825: introduce a new API to control when aggregated results are produced
Hi Hao, Thank you for the KIP! Regarding option 1, I would not use `Emitted.onWindowClose()` since that does not seem compatible with the proposed flow. Conceptually, now the flow states that the aggregation is triggered on window close and all aggregation results are emitted. `Emitted` suggests that the aggregation is configured to only emit final results. Thus, I propose the following: stream .groupBy(..) .windowedBy(..) .trigger(TriggerParameters.onWindowClose()) .aggregate(..) //result in a KTable> .mapValues(..) An alternative to `trigger()` could be `schedule()`, but I do not really like it. One thing I noticed with option 1 is that all other methods in the example above are operations on data. `groupBy()` groups, `windowedBy()` partitions, `aggregate()` computes the aggregate, `mapValues()` maps values, even `suppress()` suppresses intermediate results. But what does `trigger()` do? `trigger()` seems a config lost among operations. However, if we do not want to restrict ourselves to only use methods when we want to specify operations on data, I have the following proposal: stream .groupBy(..) .windowedBy(..) .onWindowClose() .aggregate(..) //result in a KTable> .mapValues(..) Best, Bruno P.S.: Why is it `windowedBy()` and not `windowBy()`? All other operations also use present tense. On 22.03.22 06:36, Hao Li wrote: Hi John, Yes. For naming, `trigger` is similar to Flink's trigger, but it has a different meaning in our case. `emit` sounds like an action to emit? How about `emitTrigger`? I'm open to suggestions for the naming. For deprecating `Suppressed.untilWindowClose`, I agree with Guozhang we can deprecate `Suppressed` config as a whole later. Or we can deprecate `Suppressed.untilWindowClose` in later KIP after implementation of emit final is done. BTW, isn't stream .groupBy(..) .windowBy(..) .aggregate(..) //result in a KTable> .mapValues(..) .suppress(Suppressed.untilWindowClose) // since we can trace back to parent node, to find a window definition same as stream .groupBy(..) .windowBy(..) .trigger(Emitted.onWindowClose) .aggregate(..) //result in a KTable> .mapValues(..) ? Hao On Mon, Mar 21, 2022 at 7:28 PM Guozhang Wang wrote: I think the following case is only doable via `suppress`: stream .groupBy(..) .windowBy(..) .aggregate(..) //result in a KTable> .mapValues(..) .suppress(Suppressed.untilWindowClose) // since we can trace back to parent node, to find a window definition Guozhang On Mon, Mar 21, 2022 at 6:36 PM John Roesler wrote: Thanks, Guozhang! To clarify, I was asking specifically about deprecating just the method ‘untilWindowClose’. I might not be thinking clearly about it, though. What does untilWindowClose do that this KIP doesn’t cover? Thanks, John On Mon, Mar 21, 2022, at 20:31, Guozhang Wang wrote: Just my 2c: Suppressed is in `suppress` whose application scope is much larger and hence more flexible. I.e. it can be used anywhere for a `KTable` (but internally we would check whether certain emit policies like `untilWindowClose` is valid or not), whereas `trigger` as for now is only applicable in XXWindowedKStream. So I think it would not be completely replacing Suppressed.untilWindowClose. In the future, personally I'd still want to keep one control object still for all emit policies, and maybe if we have extended Emitted for other emitting policies covered by Suppressed today, we can discuss if we could have `KTable.suppress(Emitted..)` replacing `KTable.suppress(Suppressed..)` as a whole, but for this KIP I think it's too early. Guozhang On Mon, Mar 21, 2022 at 6:18 PM John Roesler wrote: Hi all, Thanks for the Kip, Hao! For what it’s worth, I’m also in favor of your latest framing of the API, I think the name is fine. I assume it’s inspired by Flink? It’s not identical to the concept of a trigger in Flink, which specifies when to evaluate the window, which might be confusing to some people who have deep experience with Flink. Then again, it seems close enough that it should be clear to casual Flink users. For people with no other stream processing experience, it might seem a bit esoteric compared to something self-documenting like ‘emit()’, but the docs should make it clear. One small question: it seems like this proposal is identical to Suppressed.untilWindowClose, and the KIP states that this API is superior. In that case, should we deprecate Suppressed.untilWindowClose? Thanks, John On Mon, Mar 21, 2022, at 19:30, Guozhang Wang wrote: Hi Hao, For 2), I think it's a good idea in general to use a separate function on the Time/SessionWindowedKStream itself, to achieve the same effect that, for now, the emitting control is only for windowed aggregations as in this KIP, than overloading existing functions. We can discuss further about the actual function names, whether others like the
Re: [DISCUSS] KIP-825: introduce a new API to control when aggregated results are produced
Hi John, Yes. For naming, `trigger` is similar to Flink's trigger, but it has a different meaning in our case. `emit` sounds like an action to emit? How about `emitTrigger`? I'm open to suggestions for the naming. For deprecating `Suppressed.untilWindowClose`, I agree with Guozhang we can deprecate `Suppressed` config as a whole later. Or we can deprecate `Suppressed.untilWindowClose` in later KIP after implementation of emit final is done. BTW, isn't stream .groupBy(..) .windowBy(..) .aggregate(..) //result in a KTable> .mapValues(..) .suppress(Suppressed.untilWindowClose) // since we can trace back to parent node, to find a window definition same as stream .groupBy(..) .windowBy(..) .trigger(Emitted.onWindowClose) .aggregate(..) //result in a KTable> .mapValues(..) ? Hao On Mon, Mar 21, 2022 at 7:28 PM Guozhang Wang wrote: > I think the following case is only doable via `suppress`: > > stream > .groupBy(..) > .windowBy(..) > .aggregate(..) //result in a KTable> > .mapValues(..) > .suppress(Suppressed.untilWindowClose) // since we can trace back to > parent node, to find a window definition > > > Guozhang > > > On Mon, Mar 21, 2022 at 6:36 PM John Roesler wrote: > > > Thanks, Guozhang! > > > > To clarify, I was asking specifically about deprecating just the method > > ‘untilWindowClose’. I might not be thinking clearly about it, though. > What > > does untilWindowClose do that this KIP doesn’t cover? > > > > Thanks, > > John > > > > On Mon, Mar 21, 2022, at 20:31, Guozhang Wang wrote: > > > Just my 2c: Suppressed is in `suppress` whose application scope is much > > > larger and hence more flexible. I.e. it can be used anywhere for a > > `KTable` > > > (but internally we would check whether certain emit policies like > > > `untilWindowClose` is valid or not), whereas `trigger` as for now is > only > > > applicable in XXWindowedKStream. So I think it would not be completely > > > replacing Suppressed.untilWindowClose. > > > > > > In the future, personally I'd still want to keep one control object > still > > > for all emit policies, and maybe if we have extended Emitted for other > > > emitting policies covered by Suppressed today, we can discuss if we > could > > > have `KTable.suppress(Emitted..)` replacing > > `KTable.suppress(Suppressed..)` > > > as a whole, but for this KIP I think it's too early. > > > > > > > > > Guozhang > > > > > > > > > On Mon, Mar 21, 2022 at 6:18 PM John Roesler > > wrote: > > > > > >> Hi all, > > >> > > >> Thanks for the Kip, Hao! > > >> > > >> For what it’s worth, I’m also in favor of your latest framing of the > > API, > > >> > > >> I think the name is fine. I assume it’s inspired by Flink? It’s not > > >> identical to the concept of a trigger in Flink, which specifies when > to > > >> evaluate the window, which might be confusing to some people who have > > deep > > >> experience with Flink. Then again, it seems close enough that it > should > > be > > >> clear to casual Flink users. For people with no other stream > processing > > >> experience, it might seem a bit esoteric compared to something > > >> self-documenting like ‘emit()’, but the docs should make it clear. > > >> > > >> One small question: it seems like this proposal is identical to > > >> Suppressed.untilWindowClose, and the KIP states that this API is > > superior. > > >> In that case, should we deprecate Suppressed.untilWindowClose? > > >> > > >> Thanks, > > >> John > > >> > > >> On Mon, Mar 21, 2022, at 19:30, Guozhang Wang wrote: > > >> > Hi Hao, > > >> > > > >> > For 2), I think it's a good idea in general to use a separate > > function on > > >> > the Time/SessionWindowedKStream itself, to achieve the same effect > > that, > > >> > for now, the emitting control is only for windowed aggregations as > in > > >> this > > >> > KIP, than overloading existing functions. We can discuss further > about > > >> the > > >> > actual function names, whether others like the name `trigger` or > not. > > As > > >> > for myself I feel `trigger` is a good one but I'd like to see if > > others > > >> > have opinions as well. > > >> > > > >> > > > >> > Guozhang > > >> > > > >> > On Mon, Mar 21, 2022 at 5:18 PM Hao Li > > wrote: > > >> > > > >> >> Hi Guozhang, > > >> >> > > >> >> Thanks for the feedback. > > >> >> > > >> >> 1. I agree to have an `Emitted` control class and two static > > >> constructors > > >> >> named `onWindowClose` and `onEachUpdate`. > > >> >> > > >> >> 2. For the API function changes, I'm thinking of adding a new > > function > > >> >> called `trigger` to `TimeWindowedKStream` and > > `SessionWindowedKStream`. > > >> It > > >> >> takes `Emitted` config and returns the same stream. Example: > > >> >> > > >> >> stream > > >> >> .groupBy(...) > > >> >> .windowedBy(...) > > >> >> .trigger(Emitted.onWindowClose). // N > > >> >> .count() > > >> >> > > >> >> The benefits are: > > >> >> 1. It's simple and avoids creating overloading of existing > > functions
Re: [DISCUSS] KIP-825: introduce a new API to control when aggregated results are produced
I think the following case is only doable via `suppress`: stream .groupBy(..) .windowBy(..) .aggregate(..) //result in a KTable> .mapValues(..) .suppress(Suppressed.untilWindowClose) // since we can trace back to parent node, to find a window definition Guozhang On Mon, Mar 21, 2022 at 6:36 PM John Roesler wrote: > Thanks, Guozhang! > > To clarify, I was asking specifically about deprecating just the method > ‘untilWindowClose’. I might not be thinking clearly about it, though. What > does untilWindowClose do that this KIP doesn’t cover? > > Thanks, > John > > On Mon, Mar 21, 2022, at 20:31, Guozhang Wang wrote: > > Just my 2c: Suppressed is in `suppress` whose application scope is much > > larger and hence more flexible. I.e. it can be used anywhere for a > `KTable` > > (but internally we would check whether certain emit policies like > > `untilWindowClose` is valid or not), whereas `trigger` as for now is only > > applicable in XXWindowedKStream. So I think it would not be completely > > replacing Suppressed.untilWindowClose. > > > > In the future, personally I'd still want to keep one control object still > > for all emit policies, and maybe if we have extended Emitted for other > > emitting policies covered by Suppressed today, we can discuss if we could > > have `KTable.suppress(Emitted..)` replacing > `KTable.suppress(Suppressed..)` > > as a whole, but for this KIP I think it's too early. > > > > > > Guozhang > > > > > > On Mon, Mar 21, 2022 at 6:18 PM John Roesler > wrote: > > > >> Hi all, > >> > >> Thanks for the Kip, Hao! > >> > >> For what it’s worth, I’m also in favor of your latest framing of the > API, > >> > >> I think the name is fine. I assume it’s inspired by Flink? It’s not > >> identical to the concept of a trigger in Flink, which specifies when to > >> evaluate the window, which might be confusing to some people who have > deep > >> experience with Flink. Then again, it seems close enough that it should > be > >> clear to casual Flink users. For people with no other stream processing > >> experience, it might seem a bit esoteric compared to something > >> self-documenting like ‘emit()’, but the docs should make it clear. > >> > >> One small question: it seems like this proposal is identical to > >> Suppressed.untilWindowClose, and the KIP states that this API is > superior. > >> In that case, should we deprecate Suppressed.untilWindowClose? > >> > >> Thanks, > >> John > >> > >> On Mon, Mar 21, 2022, at 19:30, Guozhang Wang wrote: > >> > Hi Hao, > >> > > >> > For 2), I think it's a good idea in general to use a separate > function on > >> > the Time/SessionWindowedKStream itself, to achieve the same effect > that, > >> > for now, the emitting control is only for windowed aggregations as in > >> this > >> > KIP, than overloading existing functions. We can discuss further about > >> the > >> > actual function names, whether others like the name `trigger` or not. > As > >> > for myself I feel `trigger` is a good one but I'd like to see if > others > >> > have opinions as well. > >> > > >> > > >> > Guozhang > >> > > >> > On Mon, Mar 21, 2022 at 5:18 PM Hao Li > wrote: > >> > > >> >> Hi Guozhang, > >> >> > >> >> Thanks for the feedback. > >> >> > >> >> 1. I agree to have an `Emitted` control class and two static > >> constructors > >> >> named `onWindowClose` and `onEachUpdate`. > >> >> > >> >> 2. For the API function changes, I'm thinking of adding a new > function > >> >> called `trigger` to `TimeWindowedKStream` and > `SessionWindowedKStream`. > >> It > >> >> takes `Emitted` config and returns the same stream. Example: > >> >> > >> >> stream > >> >> .groupBy(...) > >> >> .windowedBy(...) > >> >> .trigger(Emitted.onWindowClose). // N > >> >> .count() > >> >> > >> >> The benefits are: > >> >> 1. It's simple and avoids creating overloading of existing > functions > >> like > >> >> `windowedBy` or `count`, `reduce` or `aggregate`. In fact, to add it > to > >> >> `aggregate` functions, we need to add it to all existing `count`, > >> >> `aggregate` overloading functions which is a lot. > >> >> 2. It operates directly on windowed kstream and tells how its > output > >> >> should be configured, if later we need to add this other type of > >> streams, > >> >> we can reuse same `trigger` API whereas other type of streams/tables > may > >> >> not have `aggregate`, `windowedby` api to make it consistent. > >> >> > >> >> Hao > >> >> > >> >> > >> >> On Sat, Mar 19, 2022 at 5:40 PM Guozhang Wang > >> wrote: > >> >> > >> >> > Hello Hao, > >> >> > > >> >> > I'm preferring option 2 over the other options mainly because the > >> added > >> >> > config object could potentially be used in other operators as well > >> (not > >> >> > necessarily has to be a windowed operator and hence have to be > >> >> piggy-backed > >> >> > on `windowedBy`, and that's also why I suggested not naming it > >> >> > `WindowConfig` but just `EmitConfig`). > >> >> > > >> >> > As for Matthias' questio
Re: [DISCUSS] KIP-825: introduce a new API to control when aggregated results are produced
Thanks, Guozhang! To clarify, I was asking specifically about deprecating just the method ‘untilWindowClose’. I might not be thinking clearly about it, though. What does untilWindowClose do that this KIP doesn’t cover? Thanks, John On Mon, Mar 21, 2022, at 20:31, Guozhang Wang wrote: > Just my 2c: Suppressed is in `suppress` whose application scope is much > larger and hence more flexible. I.e. it can be used anywhere for a `KTable` > (but internally we would check whether certain emit policies like > `untilWindowClose` is valid or not), whereas `trigger` as for now is only > applicable in XXWindowedKStream. So I think it would not be completely > replacing Suppressed.untilWindowClose. > > In the future, personally I'd still want to keep one control object still > for all emit policies, and maybe if we have extended Emitted for other > emitting policies covered by Suppressed today, we can discuss if we could > have `KTable.suppress(Emitted..)` replacing `KTable.suppress(Suppressed..)` > as a whole, but for this KIP I think it's too early. > > > Guozhang > > > On Mon, Mar 21, 2022 at 6:18 PM John Roesler wrote: > >> Hi all, >> >> Thanks for the Kip, Hao! >> >> For what it’s worth, I’m also in favor of your latest framing of the API, >> >> I think the name is fine. I assume it’s inspired by Flink? It’s not >> identical to the concept of a trigger in Flink, which specifies when to >> evaluate the window, which might be confusing to some people who have deep >> experience with Flink. Then again, it seems close enough that it should be >> clear to casual Flink users. For people with no other stream processing >> experience, it might seem a bit esoteric compared to something >> self-documenting like ‘emit()’, but the docs should make it clear. >> >> One small question: it seems like this proposal is identical to >> Suppressed.untilWindowClose, and the KIP states that this API is superior. >> In that case, should we deprecate Suppressed.untilWindowClose? >> >> Thanks, >> John >> >> On Mon, Mar 21, 2022, at 19:30, Guozhang Wang wrote: >> > Hi Hao, >> > >> > For 2), I think it's a good idea in general to use a separate function on >> > the Time/SessionWindowedKStream itself, to achieve the same effect that, >> > for now, the emitting control is only for windowed aggregations as in >> this >> > KIP, than overloading existing functions. We can discuss further about >> the >> > actual function names, whether others like the name `trigger` or not. As >> > for myself I feel `trigger` is a good one but I'd like to see if others >> > have opinions as well. >> > >> > >> > Guozhang >> > >> > On Mon, Mar 21, 2022 at 5:18 PM Hao Li wrote: >> > >> >> Hi Guozhang, >> >> >> >> Thanks for the feedback. >> >> >> >> 1. I agree to have an `Emitted` control class and two static >> constructors >> >> named `onWindowClose` and `onEachUpdate`. >> >> >> >> 2. For the API function changes, I'm thinking of adding a new function >> >> called `trigger` to `TimeWindowedKStream` and `SessionWindowedKStream`. >> It >> >> takes `Emitted` config and returns the same stream. Example: >> >> >> >> stream >> >> .groupBy(...) >> >> .windowedBy(...) >> >> .trigger(Emitted.onWindowClose). // N >> >> .count() >> >> >> >> The benefits are: >> >> 1. It's simple and avoids creating overloading of existing functions >> like >> >> `windowedBy` or `count`, `reduce` or `aggregate`. In fact, to add it to >> >> `aggregate` functions, we need to add it to all existing `count`, >> >> `aggregate` overloading functions which is a lot. >> >> 2. It operates directly on windowed kstream and tells how its output >> >> should be configured, if later we need to add this other type of >> streams, >> >> we can reuse same `trigger` API whereas other type of streams/tables may >> >> not have `aggregate`, `windowedby` api to make it consistent. >> >> >> >> Hao >> >> >> >> >> >> On Sat, Mar 19, 2022 at 5:40 PM Guozhang Wang >> wrote: >> >> >> >> > Hello Hao, >> >> > >> >> > I'm preferring option 2 over the other options mainly because the >> added >> >> > config object could potentially be used in other operators as well >> (not >> >> > necessarily has to be a windowed operator and hence have to be >> >> piggy-backed >> >> > on `windowedBy`, and that's also why I suggested not naming it >> >> > `WindowConfig` but just `EmitConfig`). >> >> > >> >> > As for Matthias' question, I think the difference between the windowed >> >> > aggregate operator and the stream-stream join operator is that, for >> the >> >> > latter we think emit-final should be the only right emitting policy >> and >> >> > hence we should not let users to configure it. If users configure it >> to >> >> > e.g. emit eager they may get the old spurious emitting behavior which >> is >> >> > violating the semantics. >> >> > >> >> > For option 2) itself, I have a few more thoughts: >> >> > >> >> > 1. Thinking about Matthias' suggestions, I'm also leaning a bit >> >> > towards adding the new param i
Re: [DISCUSS] KIP-825: introduce a new API to control when aggregated results are produced
Just my 2c: Suppressed is in `suppress` whose application scope is much larger and hence more flexible. I.e. it can be used anywhere for a `KTable` (but internally we would check whether certain emit policies like `untilWindowClose` is valid or not), whereas `trigger` as for now is only applicable in XXWindowedKStream. So I think it would not be completely replacing Suppressed.untilWindowClose. In the future, personally I'd still want to keep one control object still for all emit policies, and maybe if we have extended Emitted for other emitting policies covered by Suppressed today, we can discuss if we could have `KTable.suppress(Emitted..)` replacing `KTable.suppress(Suppressed..)` as a whole, but for this KIP I think it's too early. Guozhang On Mon, Mar 21, 2022 at 6:18 PM John Roesler wrote: > Hi all, > > Thanks for the Kip, Hao! > > For what it’s worth, I’m also in favor of your latest framing of the API, > > I think the name is fine. I assume it’s inspired by Flink? It’s not > identical to the concept of a trigger in Flink, which specifies when to > evaluate the window, which might be confusing to some people who have deep > experience with Flink. Then again, it seems close enough that it should be > clear to casual Flink users. For people with no other stream processing > experience, it might seem a bit esoteric compared to something > self-documenting like ‘emit()’, but the docs should make it clear. > > One small question: it seems like this proposal is identical to > Suppressed.untilWindowClose, and the KIP states that this API is superior. > In that case, should we deprecate Suppressed.untilWindowClose? > > Thanks, > John > > On Mon, Mar 21, 2022, at 19:30, Guozhang Wang wrote: > > Hi Hao, > > > > For 2), I think it's a good idea in general to use a separate function on > > the Time/SessionWindowedKStream itself, to achieve the same effect that, > > for now, the emitting control is only for windowed aggregations as in > this > > KIP, than overloading existing functions. We can discuss further about > the > > actual function names, whether others like the name `trigger` or not. As > > for myself I feel `trigger` is a good one but I'd like to see if others > > have opinions as well. > > > > > > Guozhang > > > > On Mon, Mar 21, 2022 at 5:18 PM Hao Li wrote: > > > >> Hi Guozhang, > >> > >> Thanks for the feedback. > >> > >> 1. I agree to have an `Emitted` control class and two static > constructors > >> named `onWindowClose` and `onEachUpdate`. > >> > >> 2. For the API function changes, I'm thinking of adding a new function > >> called `trigger` to `TimeWindowedKStream` and `SessionWindowedKStream`. > It > >> takes `Emitted` config and returns the same stream. Example: > >> > >> stream > >> .groupBy(...) > >> .windowedBy(...) > >> .trigger(Emitted.onWindowClose). // N > >> .count() > >> > >> The benefits are: > >> 1. It's simple and avoids creating overloading of existing functions > like > >> `windowedBy` or `count`, `reduce` or `aggregate`. In fact, to add it to > >> `aggregate` functions, we need to add it to all existing `count`, > >> `aggregate` overloading functions which is a lot. > >> 2. It operates directly on windowed kstream and tells how its output > >> should be configured, if later we need to add this other type of > streams, > >> we can reuse same `trigger` API whereas other type of streams/tables may > >> not have `aggregate`, `windowedby` api to make it consistent. > >> > >> Hao > >> > >> > >> On Sat, Mar 19, 2022 at 5:40 PM Guozhang Wang > wrote: > >> > >> > Hello Hao, > >> > > >> > I'm preferring option 2 over the other options mainly because the > added > >> > config object could potentially be used in other operators as well > (not > >> > necessarily has to be a windowed operator and hence have to be > >> piggy-backed > >> > on `windowedBy`, and that's also why I suggested not naming it > >> > `WindowConfig` but just `EmitConfig`). > >> > > >> > As for Matthias' question, I think the difference between the windowed > >> > aggregate operator and the stream-stream join operator is that, for > the > >> > latter we think emit-final should be the only right emitting policy > and > >> > hence we should not let users to configure it. If users configure it > to > >> > e.g. emit eager they may get the old spurious emitting behavior which > is > >> > violating the semantics. > >> > > >> > For option 2) itself, I have a few more thoughts: > >> > > >> > 1. Thinking about Matthias' suggestions, I'm also leaning a bit > >> > towards adding the new param in the overloaded `aggregate`, than the > >> > overloaded `windowBy` function. The reason is that the emitting logic > >> could > >> > be either window based or non-window based, in the long run. Though > for > >> > this KIP we could just add it in `XXXWindowedKStream.aggregate()`, we > may > >> > want to extend to other non-windowed operators in the future. > >> > 2. To be consistent with other control class names, I fee
Re: [DISCUSS] KIP-825: introduce a new API to control when aggregated results are produced
Hi all, Thanks for the Kip, Hao! For what it’s worth, I’m also in favor of your latest framing of the API, I think the name is fine. I assume it’s inspired by Flink? It’s not identical to the concept of a trigger in Flink, which specifies when to evaluate the window, which might be confusing to some people who have deep experience with Flink. Then again, it seems close enough that it should be clear to casual Flink users. For people with no other stream processing experience, it might seem a bit esoteric compared to something self-documenting like ‘emit()’, but the docs should make it clear. One small question: it seems like this proposal is identical to Suppressed.untilWindowClose, and the KIP states that this API is superior. In that case, should we deprecate Suppressed.untilWindowClose? Thanks, John On Mon, Mar 21, 2022, at 19:30, Guozhang Wang wrote: > Hi Hao, > > For 2), I think it's a good idea in general to use a separate function on > the Time/SessionWindowedKStream itself, to achieve the same effect that, > for now, the emitting control is only for windowed aggregations as in this > KIP, than overloading existing functions. We can discuss further about the > actual function names, whether others like the name `trigger` or not. As > for myself I feel `trigger` is a good one but I'd like to see if others > have opinions as well. > > > Guozhang > > On Mon, Mar 21, 2022 at 5:18 PM Hao Li wrote: > >> Hi Guozhang, >> >> Thanks for the feedback. >> >> 1. I agree to have an `Emitted` control class and two static constructors >> named `onWindowClose` and `onEachUpdate`. >> >> 2. For the API function changes, I'm thinking of adding a new function >> called `trigger` to `TimeWindowedKStream` and `SessionWindowedKStream`. It >> takes `Emitted` config and returns the same stream. Example: >> >> stream >> .groupBy(...) >> .windowedBy(...) >> .trigger(Emitted.onWindowClose). // N >> .count() >> >> The benefits are: >> 1. It's simple and avoids creating overloading of existing functions like >> `windowedBy` or `count`, `reduce` or `aggregate`. In fact, to add it to >> `aggregate` functions, we need to add it to all existing `count`, >> `aggregate` overloading functions which is a lot. >> 2. It operates directly on windowed kstream and tells how its output >> should be configured, if later we need to add this other type of streams, >> we can reuse same `trigger` API whereas other type of streams/tables may >> not have `aggregate`, `windowedby` api to make it consistent. >> >> Hao >> >> >> On Sat, Mar 19, 2022 at 5:40 PM Guozhang Wang wrote: >> >> > Hello Hao, >> > >> > I'm preferring option 2 over the other options mainly because the added >> > config object could potentially be used in other operators as well (not >> > necessarily has to be a windowed operator and hence have to be >> piggy-backed >> > on `windowedBy`, and that's also why I suggested not naming it >> > `WindowConfig` but just `EmitConfig`). >> > >> > As for Matthias' question, I think the difference between the windowed >> > aggregate operator and the stream-stream join operator is that, for the >> > latter we think emit-final should be the only right emitting policy and >> > hence we should not let users to configure it. If users configure it to >> > e.g. emit eager they may get the old spurious emitting behavior which is >> > violating the semantics. >> > >> > For option 2) itself, I have a few more thoughts: >> > >> > 1. Thinking about Matthias' suggestions, I'm also leaning a bit >> > towards adding the new param in the overloaded `aggregate`, than the >> > overloaded `windowBy` function. The reason is that the emitting logic >> could >> > be either window based or non-window based, in the long run. Though for >> > this KIP we could just add it in `XXXWindowedKStream.aggregate()`, we may >> > want to extend to other non-windowed operators in the future. >> > 2. To be consistent with other control class names, I feel maybe we can >> > name it "Emitted", not "EmitConfig". >> > 3. Following the first comment, I think we can have the static >> constructor >> > names as "onWindowClose" and "onEachUpdate". >> > >> > The resulted code pattern would be like this: >> > >> >stream >> > .groupBy(..) >> > .windowBy(TimeWindow..) >> > .count(Emitted.onWindowClose) >> > >> > WDYT? >> > >> > >> > On Wed, Mar 16, 2022 at 12:07 PM Matthias J. Sax >> wrote: >> > >> > > >> `allowedLateness` may not be a good name. What I have in mind is to >> > use >> > > >> this to control how frequently we try to emit final results. Maybe >> > it's >> > > >> more flexible to be used as config in properties as we don't need to >> > > >> recompile DSL to change it. >> > > >> > > I see; making it a config seems better. Frankly, I am not even sure if >> > > we need a config at all or if we can just hard code it? For the >> > > stream-stream join left/outer join fix, there is only an internal >> config >> > > but no public config ei
Re: [DISCUSS] KIP-825: introduce a new API to control when aggregated results are produced
Hi Hao, For 2), I think it's a good idea in general to use a separate function on the Time/SessionWindowedKStream itself, to achieve the same effect that, for now, the emitting control is only for windowed aggregations as in this KIP, than overloading existing functions. We can discuss further about the actual function names, whether others like the name `trigger` or not. As for myself I feel `trigger` is a good one but I'd like to see if others have opinions as well. Guozhang On Mon, Mar 21, 2022 at 5:18 PM Hao Li wrote: > Hi Guozhang, > > Thanks for the feedback. > > 1. I agree to have an `Emitted` control class and two static constructors > named `onWindowClose` and `onEachUpdate`. > > 2. For the API function changes, I'm thinking of adding a new function > called `trigger` to `TimeWindowedKStream` and `SessionWindowedKStream`. It > takes `Emitted` config and returns the same stream. Example: > > stream > .groupBy(...) > .windowedBy(...) > .trigger(Emitted.onWindowClose). // N > .count() > > The benefits are: > 1. It's simple and avoids creating overloading of existing functions like > `windowedBy` or `count`, `reduce` or `aggregate`. In fact, to add it to > `aggregate` functions, we need to add it to all existing `count`, > `aggregate` overloading functions which is a lot. > 2. It operates directly on windowed kstream and tells how its output > should be configured, if later we need to add this other type of streams, > we can reuse same `trigger` API whereas other type of streams/tables may > not have `aggregate`, `windowedby` api to make it consistent. > > Hao > > > On Sat, Mar 19, 2022 at 5:40 PM Guozhang Wang wrote: > > > Hello Hao, > > > > I'm preferring option 2 over the other options mainly because the added > > config object could potentially be used in other operators as well (not > > necessarily has to be a windowed operator and hence have to be > piggy-backed > > on `windowedBy`, and that's also why I suggested not naming it > > `WindowConfig` but just `EmitConfig`). > > > > As for Matthias' question, I think the difference between the windowed > > aggregate operator and the stream-stream join operator is that, for the > > latter we think emit-final should be the only right emitting policy and > > hence we should not let users to configure it. If users configure it to > > e.g. emit eager they may get the old spurious emitting behavior which is > > violating the semantics. > > > > For option 2) itself, I have a few more thoughts: > > > > 1. Thinking about Matthias' suggestions, I'm also leaning a bit > > towards adding the new param in the overloaded `aggregate`, than the > > overloaded `windowBy` function. The reason is that the emitting logic > could > > be either window based or non-window based, in the long run. Though for > > this KIP we could just add it in `XXXWindowedKStream.aggregate()`, we may > > want to extend to other non-windowed operators in the future. > > 2. To be consistent with other control class names, I feel maybe we can > > name it "Emitted", not "EmitConfig". > > 3. Following the first comment, I think we can have the static > constructor > > names as "onWindowClose" and "onEachUpdate". > > > > The resulted code pattern would be like this: > > > >stream > > .groupBy(..) > > .windowBy(TimeWindow..) > > .count(Emitted.onWindowClose) > > > > WDYT? > > > > > > On Wed, Mar 16, 2022 at 12:07 PM Matthias J. Sax > wrote: > > > > > >> `allowedLateness` may not be a good name. What I have in mind is to > > use > > > >> this to control how frequently we try to emit final results. Maybe > > it's > > > >> more flexible to be used as config in properties as we don't need to > > > >> recompile DSL to change it. > > > > > > I see; making it a config seems better. Frankly, I am not even sure if > > > we need a config at all or if we can just hard code it? For the > > > stream-stream join left/outer join fix, there is only an internal > config > > > but no public config either. > > > > > > Option 1: Your proposal is? > > > > > >stream > > > .groupByKey() > > > .windowBy(TimeWindow.ofSizeNoGrace(...)) > > > .configure(EmitConfig.emitFinal() > > > .count() > > > > > > Does not change my argument that it seems to be misplace from an API > > > flow POV. > > > > > > Option 1 seems to be the least desirable to me. > > > > > > For option 2 and 3, and not sure which one I like better. Might be good > > > if other could chime in, too. I think I slightly prefer option 2 over > > > option 3. > > > > > > > > > -Matthias > > > > > > On 3/15/22 5:33 PM, Hao Li wrote: > > > > Thanks for the feedback Matthias. > > > > > > > > `allowedLateness` may not be a good name. What I have in mind is to > use > > > > this to control how frequently we try to emit final results. Maybe > it's > > > > more flexible to be used as config in properties as we don't need to > > > > recompile DSL to change it. > > > > > > > > For option 1, I intend to use `emitF
Re: [DISCUSS] KIP-825: introduce a new API to control when aggregated results are produced
Hi Guozhang, Thanks for the feedback. 1. I agree to have an `Emitted` control class and two static constructors named `onWindowClose` and `onEachUpdate`. 2. For the API function changes, I'm thinking of adding a new function called `trigger` to `TimeWindowedKStream` and `SessionWindowedKStream`. It takes `Emitted` config and returns the same stream. Example: stream .groupBy(...) .windowedBy(...) .trigger(Emitted.onWindowClose). // N .count() The benefits are: 1. It's simple and avoids creating overloading of existing functions like `windowedBy` or `count`, `reduce` or `aggregate`. In fact, to add it to `aggregate` functions, we need to add it to all existing `count`, `aggregate` overloading functions which is a lot. 2. It operates directly on windowed kstream and tells how its output should be configured, if later we need to add this other type of streams, we can reuse same `trigger` API whereas other type of streams/tables may not have `aggregate`, `windowedby` api to make it consistent. Hao On Sat, Mar 19, 2022 at 5:40 PM Guozhang Wang wrote: > Hello Hao, > > I'm preferring option 2 over the other options mainly because the added > config object could potentially be used in other operators as well (not > necessarily has to be a windowed operator and hence have to be piggy-backed > on `windowedBy`, and that's also why I suggested not naming it > `WindowConfig` but just `EmitConfig`). > > As for Matthias' question, I think the difference between the windowed > aggregate operator and the stream-stream join operator is that, for the > latter we think emit-final should be the only right emitting policy and > hence we should not let users to configure it. If users configure it to > e.g. emit eager they may get the old spurious emitting behavior which is > violating the semantics. > > For option 2) itself, I have a few more thoughts: > > 1. Thinking about Matthias' suggestions, I'm also leaning a bit > towards adding the new param in the overloaded `aggregate`, than the > overloaded `windowBy` function. The reason is that the emitting logic could > be either window based or non-window based, in the long run. Though for > this KIP we could just add it in `XXXWindowedKStream.aggregate()`, we may > want to extend to other non-windowed operators in the future. > 2. To be consistent with other control class names, I feel maybe we can > name it "Emitted", not "EmitConfig". > 3. Following the first comment, I think we can have the static constructor > names as "onWindowClose" and "onEachUpdate". > > The resulted code pattern would be like this: > >stream > .groupBy(..) > .windowBy(TimeWindow..) > .count(Emitted.onWindowClose) > > WDYT? > > > On Wed, Mar 16, 2022 at 12:07 PM Matthias J. Sax wrote: > > > >> `allowedLateness` may not be a good name. What I have in mind is to > use > > >> this to control how frequently we try to emit final results. Maybe > it's > > >> more flexible to be used as config in properties as we don't need to > > >> recompile DSL to change it. > > > > I see; making it a config seems better. Frankly, I am not even sure if > > we need a config at all or if we can just hard code it? For the > > stream-stream join left/outer join fix, there is only an internal config > > but no public config either. > > > > Option 1: Your proposal is? > > > >stream > > .groupByKey() > > .windowBy(TimeWindow.ofSizeNoGrace(...)) > > .configure(EmitConfig.emitFinal() > > .count() > > > > Does not change my argument that it seems to be misplace from an API > > flow POV. > > > > Option 1 seems to be the least desirable to me. > > > > For option 2 and 3, and not sure which one I like better. Might be good > > if other could chime in, too. I think I slightly prefer option 2 over > > option 3. > > > > > > -Matthias > > > > On 3/15/22 5:33 PM, Hao Li wrote: > > > Thanks for the feedback Matthias. > > > > > > `allowedLateness` may not be a good name. What I have in mind is to use > > > this to control how frequently we try to emit final results. Maybe it's > > > more flexible to be used as config in properties as we don't need to > > > recompile DSL to change it. > > > > > > For option 1, I intend to use `emitFinal` to configure how > > > `TimeWindowedKStream` should be outputted to `KTable` after > aggregation. > > > But `emitFinal` is not an action to the `TimeWindowedKStream` > interface. > > > Maybe adding `configure(EmitConfig config)` makes more sense? > > > > > > For option 2, config can be created using `WindowConfig.emitFinal()` or > > > `EmitConfig.emitFinal` > > > > > > For option 3, it will be something like `TimeWindows(..., EmitConfig > > > emitConfig)`. > > > > > > For putting `EmitConfig` in aggregation operator, I think it doesn't > > > control how we do aggregation but how we output to `KTable`. That's > why I > > > feel option 1 makes more sense as it applies to `TimeWindowedKStream`. > > But > > > I'm also OK with option 2. > > > > > > Hao >
Re: [DISCUSS] KIP-825: introduce a new API to control when aggregated results are produced
Hello Hao, I'm preferring option 2 over the other options mainly because the added config object could potentially be used in other operators as well (not necessarily has to be a windowed operator and hence have to be piggy-backed on `windowedBy`, and that's also why I suggested not naming it `WindowConfig` but just `EmitConfig`). As for Matthias' question, I think the difference between the windowed aggregate operator and the stream-stream join operator is that, for the latter we think emit-final should be the only right emitting policy and hence we should not let users to configure it. If users configure it to e.g. emit eager they may get the old spurious emitting behavior which is violating the semantics. For option 2) itself, I have a few more thoughts: 1. Thinking about Matthias' suggestions, I'm also leaning a bit towards adding the new param in the overloaded `aggregate`, than the overloaded `windowBy` function. The reason is that the emitting logic could be either window based or non-window based, in the long run. Though for this KIP we could just add it in `XXXWindowedKStream.aggregate()`, we may want to extend to other non-windowed operators in the future. 2. To be consistent with other control class names, I feel maybe we can name it "Emitted", not "EmitConfig". 3. Following the first comment, I think we can have the static constructor names as "onWindowClose" and "onEachUpdate". The resulted code pattern would be like this: stream .groupBy(..) .windowBy(TimeWindow..) .count(Emitted.onWindowClose) WDYT? On Wed, Mar 16, 2022 at 12:07 PM Matthias J. Sax wrote: > >> `allowedLateness` may not be a good name. What I have in mind is to use > >> this to control how frequently we try to emit final results. Maybe it's > >> more flexible to be used as config in properties as we don't need to > >> recompile DSL to change it. > > I see; making it a config seems better. Frankly, I am not even sure if > we need a config at all or if we can just hard code it? For the > stream-stream join left/outer join fix, there is only an internal config > but no public config either. > > Option 1: Your proposal is? > >stream > .groupByKey() > .windowBy(TimeWindow.ofSizeNoGrace(...)) > .configure(EmitConfig.emitFinal() > .count() > > Does not change my argument that it seems to be misplace from an API > flow POV. > > Option 1 seems to be the least desirable to me. > > For option 2 and 3, and not sure which one I like better. Might be good > if other could chime in, too. I think I slightly prefer option 2 over > option 3. > > > -Matthias > > On 3/15/22 5:33 PM, Hao Li wrote: > > Thanks for the feedback Matthias. > > > > `allowedLateness` may not be a good name. What I have in mind is to use > > this to control how frequently we try to emit final results. Maybe it's > > more flexible to be used as config in properties as we don't need to > > recompile DSL to change it. > > > > For option 1, I intend to use `emitFinal` to configure how > > `TimeWindowedKStream` should be outputted to `KTable` after aggregation. > > But `emitFinal` is not an action to the `TimeWindowedKStream` interface. > > Maybe adding `configure(EmitConfig config)` makes more sense? > > > > For option 2, config can be created using `WindowConfig.emitFinal()` or > > `EmitConfig.emitFinal` > > > > For option 3, it will be something like `TimeWindows(..., EmitConfig > > emitConfig)`. > > > > For putting `EmitConfig` in aggregation operator, I think it doesn't > > control how we do aggregation but how we output to `KTable`. That's why I > > feel option 1 makes more sense as it applies to `TimeWindowedKStream`. > But > > I'm also OK with option 2. > > > > Hao > > > > > > > > > > On Tue, Mar 15, 2022 at 4:48 PM Matthias J. Sax > wrote: > > > >> Thanks for the KIP. > >> > >> A general comment: it seem that we won't need any new `allowedLateness` > >> parameter because the grace-period is defined on the window itself > already? > >> > >> (On the other hand, if I think about it once more, maybe the > >> `grace-period` is actually not a property of the window but a property > >> of the aggregation operator? _thinking_) > >> > >> From an API flow point of view, option 1 might not be desirable IMHO: > >> > >> stream > >> .groupByKey() > >> .windowBy(TimeWindow.ofSizeNoGrace(...)) > >> .emitFinal() > >> .count() > >> > >> The call to `emitFinal(0` seems not to be on the right place for this > case? > >> > >> > >> Option 2 might work (I think we need to discuss a few details of the API > >> though): > >> > >> stream > >> .groupByKey() > >> .windowBy( > >> TimeWindow.ofSizeNoGrace(...), > >> EmitConfig.emitFinal() -- just made this up; it's not in the KIP > >> ) > >> .count() > >> > >> I made up the `WindowConfig.emitFinal()` call -- from the KIP it's > >> unclear what API you have in mind? `EmitFinalConfig` has not public > >> constructor not any
Re: [DISCUSS] KIP-825: introduce a new API to control when aggregated results are produced
`allowedLateness` may not be a good name. What I have in mind is to use this to control how frequently we try to emit final results. Maybe it's more flexible to be used as config in properties as we don't need to recompile DSL to change it. I see; making it a config seems better. Frankly, I am not even sure if we need a config at all or if we can just hard code it? For the stream-stream join left/outer join fix, there is only an internal config but no public config either. Option 1: Your proposal is? stream .groupByKey() .windowBy(TimeWindow.ofSizeNoGrace(...)) .configure(EmitConfig.emitFinal() .count() Does not change my argument that it seems to be misplace from an API flow POV. Option 1 seems to be the least desirable to me. For option 2 and 3, and not sure which one I like better. Might be good if other could chime in, too. I think I slightly prefer option 2 over option 3. -Matthias On 3/15/22 5:33 PM, Hao Li wrote: Thanks for the feedback Matthias. `allowedLateness` may not be a good name. What I have in mind is to use this to control how frequently we try to emit final results. Maybe it's more flexible to be used as config in properties as we don't need to recompile DSL to change it. For option 1, I intend to use `emitFinal` to configure how `TimeWindowedKStream` should be outputted to `KTable` after aggregation. But `emitFinal` is not an action to the `TimeWindowedKStream` interface. Maybe adding `configure(EmitConfig config)` makes more sense? For option 2, config can be created using `WindowConfig.emitFinal()` or `EmitConfig.emitFinal` For option 3, it will be something like `TimeWindows(..., EmitConfig emitConfig)`. For putting `EmitConfig` in aggregation operator, I think it doesn't control how we do aggregation but how we output to `KTable`. That's why I feel option 1 makes more sense as it applies to `TimeWindowedKStream`. But I'm also OK with option 2. Hao On Tue, Mar 15, 2022 at 4:48 PM Matthias J. Sax wrote: Thanks for the KIP. A general comment: it seem that we won't need any new `allowedLateness` parameter because the grace-period is defined on the window itself already? (On the other hand, if I think about it once more, maybe the `grace-period` is actually not a property of the window but a property of the aggregation operator? _thinking_) From an API flow point of view, option 1 might not be desirable IMHO: stream .groupByKey() .windowBy(TimeWindow.ofSizeNoGrace(...)) .emitFinal() .count() The call to `emitFinal(0` seems not to be on the right place for this case? Option 2 might work (I think we need to discuss a few details of the API though): stream .groupByKey() .windowBy( TimeWindow.ofSizeNoGrace(...), EmitConfig.emitFinal() -- just made this up; it's not in the KIP ) .count() I made up the `WindowConfig.emitFinal()` call -- from the KIP it's unclear what API you have in mind? `EmitFinalConfig` has not public constructor not any builder method. For option 3, I am not sure what you really have in mind. Can you given a concrete example (similar to above) how users would write their code? Did you consider to actually pass in the `EmitConfig` into the aggregation operator? In the end, it seems not to be property of the window definition or windowing step, but a property of the actual operator: stream .groupByKey() .windowBy( TimeWindow.ofSizeNoGrace(...) ) .count(EmitConfig.emitFinal()) The API surface area that need to be updated might be larger for this case though... -Matthias On 3/14/22 9:21 PM, Hao Li wrote: Thanks Guozhang! 1. I agree `EmitConfig` is better than `WindowConfig` and option 2 modifies less places. What do you think of option 1 which doesn't change the current `windowedBy` api but configures `EmitConfig` separately. The benefit of option 1 is if we need to configure something else later, we don't need to pile them on `windowedBy` but can add separate APIs. 2. I added it to `Stores` mainly to conform to https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java#L227-L231 . But We can also create an internal API to do that without modifying `Stores`. Hao On Mon, Mar 14, 2022 at 7:52 PM Guozhang Wang wrote: Hello Hao, Thanks for the proposal, I have some preference among the options here so I will copy them here: I'm now thinking if it's better to not add this new config on each of the Window interfaces, but instead add that at the KGroupedStream#windowedBy function. Also instead of adding just a boolean flag, maybe we can add a Configured class like Grouped, Suppressed, etc, e.g. let's call it a Emitted which for now would just have a single construct as Emitted.atWindowClose whose semantics is the same as emitFinal == true. I think the benefits are: 1) you do not need to modify multiple
Re: [DISCUSS] KIP-825: introduce a new API to control when aggregated results are produced
Thanks for the feedback Matthias. `allowedLateness` may not be a good name. What I have in mind is to use this to control how frequently we try to emit final results. Maybe it's more flexible to be used as config in properties as we don't need to recompile DSL to change it. For option 1, I intend to use `emitFinal` to configure how `TimeWindowedKStream` should be outputted to `KTable` after aggregation. But `emitFinal` is not an action to the `TimeWindowedKStream` interface. Maybe adding `configure(EmitConfig config)` makes more sense? For option 2, config can be created using `WindowConfig.emitFinal()` or `EmitConfig.emitFinal` For option 3, it will be something like `TimeWindows(..., EmitConfig emitConfig)`. For putting `EmitConfig` in aggregation operator, I think it doesn't control how we do aggregation but how we output to `KTable`. That's why I feel option 1 makes more sense as it applies to `TimeWindowedKStream`. But I'm also OK with option 2. Hao On Tue, Mar 15, 2022 at 4:48 PM Matthias J. Sax wrote: > Thanks for the KIP. > > A general comment: it seem that we won't need any new `allowedLateness` > parameter because the grace-period is defined on the window itself already? > > (On the other hand, if I think about it once more, maybe the > `grace-period` is actually not a property of the window but a property > of the aggregation operator? _thinking_) > > From an API flow point of view, option 1 might not be desirable IMHO: > >stream > .groupByKey() > .windowBy(TimeWindow.ofSizeNoGrace(...)) > .emitFinal() > .count() > > The call to `emitFinal(0` seems not to be on the right place for this case? > > > Option 2 might work (I think we need to discuss a few details of the API > though): > >stream > .groupByKey() > .windowBy( >TimeWindow.ofSizeNoGrace(...), >EmitConfig.emitFinal() -- just made this up; it's not in the KIP > ) > .count() > > I made up the `WindowConfig.emitFinal()` call -- from the KIP it's > unclear what API you have in mind? `EmitFinalConfig` has not public > constructor not any builder method. > > > For option 3, I am not sure what you really have in mind. Can you given > a concrete example (similar to above) how users would write their code? > > > > Did you consider to actually pass in the `EmitConfig` into the > aggregation operator? In the end, it seems not to be property of the > window definition or windowing step, but a property of the actual operator: > >stream > .groupByKey() > .windowBy( >TimeWindow.ofSizeNoGrace(...) > ) > .count(EmitConfig.emitFinal()) > > The API surface area that need to be updated might be larger for this > case though... > > > -Matthias > > > > On 3/14/22 9:21 PM, Hao Li wrote: > > Thanks Guozhang! > > > > 1. I agree `EmitConfig` is better than `WindowConfig` and option 2 > modifies > > less places. What do you think of option 1 which doesn't change the > current > > `windowedBy` api but configures `EmitConfig` separately. The benefit of > > option 1 is if we need to configure something else later, we don't need > to > > pile them on `windowedBy` but can add separate APIs. > > 2. I added it to `Stores` mainly to conform to > > > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java#L227-L231 > . > > But We can also create an internal API to do that without modifying > > `Stores`. > > > > Hao > > > > On Mon, Mar 14, 2022 at 7:52 PM Guozhang Wang > wrote: > > > >> Hello Hao, > >> > >> Thanks for the proposal, I have some preference among the options here > so I > >> will copy them here: > >> > >> I'm now thinking if it's better to not add this new config on each of > the > >> Window interfaces, but instead add that at the KGroupedStream#windowedBy > >> function. Also instead of adding just a boolean flag, maybe we can add a > >> Configured class like Grouped, Suppressed, etc, e.g. let's call it a > >> Emitted which for now would just have a single construct as > >> Emitted.atWindowClose whose semantics is the same as emitFinal == true. > I > >> think the benefits are: > >> > >> 1) you do not need to modify multiple Window classes, but just overload > one > >> windowedBy function with a second param. This is less of a scope for > now, > >> and also more extensible for any future changes. > >> > >> 2) With a config interface, we maintain its extensibility as well as > being > >> able to reuse this Emitted interface for other operators if we wanted to > >> expand to. > >> > >> > >> > >> So in general I'm leaning towards option 2). For that, some more > detailed > >> comments: > >> > >> 1) If we want to reuse that config object for other non-window stateful > >> operations, I think naming it as `EmitConfig` is probably better than > >> `WindowConfig`. > >> 2) I saw your PR (https://github.com/apache/kafka/pull/11892) that you > are > >> also
Re: [DISCUSS] KIP-825: introduce a new API to control when aggregated results are produced
Thanks for the KIP. A general comment: it seem that we won't need any new `allowedLateness` parameter because the grace-period is defined on the window itself already? (On the other hand, if I think about it once more, maybe the `grace-period` is actually not a property of the window but a property of the aggregation operator? _thinking_) From an API flow point of view, option 1 might not be desirable IMHO: stream .groupByKey() .windowBy(TimeWindow.ofSizeNoGrace(...)) .emitFinal() .count() The call to `emitFinal(0` seems not to be on the right place for this case? Option 2 might work (I think we need to discuss a few details of the API though): stream .groupByKey() .windowBy( TimeWindow.ofSizeNoGrace(...), EmitConfig.emitFinal() -- just made this up; it's not in the KIP ) .count() I made up the `WindowConfig.emitFinal()` call -- from the KIP it's unclear what API you have in mind? `EmitFinalConfig` has not public constructor not any builder method. For option 3, I am not sure what you really have in mind. Can you given a concrete example (similar to above) how users would write their code? Did you consider to actually pass in the `EmitConfig` into the aggregation operator? In the end, it seems not to be property of the window definition or windowing step, but a property of the actual operator: stream .groupByKey() .windowBy( TimeWindow.ofSizeNoGrace(...) ) .count(EmitConfig.emitFinal()) The API surface area that need to be updated might be larger for this case though... -Matthias On 3/14/22 9:21 PM, Hao Li wrote: Thanks Guozhang! 1. I agree `EmitConfig` is better than `WindowConfig` and option 2 modifies less places. What do you think of option 1 which doesn't change the current `windowedBy` api but configures `EmitConfig` separately. The benefit of option 1 is if we need to configure something else later, we don't need to pile them on `windowedBy` but can add separate APIs. 2. I added it to `Stores` mainly to conform to https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java#L227-L231. But We can also create an internal API to do that without modifying `Stores`. Hao On Mon, Mar 14, 2022 at 7:52 PM Guozhang Wang wrote: Hello Hao, Thanks for the proposal, I have some preference among the options here so I will copy them here: I'm now thinking if it's better to not add this new config on each of the Window interfaces, but instead add that at the KGroupedStream#windowedBy function. Also instead of adding just a boolean flag, maybe we can add a Configured class like Grouped, Suppressed, etc, e.g. let's call it a Emitted which for now would just have a single construct as Emitted.atWindowClose whose semantics is the same as emitFinal == true. I think the benefits are: 1) you do not need to modify multiple Window classes, but just overload one windowedBy function with a second param. This is less of a scope for now, and also more extensible for any future changes. 2) With a config interface, we maintain its extensibility as well as being able to reuse this Emitted interface for other operators if we wanted to expand to. So in general I'm leaning towards option 2). For that, some more detailed comments: 1) If we want to reuse that config object for other non-window stateful operations, I think naming it as `EmitConfig` is probably better than `WindowConfig`. 2) I saw your PR (https://github.com/apache/kafka/pull/11892) that you are also proposing to add new stores into the public factory Stores, but it's not included in the KIP. Is that intentional? Personally I think that although we may eventually want to add a new store type to the public APIs, for this KIP maybe we do not have to add them but can delay for later after we've learned the best way to layout. LMK what do you think? Guozhang On Fri, Mar 11, 2022 at 2:13 PM Hao Li wrote: Hi Dev team, I'd like to start a discussion thread on Kafka Streams KIP-825: https://cwiki.apache.org/confluence/display/KAFKA/KIP-825%3A+introduce+a+new+API+to+control+when+aggregated+results+are+produced This KIP is aimed to add new APIs to support outputting final aggregated results for windowed aggregations. I listed several options there and I'm looking forward to your feedback. Thanks, Hao -- -- Guozhang
Re: [DISCUSS] KIP-825: introduce a new API to control when aggregated results are produced
Thanks Guozhang! 1. I agree `EmitConfig` is better than `WindowConfig` and option 2 modifies less places. What do you think of option 1 which doesn't change the current `windowedBy` api but configures `EmitConfig` separately. The benefit of option 1 is if we need to configure something else later, we don't need to pile them on `windowedBy` but can add separate APIs. 2. I added it to `Stores` mainly to conform to https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java#L227-L231. But We can also create an internal API to do that without modifying `Stores`. Hao On Mon, Mar 14, 2022 at 7:52 PM Guozhang Wang wrote: > Hello Hao, > > Thanks for the proposal, I have some preference among the options here so I > will copy them here: > > I'm now thinking if it's better to not add this new config on each of the > Window interfaces, but instead add that at the KGroupedStream#windowedBy > function. Also instead of adding just a boolean flag, maybe we can add a > Configured class like Grouped, Suppressed, etc, e.g. let's call it a > Emitted which for now would just have a single construct as > Emitted.atWindowClose whose semantics is the same as emitFinal == true. I > think the benefits are: > > 1) you do not need to modify multiple Window classes, but just overload one > windowedBy function with a second param. This is less of a scope for now, > and also more extensible for any future changes. > > 2) With a config interface, we maintain its extensibility as well as being > able to reuse this Emitted interface for other operators if we wanted to > expand to. > > > > So in general I'm leaning towards option 2). For that, some more detailed > comments: > > 1) If we want to reuse that config object for other non-window stateful > operations, I think naming it as `EmitConfig` is probably better than > `WindowConfig`. > 2) I saw your PR (https://github.com/apache/kafka/pull/11892) that you are > also proposing to add new stores into the public factory Stores, but it's > not included in the KIP. Is that intentional? Personally I think that > although we may eventually want to add a new store type to the public APIs, > for this KIP maybe we do not have to add them but can delay for later after > we've learned the best way to layout. LMK what do you think? > > > > Guozhang > > > > On Fri, Mar 11, 2022 at 2:13 PM Hao Li wrote: > > > Hi Dev team, > > > > I'd like to start a discussion thread on Kafka Streams KIP-825: > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-825%3A+introduce+a+new+API+to+control+when+aggregated+results+are+produced > > > > This KIP is aimed to add new APIs to support outputting final aggregated > > results for windowed aggregations. I listed several options there and I'm > > looking forward to your feedback. > > > > Thanks, > > Hao > > > > > -- > -- Guozhang > -- Thanks, Hao
Re: [DISCUSS] KIP-825: introduce a new API to control when aggregated results are produced
Hello Hao, Thanks for the proposal, I have some preference among the options here so I will copy them here: I'm now thinking if it's better to not add this new config on each of the Window interfaces, but instead add that at the KGroupedStream#windowedBy function. Also instead of adding just a boolean flag, maybe we can add a Configured class like Grouped, Suppressed, etc, e.g. let's call it a Emitted which for now would just have a single construct as Emitted.atWindowClose whose semantics is the same as emitFinal == true. I think the benefits are: 1) you do not need to modify multiple Window classes, but just overload one windowedBy function with a second param. This is less of a scope for now, and also more extensible for any future changes. 2) With a config interface, we maintain its extensibility as well as being able to reuse this Emitted interface for other operators if we wanted to expand to. So in general I'm leaning towards option 2). For that, some more detailed comments: 1) If we want to reuse that config object for other non-window stateful operations, I think naming it as `EmitConfig` is probably better than `WindowConfig`. 2) I saw your PR (https://github.com/apache/kafka/pull/11892) that you are also proposing to add new stores into the public factory Stores, but it's not included in the KIP. Is that intentional? Personally I think that although we may eventually want to add a new store type to the public APIs, for this KIP maybe we do not have to add them but can delay for later after we've learned the best way to layout. LMK what do you think? Guozhang On Fri, Mar 11, 2022 at 2:13 PM Hao Li wrote: > Hi Dev team, > > I'd like to start a discussion thread on Kafka Streams KIP-825: > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-825%3A+introduce+a+new+API+to+control+when+aggregated+results+are+produced > > This KIP is aimed to add new APIs to support outputting final aggregated > results for windowed aggregations. I listed several options there and I'm > looking forward to your feedback. > > Thanks, > Hao > -- -- Guozhang
[DISCUSS] KIP-825: introduce a new API to control when aggregated results are produced
Hi Dev team, I'd like to start a discussion thread on Kafka Streams KIP-825: https://cwiki.apache.org/confluence/display/KAFKA/KIP-825%3A+introduce+a+new+API+to+control+when+aggregated+results+are+produced This KIP is aimed to add new APIs to support outputting final aggregated results for windowed aggregations. I listed several options there and I'm looking forward to your feedback. Thanks, Hao