Re: [DISCUSS] KIP-825: introduce a new API to control when aggregated results are produced

2022-03-24 Thread John Roesler
Thanks, Hao,

The KIP looks good to me overall, so I'll go ahead and vote.
I did notice a couple of typos, though:

> static EmitStrategy onWindowClose() {
> return new WindowCloseTrigger();
> }

should return WindowCloseStrategy. The other strategy also
references WindowUpdateTrigger, which I'm guessing should be
WindowUpdateStrategy, and which should also be specified.

More generally, it seems a bit roundabout to define an
interface that specifies an enum along with static methods
that return classes that themselves return enum values from
the interface. I'm guessing this is all by way of just
making the API look nice.

Both of those issues seem relatively minor, though, so I'm
comfortable casting my vote at this point.

Thanks for all the good work here,
-John

On Wed, 2022-03-23 at 20:51 -0700, Hao Li wrote:
> Hi all,
> 
> I just updated the KIP with option 1 as design and put option 2 and 3 in
> rejected alternatives. Since Matthias is strongly against `trigger`,
> I adopted the proposed `EmitStrategy` and dropped the "with" in the
> function name. So it's like this:
> 
> stream
>   .groupBy(..)
>   .windowedBy(..)
>   .emitStrategy(EmitStrategy.onWindowClose())
>   .aggregate(..)
>   .mapValues(..)
> 
> I used `onWindowClose` since `EmitStrategy` is meant to be an interface.
> 
> Hao
> 
> On Wed, Mar 23, 2022 at 6:35 PM Matthias J. Sax  wrote:
> 
> > Wow. Quite a thread... #namingIsHard :D
> > 
> > I won't repeat all arguments which are all very good ones. I can just
> > state my personal favorite option:
> > 
> > stream
> >   .groupBy(..)
> >   .windowedBy(..)
> >   .withEmitStrategy(EmitStrategy.ON_WINDOW_CLOSE)
> >   .aggregate(..)
> >   .mapValues(..)
> > 
> > Is seems to be the best compromise / trade-off across the board.
> > Personally, I would strong advocate against using `trigger()`!
> > 
> > 
> > -Matthias
> > 
> > 
> > On 3/23/22 4:38 PM, Guozhang Wang wrote:
> > > Hao is right, I think that's the hindsight we have for `suppress` which
> > > since can be applied anywhere for a K(windowed)Table, incurs an awkward
> > > programming flexibility and I felt it's better to make its application
> > > scope more constraint.
> > > 
> > > And I also agree with John that, unless any of us feel strongly about any
> > > options, Hao could make the final call about the namings.
> > > 
> > > 
> > > Guozhang
> > > 
> > > On Wed, Mar 23, 2022 at 1:49 PM Hao Li  wrote:
> > > 
> > > > For
> > > > 
> > > > stream
> > > >.groupBy(..)
> > > >.windowedBy(..)
> > > >.aggregate(..)
> > > >.withEmitStrategy(EmitStrategy.ON_WINDOW_CLOSE)
> > > >.mapValues(..)
> > > > 
> > > > I think after `aggregate` it's already a table and then the emit
> > strategy
> > > > is too late to control
> > > > how windowed stream is outputted to table. This is the concern Guozhang
> > > > raised about having this in existing `suppress` operator as well.
> > > > 
> > > > Thanks,
> > > > Hao
> > > > 
> > > > On Wed, Mar 23, 2022 at 1:05 PM Bruno Cadonna 
> > wrote:
> > > > 
> > > > > Hi,
> > > > > 
> > > > > Thank you for your answers to my questions!
> > > > > 
> > > > > I see the argument about conciseness of configuring a stream with
> > > > > methods instead of config objects. I just miss a bit the descriptive
> > > > > aspect.
> > > > > 
> > > > > What about
> > > > > 
> > > > > stream
> > > > >.groupBy(..)
> > > > >.windowedBy(..)
> > > > >.withEmitStrategy(EmitStrategy.ON_WINDOW_CLOSE)
> > > > >.aggregate(..)
> > > > >.mapValues(..)
> > > > > 
> > > > > I have also another question. Why should emitting of results be
> > > > > controlled by the window level api? If I want to emit results for each
> > > > > input record the emit strategy is quite independent from the window. 
> > > > > So
> > > > > I somehow share Matthias' and Guozhang's concern that the emit 
> > > > > strategy
> > > > > seems misplaced there.
> > > > > 
> > > > > What are the arguments against?
> > > > > 
> > > > > stream
> > > > >.groupBy(..)
> > > > >.windowedBy(..)
> > > > >.aggregate(..)
> > > > >.withEmitStrategy(EmitStrategy.ON_WINDOW_CLOSE)
> > > > >.mapValues(..)
> > > > > 
> > > > > 
> > > > > A final administrative request: Hao, could you please add the rejected
> > > > > alternatives to the KIP so that future us will know why we rejected
> > them?
> > > > > 
> > > > > Best,
> > > > > Bruno
> > > > > 
> > > > > On 23.03.22 19:38, John Roesler wrote:
> > > > > > Hi all,
> > > > > > 
> > > > > > I can see both sides of this.
> > > > > > 
> > > > > > On one hand, when we say
> > > > > > "stream.groupBy().windowBy().count()", it seems like we're
> > > > > > telling KS to take the raw stream, group it based on key,
> > > > > > then window it based on time, and then compute an
> > > > > > aggregation on the windows. In that model, "trigger()" would
> > > > > > have to mean something like "trigger it"

Re: [DISCUSS] KIP-825: introduce a new API to control when aggregated results are produced

2022-03-23 Thread Hao Li
Hi all,

I just updated the KIP with option 1 as design and put option 2 and 3 in
rejected alternatives. Since Matthias is strongly against `trigger`,
I adopted the proposed `EmitStrategy` and dropped the "with" in the
function name. So it's like this:

stream
  .groupBy(..)
  .windowedBy(..)
  .emitStrategy(EmitStrategy.onWindowClose())
  .aggregate(..)
  .mapValues(..)

I used `onWindowClose` since `EmitStrategy` is meant to be an interface.

Hao

On Wed, Mar 23, 2022 at 6:35 PM Matthias J. Sax  wrote:

> Wow. Quite a thread... #namingIsHard :D
>
> I won't repeat all arguments which are all very good ones. I can just
> state my personal favorite option:
>
> stream
>   .groupBy(..)
>   .windowedBy(..)
>   .withEmitStrategy(EmitStrategy.ON_WINDOW_CLOSE)
>   .aggregate(..)
>   .mapValues(..)
>
> Is seems to be the best compromise / trade-off across the board.
> Personally, I would strong advocate against using `trigger()`!
>
>
> -Matthias
>
>
> On 3/23/22 4:38 PM, Guozhang Wang wrote:
> > Hao is right, I think that's the hindsight we have for `suppress` which
> > since can be applied anywhere for a K(windowed)Table, incurs an awkward
> > programming flexibility and I felt it's better to make its application
> > scope more constraint.
> >
> > And I also agree with John that, unless any of us feel strongly about any
> > options, Hao could make the final call about the namings.
> >
> >
> > Guozhang
> >
> > On Wed, Mar 23, 2022 at 1:49 PM Hao Li  wrote:
> >
> >> For
> >>
> >> stream
> >>.groupBy(..)
> >>.windowedBy(..)
> >>.aggregate(..)
> >>.withEmitStrategy(EmitStrategy.ON_WINDOW_CLOSE)
> >>.mapValues(..)
> >>
> >> I think after `aggregate` it's already a table and then the emit
> strategy
> >> is too late to control
> >> how windowed stream is outputted to table. This is the concern Guozhang
> >> raised about having this in existing `suppress` operator as well.
> >>
> >> Thanks,
> >> Hao
> >>
> >> On Wed, Mar 23, 2022 at 1:05 PM Bruno Cadonna 
> wrote:
> >>
> >>> Hi,
> >>>
> >>> Thank you for your answers to my questions!
> >>>
> >>> I see the argument about conciseness of configuring a stream with
> >>> methods instead of config objects. I just miss a bit the descriptive
> >>> aspect.
> >>>
> >>> What about
> >>>
> >>> stream
> >>>.groupBy(..)
> >>>.windowedBy(..)
> >>>.withEmitStrategy(EmitStrategy.ON_WINDOW_CLOSE)
> >>>.aggregate(..)
> >>>.mapValues(..)
> >>>
> >>> I have also another question. Why should emitting of results be
> >>> controlled by the window level api? If I want to emit results for each
> >>> input record the emit strategy is quite independent from the window. So
> >>> I somehow share Matthias' and Guozhang's concern that the emit strategy
> >>> seems misplaced there.
> >>>
> >>> What are the arguments against?
> >>>
> >>> stream
> >>>.groupBy(..)
> >>>.windowedBy(..)
> >>>.aggregate(..)
> >>>.withEmitStrategy(EmitStrategy.ON_WINDOW_CLOSE)
> >>>.mapValues(..)
> >>>
> >>>
> >>> A final administrative request: Hao, could you please add the rejected
> >>> alternatives to the KIP so that future us will know why we rejected
> them?
> >>>
> >>> Best,
> >>> Bruno
> >>>
> >>> On 23.03.22 19:38, John Roesler wrote:
>  Hi all,
> 
>  I can see both sides of this.
> 
>  On one hand, when we say
>  "stream.groupBy().windowBy().count()", it seems like we're
>  telling KS to take the raw stream, group it based on key,
>  then window it based on time, and then compute an
>  aggregation on the windows. In that model, "trigger()" would
>  have to mean something like "trigger it", which doesn't
>  really make sense, since we aren't "triggering" the
>  aggregation (then again, to an outside observer, it would
>  appear that way... food for thought).
> 
>  Another way to look at it is that all we're really doing is
>  configuring a windowed aggreation on the stream, and we're
>  doing it with a progressive builder interface. In other
>  words, the above is just a progressive builder for
>  configuring an operation like
>  "stream.aggregate(groupingConfig, windowingConfig,
>  countFn)". Under the latter interpretation of the DSL, it
>  makes perfect sense to add more optional progressive builder
>  methods like trigger() to the WindowedKStream interfaces.
> 
>  Since part of the motivation for choosing the word "trigger"
>  here is to stay close to what Flink defines, I'll also point
>  out that Flink's syntax is also
>  "stream.keyBy().window().trigger().aggregate()". Not that
>  their API is the holy grail or anything, but it's at least
>  an indication that this API isn't a horrible mistake.
> 
>  All other things being equal, I also prefer to leave tie-
>  breakers in the hands of the contributor. So, if we've all
> >>>

Re: [DISCUSS] KIP-825: introduce a new API to control when aggregated results are produced

2022-03-23 Thread Matthias J. Sax

Wow. Quite a thread... #namingIsHard :D

I won't repeat all arguments which are all very good ones. I can just 
state my personal favorite option:


stream
 .groupBy(..)
 .windowedBy(..)
 .withEmitStrategy(EmitStrategy.ON_WINDOW_CLOSE)
 .aggregate(..)
 .mapValues(..)

Is seems to be the best compromise / trade-off across the board. 
Personally, I would strong advocate against using `trigger()`!



-Matthias


On 3/23/22 4:38 PM, Guozhang Wang wrote:

Hao is right, I think that's the hindsight we have for `suppress` which
since can be applied anywhere for a K(windowed)Table, incurs an awkward
programming flexibility and I felt it's better to make its application
scope more constraint.

And I also agree with John that, unless any of us feel strongly about any
options, Hao could make the final call about the namings.


Guozhang

On Wed, Mar 23, 2022 at 1:49 PM Hao Li  wrote:


For

stream
   .groupBy(..)
   .windowedBy(..)
   .aggregate(..)
   .withEmitStrategy(EmitStrategy.ON_WINDOW_CLOSE)
   .mapValues(..)

I think after `aggregate` it's already a table and then the emit strategy
is too late to control
how windowed stream is outputted to table. This is the concern Guozhang
raised about having this in existing `suppress` operator as well.

Thanks,
Hao

On Wed, Mar 23, 2022 at 1:05 PM Bruno Cadonna  wrote:


Hi,

Thank you for your answers to my questions!

I see the argument about conciseness of configuring a stream with
methods instead of config objects. I just miss a bit the descriptive
aspect.

What about

stream
   .groupBy(..)
   .windowedBy(..)
   .withEmitStrategy(EmitStrategy.ON_WINDOW_CLOSE)
   .aggregate(..)
   .mapValues(..)

I have also another question. Why should emitting of results be
controlled by the window level api? If I want to emit results for each
input record the emit strategy is quite independent from the window. So
I somehow share Matthias' and Guozhang's concern that the emit strategy
seems misplaced there.

What are the arguments against?

stream
   .groupBy(..)
   .windowedBy(..)
   .aggregate(..)
   .withEmitStrategy(EmitStrategy.ON_WINDOW_CLOSE)
   .mapValues(..)


A final administrative request: Hao, could you please add the rejected
alternatives to the KIP so that future us will know why we rejected them?

Best,
Bruno

On 23.03.22 19:38, John Roesler wrote:

Hi all,

I can see both sides of this.

On one hand, when we say
"stream.groupBy().windowBy().count()", it seems like we're
telling KS to take the raw stream, group it based on key,
then window it based on time, and then compute an
aggregation on the windows. In that model, "trigger()" would
have to mean something like "trigger it", which doesn't
really make sense, since we aren't "triggering" the
aggregation (then again, to an outside observer, it would
appear that way... food for thought).

Another way to look at it is that all we're really doing is
configuring a windowed aggreation on the stream, and we're
doing it with a progressive builder interface. In other
words, the above is just a progressive builder for
configuring an operation like
"stream.aggregate(groupingConfig, windowingConfig,
countFn)". Under the latter interpretation of the DSL, it
makes perfect sense to add more optional progressive builder
methods like trigger() to the WindowedKStream interfaces.

Since part of the motivation for choosing the word "trigger"
here is to stay close to what Flink defines, I'll also point
out that Flink's syntax is also
"stream.keyBy().window().trigger().aggregate()". Not that
their API is the holy grail or anything, but it's at least
an indication that this API isn't a horrible mistake.

All other things being equal, I also prefer to leave tie-
breakers in the hands of the contributor. So, if we've all
said our piece and Hao still prefers option 1, then (as long
as we don't think it's a horrible mistake), I think we
should just let him go for it.

Speaking of which, after reviewing the responses regarding
deprecating `Suppressed#onWindowClose`, I still think we
should just go ahead and deprecate it. Although it's not
expressed exactly the same way, it still does exactly the
same thing, or so close that it seems confusing to keep
both. But again, if Hao really prefers to keep both, I won't
insist on it :)

Thanks all,
-John

On Wed, 2022-03-23 at 09:59 -0700, Hao Li wrote:

Thanks Bruno!

Argument for option 1 is:
1. Concise and descriptive. It avoids overloading existing functions

and

it's very clear what it's doing. Imagine if there's a autocomplete

feature

in Intellij or other IDE for our DSL in the future, it's not favorable

to

show 6 `windowedBy` functions.
2. Option 1 is operated on `windowedStream` to configure how it should

be

outputted. Option 2 operates on `KGroupedStream` to produce
`windowedStream` as well as configure how `windowedStream` should be
  outputted. I feel it's better to have a `windowedStream` and then
conf

Re: [DISCUSS] KIP-825: introduce a new API to control when aggregated results are produced

2022-03-23 Thread Guozhang Wang
Hao is right, I think that's the hindsight we have for `suppress` which
since can be applied anywhere for a K(windowed)Table, incurs an awkward
programming flexibility and I felt it's better to make its application
scope more constraint.

And I also agree with John that, unless any of us feel strongly about any
options, Hao could make the final call about the namings.


Guozhang

On Wed, Mar 23, 2022 at 1:49 PM Hao Li  wrote:

> For
>
> stream
>   .groupBy(..)
>   .windowedBy(..)
>   .aggregate(..)
>   .withEmitStrategy(EmitStrategy.ON_WINDOW_CLOSE)
>   .mapValues(..)
>
> I think after `aggregate` it's already a table and then the emit strategy
> is too late to control
> how windowed stream is outputted to table. This is the concern Guozhang
> raised about having this in existing `suppress` operator as well.
>
> Thanks,
> Hao
>
> On Wed, Mar 23, 2022 at 1:05 PM Bruno Cadonna  wrote:
>
> > Hi,
> >
> > Thank you for your answers to my questions!
> >
> > I see the argument about conciseness of configuring a stream with
> > methods instead of config objects. I just miss a bit the descriptive
> > aspect.
> >
> > What about
> >
> > stream
> >   .groupBy(..)
> >   .windowedBy(..)
> >   .withEmitStrategy(EmitStrategy.ON_WINDOW_CLOSE)
> >   .aggregate(..)
> >   .mapValues(..)
> >
> > I have also another question. Why should emitting of results be
> > controlled by the window level api? If I want to emit results for each
> > input record the emit strategy is quite independent from the window. So
> > I somehow share Matthias' and Guozhang's concern that the emit strategy
> > seems misplaced there.
> >
> > What are the arguments against?
> >
> > stream
> >   .groupBy(..)
> >   .windowedBy(..)
> >   .aggregate(..)
> >   .withEmitStrategy(EmitStrategy.ON_WINDOW_CLOSE)
> >   .mapValues(..)
> >
> >
> > A final administrative request: Hao, could you please add the rejected
> > alternatives to the KIP so that future us will know why we rejected them?
> >
> > Best,
> > Bruno
> >
> > On 23.03.22 19:38, John Roesler wrote:
> > > Hi all,
> > >
> > > I can see both sides of this.
> > >
> > > On one hand, when we say
> > > "stream.groupBy().windowBy().count()", it seems like we're
> > > telling KS to take the raw stream, group it based on key,
> > > then window it based on time, and then compute an
> > > aggregation on the windows. In that model, "trigger()" would
> > > have to mean something like "trigger it", which doesn't
> > > really make sense, since we aren't "triggering" the
> > > aggregation (then again, to an outside observer, it would
> > > appear that way... food for thought).
> > >
> > > Another way to look at it is that all we're really doing is
> > > configuring a windowed aggreation on the stream, and we're
> > > doing it with a progressive builder interface. In other
> > > words, the above is just a progressive builder for
> > > configuring an operation like
> > > "stream.aggregate(groupingConfig, windowingConfig,
> > > countFn)". Under the latter interpretation of the DSL, it
> > > makes perfect sense to add more optional progressive builder
> > > methods like trigger() to the WindowedKStream interfaces.
> > >
> > > Since part of the motivation for choosing the word "trigger"
> > > here is to stay close to what Flink defines, I'll also point
> > > out that Flink's syntax is also
> > > "stream.keyBy().window().trigger().aggregate()". Not that
> > > their API is the holy grail or anything, but it's at least
> > > an indication that this API isn't a horrible mistake.
> > >
> > > All other things being equal, I also prefer to leave tie-
> > > breakers in the hands of the contributor. So, if we've all
> > > said our piece and Hao still prefers option 1, then (as long
> > > as we don't think it's a horrible mistake), I think we
> > > should just let him go for it.
> > >
> > > Speaking of which, after reviewing the responses regarding
> > > deprecating `Suppressed#onWindowClose`, I still think we
> > > should just go ahead and deprecate it. Although it's not
> > > expressed exactly the same way, it still does exactly the
> > > same thing, or so close that it seems confusing to keep
> > > both. But again, if Hao really prefers to keep both, I won't
> > > insist on it :)
> > >
> > > Thanks all,
> > > -John
> > >
> > > On Wed, 2022-03-23 at 09:59 -0700, Hao Li wrote:
> > >> Thanks Bruno!
> > >>
> > >> Argument for option 1 is:
> > >> 1. Concise and descriptive. It avoids overloading existing functions
> and
> > >> it's very clear what it's doing. Imagine if there's a autocomplete
> > feature
> > >> in Intellij or other IDE for our DSL in the future, it's not favorable
> > to
> > >> show 6 `windowedBy` functions.
> > >> 2. Option 1 is operated on `windowedStream` to configure how it should
> > be
> > >> outputted. Option 2 operates on `KGroupedStream` to produce
> > >> `windowedStream` as well as configure how `windowedStream` should be
> > >>  outputted. I fee

Re: [DISCUSS] KIP-825: introduce a new API to control when aggregated results are produced

2022-03-23 Thread Hao Li
For

stream
  .groupBy(..)
  .windowedBy(..)
  .aggregate(..)
  .withEmitStrategy(EmitStrategy.ON_WINDOW_CLOSE)
  .mapValues(..)

I think after `aggregate` it's already a table and then the emit strategy
is too late to control
how windowed stream is outputted to table. This is the concern Guozhang
raised about having this in existing `suppress` operator as well.

Thanks,
Hao

On Wed, Mar 23, 2022 at 1:05 PM Bruno Cadonna  wrote:

> Hi,
>
> Thank you for your answers to my questions!
>
> I see the argument about conciseness of configuring a stream with
> methods instead of config objects. I just miss a bit the descriptive
> aspect.
>
> What about
>
> stream
>   .groupBy(..)
>   .windowedBy(..)
>   .withEmitStrategy(EmitStrategy.ON_WINDOW_CLOSE)
>   .aggregate(..)
>   .mapValues(..)
>
> I have also another question. Why should emitting of results be
> controlled by the window level api? If I want to emit results for each
> input record the emit strategy is quite independent from the window. So
> I somehow share Matthias' and Guozhang's concern that the emit strategy
> seems misplaced there.
>
> What are the arguments against?
>
> stream
>   .groupBy(..)
>   .windowedBy(..)
>   .aggregate(..)
>   .withEmitStrategy(EmitStrategy.ON_WINDOW_CLOSE)
>   .mapValues(..)
>
>
> A final administrative request: Hao, could you please add the rejected
> alternatives to the KIP so that future us will know why we rejected them?
>
> Best,
> Bruno
>
> On 23.03.22 19:38, John Roesler wrote:
> > Hi all,
> >
> > I can see both sides of this.
> >
> > On one hand, when we say
> > "stream.groupBy().windowBy().count()", it seems like we're
> > telling KS to take the raw stream, group it based on key,
> > then window it based on time, and then compute an
> > aggregation on the windows. In that model, "trigger()" would
> > have to mean something like "trigger it", which doesn't
> > really make sense, since we aren't "triggering" the
> > aggregation (then again, to an outside observer, it would
> > appear that way... food for thought).
> >
> > Another way to look at it is that all we're really doing is
> > configuring a windowed aggreation on the stream, and we're
> > doing it with a progressive builder interface. In other
> > words, the above is just a progressive builder for
> > configuring an operation like
> > "stream.aggregate(groupingConfig, windowingConfig,
> > countFn)". Under the latter interpretation of the DSL, it
> > makes perfect sense to add more optional progressive builder
> > methods like trigger() to the WindowedKStream interfaces.
> >
> > Since part of the motivation for choosing the word "trigger"
> > here is to stay close to what Flink defines, I'll also point
> > out that Flink's syntax is also
> > "stream.keyBy().window().trigger().aggregate()". Not that
> > their API is the holy grail or anything, but it's at least
> > an indication that this API isn't a horrible mistake.
> >
> > All other things being equal, I also prefer to leave tie-
> > breakers in the hands of the contributor. So, if we've all
> > said our piece and Hao still prefers option 1, then (as long
> > as we don't think it's a horrible mistake), I think we
> > should just let him go for it.
> >
> > Speaking of which, after reviewing the responses regarding
> > deprecating `Suppressed#onWindowClose`, I still think we
> > should just go ahead and deprecate it. Although it's not
> > expressed exactly the same way, it still does exactly the
> > same thing, or so close that it seems confusing to keep
> > both. But again, if Hao really prefers to keep both, I won't
> > insist on it :)
> >
> > Thanks all,
> > -John
> >
> > On Wed, 2022-03-23 at 09:59 -0700, Hao Li wrote:
> >> Thanks Bruno!
> >>
> >> Argument for option 1 is:
> >> 1. Concise and descriptive. It avoids overloading existing functions and
> >> it's very clear what it's doing. Imagine if there's a autocomplete
> feature
> >> in Intellij or other IDE for our DSL in the future, it's not favorable
> to
> >> show 6 `windowedBy` functions.
> >> 2. Option 1 is operated on `windowedStream` to configure how it should
> be
> >> outputted. Option 2 operates on `KGroupedStream` to produce
> >> `windowedStream` as well as configure how `windowedStream` should be
> >>  outputted. I feel it's better to have a `windowedStream` and then
> >> configure how it can be outputted. Somehow I feel option 2 breaks the
> >> builder pattern.
> >> 3. `WindowedByParameters` doesn't seem very descriptive. If we put all
> >> kinds of different parameters into it to avoid future overloading, it's
> too
> >> bloated and not very user friendly.
> >>
> >> I agree option 1's `trigger` function is configuring the stream which
> feels
> >> different from existing `count` or `aggregate` etc. Configuring might be
> >> also a kind of action to stream :) I'm not sure if it breaks DSL
> principle
> >> and if it does,
> >> can we relax the principle given the benefits compa

Re: [DISCUSS] KIP-825: introduce a new API to control when aggregated results are produced

2022-03-23 Thread Bruno Cadonna

Hi,

Thank you for your answers to my questions!

I see the argument about conciseness of configuring a stream with 
methods instead of config objects. I just miss a bit the descriptive aspect.


What about

stream
 .groupBy(..)
 .windowedBy(..)
 .withEmitStrategy(EmitStrategy.ON_WINDOW_CLOSE)
 .aggregate(..)
 .mapValues(..)

I have also another question. Why should emitting of results be 
controlled by the window level api? If I want to emit results for each 
input record the emit strategy is quite independent from the window. So 
I somehow share Matthias' and Guozhang's concern that the emit strategy 
seems misplaced there.


What are the arguments against?

stream
 .groupBy(..)
 .windowedBy(..)
 .aggregate(..)
 .withEmitStrategy(EmitStrategy.ON_WINDOW_CLOSE)
 .mapValues(..)


A final administrative request: Hao, could you please add the rejected 
alternatives to the KIP so that future us will know why we rejected them?


Best,
Bruno

On 23.03.22 19:38, John Roesler wrote:

Hi all,

I can see both sides of this.

On one hand, when we say
"stream.groupBy().windowBy().count()", it seems like we're
telling KS to take the raw stream, group it based on key,
then window it based on time, and then compute an
aggregation on the windows. In that model, "trigger()" would
have to mean something like "trigger it", which doesn't
really make sense, since we aren't "triggering" the
aggregation (then again, to an outside observer, it would
appear that way... food for thought).

Another way to look at it is that all we're really doing is
configuring a windowed aggreation on the stream, and we're
doing it with a progressive builder interface. In other
words, the above is just a progressive builder for
configuring an operation like
"stream.aggregate(groupingConfig, windowingConfig,
countFn)". Under the latter interpretation of the DSL, it
makes perfect sense to add more optional progressive builder
methods like trigger() to the WindowedKStream interfaces.

Since part of the motivation for choosing the word "trigger"
here is to stay close to what Flink defines, I'll also point
out that Flink's syntax is also
"stream.keyBy().window().trigger().aggregate()". Not that
their API is the holy grail or anything, but it's at least
an indication that this API isn't a horrible mistake.

All other things being equal, I also prefer to leave tie-
breakers in the hands of the contributor. So, if we've all
said our piece and Hao still prefers option 1, then (as long
as we don't think it's a horrible mistake), I think we
should just let him go for it.

Speaking of which, after reviewing the responses regarding
deprecating `Suppressed#onWindowClose`, I still think we
should just go ahead and deprecate it. Although it's not
expressed exactly the same way, it still does exactly the
same thing, or so close that it seems confusing to keep
both. But again, if Hao really prefers to keep both, I won't
insist on it :)

Thanks all,
-John

On Wed, 2022-03-23 at 09:59 -0700, Hao Li wrote:

Thanks Bruno!

Argument for option 1 is:
1. Concise and descriptive. It avoids overloading existing functions and
it's very clear what it's doing. Imagine if there's a autocomplete feature
in Intellij or other IDE for our DSL in the future, it's not favorable to
show 6 `windowedBy` functions.
2. Option 1 is operated on `windowedStream` to configure how it should be
outputted. Option 2 operates on `KGroupedStream` to produce
`windowedStream` as well as configure how `windowedStream` should be
 outputted. I feel it's better to have a `windowedStream` and then
configure how it can be outputted. Somehow I feel option 2 breaks the
builder pattern.
3. `WindowedByParameters` doesn't seem very descriptive. If we put all
kinds of different parameters into it to avoid future overloading, it's too
bloated and not very user friendly.

I agree option 1's `trigger` function is configuring the stream which feels
different from existing `count` or `aggregate` etc. Configuring might be
also a kind of action to stream :) I'm not sure if it breaks DSL principle
and if it does,
can we relax the principle given the benefits compared to option 2)? Maybe
John can chime in as the DSL grammar author.

Thanks,
Hao

On Wed, Mar 23, 2022 at 2:59 AM Bruno Cadonna  wrote:


Hi Hao,

I agree with Guozhang: Great summary! Thank you!

Regarding "aligned with other config class names", there is this DSL
grammar John once specified
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+DSL+Grammar
and we have already used it in the code. I found the grammar quite useful.

I am undecided if option 1 is really worth it. What are actually the
arguments in favor of it? Is it only that we do not need to overload
other methods? This does not seem worth to break DSL principles. An
alternative proposal would be to go with option 2 and conform with the
grammar above:

 TimeWindowedKStream windowedBy(final Windows
windows, WindowedByParameters parameters);

Re: [DISCUSS] KIP-825: introduce a new API to control when aggregated results are produced

2022-03-23 Thread John Roesler
Hi all,

I can see both sides of this.

On one hand, when we say
"stream.groupBy().windowBy().count()", it seems like we're
telling KS to take the raw stream, group it based on key,
then window it based on time, and then compute an
aggregation on the windows. In that model, "trigger()" would
have to mean something like "trigger it", which doesn't
really make sense, since we aren't "triggering" the
aggregation (then again, to an outside observer, it would
appear that way... food for thought).

Another way to look at it is that all we're really doing is
configuring a windowed aggreation on the stream, and we're
doing it with a progressive builder interface. In other
words, the above is just a progressive builder for
configuring an operation like
"stream.aggregate(groupingConfig, windowingConfig,
countFn)". Under the latter interpretation of the DSL, it
makes perfect sense to add more optional progressive builder
methods like trigger() to the WindowedKStream interfaces.

Since part of the motivation for choosing the word "trigger"
here is to stay close to what Flink defines, I'll also point
out that Flink's syntax is also
"stream.keyBy().window().trigger().aggregate()". Not that
their API is the holy grail or anything, but it's at least
an indication that this API isn't a horrible mistake.

All other things being equal, I also prefer to leave tie-
breakers in the hands of the contributor. So, if we've all
said our piece and Hao still prefers option 1, then (as long
as we don't think it's a horrible mistake), I think we
should just let him go for it.

Speaking of which, after reviewing the responses regarding
deprecating `Suppressed#onWindowClose`, I still think we
should just go ahead and deprecate it. Although it's not
expressed exactly the same way, it still does exactly the
same thing, or so close that it seems confusing to keep
both. But again, if Hao really prefers to keep both, I won't
insist on it :)

Thanks all,
-John

On Wed, 2022-03-23 at 09:59 -0700, Hao Li wrote:
> Thanks Bruno!
> 
> Argument for option 1 is:
> 1. Concise and descriptive. It avoids overloading existing functions and
> it's very clear what it's doing. Imagine if there's a autocomplete feature
> in Intellij or other IDE for our DSL in the future, it's not favorable to
> show 6 `windowedBy` functions.
> 2. Option 1 is operated on `windowedStream` to configure how it should be
> outputted. Option 2 operates on `KGroupedStream` to produce
> `windowedStream` as well as configure how `windowedStream` should be
> outputted. I feel it's better to have a `windowedStream` and then
> configure how it can be outputted. Somehow I feel option 2 breaks the
> builder pattern.
> 3. `WindowedByParameters` doesn't seem very descriptive. If we put all
> kinds of different parameters into it to avoid future overloading, it's too
> bloated and not very user friendly.
> 
> I agree option 1's `trigger` function is configuring the stream which feels
> different from existing `count` or `aggregate` etc. Configuring might be
> also a kind of action to stream :) I'm not sure if it breaks DSL principle
> and if it does,
> can we relax the principle given the benefits compared to option 2)? Maybe
> John can chime in as the DSL grammar author.
> 
> Thanks,
> Hao
> 
> On Wed, Mar 23, 2022 at 2:59 AM Bruno Cadonna  wrote:
> 
> > Hi Hao,
> > 
> > I agree with Guozhang: Great summary! Thank you!
> > 
> > Regarding "aligned with other config class names", there is this DSL
> > grammar John once specified
> > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+DSL+Grammar
> > and we have already used it in the code. I found the grammar quite useful.
> > 
> > I am undecided if option 1 is really worth it. What are actually the
> > arguments in favor of it? Is it only that we do not need to overload
> > other methods? This does not seem worth to break DSL principles. An
> > alternative proposal would be to go with option 2 and conform with the
> > grammar above:
> > 
> >  TimeWindowedKStream windowedBy(final Windows
> > windows, WindowedByParameters parameters);
> > 
> > TimeWindowedKStream windowedBy(final SlidingWindows windows,
> > WindowedByParameters parameters);
> > 
> > SessionWindowedKStream windowedBy(final SessionWindows windows,
> > WindowedByParameters parameters);
> > 
> > This is similar to option 2 in the KIP, but it ensures that we put all
> > future needed configs in the parameters object and we do not need to
> > overload the methods anymore.
> > 
> > Then if we also get KAFKA-10298 done, we could even collapse all
> > `windowedBy()` methods into one.
> > 
> > Best,
> > Bruno
> > 
> > On 22.03.22 22:31, Guozhang Wang wrote:
> > > Thanks for the great summary Hao. I'm still learning towards option 2)
> > > here, and I'm in favor of `trigger` as function name, and `Triggered` as
> > > config class name (mainly to be aligned with other config class names).
> > > Also want to see other's preferences between the options, as well as the
> > >

Re: [DISCUSS] KIP-825: introduce a new API to control when aggregated results are produced

2022-03-23 Thread Hao Li
Thanks Bruno!

Argument for option 1 is:
1. Concise and descriptive. It avoids overloading existing functions and
it's very clear what it's doing. Imagine if there's a autocomplete feature
in Intellij or other IDE for our DSL in the future, it's not favorable to
show 6 `windowedBy` functions.
2. Option 1 is operated on `windowedStream` to configure how it should be
outputted. Option 2 operates on `KGroupedStream` to produce
`windowedStream` as well as configure how `windowedStream` should be
outputted. I feel it's better to have a `windowedStream` and then
configure how it can be outputted. Somehow I feel option 2 breaks the
builder pattern.
3. `WindowedByParameters` doesn't seem very descriptive. If we put all
kinds of different parameters into it to avoid future overloading, it's too
bloated and not very user friendly.

I agree option 1's `trigger` function is configuring the stream which feels
different from existing `count` or `aggregate` etc. Configuring might be
also a kind of action to stream :) I'm not sure if it breaks DSL principle
and if it does,
can we relax the principle given the benefits compared to option 2)? Maybe
John can chime in as the DSL grammar author.

Thanks,
Hao

On Wed, Mar 23, 2022 at 2:59 AM Bruno Cadonna  wrote:

> Hi Hao,
>
> I agree with Guozhang: Great summary! Thank you!
>
> Regarding "aligned with other config class names", there is this DSL
> grammar John once specified
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+DSL+Grammar
> and we have already used it in the code. I found the grammar quite useful.
>
> I am undecided if option 1 is really worth it. What are actually the
> arguments in favor of it? Is it only that we do not need to overload
> other methods? This does not seem worth to break DSL principles. An
> alternative proposal would be to go with option 2 and conform with the
> grammar above:
>
>  TimeWindowedKStream windowedBy(final Windows
> windows, WindowedByParameters parameters);
>
> TimeWindowedKStream windowedBy(final SlidingWindows windows,
> WindowedByParameters parameters);
>
> SessionWindowedKStream windowedBy(final SessionWindows windows,
> WindowedByParameters parameters);
>
> This is similar to option 2 in the KIP, but it ensures that we put all
> future needed configs in the parameters object and we do not need to
> overload the methods anymore.
>
> Then if we also get KAFKA-10298 done, we could even collapse all
> `windowedBy()` methods into one.
>
> Best,
> Bruno
>
> On 22.03.22 22:31, Guozhang Wang wrote:
> > Thanks for the great summary Hao. I'm still learning towards option 2)
> > here, and I'm in favor of `trigger` as function name, and `Triggered` as
> > config class name (mainly to be aligned with other config class names).
> > Also want to see other's preferences between the options, as well as the
> > namings.
> >
> >
> > Guozhang
> >
> >
> >
> > On Tue, Mar 22, 2022 at 12:23 PM Hao Li 
> wrote:
> >
> >> `windowedStream.onWindowClose()` was the original option 1
> >> (`windowedStream.emitFinal()`) but was rejected
> >> because we could add more emit types and this will result in adding more
> >> functions. I still prefer the
> >> "windowedStream.someFunc(Controlled.onWindowClose)"
> >> model since it's flexible and clear that it's configuring the emit
> policy.
> >> Let me summarize all the naming options we have and compare:
> >>
> >> *API function name:*
> >>
> >> *1. `windowedStream.trigger()`*
> >>  Pros:
> >> i. Simple
> >> ii. Similar to Flink's trigger function (is this a con
> actually?)
> >>  Cons:
> >> i. `trigger()` can be confused with Flink trigger (raised by
> John)
> >> ii. `trigger()` feels like an operation instead of a configure
> >> function (raised by Bruno)?
> >>
> >> *2. `windowedStream.emitTrigger()`*
> >>   Pros:
> >> i. Avoid confusion from Flink's trigger API
> >> ii. `emitTrigger` feels like configuring the trigger because
> >> "trigger" here is a noun instead of verbose in `trigger()`
> >>   Cons:
> >>   i: Verbose?
> >>  ii: Not consistent with `Suppressed.untilWindowClose`?
> >>
> >>
> >> *Config class/object name:*
> >>
> >> 1. *`Emitted.onWindowClose()`* and *`Emitted.onEachUpdate()`*
> >>   Cons:
> >>   i. Doesn't go along with `trigger` (raised by Bruno)
> >>
> >> 2. *`Triggered.onWindowClose()`* and *`Triggered.onEachUpdate()`*
> >>
> >> 3. *`EmitTrigger.onWindowClose()`* and *`EmitTrigger.onEachUpdate()`*
> >>
> >> 4. *`(Emit|Trigger)(Config|Policy).onWindowClose()`* and
> >> *`(Emit|Trigger)(Config|Policy).onEachUpdate()`*
> >>   This is a combination of different names like: `EmitConfig`,
> >> `EmitPolicy`, `TriggerConfig` and `TriggerPolicy`...
> >>
> >>
> >> If we are settled with option 1), we can add new options to these names
> and
> >> comment on their Pros and Cons.
> >>
> >> Hao
> >>
> >>
> >>
> >>
> >>
> >> On Tue, Mar 22, 2022 at 10:48 AM Guozhang Wang 
> wrote:
> >>
> >>> I see what 

Re: [DISCUSS] KIP-825: introduce a new API to control when aggregated results are produced

2022-03-23 Thread Bruno Cadonna

Hi Hao,

I agree with Guozhang: Great summary! Thank you!

Regarding "aligned with other config class names", there is this DSL 
grammar John once specified

https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+DSL+Grammar
and we have already used it in the code. I found the grammar quite useful.

I am undecided if option 1 is really worth it. What are actually the 
arguments in favor of it? Is it only that we do not need to overload 
other methods? This does not seem worth to break DSL principles. An 
alternative proposal would be to go with option 2 and conform with the 
grammar above:


 TimeWindowedKStream windowedBy(final Windows 
windows, WindowedByParameters parameters);


TimeWindowedKStream windowedBy(final SlidingWindows windows, 
WindowedByParameters parameters);


SessionWindowedKStream windowedBy(final SessionWindows windows, 
WindowedByParameters parameters);


This is similar to option 2 in the KIP, but it ensures that we put all 
future needed configs in the parameters object and we do not need to 
overload the methods anymore.


Then if we also get KAFKA-10298 done, we could even collapse all 
`windowedBy()` methods into one.


Best,
Bruno

On 22.03.22 22:31, Guozhang Wang wrote:

Thanks for the great summary Hao. I'm still learning towards option 2)
here, and I'm in favor of `trigger` as function name, and `Triggered` as
config class name (mainly to be aligned with other config class names).
Also want to see other's preferences between the options, as well as the
namings.


Guozhang



On Tue, Mar 22, 2022 at 12:23 PM Hao Li  wrote:


`windowedStream.onWindowClose()` was the original option 1
(`windowedStream.emitFinal()`) but was rejected
because we could add more emit types and this will result in adding more
functions. I still prefer the
"windowedStream.someFunc(Controlled.onWindowClose)"
model since it's flexible and clear that it's configuring the emit policy.
Let me summarize all the naming options we have and compare:

*API function name:*

*1. `windowedStream.trigger()`*
 Pros:
i. Simple
ii. Similar to Flink's trigger function (is this a con actually?)
 Cons:
i. `trigger()` can be confused with Flink trigger (raised by John)
ii. `trigger()` feels like an operation instead of a configure
function (raised by Bruno)?

*2. `windowedStream.emitTrigger()`*
  Pros:
i. Avoid confusion from Flink's trigger API
ii. `emitTrigger` feels like configuring the trigger because
"trigger" here is a noun instead of verbose in `trigger()`
  Cons:
  i: Verbose?
 ii: Not consistent with `Suppressed.untilWindowClose`?


*Config class/object name:*

1. *`Emitted.onWindowClose()`* and *`Emitted.onEachUpdate()`*
  Cons:
  i. Doesn't go along with `trigger` (raised by Bruno)

2. *`Triggered.onWindowClose()`* and *`Triggered.onEachUpdate()`*

3. *`EmitTrigger.onWindowClose()`* and *`EmitTrigger.onEachUpdate()`*

4. *`(Emit|Trigger)(Config|Policy).onWindowClose()`* and
*`(Emit|Trigger)(Config|Policy).onEachUpdate()`*
  This is a combination of different names like: `EmitConfig`,
`EmitPolicy`, `TriggerConfig` and `TriggerPolicy`...


If we are settled with option 1), we can add new options to these names and
comment on their Pros and Cons.

Hao





On Tue, Mar 22, 2022 at 10:48 AM Guozhang Wang  wrote:


I see what you mean now, and I think it's a fair point that composing
`trigger` and `emitted` seems awkward.

Re: data process operator v.s. control operator, I shared your concern as
well, and here's my train of thoughts: Having only data process operators
was my primary motivation for how we add the suppress operator --- it
indeed "suppresses" data. But as a hind-sight it's disadvantage is that,
for example in Suppressed.onWindowClose() should be only related to an
earlier windowedBy operator which is possibly very far from it in the
resulting DSL code. It's not only a bit awkward for users to write such
code, but also in such cases the DSL builder needs to maintain and
propagate this information to the suppress operator further down. So we

are

now thinking about "putting the control object as close as to where the
related processor really happens". And in that world my original
preference was somewhere in option 2), i.e. just put the control as a

param

of the related "windowedBy" operator, but the trade-off is we keep adding
overloaded functions to these operators. So after some back and forth
thoughts I'm learning towards relaxing our principles to only have
processing operators but no flow-control operators. That being said, if

you

have any ideas that we can have both world's benefits I'm all ears.

Re: using a direct function like "windowedStream.onWindowClose()" v.s.
"windowedStream.someFunc(Controlled.onWindowClose)", again my motivation
for the latter is for extensibility without adding more functions in the
future. If people feel this is not worthy we can do the first option as
well. If we just feel

Re: [DISCUSS] KIP-825: introduce a new API to control when aggregated results are produced

2022-03-22 Thread Guozhang Wang
Thanks for the great summary Hao. I'm still learning towards option 2)
here, and I'm in favor of `trigger` as function name, and `Triggered` as
config class name (mainly to be aligned with other config class names).
Also want to see other's preferences between the options, as well as the
namings.


Guozhang



On Tue, Mar 22, 2022 at 12:23 PM Hao Li  wrote:

> `windowedStream.onWindowClose()` was the original option 1
> (`windowedStream.emitFinal()`) but was rejected
> because we could add more emit types and this will result in adding more
> functions. I still prefer the
> "windowedStream.someFunc(Controlled.onWindowClose)"
> model since it's flexible and clear that it's configuring the emit policy.
> Let me summarize all the naming options we have and compare:
>
> *API function name:*
>
> *1. `windowedStream.trigger()`*
> Pros:
>i. Simple
>ii. Similar to Flink's trigger function (is this a con actually?)
> Cons:
>i. `trigger()` can be confused with Flink trigger (raised by John)
>ii. `trigger()` feels like an operation instead of a configure
> function (raised by Bruno)?
>
> *2. `windowedStream.emitTrigger()`*
>  Pros:
>i. Avoid confusion from Flink's trigger API
>ii. `emitTrigger` feels like configuring the trigger because
> "trigger" here is a noun instead of verbose in `trigger()`
>  Cons:
>  i: Verbose?
> ii: Not consistent with `Suppressed.untilWindowClose`?
>
>
> *Config class/object name:*
>
> 1. *`Emitted.onWindowClose()`* and *`Emitted.onEachUpdate()`*
>  Cons:
>  i. Doesn't go along with `trigger` (raised by Bruno)
>
> 2. *`Triggered.onWindowClose()`* and *`Triggered.onEachUpdate()`*
>
> 3. *`EmitTrigger.onWindowClose()`* and *`EmitTrigger.onEachUpdate()`*
>
> 4. *`(Emit|Trigger)(Config|Policy).onWindowClose()`* and
> *`(Emit|Trigger)(Config|Policy).onEachUpdate()`*
>  This is a combination of different names like: `EmitConfig`,
> `EmitPolicy`, `TriggerConfig` and `TriggerPolicy`...
>
>
> If we are settled with option 1), we can add new options to these names and
> comment on their Pros and Cons.
>
> Hao
>
>
>
>
>
> On Tue, Mar 22, 2022 at 10:48 AM Guozhang Wang  wrote:
>
> > I see what you mean now, and I think it's a fair point that composing
> > `trigger` and `emitted` seems awkward.
> >
> > Re: data process operator v.s. control operator, I shared your concern as
> > well, and here's my train of thoughts: Having only data process operators
> > was my primary motivation for how we add the suppress operator --- it
> > indeed "suppresses" data. But as a hind-sight it's disadvantage is that,
> > for example in Suppressed.onWindowClose() should be only related to an
> > earlier windowedBy operator which is possibly very far from it in the
> > resulting DSL code. It's not only a bit awkward for users to write such
> > code, but also in such cases the DSL builder needs to maintain and
> > propagate this information to the suppress operator further down. So we
> are
> > now thinking about "putting the control object as close as to where the
> > related processor really happens". And in that world my original
> > preference was somewhere in option 2), i.e. just put the control as a
> param
> > of the related "windowedBy" operator, but the trade-off is we keep adding
> > overloaded functions to these operators. So after some back and forth
> > thoughts I'm learning towards relaxing our principles to only have
> > processing operators but no flow-control operators. That being said, if
> you
> > have any ideas that we can have both world's benefits I'm all ears.
> >
> > Re: using a direct function like "windowedStream.onWindowClose()" v.s.
> > "windowedStream.someFunc(Controlled.onWindowClose)", again my motivation
> > for the latter is for extensibility without adding more functions in the
> > future. If people feel this is not worthy we can do the first option as
> > well. If we just feel the `trigger` and `emitted` does not feel
> composible
> > together, maybe we can consider something like
> > `windowedStream.trigger(Triggered.onWindowClose())"?
> >
> > Re: windowedBy v.s. windowBy, yeah I do not really have a good reason why
> > we should use past term as well :P But if it's not bothering people much
> > I'd say we just keep it than deprecate/rename new APIs.
> >
> >
> > On Tue, Mar 22, 2022 at 9:42 AM Bruno Cadonna 
> wrote:
> >
> > > Hi Guozhang,
> > >
> > > There is no semantic difference. It is a cosmetic difference.
> > > Conceptually, I relate `Emitted` with the aggregation and not with
> > > `trigger()` in the API flow, because the aggregation emits the result
> > > not `trigger()`. Therefore, I proposed to not use `Emitted` as the name
> > > of the config object passed to `trigger()`.
> > >
> > >
> > > Best,
> > > Bruno
> > >
> > > On 22.03.22 17:24, Guozhang Wang wrote:
> > > > Hi Bruno,
> > > >
> > > > Could you elaborate a bit more here, what's the semantic difference
> > > between
> > > > "the aggregat

Re: [DISCUSS] KIP-825: introduce a new API to control when aggregated results are produced

2022-03-22 Thread Hao Li
`windowedStream.onWindowClose()` was the original option 1
(`windowedStream.emitFinal()`) but was rejected
because we could add more emit types and this will result in adding more
functions. I still prefer the
"windowedStream.someFunc(Controlled.onWindowClose)"
model since it's flexible and clear that it's configuring the emit policy.
Let me summarize all the naming options we have and compare:

*API function name:*

*1. `windowedStream.trigger()`*
Pros:
   i. Simple
   ii. Similar to Flink's trigger function (is this a con actually?)
Cons:
   i. `trigger()` can be confused with Flink trigger (raised by John)
   ii. `trigger()` feels like an operation instead of a configure
function (raised by Bruno)?

*2. `windowedStream.emitTrigger()`*
 Pros:
   i. Avoid confusion from Flink's trigger API
   ii. `emitTrigger` feels like configuring the trigger because
"trigger" here is a noun instead of verbose in `trigger()`
 Cons:
 i: Verbose?
ii: Not consistent with `Suppressed.untilWindowClose`?


*Config class/object name:*

1. *`Emitted.onWindowClose()`* and *`Emitted.onEachUpdate()`*
 Cons:
 i. Doesn't go along with `trigger` (raised by Bruno)

2. *`Triggered.onWindowClose()`* and *`Triggered.onEachUpdate()`*

3. *`EmitTrigger.onWindowClose()`* and *`EmitTrigger.onEachUpdate()`*

4. *`(Emit|Trigger)(Config|Policy).onWindowClose()`* and
*`(Emit|Trigger)(Config|Policy).onEachUpdate()`*
 This is a combination of different names like: `EmitConfig`,
`EmitPolicy`, `TriggerConfig` and `TriggerPolicy`...


If we are settled with option 1), we can add new options to these names and
comment on their Pros and Cons.

Hao





On Tue, Mar 22, 2022 at 10:48 AM Guozhang Wang  wrote:

> I see what you mean now, and I think it's a fair point that composing
> `trigger` and `emitted` seems awkward.
>
> Re: data process operator v.s. control operator, I shared your concern as
> well, and here's my train of thoughts: Having only data process operators
> was my primary motivation for how we add the suppress operator --- it
> indeed "suppresses" data. But as a hind-sight it's disadvantage is that,
> for example in Suppressed.onWindowClose() should be only related to an
> earlier windowedBy operator which is possibly very far from it in the
> resulting DSL code. It's not only a bit awkward for users to write such
> code, but also in such cases the DSL builder needs to maintain and
> propagate this information to the suppress operator further down. So we are
> now thinking about "putting the control object as close as to where the
> related processor really happens". And in that world my original
> preference was somewhere in option 2), i.e. just put the control as a param
> of the related "windowedBy" operator, but the trade-off is we keep adding
> overloaded functions to these operators. So after some back and forth
> thoughts I'm learning towards relaxing our principles to only have
> processing operators but no flow-control operators. That being said, if you
> have any ideas that we can have both world's benefits I'm all ears.
>
> Re: using a direct function like "windowedStream.onWindowClose()" v.s.
> "windowedStream.someFunc(Controlled.onWindowClose)", again my motivation
> for the latter is for extensibility without adding more functions in the
> future. If people feel this is not worthy we can do the first option as
> well. If we just feel the `trigger` and `emitted` does not feel composible
> together, maybe we can consider something like
> `windowedStream.trigger(Triggered.onWindowClose())"?
>
> Re: windowedBy v.s. windowBy, yeah I do not really have a good reason why
> we should use past term as well :P But if it's not bothering people much
> I'd say we just keep it than deprecate/rename new APIs.
>
>
> On Tue, Mar 22, 2022 at 9:42 AM Bruno Cadonna  wrote:
>
> > Hi Guozhang,
> >
> > There is no semantic difference. It is a cosmetic difference.
> > Conceptually, I relate `Emitted` with the aggregation and not with
> > `trigger()` in the API flow, because the aggregation emits the result
> > not `trigger()`. Therefore, I proposed to not use `Emitted` as the name
> > of the config object passed to `trigger()`.
> >
> >
> > Best,
> > Bruno
> >
> > On 22.03.22 17:24, Guozhang Wang wrote:
> > > Hi Bruno,
> > >
> > > Could you elaborate a bit more here, what's the semantic difference
> > between
> > > "the aggregation is triggered on window close and all aggregation
> results
> > > are emitted." for trigger(TriggerParameters.onWindowClose()), and "the
> > > aggregation is configured to only emit final results." for
> > > trigger(Emitted.onWindowClose())?
> > >
> > > On Tue, Mar 22, 2022 at 4:19 AM Bruno Cadonna 
> > wrote:
> > >
> > >> Hi Hao,
> > >>
> > >> Thank you for the KIP!
> > >>
> > >> Regarding option 1, I would not use `Emitted.onWindowClose()` since
> that
> > >> does not seem compatible with the proposed flow. Conceptually, now the
> > >> flow states that the aggre

Re: [DISCUSS] KIP-825: introduce a new API to control when aggregated results are produced

2022-03-22 Thread Guozhang Wang
I see what you mean now, and I think it's a fair point that composing
`trigger` and `emitted` seems awkward.

Re: data process operator v.s. control operator, I shared your concern as
well, and here's my train of thoughts: Having only data process operators
was my primary motivation for how we add the suppress operator --- it
indeed "suppresses" data. But as a hind-sight it's disadvantage is that,
for example in Suppressed.onWindowClose() should be only related to an
earlier windowedBy operator which is possibly very far from it in the
resulting DSL code. It's not only a bit awkward for users to write such
code, but also in such cases the DSL builder needs to maintain and
propagate this information to the suppress operator further down. So we are
now thinking about "putting the control object as close as to where the
related processor really happens". And in that world my original
preference was somewhere in option 2), i.e. just put the control as a param
of the related "windowedBy" operator, but the trade-off is we keep adding
overloaded functions to these operators. So after some back and forth
thoughts I'm learning towards relaxing our principles to only have
processing operators but no flow-control operators. That being said, if you
have any ideas that we can have both world's benefits I'm all ears.

Re: using a direct function like "windowedStream.onWindowClose()" v.s.
"windowedStream.someFunc(Controlled.onWindowClose)", again my motivation
for the latter is for extensibility without adding more functions in the
future. If people feel this is not worthy we can do the first option as
well. If we just feel the `trigger` and `emitted` does not feel composible
together, maybe we can consider something like
`windowedStream.trigger(Triggered.onWindowClose())"?

Re: windowedBy v.s. windowBy, yeah I do not really have a good reason why
we should use past term as well :P But if it's not bothering people much
I'd say we just keep it than deprecate/rename new APIs.


On Tue, Mar 22, 2022 at 9:42 AM Bruno Cadonna  wrote:

> Hi Guozhang,
>
> There is no semantic difference. It is a cosmetic difference.
> Conceptually, I relate `Emitted` with the aggregation and not with
> `trigger()` in the API flow, because the aggregation emits the result
> not `trigger()`. Therefore, I proposed to not use `Emitted` as the name
> of the config object passed to `trigger()`.
>
>
> Best,
> Bruno
>
> On 22.03.22 17:24, Guozhang Wang wrote:
> > Hi Bruno,
> >
> > Could you elaborate a bit more here, what's the semantic difference
> between
> > "the aggregation is triggered on window close and all aggregation results
> > are emitted." for trigger(TriggerParameters.onWindowClose()), and "the
> > aggregation is configured to only emit final results." for
> > trigger(Emitted.onWindowClose())?
> >
> > On Tue, Mar 22, 2022 at 4:19 AM Bruno Cadonna 
> wrote:
> >
> >> Hi Hao,
> >>
> >> Thank you for the KIP!
> >>
> >> Regarding option 1, I would not use `Emitted.onWindowClose()` since that
> >> does not seem compatible with the proposed flow. Conceptually, now the
> >> flow states that the aggregation is triggered on window close and all
> >> aggregation results are emitted. `Emitted` suggests that the aggregation
> >> is configured to only emit final results.
> >>
> >> Thus, I propose the following:
> >>
> >> stream
> >>   .groupBy(..)
> >>   .windowedBy(..)
> >>   .trigger(TriggerParameters.onWindowClose())
> >>   .aggregate(..) //result in a KTable>
> >>   .mapValues(..)
> >>
> >> An alternative to `trigger()` could be `schedule()`, but I do not really
> >> like it.
> >>
> >> One thing I noticed with option 1 is that all other methods in the
> >> example above are operations on data. `groupBy()` groups, `windowedBy()`
> >> partitions, `aggregate()` computes the aggregate, `mapValues()` maps
> >> values, even `suppress()` suppresses intermediate results. But what does
> >> `trigger()` do? `trigger()` seems a config lost among operations.
> >>
> >> However, if we do not want to restrict ourselves to only use methods
> >> when we want to specify operations on data, I have the following
> proposal:
> >>
> >> stream
> >>   .groupBy(..)
> >>   .windowedBy(..)
> >>   .onWindowClose()
> >>   .aggregate(..) //result in a KTable>
> >>   .mapValues(..)
> >>
> >> Best,
> >> Bruno
> >>
> >> P.S.: Why is it `windowedBy()` and not `windowBy()`? All other
> >> operations also use present tense.
> >>
> >> On 22.03.22 06:36, Hao Li wrote:
> >>> Hi John,
> >>>
> >>> Yes. For naming, `trigger` is similar to Flink's trigger, but it has a
> >>> different meaning in our case. `emit` sounds like an action to emit?
> How
> >>> about `emitTrigger`? I'm open to suggestions for the naming.
> >>>
> >>> For deprecating `Suppressed.untilWindowClose`, I agree with Guozhang we
> >> can
> >>> deprecate `Suppressed` config as a whole later. Or we can deprecate
> >>> `Suppressed.untilWindowClose` in later KIP after implementation of em

Re: [DISCUSS] KIP-825: introduce a new API to control when aggregated results are produced

2022-03-22 Thread Hao Li
Hi Bruno, Guozhang,

I think `Emitted.onWindowClose` is inspired by `Suppressed.untilWindowClose`.
Given `Suppressed.untilWindowClose` is used inside `suppress()`,
`Suppressed` sounds good. However, `trigger()` is a builder style setting
instead of an action, `Emitted` may be a bit off.

How about using `emitTrigger()` as the function name? Benefits are:
1. not to be confused with Flink's `trigger` as John mentioned.
2. make it sound like a setting instead of an action. `trigger()` sounds
like we are triggering some action.

For the config type name. How about `EmitConfig` or `EmitTriggerConfig` to
make it clear it's a config for emit trigger.

Hao

On Tue, Mar 22, 2022 at 9:43 AM Bruno Cadonna  wrote:

> Hi Guozhang,
>
> There is no semantic difference. It is a cosmetic difference.
> Conceptually, I relate `Emitted` with the aggregation and not with
> `trigger()` in the API flow, because the aggregation emits the result
> not `trigger()`. Therefore, I proposed to not use `Emitted` as the name
> of the config object passed to `trigger()`.
>
>
> Best,
> Bruno
>
> On 22.03.22 17:24, Guozhang Wang wrote:
> > Hi Bruno,
> >
> > Could you elaborate a bit more here, what's the semantic difference
> between
> > "the aggregation is triggered on window close and all aggregation results
> > are emitted." for trigger(TriggerParameters.onWindowClose()), and "the
> > aggregation is configured to only emit final results." for
> > trigger(Emitted.onWindowClose())?
> >
> > On Tue, Mar 22, 2022 at 4:19 AM Bruno Cadonna 
> wrote:
> >
> >> Hi Hao,
> >>
> >> Thank you for the KIP!
> >>
> >> Regarding option 1, I would not use `Emitted.onWindowClose()` since that
> >> does not seem compatible with the proposed flow. Conceptually, now the
> >> flow states that the aggregation is triggered on window close and all
> >> aggregation results are emitted. `Emitted` suggests that the aggregation
> >> is configured to only emit final results.
> >>
> >> Thus, I propose the following:
> >>
> >> stream
> >>   .groupBy(..)
> >>   .windowedBy(..)
> >>   .trigger(TriggerParameters.onWindowClose())
> >>   .aggregate(..) //result in a KTable>
> >>   .mapValues(..)
> >>
> >> An alternative to `trigger()` could be `schedule()`, but I do not really
> >> like it.
> >>
> >> One thing I noticed with option 1 is that all other methods in the
> >> example above are operations on data. `groupBy()` groups, `windowedBy()`
> >> partitions, `aggregate()` computes the aggregate, `mapValues()` maps
> >> values, even `suppress()` suppresses intermediate results. But what does
> >> `trigger()` do? `trigger()` seems a config lost among operations.
> >>
> >> However, if we do not want to restrict ourselves to only use methods
> >> when we want to specify operations on data, I have the following
> proposal:
> >>
> >> stream
> >>   .groupBy(..)
> >>   .windowedBy(..)
> >>   .onWindowClose()
> >>   .aggregate(..) //result in a KTable>
> >>   .mapValues(..)
> >>
> >> Best,
> >> Bruno
> >>
> >> P.S.: Why is it `windowedBy()` and not `windowBy()`? All other
> >> operations also use present tense.
> >>
> >> On 22.03.22 06:36, Hao Li wrote:
> >>> Hi John,
> >>>
> >>> Yes. For naming, `trigger` is similar to Flink's trigger, but it has a
> >>> different meaning in our case. `emit` sounds like an action to emit?
> How
> >>> about `emitTrigger`? I'm open to suggestions for the naming.
> >>>
> >>> For deprecating `Suppressed.untilWindowClose`, I agree with Guozhang we
> >> can
> >>> deprecate `Suppressed` config as a whole later. Or we can deprecate
> >>> `Suppressed.untilWindowClose` in later KIP after implementation of emit
> >>> final is done.
> >>>
> >>> BTW, isn't
> >>>
> >>> stream
> >>> .groupBy(..)
> >>> .windowBy(..)
> >>> .aggregate(..) //result in a KTable>
> >>> .mapValues(..)
> >>> .suppress(Suppressed.untilWindowClose) // since we can trace back
> to
> >>> parent node, to find a window definition
> >>>
> >>> same as
> >>>
> >>> stream
> >>> .groupBy(..)
> >>> .windowBy(..)
> >>> .trigger(Emitted.onWindowClose)
> >>> .aggregate(..) //result in a KTable>
> >>> .mapValues(..)
> >>> ?
> >>>
> >>> Hao
> >>>
> >>>
> >>> On Mon, Mar 21, 2022 at 7:28 PM Guozhang Wang 
> >> wrote:
> >>>
>  I think the following case is only doable via `suppress`:
> 
>  stream
>  .groupBy(..)
>  .windowBy(..)
>  .aggregate(..) //result in a KTable>
>  .mapValues(..)
>  .suppress(Suppressed.untilWindowClose) // since we can trace back
> to
>  parent node, to find a window definition
> 
> 
>  Guozhang
> 
> 
>  On Mon, Mar 21, 2022 at 6:36 PM John Roesler 
> >> wrote:
> 
> > Thanks, Guozhang!
> >
> > To clarify, I was asking specifically about deprecating just the
> method
> > ‘untilWindowClose’. I might not be thinking clearly about it, though.
>  What
> > does untilWindowClose do that this KIP doesn’t co

Re: [DISCUSS] KIP-825: introduce a new API to control when aggregated results are produced

2022-03-22 Thread Bruno Cadonna

Hi Guozhang,

There is no semantic difference. It is a cosmetic difference. 
Conceptually, I relate `Emitted` with the aggregation and not with 
`trigger()` in the API flow, because the aggregation emits the result 
not `trigger()`. Therefore, I proposed to not use `Emitted` as the name 
of the config object passed to `trigger()`.



Best,
Bruno

On 22.03.22 17:24, Guozhang Wang wrote:

Hi Bruno,

Could you elaborate a bit more here, what's the semantic difference between
"the aggregation is triggered on window close and all aggregation results
are emitted." for trigger(TriggerParameters.onWindowClose()), and "the
aggregation is configured to only emit final results." for
trigger(Emitted.onWindowClose())?

On Tue, Mar 22, 2022 at 4:19 AM Bruno Cadonna  wrote:


Hi Hao,

Thank you for the KIP!

Regarding option 1, I would not use `Emitted.onWindowClose()` since that
does not seem compatible with the proposed flow. Conceptually, now the
flow states that the aggregation is triggered on window close and all
aggregation results are emitted. `Emitted` suggests that the aggregation
is configured to only emit final results.

Thus, I propose the following:

stream
  .groupBy(..)
  .windowedBy(..)
  .trigger(TriggerParameters.onWindowClose())
  .aggregate(..) //result in a KTable>
  .mapValues(..)

An alternative to `trigger()` could be `schedule()`, but I do not really
like it.

One thing I noticed with option 1 is that all other methods in the
example above are operations on data. `groupBy()` groups, `windowedBy()`
partitions, `aggregate()` computes the aggregate, `mapValues()` maps
values, even `suppress()` suppresses intermediate results. But what does
`trigger()` do? `trigger()` seems a config lost among operations.

However, if we do not want to restrict ourselves to only use methods
when we want to specify operations on data, I have the following proposal:

stream
  .groupBy(..)
  .windowedBy(..)
  .onWindowClose()
  .aggregate(..) //result in a KTable>
  .mapValues(..)

Best,
Bruno

P.S.: Why is it `windowedBy()` and not `windowBy()`? All other
operations also use present tense.

On 22.03.22 06:36, Hao Li wrote:

Hi John,

Yes. For naming, `trigger` is similar to Flink's trigger, but it has a
different meaning in our case. `emit` sounds like an action to emit? How
about `emitTrigger`? I'm open to suggestions for the naming.

For deprecating `Suppressed.untilWindowClose`, I agree with Guozhang we

can

deprecate `Suppressed` config as a whole later. Or we can deprecate
`Suppressed.untilWindowClose` in later KIP after implementation of emit
final is done.

BTW, isn't

stream
.groupBy(..)
.windowBy(..)
.aggregate(..) //result in a KTable>
.mapValues(..)
.suppress(Suppressed.untilWindowClose) // since we can trace back to
parent node, to find a window definition

same as

stream
.groupBy(..)
.windowBy(..)
.trigger(Emitted.onWindowClose)
.aggregate(..) //result in a KTable>
.mapValues(..)
?

Hao


On Mon, Mar 21, 2022 at 7:28 PM Guozhang Wang 

wrote:



I think the following case is only doable via `suppress`:

stream
.groupBy(..)
.windowBy(..)
.aggregate(..) //result in a KTable>
.mapValues(..)
.suppress(Suppressed.untilWindowClose) // since we can trace back to
parent node, to find a window definition


Guozhang


On Mon, Mar 21, 2022 at 6:36 PM John Roesler 

wrote:



Thanks, Guozhang!

To clarify, I was asking specifically about deprecating just the method
‘untilWindowClose’. I might not be thinking clearly about it, though.

What

does untilWindowClose do that this KIP doesn’t cover?

Thanks,
John

On Mon, Mar 21, 2022, at 20:31, Guozhang Wang wrote:

Just my 2c: Suppressed is in `suppress` whose application scope is

much

larger and hence more flexible. I.e. it can be used anywhere for a

`KTable`

(but internally we would check whether certain emit policies like
`untilWindowClose` is valid or not), whereas `trigger` as for now is

only

applicable in XXWindowedKStream. So I think it would not be completely
replacing Suppressed.untilWindowClose.

In the future, personally I'd still want to keep one control object

still

for all emit policies, and maybe if we have extended Emitted for other
emitting policies covered by Suppressed today, we can discuss if we

could

have `KTable.suppress(Emitted..)` replacing

`KTable.suppress(Suppressed..)`

as a whole, but for this KIP I think it's too early.


Guozhang


On Mon, Mar 21, 2022 at 6:18 PM John Roesler 

wrote:



Hi all,

Thanks for the Kip, Hao!

For what it’s worth, I’m also in favor of your latest framing of the

API,


I think the name is fine. I assume it’s inspired by Flink? It’s not
identical to the concept of a trigger in Flink, which specifies when

to

evaluate the window, which might be confusing to some people who have

deep

experience with Flink. Then again, it seems close enough that it

should

be

clear to casual Flink users. For people wi

Re: [DISCUSS] KIP-825: introduce a new API to control when aggregated results are produced

2022-03-22 Thread Guozhang Wang
Hi Bruno,

Could you elaborate a bit more here, what's the semantic difference between
"the aggregation is triggered on window close and all aggregation results
are emitted." for trigger(TriggerParameters.onWindowClose()), and "the
aggregation is configured to only emit final results." for
trigger(Emitted.onWindowClose())?

On Tue, Mar 22, 2022 at 4:19 AM Bruno Cadonna  wrote:

> Hi Hao,
>
> Thank you for the KIP!
>
> Regarding option 1, I would not use `Emitted.onWindowClose()` since that
> does not seem compatible with the proposed flow. Conceptually, now the
> flow states that the aggregation is triggered on window close and all
> aggregation results are emitted. `Emitted` suggests that the aggregation
> is configured to only emit final results.
>
> Thus, I propose the following:
>
> stream
>  .groupBy(..)
>  .windowedBy(..)
>  .trigger(TriggerParameters.onWindowClose())
>  .aggregate(..) //result in a KTable>
>  .mapValues(..)
>
> An alternative to `trigger()` could be `schedule()`, but I do not really
> like it.
>
> One thing I noticed with option 1 is that all other methods in the
> example above are operations on data. `groupBy()` groups, `windowedBy()`
> partitions, `aggregate()` computes the aggregate, `mapValues()` maps
> values, even `suppress()` suppresses intermediate results. But what does
> `trigger()` do? `trigger()` seems a config lost among operations.
>
> However, if we do not want to restrict ourselves to only use methods
> when we want to specify operations on data, I have the following proposal:
>
> stream
>  .groupBy(..)
>  .windowedBy(..)
>  .onWindowClose()
>  .aggregate(..) //result in a KTable>
>  .mapValues(..)
>
> Best,
> Bruno
>
> P.S.: Why is it `windowedBy()` and not `windowBy()`? All other
> operations also use present tense.
>
> On 22.03.22 06:36, Hao Li wrote:
> > Hi John,
> >
> > Yes. For naming, `trigger` is similar to Flink's trigger, but it has a
> > different meaning in our case. `emit` sounds like an action to emit? How
> > about `emitTrigger`? I'm open to suggestions for the naming.
> >
> > For deprecating `Suppressed.untilWindowClose`, I agree with Guozhang we
> can
> > deprecate `Suppressed` config as a whole later. Or we can deprecate
> > `Suppressed.untilWindowClose` in later KIP after implementation of emit
> > final is done.
> >
> > BTW, isn't
> >
> > stream
> >.groupBy(..)
> >.windowBy(..)
> >.aggregate(..) //result in a KTable>
> >.mapValues(..)
> >.suppress(Suppressed.untilWindowClose) // since we can trace back to
> > parent node, to find a window definition
> >
> > same as
> >
> > stream
> >.groupBy(..)
> >.windowBy(..)
> >.trigger(Emitted.onWindowClose)
> >.aggregate(..) //result in a KTable>
> >.mapValues(..)
> > ?
> >
> > Hao
> >
> >
> > On Mon, Mar 21, 2022 at 7:28 PM Guozhang Wang 
> wrote:
> >
> >> I think the following case is only doable via `suppress`:
> >>
> >> stream
> >>.groupBy(..)
> >>.windowBy(..)
> >>.aggregate(..) //result in a KTable>
> >>.mapValues(..)
> >>.suppress(Suppressed.untilWindowClose) // since we can trace back to
> >> parent node, to find a window definition
> >>
> >>
> >> Guozhang
> >>
> >>
> >> On Mon, Mar 21, 2022 at 6:36 PM John Roesler 
> wrote:
> >>
> >>> Thanks, Guozhang!
> >>>
> >>> To clarify, I was asking specifically about deprecating just the method
> >>> ‘untilWindowClose’. I might not be thinking clearly about it, though.
> >> What
> >>> does untilWindowClose do that this KIP doesn’t cover?
> >>>
> >>> Thanks,
> >>> John
> >>>
> >>> On Mon, Mar 21, 2022, at 20:31, Guozhang Wang wrote:
>  Just my 2c: Suppressed is in `suppress` whose application scope is
> much
>  larger and hence more flexible. I.e. it can be used anywhere for a
> >>> `KTable`
>  (but internally we would check whether certain emit policies like
>  `untilWindowClose` is valid or not), whereas `trigger` as for now is
> >> only
>  applicable in XXWindowedKStream. So I think it would not be completely
>  replacing Suppressed.untilWindowClose.
> 
>  In the future, personally I'd still want to keep one control object
> >> still
>  for all emit policies, and maybe if we have extended Emitted for other
>  emitting policies covered by Suppressed today, we can discuss if we
> >> could
>  have `KTable.suppress(Emitted..)` replacing
> >>> `KTable.suppress(Suppressed..)`
>  as a whole, but for this KIP I think it's too early.
> 
> 
>  Guozhang
> 
> 
>  On Mon, Mar 21, 2022 at 6:18 PM John Roesler 
> >>> wrote:
> 
> > Hi all,
> >
> > Thanks for the Kip, Hao!
> >
> > For what it’s worth, I’m also in favor of your latest framing of the
> >>> API,
> >
> > I think the name is fine. I assume it’s inspired by Flink? It’s not
> > identical to the concept of a trigger in Flink, which specifies when
> >> to
> > evaluate the window, which might be confusing to

Re: [DISCUSS] KIP-825: introduce a new API to control when aggregated results are produced

2022-03-22 Thread Bruno Cadonna

Hi Hao,

Thank you for the KIP!

Regarding option 1, I would not use `Emitted.onWindowClose()` since that 
does not seem compatible with the proposed flow. Conceptually, now the 
flow states that the aggregation is triggered on window close and all 
aggregation results are emitted. `Emitted` suggests that the aggregation 
is configured to only emit final results.


Thus, I propose the following:

stream
.groupBy(..)
.windowedBy(..)
.trigger(TriggerParameters.onWindowClose())
.aggregate(..) //result in a KTable>
.mapValues(..)

An alternative to `trigger()` could be `schedule()`, but I do not really 
like it.


One thing I noticed with option 1 is that all other methods in the 
example above are operations on data. `groupBy()` groups, `windowedBy()` 
partitions, `aggregate()` computes the aggregate, `mapValues()` maps 
values, even `suppress()` suppresses intermediate results. But what does 
`trigger()` do? `trigger()` seems a config lost among operations.


However, if we do not want to restrict ourselves to only use methods 
when we want to specify operations on data, I have the following proposal:


stream
.groupBy(..)
.windowedBy(..)
.onWindowClose()
.aggregate(..) //result in a KTable>
.mapValues(..)

Best,
Bruno

P.S.: Why is it `windowedBy()` and not `windowBy()`? All other 
operations also use present tense.


On 22.03.22 06:36, Hao Li wrote:

Hi John,

Yes. For naming, `trigger` is similar to Flink's trigger, but it has a
different meaning in our case. `emit` sounds like an action to emit? How
about `emitTrigger`? I'm open to suggestions for the naming.

For deprecating `Suppressed.untilWindowClose`, I agree with Guozhang we can
deprecate `Suppressed` config as a whole later. Or we can deprecate
`Suppressed.untilWindowClose` in later KIP after implementation of emit
final is done.

BTW, isn't

stream
   .groupBy(..)
   .windowBy(..)
   .aggregate(..) //result in a KTable>
   .mapValues(..)
   .suppress(Suppressed.untilWindowClose) // since we can trace back to
parent node, to find a window definition

same as

stream
   .groupBy(..)
   .windowBy(..)
   .trigger(Emitted.onWindowClose)
   .aggregate(..) //result in a KTable>
   .mapValues(..)
?

Hao


On Mon, Mar 21, 2022 at 7:28 PM Guozhang Wang  wrote:


I think the following case is only doable via `suppress`:

stream
   .groupBy(..)
   .windowBy(..)
   .aggregate(..) //result in a KTable>
   .mapValues(..)
   .suppress(Suppressed.untilWindowClose) // since we can trace back to
parent node, to find a window definition


Guozhang


On Mon, Mar 21, 2022 at 6:36 PM John Roesler  wrote:


Thanks, Guozhang!

To clarify, I was asking specifically about deprecating just the method
‘untilWindowClose’. I might not be thinking clearly about it, though.

What

does untilWindowClose do that this KIP doesn’t cover?

Thanks,
John

On Mon, Mar 21, 2022, at 20:31, Guozhang Wang wrote:

Just my 2c: Suppressed is in `suppress` whose application scope is much
larger and hence more flexible. I.e. it can be used anywhere for a

`KTable`

(but internally we would check whether certain emit policies like
`untilWindowClose` is valid or not), whereas `trigger` as for now is

only

applicable in XXWindowedKStream. So I think it would not be completely
replacing Suppressed.untilWindowClose.

In the future, personally I'd still want to keep one control object

still

for all emit policies, and maybe if we have extended Emitted for other
emitting policies covered by Suppressed today, we can discuss if we

could

have `KTable.suppress(Emitted..)` replacing

`KTable.suppress(Suppressed..)`

as a whole, but for this KIP I think it's too early.


Guozhang


On Mon, Mar 21, 2022 at 6:18 PM John Roesler 

wrote:



Hi all,

Thanks for the Kip, Hao!

For what it’s worth, I’m also in favor of your latest framing of the

API,


I think the name is fine. I assume it’s inspired by Flink? It’s not
identical to the concept of a trigger in Flink, which specifies when

to

evaluate the window, which might be confusing to some people who have

deep

experience with Flink. Then again, it seems close enough that it

should

be

clear to casual Flink users. For people with no other stream

processing

experience, it might seem a bit esoteric compared to something
self-documenting like ‘emit()’, but the docs should  make it clear.

One small question: it seems like this proposal is identical to
Suppressed.untilWindowClose, and the KIP states that this API is

superior.

In that case, should we deprecate Suppressed.untilWindowClose?

Thanks,
John

On Mon, Mar 21, 2022, at 19:30, Guozhang Wang wrote:

Hi Hao,

For 2), I think it's a good idea in general to use a separate

function on

the Time/SessionWindowedKStream itself, to achieve the same effect

that,

for now, the emitting control is only for windowed aggregations as

in

this

KIP, than overloading existing functions. We can discuss further

about

the

actual function names, whether others like the

Re: [DISCUSS] KIP-825: introduce a new API to control when aggregated results are produced

2022-03-21 Thread Hao Li
Hi John,

Yes. For naming, `trigger` is similar to Flink's trigger, but it has a
different meaning in our case. `emit` sounds like an action to emit? How
about `emitTrigger`? I'm open to suggestions for the naming.

For deprecating `Suppressed.untilWindowClose`, I agree with Guozhang we can
deprecate `Suppressed` config as a whole later. Or we can deprecate
`Suppressed.untilWindowClose` in later KIP after implementation of emit
final is done.

BTW, isn't

stream
  .groupBy(..)
  .windowBy(..)
  .aggregate(..) //result in a KTable>
  .mapValues(..)
  .suppress(Suppressed.untilWindowClose) // since we can trace back to
parent node, to find a window definition

same as

stream
  .groupBy(..)
  .windowBy(..)
  .trigger(Emitted.onWindowClose)
  .aggregate(..) //result in a KTable>
  .mapValues(..)
?

Hao


On Mon, Mar 21, 2022 at 7:28 PM Guozhang Wang  wrote:

> I think the following case is only doable via `suppress`:
>
> stream
>   .groupBy(..)
>   .windowBy(..)
>   .aggregate(..) //result in a KTable>
>   .mapValues(..)
>   .suppress(Suppressed.untilWindowClose) // since we can trace back to
> parent node, to find a window definition
>
>
> Guozhang
>
>
> On Mon, Mar 21, 2022 at 6:36 PM John Roesler  wrote:
>
> > Thanks, Guozhang!
> >
> > To clarify, I was asking specifically about deprecating just the method
> > ‘untilWindowClose’. I might not be thinking clearly about it, though.
> What
> > does untilWindowClose do that this KIP doesn’t cover?
> >
> > Thanks,
> > John
> >
> > On Mon, Mar 21, 2022, at 20:31, Guozhang Wang wrote:
> > > Just my 2c: Suppressed is in `suppress` whose application scope is much
> > > larger and hence more flexible. I.e. it can be used anywhere for a
> > `KTable`
> > > (but internally we would check whether certain emit policies like
> > > `untilWindowClose` is valid or not), whereas `trigger` as for now is
> only
> > > applicable in XXWindowedKStream. So I think it would not be completely
> > > replacing Suppressed.untilWindowClose.
> > >
> > > In the future, personally I'd still want to keep one control object
> still
> > > for all emit policies, and maybe if we have extended Emitted for other
> > > emitting policies covered by Suppressed today, we can discuss if we
> could
> > > have `KTable.suppress(Emitted..)` replacing
> > `KTable.suppress(Suppressed..)`
> > > as a whole, but for this KIP I think it's too early.
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Mon, Mar 21, 2022 at 6:18 PM John Roesler 
> > wrote:
> > >
> > >> Hi all,
> > >>
> > >> Thanks for the Kip, Hao!
> > >>
> > >> For what it’s worth, I’m also in favor of your latest framing of the
> > API,
> > >>
> > >> I think the name is fine. I assume it’s inspired by Flink? It’s not
> > >> identical to the concept of a trigger in Flink, which specifies when
> to
> > >> evaluate the window, which might be confusing to some people who have
> > deep
> > >> experience with Flink. Then again, it seems close enough that it
> should
> > be
> > >> clear to casual Flink users. For people with no other stream
> processing
> > >> experience, it might seem a bit esoteric compared to something
> > >> self-documenting like ‘emit()’, but the docs should  make it clear.
> > >>
> > >> One small question: it seems like this proposal is identical to
> > >> Suppressed.untilWindowClose, and the KIP states that this API is
> > superior.
> > >> In that case, should we deprecate Suppressed.untilWindowClose?
> > >>
> > >> Thanks,
> > >> John
> > >>
> > >> On Mon, Mar 21, 2022, at 19:30, Guozhang Wang wrote:
> > >> > Hi Hao,
> > >> >
> > >> > For 2), I think it's a good idea in general to use a separate
> > function on
> > >> > the Time/SessionWindowedKStream itself, to achieve the same effect
> > that,
> > >> > for now, the emitting control is only for windowed aggregations as
> in
> > >> this
> > >> > KIP, than overloading existing functions. We can discuss further
> about
> > >> the
> > >> > actual function names, whether others like the name `trigger` or
> not.
> > As
> > >> > for myself I feel `trigger` is a good one but I'd like to see if
> > others
> > >> > have opinions as well.
> > >> >
> > >> >
> > >> > Guozhang
> > >> >
> > >> > On Mon, Mar 21, 2022 at 5:18 PM Hao Li 
> > wrote:
> > >> >
> > >> >> Hi Guozhang,
> > >> >>
> > >> >> Thanks for the feedback.
> > >> >>
> > >> >> 1. I agree to have an `Emitted` control class and two static
> > >> constructors
> > >> >> named `onWindowClose` and `onEachUpdate`.
> > >> >>
> > >> >> 2. For the API function changes, I'm thinking of adding a new
> > function
> > >> >> called `trigger` to `TimeWindowedKStream` and
> > `SessionWindowedKStream`.
> > >> It
> > >> >> takes `Emitted` config and returns the same stream. Example:
> > >> >>
> > >> >> stream
> > >> >>   .groupBy(...)
> > >> >>   .windowedBy(...)
> > >> >>   .trigger(Emitted.onWindowClose). // N
> > >> >>   .count()
> > >> >>
> > >> >> The benefits are:
> > >> >>   1. It's simple and avoids creating overloading of existing
> > functions

Re: [DISCUSS] KIP-825: introduce a new API to control when aggregated results are produced

2022-03-21 Thread Guozhang Wang
I think the following case is only doable via `suppress`:

stream
  .groupBy(..)
  .windowBy(..)
  .aggregate(..) //result in a KTable>
  .mapValues(..)
  .suppress(Suppressed.untilWindowClose) // since we can trace back to
parent node, to find a window definition


Guozhang


On Mon, Mar 21, 2022 at 6:36 PM John Roesler  wrote:

> Thanks, Guozhang!
>
> To clarify, I was asking specifically about deprecating just the method
> ‘untilWindowClose’. I might not be thinking clearly about it, though. What
> does untilWindowClose do that this KIP doesn’t cover?
>
> Thanks,
> John
>
> On Mon, Mar 21, 2022, at 20:31, Guozhang Wang wrote:
> > Just my 2c: Suppressed is in `suppress` whose application scope is much
> > larger and hence more flexible. I.e. it can be used anywhere for a
> `KTable`
> > (but internally we would check whether certain emit policies like
> > `untilWindowClose` is valid or not), whereas `trigger` as for now is only
> > applicable in XXWindowedKStream. So I think it would not be completely
> > replacing Suppressed.untilWindowClose.
> >
> > In the future, personally I'd still want to keep one control object still
> > for all emit policies, and maybe if we have extended Emitted for other
> > emitting policies covered by Suppressed today, we can discuss if we could
> > have `KTable.suppress(Emitted..)` replacing
> `KTable.suppress(Suppressed..)`
> > as a whole, but for this KIP I think it's too early.
> >
> >
> > Guozhang
> >
> >
> > On Mon, Mar 21, 2022 at 6:18 PM John Roesler 
> wrote:
> >
> >> Hi all,
> >>
> >> Thanks for the Kip, Hao!
> >>
> >> For what it’s worth, I’m also in favor of your latest framing of the
> API,
> >>
> >> I think the name is fine. I assume it’s inspired by Flink? It’s not
> >> identical to the concept of a trigger in Flink, which specifies when to
> >> evaluate the window, which might be confusing to some people who have
> deep
> >> experience with Flink. Then again, it seems close enough that it should
> be
> >> clear to casual Flink users. For people with no other stream processing
> >> experience, it might seem a bit esoteric compared to something
> >> self-documenting like ‘emit()’, but the docs should  make it clear.
> >>
> >> One small question: it seems like this proposal is identical to
> >> Suppressed.untilWindowClose, and the KIP states that this API is
> superior.
> >> In that case, should we deprecate Suppressed.untilWindowClose?
> >>
> >> Thanks,
> >> John
> >>
> >> On Mon, Mar 21, 2022, at 19:30, Guozhang Wang wrote:
> >> > Hi Hao,
> >> >
> >> > For 2), I think it's a good idea in general to use a separate
> function on
> >> > the Time/SessionWindowedKStream itself, to achieve the same effect
> that,
> >> > for now, the emitting control is only for windowed aggregations as in
> >> this
> >> > KIP, than overloading existing functions. We can discuss further about
> >> the
> >> > actual function names, whether others like the name `trigger` or not.
> As
> >> > for myself I feel `trigger` is a good one but I'd like to see if
> others
> >> > have opinions as well.
> >> >
> >> >
> >> > Guozhang
> >> >
> >> > On Mon, Mar 21, 2022 at 5:18 PM Hao Li 
> wrote:
> >> >
> >> >> Hi Guozhang,
> >> >>
> >> >> Thanks for the feedback.
> >> >>
> >> >> 1. I agree to have an `Emitted` control class and two static
> >> constructors
> >> >> named `onWindowClose` and `onEachUpdate`.
> >> >>
> >> >> 2. For the API function changes, I'm thinking of adding a new
> function
> >> >> called `trigger` to `TimeWindowedKStream` and
> `SessionWindowedKStream`.
> >> It
> >> >> takes `Emitted` config and returns the same stream. Example:
> >> >>
> >> >> stream
> >> >>   .groupBy(...)
> >> >>   .windowedBy(...)
> >> >>   .trigger(Emitted.onWindowClose). // N
> >> >>   .count()
> >> >>
> >> >> The benefits are:
> >> >>   1. It's simple and avoids creating overloading of existing
> functions
> >> like
> >> >> `windowedBy` or `count`, `reduce` or `aggregate`. In fact, to add it
> to
> >> >> `aggregate` functions, we need to add it to all existing `count`,
> >> >> `aggregate` overloading functions which is a lot.
> >> >>   2. It operates directly on windowed kstream and tells how its
> output
> >> >> should be configured, if later we need to add this other type of
> >> streams,
> >> >> we can reuse same `trigger` API whereas other type of streams/tables
> may
> >> >> not have `aggregate`, `windowedby` api to make it consistent.
> >> >>
> >> >> Hao
> >> >>
> >> >>
> >> >> On Sat, Mar 19, 2022 at 5:40 PM Guozhang Wang 
> >> wrote:
> >> >>
> >> >> > Hello Hao,
> >> >> >
> >> >> > I'm preferring option 2 over the other options mainly because the
> >> added
> >> >> > config object could potentially be used in other operators as well
> >> (not
> >> >> > necessarily has to be a windowed operator and hence have to be
> >> >> piggy-backed
> >> >> > on `windowedBy`, and that's also why I suggested not naming it
> >> >> > `WindowConfig` but just `EmitConfig`).
> >> >> >
> >> >> > As for Matthias' questio

Re: [DISCUSS] KIP-825: introduce a new API to control when aggregated results are produced

2022-03-21 Thread John Roesler
Thanks, Guozhang!

To clarify, I was asking specifically about deprecating just the method 
‘untilWindowClose’. I might not be thinking clearly about it, though. What does 
untilWindowClose do that this KIP doesn’t cover?

Thanks,
John

On Mon, Mar 21, 2022, at 20:31, Guozhang Wang wrote:
> Just my 2c: Suppressed is in `suppress` whose application scope is much
> larger and hence more flexible. I.e. it can be used anywhere for a `KTable`
> (but internally we would check whether certain emit policies like
> `untilWindowClose` is valid or not), whereas `trigger` as for now is only
> applicable in XXWindowedKStream. So I think it would not be completely
> replacing Suppressed.untilWindowClose.
>
> In the future, personally I'd still want to keep one control object still
> for all emit policies, and maybe if we have extended Emitted for other
> emitting policies covered by Suppressed today, we can discuss if we could
> have `KTable.suppress(Emitted..)` replacing `KTable.suppress(Suppressed..)`
> as a whole, but for this KIP I think it's too early.
>
>
> Guozhang
>
>
> On Mon, Mar 21, 2022 at 6:18 PM John Roesler  wrote:
>
>> Hi all,
>>
>> Thanks for the Kip, Hao!
>>
>> For what it’s worth, I’m also in favor of your latest framing of the API,
>>
>> I think the name is fine. I assume it’s inspired by Flink? It’s not
>> identical to the concept of a trigger in Flink, which specifies when to
>> evaluate the window, which might be confusing to some people who have deep
>> experience with Flink. Then again, it seems close enough that it should be
>> clear to casual Flink users. For people with no other stream processing
>> experience, it might seem a bit esoteric compared to something
>> self-documenting like ‘emit()’, but the docs should  make it clear.
>>
>> One small question: it seems like this proposal is identical to
>> Suppressed.untilWindowClose, and the KIP states that this API is superior.
>> In that case, should we deprecate Suppressed.untilWindowClose?
>>
>> Thanks,
>> John
>>
>> On Mon, Mar 21, 2022, at 19:30, Guozhang Wang wrote:
>> > Hi Hao,
>> >
>> > For 2), I think it's a good idea in general to use a separate function on
>> > the Time/SessionWindowedKStream itself, to achieve the same effect that,
>> > for now, the emitting control is only for windowed aggregations as in
>> this
>> > KIP, than overloading existing functions. We can discuss further about
>> the
>> > actual function names, whether others like the name `trigger` or not. As
>> > for myself I feel `trigger` is a good one but I'd like to see if others
>> > have opinions as well.
>> >
>> >
>> > Guozhang
>> >
>> > On Mon, Mar 21, 2022 at 5:18 PM Hao Li  wrote:
>> >
>> >> Hi Guozhang,
>> >>
>> >> Thanks for the feedback.
>> >>
>> >> 1. I agree to have an `Emitted` control class and two static
>> constructors
>> >> named `onWindowClose` and `onEachUpdate`.
>> >>
>> >> 2. For the API function changes, I'm thinking of adding a new function
>> >> called `trigger` to `TimeWindowedKStream` and `SessionWindowedKStream`.
>> It
>> >> takes `Emitted` config and returns the same stream. Example:
>> >>
>> >> stream
>> >>   .groupBy(...)
>> >>   .windowedBy(...)
>> >>   .trigger(Emitted.onWindowClose). // N
>> >>   .count()
>> >>
>> >> The benefits are:
>> >>   1. It's simple and avoids creating overloading of existing functions
>> like
>> >> `windowedBy` or `count`, `reduce` or `aggregate`. In fact, to add it to
>> >> `aggregate` functions, we need to add it to all existing `count`,
>> >> `aggregate` overloading functions which is a lot.
>> >>   2. It operates directly on windowed kstream and tells how its output
>> >> should be configured, if later we need to add this other type of
>> streams,
>> >> we can reuse same `trigger` API whereas other type of streams/tables may
>> >> not have `aggregate`, `windowedby` api to make it consistent.
>> >>
>> >> Hao
>> >>
>> >>
>> >> On Sat, Mar 19, 2022 at 5:40 PM Guozhang Wang 
>> wrote:
>> >>
>> >> > Hello Hao,
>> >> >
>> >> > I'm preferring option 2 over the other options mainly because the
>> added
>> >> > config object could potentially be used in other operators as well
>> (not
>> >> > necessarily has to be a windowed operator and hence have to be
>> >> piggy-backed
>> >> > on `windowedBy`, and that's also why I suggested not naming it
>> >> > `WindowConfig` but just `EmitConfig`).
>> >> >
>> >> > As for Matthias' question, I think the difference between the windowed
>> >> > aggregate operator and the stream-stream join operator is that, for
>> the
>> >> > latter we think emit-final should be the only right emitting policy
>> and
>> >> > hence we should not let users to configure it. If users configure it
>> to
>> >> > e.g. emit eager they may get the old spurious emitting behavior which
>> is
>> >> > violating the semantics.
>> >> >
>> >> > For option 2) itself, I have a few more thoughts:
>> >> >
>> >> > 1. Thinking about Matthias' suggestions, I'm also leaning a bit
>> >> > towards adding the new param i

Re: [DISCUSS] KIP-825: introduce a new API to control when aggregated results are produced

2022-03-21 Thread Guozhang Wang
Just my 2c: Suppressed is in `suppress` whose application scope is much
larger and hence more flexible. I.e. it can be used anywhere for a `KTable`
(but internally we would check whether certain emit policies like
`untilWindowClose` is valid or not), whereas `trigger` as for now is only
applicable in XXWindowedKStream. So I think it would not be completely
replacing Suppressed.untilWindowClose.

In the future, personally I'd still want to keep one control object still
for all emit policies, and maybe if we have extended Emitted for other
emitting policies covered by Suppressed today, we can discuss if we could
have `KTable.suppress(Emitted..)` replacing `KTable.suppress(Suppressed..)`
as a whole, but for this KIP I think it's too early.


Guozhang


On Mon, Mar 21, 2022 at 6:18 PM John Roesler  wrote:

> Hi all,
>
> Thanks for the Kip, Hao!
>
> For what it’s worth, I’m also in favor of your latest framing of the API,
>
> I think the name is fine. I assume it’s inspired by Flink? It’s not
> identical to the concept of a trigger in Flink, which specifies when to
> evaluate the window, which might be confusing to some people who have deep
> experience with Flink. Then again, it seems close enough that it should be
> clear to casual Flink users. For people with no other stream processing
> experience, it might seem a bit esoteric compared to something
> self-documenting like ‘emit()’, but the docs should  make it clear.
>
> One small question: it seems like this proposal is identical to
> Suppressed.untilWindowClose, and the KIP states that this API is superior.
> In that case, should we deprecate Suppressed.untilWindowClose?
>
> Thanks,
> John
>
> On Mon, Mar 21, 2022, at 19:30, Guozhang Wang wrote:
> > Hi Hao,
> >
> > For 2), I think it's a good idea in general to use a separate function on
> > the Time/SessionWindowedKStream itself, to achieve the same effect that,
> > for now, the emitting control is only for windowed aggregations as in
> this
> > KIP, than overloading existing functions. We can discuss further about
> the
> > actual function names, whether others like the name `trigger` or not. As
> > for myself I feel `trigger` is a good one but I'd like to see if others
> > have opinions as well.
> >
> >
> > Guozhang
> >
> > On Mon, Mar 21, 2022 at 5:18 PM Hao Li  wrote:
> >
> >> Hi Guozhang,
> >>
> >> Thanks for the feedback.
> >>
> >> 1. I agree to have an `Emitted` control class and two static
> constructors
> >> named `onWindowClose` and `onEachUpdate`.
> >>
> >> 2. For the API function changes, I'm thinking of adding a new function
> >> called `trigger` to `TimeWindowedKStream` and `SessionWindowedKStream`.
> It
> >> takes `Emitted` config and returns the same stream. Example:
> >>
> >> stream
> >>   .groupBy(...)
> >>   .windowedBy(...)
> >>   .trigger(Emitted.onWindowClose). // N
> >>   .count()
> >>
> >> The benefits are:
> >>   1. It's simple and avoids creating overloading of existing functions
> like
> >> `windowedBy` or `count`, `reduce` or `aggregate`. In fact, to add it to
> >> `aggregate` functions, we need to add it to all existing `count`,
> >> `aggregate` overloading functions which is a lot.
> >>   2. It operates directly on windowed kstream and tells how its output
> >> should be configured, if later we need to add this other type of
> streams,
> >> we can reuse same `trigger` API whereas other type of streams/tables may
> >> not have `aggregate`, `windowedby` api to make it consistent.
> >>
> >> Hao
> >>
> >>
> >> On Sat, Mar 19, 2022 at 5:40 PM Guozhang Wang 
> wrote:
> >>
> >> > Hello Hao,
> >> >
> >> > I'm preferring option 2 over the other options mainly because the
> added
> >> > config object could potentially be used in other operators as well
> (not
> >> > necessarily has to be a windowed operator and hence have to be
> >> piggy-backed
> >> > on `windowedBy`, and that's also why I suggested not naming it
> >> > `WindowConfig` but just `EmitConfig`).
> >> >
> >> > As for Matthias' question, I think the difference between the windowed
> >> > aggregate operator and the stream-stream join operator is that, for
> the
> >> > latter we think emit-final should be the only right emitting policy
> and
> >> > hence we should not let users to configure it. If users configure it
> to
> >> > e.g. emit eager they may get the old spurious emitting behavior which
> is
> >> > violating the semantics.
> >> >
> >> > For option 2) itself, I have a few more thoughts:
> >> >
> >> > 1. Thinking about Matthias' suggestions, I'm also leaning a bit
> >> > towards adding the new param in the overloaded `aggregate`, than the
> >> > overloaded `windowBy` function. The reason is that the emitting logic
> >> could
> >> > be either window based or non-window based, in the long run. Though
> for
> >> > this KIP we could just add it in `XXXWindowedKStream.aggregate()`, we
> may
> >> > want to extend to other non-windowed operators in the future.
> >> > 2. To be consistent with other control class names, I fee

Re: [DISCUSS] KIP-825: introduce a new API to control when aggregated results are produced

2022-03-21 Thread John Roesler
Hi all,

Thanks for the Kip, Hao!

For what it’s worth, I’m also in favor of your latest framing of the API,

I think the name is fine. I assume it’s inspired by Flink? It’s not identical 
to the concept of a trigger in Flink, which specifies when to evaluate the 
window, which might be confusing to some people who have deep experience with 
Flink. Then again, it seems close enough that it should be clear to casual 
Flink users. For people with no other stream processing experience, it might 
seem a bit esoteric compared to something self-documenting like ‘emit()’, but 
the docs should  make it clear. 

One small question: it seems like this proposal is identical to 
Suppressed.untilWindowClose, and the KIP states that this API is superior. In 
that case, should we deprecate Suppressed.untilWindowClose?

Thanks,
John

On Mon, Mar 21, 2022, at 19:30, Guozhang Wang wrote:
> Hi Hao,
>
> For 2), I think it's a good idea in general to use a separate function on
> the Time/SessionWindowedKStream itself, to achieve the same effect that,
> for now, the emitting control is only for windowed aggregations as in this
> KIP, than overloading existing functions. We can discuss further about the
> actual function names, whether others like the name `trigger` or not. As
> for myself I feel `trigger` is a good one but I'd like to see if others
> have opinions as well.
>
>
> Guozhang
>
> On Mon, Mar 21, 2022 at 5:18 PM Hao Li  wrote:
>
>> Hi Guozhang,
>>
>> Thanks for the feedback.
>>
>> 1. I agree to have an `Emitted` control class and two static constructors
>> named `onWindowClose` and `onEachUpdate`.
>>
>> 2. For the API function changes, I'm thinking of adding a new function
>> called `trigger` to `TimeWindowedKStream` and `SessionWindowedKStream`. It
>> takes `Emitted` config and returns the same stream. Example:
>>
>> stream
>>   .groupBy(...)
>>   .windowedBy(...)
>>   .trigger(Emitted.onWindowClose). // N
>>   .count()
>>
>> The benefits are:
>>   1. It's simple and avoids creating overloading of existing functions like
>> `windowedBy` or `count`, `reduce` or `aggregate`. In fact, to add it to
>> `aggregate` functions, we need to add it to all existing `count`,
>> `aggregate` overloading functions which is a lot.
>>   2. It operates directly on windowed kstream and tells how its output
>> should be configured, if later we need to add this other type of streams,
>> we can reuse same `trigger` API whereas other type of streams/tables may
>> not have `aggregate`, `windowedby` api to make it consistent.
>>
>> Hao
>>
>>
>> On Sat, Mar 19, 2022 at 5:40 PM Guozhang Wang  wrote:
>>
>> > Hello Hao,
>> >
>> > I'm preferring option 2 over the other options mainly because the added
>> > config object could potentially be used in other operators as well (not
>> > necessarily has to be a windowed operator and hence have to be
>> piggy-backed
>> > on `windowedBy`, and that's also why I suggested not naming it
>> > `WindowConfig` but just `EmitConfig`).
>> >
>> > As for Matthias' question, I think the difference between the windowed
>> > aggregate operator and the stream-stream join operator is that, for the
>> > latter we think emit-final should be the only right emitting policy and
>> > hence we should not let users to configure it. If users configure it to
>> > e.g. emit eager they may get the old spurious emitting behavior which is
>> > violating the semantics.
>> >
>> > For option 2) itself, I have a few more thoughts:
>> >
>> > 1. Thinking about Matthias' suggestions, I'm also leaning a bit
>> > towards adding the new param in the overloaded `aggregate`, than the
>> > overloaded `windowBy` function. The reason is that the emitting logic
>> could
>> > be either window based or non-window based, in the long run. Though for
>> > this KIP we could just add it in `XXXWindowedKStream.aggregate()`, we may
>> > want to extend to other non-windowed operators in the future.
>> > 2. To be consistent with other control class names, I feel maybe we can
>> > name it "Emitted", not "EmitConfig".
>> > 3. Following the first comment, I think we can have the static
>> constructor
>> > names as "onWindowClose" and "onEachUpdate".
>> >
>> > The resulted code pattern would be like this:
>> >
>> >stream
>> >  .groupBy(..)
>> >  .windowBy(TimeWindow..)
>> >  .count(Emitted.onWindowClose)
>> >
>> > WDYT?
>> >
>> >
>> > On Wed, Mar 16, 2022 at 12:07 PM Matthias J. Sax 
>> wrote:
>> >
>> > > >> `allowedLateness` may not be a good name. What I have in mind is to
>> > use
>> > > >> this to control how frequently we try to emit final results. Maybe
>> > it's
>> > > >> more flexible to be used as config in properties as we don't need to
>> > > >> recompile DSL to change it.
>> > >
>> > > I see; making it a config seems better. Frankly, I am not even sure if
>> > > we need a config at all or if we can just hard code it? For the
>> > > stream-stream join left/outer join fix, there is only an internal
>> config
>> > > but no public config ei

Re: [DISCUSS] KIP-825: introduce a new API to control when aggregated results are produced

2022-03-21 Thread Guozhang Wang
Hi Hao,

For 2), I think it's a good idea in general to use a separate function on
the Time/SessionWindowedKStream itself, to achieve the same effect that,
for now, the emitting control is only for windowed aggregations as in this
KIP, than overloading existing functions. We can discuss further about the
actual function names, whether others like the name `trigger` or not. As
for myself I feel `trigger` is a good one but I'd like to see if others
have opinions as well.


Guozhang

On Mon, Mar 21, 2022 at 5:18 PM Hao Li  wrote:

> Hi Guozhang,
>
> Thanks for the feedback.
>
> 1. I agree to have an `Emitted` control class and two static constructors
> named `onWindowClose` and `onEachUpdate`.
>
> 2. For the API function changes, I'm thinking of adding a new function
> called `trigger` to `TimeWindowedKStream` and `SessionWindowedKStream`. It
> takes `Emitted` config and returns the same stream. Example:
>
> stream
>   .groupBy(...)
>   .windowedBy(...)
>   .trigger(Emitted.onWindowClose). // N
>   .count()
>
> The benefits are:
>   1. It's simple and avoids creating overloading of existing functions like
> `windowedBy` or `count`, `reduce` or `aggregate`. In fact, to add it to
> `aggregate` functions, we need to add it to all existing `count`,
> `aggregate` overloading functions which is a lot.
>   2. It operates directly on windowed kstream and tells how its output
> should be configured, if later we need to add this other type of streams,
> we can reuse same `trigger` API whereas other type of streams/tables may
> not have `aggregate`, `windowedby` api to make it consistent.
>
> Hao
>
>
> On Sat, Mar 19, 2022 at 5:40 PM Guozhang Wang  wrote:
>
> > Hello Hao,
> >
> > I'm preferring option 2 over the other options mainly because the added
> > config object could potentially be used in other operators as well (not
> > necessarily has to be a windowed operator and hence have to be
> piggy-backed
> > on `windowedBy`, and that's also why I suggested not naming it
> > `WindowConfig` but just `EmitConfig`).
> >
> > As for Matthias' question, I think the difference between the windowed
> > aggregate operator and the stream-stream join operator is that, for the
> > latter we think emit-final should be the only right emitting policy and
> > hence we should not let users to configure it. If users configure it to
> > e.g. emit eager they may get the old spurious emitting behavior which is
> > violating the semantics.
> >
> > For option 2) itself, I have a few more thoughts:
> >
> > 1. Thinking about Matthias' suggestions, I'm also leaning a bit
> > towards adding the new param in the overloaded `aggregate`, than the
> > overloaded `windowBy` function. The reason is that the emitting logic
> could
> > be either window based or non-window based, in the long run. Though for
> > this KIP we could just add it in `XXXWindowedKStream.aggregate()`, we may
> > want to extend to other non-windowed operators in the future.
> > 2. To be consistent with other control class names, I feel maybe we can
> > name it "Emitted", not "EmitConfig".
> > 3. Following the first comment, I think we can have the static
> constructor
> > names as "onWindowClose" and "onEachUpdate".
> >
> > The resulted code pattern would be like this:
> >
> >stream
> >  .groupBy(..)
> >  .windowBy(TimeWindow..)
> >  .count(Emitted.onWindowClose)
> >
> > WDYT?
> >
> >
> > On Wed, Mar 16, 2022 at 12:07 PM Matthias J. Sax 
> wrote:
> >
> > > >> `allowedLateness` may not be a good name. What I have in mind is to
> > use
> > > >> this to control how frequently we try to emit final results. Maybe
> > it's
> > > >> more flexible to be used as config in properties as we don't need to
> > > >> recompile DSL to change it.
> > >
> > > I see; making it a config seems better. Frankly, I am not even sure if
> > > we need a config at all or if we can just hard code it? For the
> > > stream-stream join left/outer join fix, there is only an internal
> config
> > > but no public config either.
> > >
> > > Option 1: Your proposal is?
> > >
> > >stream
> > >  .groupByKey()
> > >  .windowBy(TimeWindow.ofSizeNoGrace(...))
> > >  .configure(EmitConfig.emitFinal()
> > >  .count()
> > >
> > > Does not change my argument that it seems to be misplace from an API
> > > flow POV.
> > >
> > > Option 1 seems to be the least desirable to me.
> > >
> > > For option 2 and 3, and not sure which one I like better. Might be good
> > > if other could chime in, too. I think I slightly prefer option 2 over
> > > option 3.
> > >
> > >
> > > -Matthias
> > >
> > > On 3/15/22 5:33 PM, Hao Li wrote:
> > > > Thanks for the feedback Matthias.
> > > >
> > > > `allowedLateness` may not be a good name. What I have in mind is to
> use
> > > > this to control how frequently we try to emit final results. Maybe
> it's
> > > > more flexible to be used as config in properties as we don't need to
> > > > recompile DSL to change it.
> > > >
> > > > For option 1, I intend to use `emitF

Re: [DISCUSS] KIP-825: introduce a new API to control when aggregated results are produced

2022-03-21 Thread Hao Li
Hi Guozhang,

Thanks for the feedback.

1. I agree to have an `Emitted` control class and two static constructors
named `onWindowClose` and `onEachUpdate`.

2. For the API function changes, I'm thinking of adding a new function
called `trigger` to `TimeWindowedKStream` and `SessionWindowedKStream`. It
takes `Emitted` config and returns the same stream. Example:

stream
  .groupBy(...)
  .windowedBy(...)
  .trigger(Emitted.onWindowClose). // N
  .count()

The benefits are:
  1. It's simple and avoids creating overloading of existing functions like
`windowedBy` or `count`, `reduce` or `aggregate`. In fact, to add it to
`aggregate` functions, we need to add it to all existing `count`,
`aggregate` overloading functions which is a lot.
  2. It operates directly on windowed kstream and tells how its output
should be configured, if later we need to add this other type of streams,
we can reuse same `trigger` API whereas other type of streams/tables may
not have `aggregate`, `windowedby` api to make it consistent.

Hao


On Sat, Mar 19, 2022 at 5:40 PM Guozhang Wang  wrote:

> Hello Hao,
>
> I'm preferring option 2 over the other options mainly because the added
> config object could potentially be used in other operators as well (not
> necessarily has to be a windowed operator and hence have to be piggy-backed
> on `windowedBy`, and that's also why I suggested not naming it
> `WindowConfig` but just `EmitConfig`).
>
> As for Matthias' question, I think the difference between the windowed
> aggregate operator and the stream-stream join operator is that, for the
> latter we think emit-final should be the only right emitting policy and
> hence we should not let users to configure it. If users configure it to
> e.g. emit eager they may get the old spurious emitting behavior which is
> violating the semantics.
>
> For option 2) itself, I have a few more thoughts:
>
> 1. Thinking about Matthias' suggestions, I'm also leaning a bit
> towards adding the new param in the overloaded `aggregate`, than the
> overloaded `windowBy` function. The reason is that the emitting logic could
> be either window based or non-window based, in the long run. Though for
> this KIP we could just add it in `XXXWindowedKStream.aggregate()`, we may
> want to extend to other non-windowed operators in the future.
> 2. To be consistent with other control class names, I feel maybe we can
> name it "Emitted", not "EmitConfig".
> 3. Following the first comment, I think we can have the static constructor
> names as "onWindowClose" and "onEachUpdate".
>
> The resulted code pattern would be like this:
>
>stream
>  .groupBy(..)
>  .windowBy(TimeWindow..)
>  .count(Emitted.onWindowClose)
>
> WDYT?
>
>
> On Wed, Mar 16, 2022 at 12:07 PM Matthias J. Sax  wrote:
>
> > >> `allowedLateness` may not be a good name. What I have in mind is to
> use
> > >> this to control how frequently we try to emit final results. Maybe
> it's
> > >> more flexible to be used as config in properties as we don't need to
> > >> recompile DSL to change it.
> >
> > I see; making it a config seems better. Frankly, I am not even sure if
> > we need a config at all or if we can just hard code it? For the
> > stream-stream join left/outer join fix, there is only an internal config
> > but no public config either.
> >
> > Option 1: Your proposal is?
> >
> >stream
> >  .groupByKey()
> >  .windowBy(TimeWindow.ofSizeNoGrace(...))
> >  .configure(EmitConfig.emitFinal()
> >  .count()
> >
> > Does not change my argument that it seems to be misplace from an API
> > flow POV.
> >
> > Option 1 seems to be the least desirable to me.
> >
> > For option 2 and 3, and not sure which one I like better. Might be good
> > if other could chime in, too. I think I slightly prefer option 2 over
> > option 3.
> >
> >
> > -Matthias
> >
> > On 3/15/22 5:33 PM, Hao Li wrote:
> > > Thanks for the feedback Matthias.
> > >
> > > `allowedLateness` may not be a good name. What I have in mind is to use
> > > this to control how frequently we try to emit final results. Maybe it's
> > > more flexible to be used as config in properties as we don't need to
> > > recompile DSL to change it.
> > >
> > > For option 1, I intend to use `emitFinal` to configure how
> > > `TimeWindowedKStream` should be outputted to `KTable` after
> aggregation.
> > > But `emitFinal` is not an action to the `TimeWindowedKStream`
> interface.
> > > Maybe adding `configure(EmitConfig config)` makes more sense?
> > >
> > > For option 2, config can be created using `WindowConfig.emitFinal()` or
> > > `EmitConfig.emitFinal`
> > >
> > > For option 3, it will be something like `TimeWindows(..., EmitConfig
> > > emitConfig)`.
> > >
> > > For putting `EmitConfig` in aggregation operator, I think it doesn't
> > > control how we do aggregation but how we output to `KTable`. That's
> why I
> > > feel option 1 makes more sense as it applies to `TimeWindowedKStream`.
> > But
> > > I'm also OK with option 2.
> > >
> > > Hao
>

Re: [DISCUSS] KIP-825: introduce a new API to control when aggregated results are produced

2022-03-19 Thread Guozhang Wang
Hello Hao,

I'm preferring option 2 over the other options mainly because the added
config object could potentially be used in other operators as well (not
necessarily has to be a windowed operator and hence have to be piggy-backed
on `windowedBy`, and that's also why I suggested not naming it
`WindowConfig` but just `EmitConfig`).

As for Matthias' question, I think the difference between the windowed
aggregate operator and the stream-stream join operator is that, for the
latter we think emit-final should be the only right emitting policy and
hence we should not let users to configure it. If users configure it to
e.g. emit eager they may get the old spurious emitting behavior which is
violating the semantics.

For option 2) itself, I have a few more thoughts:

1. Thinking about Matthias' suggestions, I'm also leaning a bit
towards adding the new param in the overloaded `aggregate`, than the
overloaded `windowBy` function. The reason is that the emitting logic could
be either window based or non-window based, in the long run. Though for
this KIP we could just add it in `XXXWindowedKStream.aggregate()`, we may
want to extend to other non-windowed operators in the future.
2. To be consistent with other control class names, I feel maybe we can
name it "Emitted", not "EmitConfig".
3. Following the first comment, I think we can have the static constructor
names as "onWindowClose" and "onEachUpdate".

The resulted code pattern would be like this:

   stream
 .groupBy(..)
 .windowBy(TimeWindow..)
 .count(Emitted.onWindowClose)

WDYT?


On Wed, Mar 16, 2022 at 12:07 PM Matthias J. Sax  wrote:

> >> `allowedLateness` may not be a good name. What I have in mind is to use
> >> this to control how frequently we try to emit final results. Maybe it's
> >> more flexible to be used as config in properties as we don't need to
> >> recompile DSL to change it.
>
> I see; making it a config seems better. Frankly, I am not even sure if
> we need a config at all or if we can just hard code it? For the
> stream-stream join left/outer join fix, there is only an internal config
> but no public config either.
>
> Option 1: Your proposal is?
>
>stream
>  .groupByKey()
>  .windowBy(TimeWindow.ofSizeNoGrace(...))
>  .configure(EmitConfig.emitFinal()
>  .count()
>
> Does not change my argument that it seems to be misplace from an API
> flow POV.
>
> Option 1 seems to be the least desirable to me.
>
> For option 2 and 3, and not sure which one I like better. Might be good
> if other could chime in, too. I think I slightly prefer option 2 over
> option 3.
>
>
> -Matthias
>
> On 3/15/22 5:33 PM, Hao Li wrote:
> > Thanks for the feedback Matthias.
> >
> > `allowedLateness` may not be a good name. What I have in mind is to use
> > this to control how frequently we try to emit final results. Maybe it's
> > more flexible to be used as config in properties as we don't need to
> > recompile DSL to change it.
> >
> > For option 1, I intend to use `emitFinal` to configure how
> > `TimeWindowedKStream` should be outputted to `KTable` after aggregation.
> > But `emitFinal` is not an action to the `TimeWindowedKStream` interface.
> > Maybe adding `configure(EmitConfig config)` makes more sense?
> >
> > For option 2, config can be created using `WindowConfig.emitFinal()` or
> > `EmitConfig.emitFinal`
> >
> > For option 3, it will be something like `TimeWindows(..., EmitConfig
> > emitConfig)`.
> >
> > For putting `EmitConfig` in aggregation operator, I think it doesn't
> > control how we do aggregation but how we output to `KTable`. That's why I
> > feel option 1 makes more sense as it applies to `TimeWindowedKStream`.
> But
> > I'm also OK with option 2.
> >
> > Hao
> >
> >
> >
> >
> > On Tue, Mar 15, 2022 at 4:48 PM Matthias J. Sax 
> wrote:
> >
> >> Thanks for the KIP.
> >>
> >> A general comment: it seem that we won't need any new `allowedLateness`
> >> parameter because the grace-period is defined on the window itself
> already?
> >>
> >> (On the other hand, if I think about it once more, maybe the
> >> `grace-period` is actually not a property of the window but a property
> >> of the aggregation operator? _thinking_)
> >>
> >>   From an API flow point of view, option 1 might not be desirable IMHO:
> >>
> >> stream
> >>   .groupByKey()
> >>   .windowBy(TimeWindow.ofSizeNoGrace(...))
> >>   .emitFinal()
> >>   .count()
> >>
> >> The call to `emitFinal(0` seems not to be on the right place for this
> case?
> >>
> >>
> >> Option 2 might work (I think we need to discuss a few details of the API
> >> though):
> >>
> >> stream
> >>   .groupByKey()
> >>   .windowBy(
> >> TimeWindow.ofSizeNoGrace(...),
> >> EmitConfig.emitFinal() -- just made this up; it's not in the KIP
> >>   )
> >>   .count()
> >>
> >> I made up the `WindowConfig.emitFinal()` call -- from the KIP it's
> >> unclear what API you have in mind? `EmitFinalConfig` has not public
> >> constructor not any

Re: [DISCUSS] KIP-825: introduce a new API to control when aggregated results are produced

2022-03-16 Thread Matthias J. Sax

`allowedLateness` may not be a good name. What I have in mind is to use
this to control how frequently we try to emit final results. Maybe it's
more flexible to be used as config in properties as we don't need to
recompile DSL to change it.


I see; making it a config seems better. Frankly, I am not even sure if 
we need a config at all or if we can just hard code it? For the 
stream-stream join left/outer join fix, there is only an internal config 
but no public config either.


Option 1: Your proposal is?

  stream
.groupByKey()
.windowBy(TimeWindow.ofSizeNoGrace(...))
.configure(EmitConfig.emitFinal()
.count()

Does not change my argument that it seems to be misplace from an API 
flow POV.


Option 1 seems to be the least desirable to me.

For option 2 and 3, and not sure which one I like better. Might be good 
if other could chime in, too. I think I slightly prefer option 2 over 
option 3.



-Matthias

On 3/15/22 5:33 PM, Hao Li wrote:

Thanks for the feedback Matthias.

`allowedLateness` may not be a good name. What I have in mind is to use
this to control how frequently we try to emit final results. Maybe it's
more flexible to be used as config in properties as we don't need to
recompile DSL to change it.

For option 1, I intend to use `emitFinal` to configure how
`TimeWindowedKStream` should be outputted to `KTable` after aggregation.
But `emitFinal` is not an action to the `TimeWindowedKStream` interface.
Maybe adding `configure(EmitConfig config)` makes more sense?

For option 2, config can be created using `WindowConfig.emitFinal()` or
`EmitConfig.emitFinal`

For option 3, it will be something like `TimeWindows(..., EmitConfig
emitConfig)`.

For putting `EmitConfig` in aggregation operator, I think it doesn't
control how we do aggregation but how we output to `KTable`. That's why I
feel option 1 makes more sense as it applies to `TimeWindowedKStream`. But
I'm also OK with option 2.

Hao




On Tue, Mar 15, 2022 at 4:48 PM Matthias J. Sax  wrote:


Thanks for the KIP.

A general comment: it seem that we won't need any new `allowedLateness`
parameter because the grace-period is defined on the window itself already?

(On the other hand, if I think about it once more, maybe the
`grace-period` is actually not a property of the window but a property
of the aggregation operator? _thinking_)

  From an API flow point of view, option 1 might not be desirable IMHO:

stream
  .groupByKey()
  .windowBy(TimeWindow.ofSizeNoGrace(...))
  .emitFinal()
  .count()

The call to `emitFinal(0` seems not to be on the right place for this case?


Option 2 might work (I think we need to discuss a few details of the API
though):

stream
  .groupByKey()
  .windowBy(
TimeWindow.ofSizeNoGrace(...),
EmitConfig.emitFinal() -- just made this up; it's not in the KIP
  )
  .count()

I made up the `WindowConfig.emitFinal()` call -- from the KIP it's
unclear what API you have in mind? `EmitFinalConfig` has not public
constructor not any builder method.


For option 3, I am not sure what you really have in mind. Can you given
a concrete example (similar to above) how users would write their code?



Did you consider to actually pass in the `EmitConfig` into the
aggregation operator? In the end, it seems not to be property of the
window definition or windowing step, but a property of the actual operator:

stream
  .groupByKey()
  .windowBy(
TimeWindow.ofSizeNoGrace(...)
  )
  .count(EmitConfig.emitFinal())

The API surface area that need to be updated might be larger for this
case though...


-Matthias



On 3/14/22 9:21 PM, Hao Li wrote:

Thanks Guozhang!

1. I agree `EmitConfig` is better than `WindowConfig` and option 2

modifies

less places. What do you think of option 1 which doesn't change the

current

`windowedBy` api but configures `EmitConfig` separately. The benefit of
option 1 is if we need to configure something else later, we don't need

to

pile them on `windowedBy` but can add separate APIs.
2. I added it to `Stores` mainly to conform to


https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java#L227-L231
.

But We can also create an internal API to do that without modifying
`Stores`.

Hao

On Mon, Mar 14, 2022 at 7:52 PM Guozhang Wang 

wrote:



Hello Hao,

Thanks for the proposal, I have some preference among the options here

so I

will copy them here:

I'm now thinking if it's better to not add this new config on each of

the

Window interfaces, but instead add that at the KGroupedStream#windowedBy
function. Also instead of adding just a boolean flag, maybe we can add a
Configured class like Grouped, Suppressed, etc, e.g. let's call it a
Emitted which for now would just have a single construct as
Emitted.atWindowClose whose semantics is the same as emitFinal == true.

I

think the benefits are:

1) you do not need to modify multiple

Re: [DISCUSS] KIP-825: introduce a new API to control when aggregated results are produced

2022-03-15 Thread Hao Li
Thanks for the feedback Matthias.

`allowedLateness` may not be a good name. What I have in mind is to use
this to control how frequently we try to emit final results. Maybe it's
more flexible to be used as config in properties as we don't need to
recompile DSL to change it.

For option 1, I intend to use `emitFinal` to configure how
`TimeWindowedKStream` should be outputted to `KTable` after aggregation.
But `emitFinal` is not an action to the `TimeWindowedKStream` interface.
Maybe adding `configure(EmitConfig config)` makes more sense?

For option 2, config can be created using `WindowConfig.emitFinal()` or
`EmitConfig.emitFinal`

For option 3, it will be something like `TimeWindows(..., EmitConfig
emitConfig)`.

For putting `EmitConfig` in aggregation operator, I think it doesn't
control how we do aggregation but how we output to `KTable`. That's why I
feel option 1 makes more sense as it applies to `TimeWindowedKStream`. But
I'm also OK with option 2.

Hao




On Tue, Mar 15, 2022 at 4:48 PM Matthias J. Sax  wrote:

> Thanks for the KIP.
>
> A general comment: it seem that we won't need any new `allowedLateness`
> parameter because the grace-period is defined on the window itself already?
>
> (On the other hand, if I think about it once more, maybe the
> `grace-period` is actually not a property of the window but a property
> of the aggregation operator? _thinking_)
>
>  From an API flow point of view, option 1 might not be desirable IMHO:
>
>stream
>  .groupByKey()
>  .windowBy(TimeWindow.ofSizeNoGrace(...))
>  .emitFinal()
>  .count()
>
> The call to `emitFinal(0` seems not to be on the right place for this case?
>
>
> Option 2 might work (I think we need to discuss a few details of the API
> though):
>
>stream
>  .groupByKey()
>  .windowBy(
>TimeWindow.ofSizeNoGrace(...),
>EmitConfig.emitFinal() -- just made this up; it's not in the KIP
>  )
>  .count()
>
> I made up the `WindowConfig.emitFinal()` call -- from the KIP it's
> unclear what API you have in mind? `EmitFinalConfig` has not public
> constructor not any builder method.
>
>
> For option 3, I am not sure what you really have in mind. Can you given
> a concrete example (similar to above) how users would write their code?
>
>
>
> Did you consider to actually pass in the `EmitConfig` into the
> aggregation operator? In the end, it seems not to be property of the
> window definition or windowing step, but a property of the actual operator:
>
>stream
>  .groupByKey()
>  .windowBy(
>TimeWindow.ofSizeNoGrace(...)
>  )
>  .count(EmitConfig.emitFinal())
>
> The API surface area that need to be updated might be larger for this
> case though...
>
>
> -Matthias
>
>
>
> On 3/14/22 9:21 PM, Hao Li wrote:
> > Thanks Guozhang!
> >
> > 1. I agree `EmitConfig` is better than `WindowConfig` and option 2
> modifies
> > less places. What do you think of option 1 which doesn't change the
> current
> > `windowedBy` api but configures `EmitConfig` separately. The benefit of
> > option 1 is if we need to configure something else later, we don't need
> to
> > pile them on `windowedBy` but can add separate APIs.
> > 2. I added it to `Stores` mainly to conform to
> >
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java#L227-L231
> .
> > But We can also create an internal API to do that without modifying
> > `Stores`.
> >
> > Hao
> >
> > On Mon, Mar 14, 2022 at 7:52 PM Guozhang Wang 
> wrote:
> >
> >> Hello Hao,
> >>
> >> Thanks for the proposal, I have some preference among the options here
> so I
> >> will copy them here:
> >>
> >> I'm now thinking if it's better to not add this new config on each of
> the
> >> Window interfaces, but instead add that at the KGroupedStream#windowedBy
> >> function. Also instead of adding just a boolean flag, maybe we can add a
> >> Configured class like Grouped, Suppressed, etc, e.g. let's call it a
> >> Emitted which for now would just have a single construct as
> >> Emitted.atWindowClose whose semantics is the same as emitFinal == true.
> I
> >> think the benefits are:
> >>
> >> 1) you do not need to modify multiple Window classes, but just overload
> one
> >> windowedBy function with a second param. This is less of a scope for
> now,
> >> and also more extensible for any future changes.
> >>
> >> 2) With a config interface, we maintain its extensibility as well as
> being
> >> able to reuse this Emitted interface for other operators if we wanted to
> >> expand to.
> >>
> >> 
> >>
> >> So in general I'm leaning towards option 2). For that, some more
> detailed
> >> comments:
> >>
> >> 1) If we want to reuse that config object for other non-window stateful
> >> operations, I think naming it as `EmitConfig` is probably better than
> >> `WindowConfig`.
> >> 2) I saw your PR (https://github.com/apache/kafka/pull/11892) that you
> are
> >> also

Re: [DISCUSS] KIP-825: introduce a new API to control when aggregated results are produced

2022-03-15 Thread Matthias J. Sax

Thanks for the KIP.

A general comment: it seem that we won't need any new `allowedLateness` 
parameter because the grace-period is defined on the window itself already?


(On the other hand, if I think about it once more, maybe the 
`grace-period` is actually not a property of the window but a property 
of the aggregation operator? _thinking_)


From an API flow point of view, option 1 might not be desirable IMHO:

  stream
.groupByKey()
.windowBy(TimeWindow.ofSizeNoGrace(...))
.emitFinal()
.count()

The call to `emitFinal(0` seems not to be on the right place for this case?


Option 2 might work (I think we need to discuss a few details of the API 
though):


  stream
.groupByKey()
.windowBy(
  TimeWindow.ofSizeNoGrace(...),
  EmitConfig.emitFinal() -- just made this up; it's not in the KIP
)
.count()

I made up the `WindowConfig.emitFinal()` call -- from the KIP it's 
unclear what API you have in mind? `EmitFinalConfig` has not public 
constructor not any builder method.



For option 3, I am not sure what you really have in mind. Can you given 
a concrete example (similar to above) how users would write their code?




Did you consider to actually pass in the `EmitConfig` into the 
aggregation operator? In the end, it seems not to be property of the 
window definition or windowing step, but a property of the actual operator:


  stream
.groupByKey()
.windowBy(
  TimeWindow.ofSizeNoGrace(...)
)
.count(EmitConfig.emitFinal())

The API surface area that need to be updated might be larger for this 
case though...



-Matthias



On 3/14/22 9:21 PM, Hao Li wrote:

Thanks Guozhang!

1. I agree `EmitConfig` is better than `WindowConfig` and option 2 modifies
less places. What do you think of option 1 which doesn't change the current
`windowedBy` api but configures `EmitConfig` separately. The benefit of
option 1 is if we need to configure something else later, we don't need to
pile them on `windowedBy` but can add separate APIs.
2. I added it to `Stores` mainly to conform to
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java#L227-L231.
But We can also create an internal API to do that without modifying
`Stores`.

Hao

On Mon, Mar 14, 2022 at 7:52 PM Guozhang Wang  wrote:


Hello Hao,

Thanks for the proposal, I have some preference among the options here so I
will copy them here:

I'm now thinking if it's better to not add this new config on each of the
Window interfaces, but instead add that at the KGroupedStream#windowedBy
function. Also instead of adding just a boolean flag, maybe we can add a
Configured class like Grouped, Suppressed, etc, e.g. let's call it a
Emitted which for now would just have a single construct as
Emitted.atWindowClose whose semantics is the same as emitFinal == true. I
think the benefits are:

1) you do not need to modify multiple Window classes, but just overload one
windowedBy function with a second param. This is less of a scope for now,
and also more extensible for any future changes.

2) With a config interface, we maintain its extensibility as well as being
able to reuse this Emitted interface for other operators if we wanted to
expand to.



So in general I'm leaning towards option 2). For that, some more detailed
comments:

1) If we want to reuse that config object for other non-window stateful
operations, I think naming it as `EmitConfig` is probably better than
`WindowConfig`.
2) I saw your PR (https://github.com/apache/kafka/pull/11892) that you are
also proposing to add new stores into the public factory Stores, but it's
not included in the KIP. Is that intentional? Personally I think that
although we may eventually want to add a new store type to the public APIs,
for this KIP maybe we do not have to add them but can delay for later after
we've learned the best way to layout. LMK what do you think?



Guozhang



On Fri, Mar 11, 2022 at 2:13 PM Hao Li  wrote:


Hi Dev team,

I'd like to start a discussion thread on Kafka Streams KIP-825:



https://cwiki.apache.org/confluence/display/KAFKA/KIP-825%3A+introduce+a+new+API+to+control+when+aggregated+results+are+produced


This KIP is aimed to add new APIs to support outputting final aggregated
results for windowed aggregations. I listed several options there and I'm
looking forward to your feedback.

Thanks,
Hao




--
-- Guozhang






Re: [DISCUSS] KIP-825: introduce a new API to control when aggregated results are produced

2022-03-14 Thread Hao Li
Thanks Guozhang!

1. I agree `EmitConfig` is better than `WindowConfig` and option 2 modifies
less places. What do you think of option 1 which doesn't change the current
`windowedBy` api but configures `EmitConfig` separately. The benefit of
option 1 is if we need to configure something else later, we don't need to
pile them on `windowedBy` but can add separate APIs.
2. I added it to `Stores` mainly to conform to
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java#L227-L231.
But We can also create an internal API to do that without modifying
`Stores`.

Hao

On Mon, Mar 14, 2022 at 7:52 PM Guozhang Wang  wrote:

> Hello Hao,
>
> Thanks for the proposal, I have some preference among the options here so I
> will copy them here:
>
> I'm now thinking if it's better to not add this new config on each of the
> Window interfaces, but instead add that at the KGroupedStream#windowedBy
> function. Also instead of adding just a boolean flag, maybe we can add a
> Configured class like Grouped, Suppressed, etc, e.g. let's call it a
> Emitted which for now would just have a single construct as
> Emitted.atWindowClose whose semantics is the same as emitFinal == true. I
> think the benefits are:
>
> 1) you do not need to modify multiple Window classes, but just overload one
> windowedBy function with a second param. This is less of a scope for now,
> and also more extensible for any future changes.
>
> 2) With a config interface, we maintain its extensibility as well as being
> able to reuse this Emitted interface for other operators if we wanted to
> expand to.
>
> 
>
> So in general I'm leaning towards option 2). For that, some more detailed
> comments:
>
> 1) If we want to reuse that config object for other non-window stateful
> operations, I think naming it as `EmitConfig` is probably better than
> `WindowConfig`.
> 2) I saw your PR (https://github.com/apache/kafka/pull/11892) that you are
> also proposing to add new stores into the public factory Stores, but it's
> not included in the KIP. Is that intentional? Personally I think that
> although we may eventually want to add a new store type to the public APIs,
> for this KIP maybe we do not have to add them but can delay for later after
> we've learned the best way to layout. LMK what do you think?
>
>
>
> Guozhang
>
>
>
> On Fri, Mar 11, 2022 at 2:13 PM Hao Li  wrote:
>
> > Hi Dev team,
> >
> > I'd like to start a discussion thread on Kafka Streams KIP-825:
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-825%3A+introduce+a+new+API+to+control+when+aggregated+results+are+produced
> >
> > This KIP is aimed to add new APIs to support outputting final aggregated
> > results for windowed aggregations. I listed several options there and I'm
> > looking forward to your feedback.
> >
> > Thanks,
> > Hao
> >
>
>
> --
> -- Guozhang
>


-- 
Thanks,
Hao


Re: [DISCUSS] KIP-825: introduce a new API to control when aggregated results are produced

2022-03-14 Thread Guozhang Wang
Hello Hao,

Thanks for the proposal, I have some preference among the options here so I
will copy them here:

I'm now thinking if it's better to not add this new config on each of the
Window interfaces, but instead add that at the KGroupedStream#windowedBy
function. Also instead of adding just a boolean flag, maybe we can add a
Configured class like Grouped, Suppressed, etc, e.g. let's call it a
Emitted which for now would just have a single construct as
Emitted.atWindowClose whose semantics is the same as emitFinal == true. I
think the benefits are:

1) you do not need to modify multiple Window classes, but just overload one
windowedBy function with a second param. This is less of a scope for now,
and also more extensible for any future changes.

2) With a config interface, we maintain its extensibility as well as being
able to reuse this Emitted interface for other operators if we wanted to
expand to.



So in general I'm leaning towards option 2). For that, some more detailed
comments:

1) If we want to reuse that config object for other non-window stateful
operations, I think naming it as `EmitConfig` is probably better than
`WindowConfig`.
2) I saw your PR (https://github.com/apache/kafka/pull/11892) that you are
also proposing to add new stores into the public factory Stores, but it's
not included in the KIP. Is that intentional? Personally I think that
although we may eventually want to add a new store type to the public APIs,
for this KIP maybe we do not have to add them but can delay for later after
we've learned the best way to layout. LMK what do you think?



Guozhang



On Fri, Mar 11, 2022 at 2:13 PM Hao Li  wrote:

> Hi Dev team,
>
> I'd like to start a discussion thread on Kafka Streams KIP-825:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-825%3A+introduce+a+new+API+to+control+when+aggregated+results+are+produced
>
> This KIP is aimed to add new APIs to support outputting final aggregated
> results for windowed aggregations. I listed several options there and I'm
> looking forward to your feedback.
>
> Thanks,
> Hao
>


-- 
-- Guozhang


[DISCUSS] KIP-825: introduce a new API to control when aggregated results are produced

2022-03-11 Thread Hao Li
Hi Dev team,

I'd like to start a discussion thread on Kafka Streams KIP-825:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-825%3A+introduce+a+new+API+to+control+when+aggregated+results+are+produced

This KIP is aimed to add new APIs to support outputting final aggregated
results for windowed aggregations. I listed several options there and I'm
looking forward to your feedback.

Thanks,
Hao