Hi Guozhang,

Thanks for the feedback.

1. I agree to have an `Emitted` control class and two static constructors
named `onWindowClose` and `onEachUpdate`.

2. For the API function changes, I'm thinking of adding a new function
called `trigger` to `TimeWindowedKStream` and `SessionWindowedKStream`. It
takes `Emitted` config and returns the same stream. Example:

stream
  .groupBy(...)
  .windowedBy(...)
  .trigger(Emitted.onWindowClose). // N
  .count()

The benefits are:
  1. It's simple and avoids creating overloading of existing functions like
`windowedBy` or `count`, `reduce` or `aggregate`. In fact, to add it to
`aggregate` functions, we need to add it to all existing `count`,
`aggregate` overloading functions which is a lot.
  2. It operates directly on windowed kstream and tells how its output
should be configured, if later we need to add this other type of streams,
we can reuse same `trigger` API whereas other type of streams/tables may
not have `aggregate`, `windowedby` api to make it consistent.

Hao


On Sat, Mar 19, 2022 at 5:40 PM Guozhang Wang <wangg...@gmail.com> wrote:

> Hello Hao,
>
> I'm preferring option 2 over the other options mainly because the added
> config object could potentially be used in other operators as well (not
> necessarily has to be a windowed operator and hence have to be piggy-backed
> on `windowedBy`, and that's also why I suggested not naming it
> `WindowConfig` but just `EmitConfig`).
>
> As for Matthias' question, I think the difference between the windowed
> aggregate operator and the stream-stream join operator is that, for the
> latter we think emit-final should be the only right emitting policy and
> hence we should not let users to configure it. If users configure it to
> e.g. emit eager they may get the old spurious emitting behavior which is
> violating the semantics.
>
> For option 2) itself, I have a few more thoughts:
>
> 1. Thinking about Matthias' suggestions, I'm also leaning a bit
> towards adding the new param in the overloaded `aggregate`, than the
> overloaded `windowBy` function. The reason is that the emitting logic could
> be either window based or non-window based, in the long run. Though for
> this KIP we could just add it in `XXXWindowedKStream.aggregate()`, we may
> want to extend to other non-windowed operators in the future.
> 2. To be consistent with other control class names, I feel maybe we can
> name it "Emitted", not "EmitConfig".
> 3. Following the first comment, I think we can have the static constructor
> names as "onWindowClose" and "onEachUpdate".
>
> The resulted code pattern would be like this:
>
>    stream
>      .groupBy(..)
>      .windowBy(TimeWindow..)
>      .count(Emitted.onWindowClose)
>
> WDYT?
>
>
> On Wed, Mar 16, 2022 at 12:07 PM Matthias J. Sax <mj...@apache.org> wrote:
>
> > >> `allowedLateness` may not be a good name. What I have in mind is to
> use
> > >> this to control how frequently we try to emit final results. Maybe
> it's
> > >> more flexible to be used as config in properties as we don't need to
> > >> recompile DSL to change it.
> >
> > I see; making it a config seems better. Frankly, I am not even sure if
> > we need a config at all or if we can just hard code it? For the
> > stream-stream join left/outer join fix, there is only an internal config
> > but no public config either.
> >
> > Option 1: Your proposal is?
> >
> >    stream
> >      .groupByKey()
> >      .windowBy(TimeWindow.ofSizeNoGrace(...))
> >      .configure(EmitConfig.emitFinal()
> >      .count()
> >
> > Does not change my argument that it seems to be misplace from an API
> > flow POV.
> >
> > Option 1 seems to be the least desirable to me.
> >
> > For option 2 and 3, and not sure which one I like better. Might be good
> > if other could chime in, too. I think I slightly prefer option 2 over
> > option 3.
> >
> >
> > -Matthias
> >
> > On 3/15/22 5:33 PM, Hao Li wrote:
> > > Thanks for the feedback Matthias.
> > >
> > > `allowedLateness` may not be a good name. What I have in mind is to use
> > > this to control how frequently we try to emit final results. Maybe it's
> > > more flexible to be used as config in properties as we don't need to
> > > recompile DSL to change it.
> > >
> > > For option 1, I intend to use `emitFinal` to configure how
> > > `TimeWindowedKStream` should be outputted to `KTable` after
> aggregation.
> > > But `emitFinal` is not an action to the `TimeWindowedKStream`
> interface.
> > > Maybe adding `configure(EmitConfig config)` makes more sense?
> > >
> > > For option 2, config can be created using `WindowConfig.emitFinal()` or
> > > `EmitConfig.emitFinal`
> > >
> > > For option 3, it will be something like `TimeWindows(..., EmitConfig
> > > emitConfig)`.
> > >
> > > For putting `EmitConfig` in aggregation operator, I think it doesn't
> > > control how we do aggregation but how we output to `KTable`. That's
> why I
> > > feel option 1 makes more sense as it applies to `TimeWindowedKStream`.
> > But
> > > I'm also OK with option 2.
> > >
> > > Hao
> > >
> > >
> > >
> > >
> > > On Tue, Mar 15, 2022 at 4:48 PM Matthias J. Sax <mj...@apache.org>
> > wrote:
> > >
> > >> Thanks for the KIP.
> > >>
> > >> A general comment: it seem that we won't need any new
> `allowedLateness`
> > >> parameter because the grace-period is defined on the window itself
> > already?
> > >>
> > >> (On the other hand, if I think about it once more, maybe the
> > >> `grace-period` is actually not a property of the window but a property
> > >> of the aggregation operator? _thinking_)
> > >>
> > >>   From an API flow point of view, option 1 might not be desirable
> IMHO:
> > >>
> > >>     stream
> > >>       .groupByKey()
> > >>       .windowBy(TimeWindow.ofSizeNoGrace(...))
> > >>       .emitFinal()
> > >>       .count()
> > >>
> > >> The call to `emitFinal(0` seems not to be on the right place for this
> > case?
> > >>
> > >>
> > >> Option 2 might work (I think we need to discuss a few details of the
> API
> > >> though):
> > >>
> > >>     stream
> > >>       .groupByKey()
> > >>       .windowBy(
> > >>         TimeWindow.ofSizeNoGrace(...),
> > >>         EmitConfig.emitFinal() -- just made this up; it's not in the
> KIP
> > >>       )
> > >>       .count()
> > >>
> > >> I made up the `WindowConfig.emitFinal()` call -- from the KIP it's
> > >> unclear what API you have in mind? `EmitFinalConfig` has not public
> > >> constructor not any builder method.
> > >>
> > >>
> > >> For option 3, I am not sure what you really have in mind. Can you
> given
> > >> a concrete example (similar to above) how users would write their
> code?
> > >>
> > >>
> > >>
> > >> Did you consider to actually pass in the `EmitConfig` into the
> > >> aggregation operator? In the end, it seems not to be property of the
> > >> window definition or windowing step, but a property of the actual
> > operator:
> > >>
> > >>     stream
> > >>       .groupByKey()
> > >>       .windowBy(
> > >>         TimeWindow.ofSizeNoGrace(...)
> > >>       )
> > >>       .count(EmitConfig.emitFinal())
> > >>
> > >> The API surface area that need to be updated might be larger for this
> > >> case though...
> > >>
> > >>
> > >> -Matthias
> > >>
> > >>
> > >>
> > >> On 3/14/22 9:21 PM, Hao Li wrote:
> > >>> Thanks Guozhang!
> > >>>
> > >>> 1. I agree `EmitConfig` is better than `WindowConfig` and option 2
> > >> modifies
> > >>> less places. What do you think of option 1 which doesn't change the
> > >> current
> > >>> `windowedBy` api but configures `EmitConfig` separately. The benefit
> of
> > >>> option 1 is if we need to configure something else later, we don't
> need
> > >> to
> > >>> pile them on `windowedBy` but can add separate APIs.
> > >>> 2. I added it to `Stores` mainly to conform to
> > >>>
> > >>
> >
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java#L227-L231
> > >> .
> > >>> But We can also create an internal API to do that without modifying
> > >>> `Stores`.
> > >>>
> > >>> Hao
> > >>>
> > >>> On Mon, Mar 14, 2022 at 7:52 PM Guozhang Wang <wangg...@gmail.com>
> > >> wrote:
> > >>>
> > >>>> Hello Hao,
> > >>>>
> > >>>> Thanks for the proposal, I have some preference among the options
> here
> > >> so I
> > >>>> will copy them here:
> > >>>>
> > >>>> I'm now thinking if it's better to not add this new config on each
> of
> > >> the
> > >>>> Window interfaces, but instead add that at the
> > KGroupedStream#windowedBy
> > >>>> function. Also instead of adding just a boolean flag, maybe we can
> > add a
> > >>>> Configured class like Grouped, Suppressed, etc, e.g. let's call it a
> > >>>> Emitted which for now would just have a single construct as
> > >>>> Emitted.atWindowClose whose semantics is the same as emitFinal ==
> > true.
> > >> I
> > >>>> think the benefits are:
> > >>>>
> > >>>> 1) you do not need to modify multiple Window classes, but just
> > overload
> > >> one
> > >>>> windowedBy function with a second param. This is less of a scope for
> > >> now,
> > >>>> and also more extensible for any future changes.
> > >>>>
> > >>>> 2) With a config interface, we maintain its extensibility as well as
> > >> being
> > >>>> able to reuse this Emitted interface for other operators if we
> wanted
> > to
> > >>>> expand to.
> > >>>>
> > >>>> ----------------------------
> > >>>>
> > >>>> So in general I'm leaning towards option 2). For that, some more
> > >> detailed
> > >>>> comments:
> > >>>>
> > >>>> 1) If we want to reuse that config object for other non-window
> > stateful
> > >>>> operations, I think naming it as `EmitConfig` is probably better
> than
> > >>>> `WindowConfig`.
> > >>>> 2) I saw your PR (https://github.com/apache/kafka/pull/11892) that
> > you
> > >> are
> > >>>> also proposing to add new stores into the public factory Stores, but
> > >> it's
> > >>>> not included in the KIP. Is that intentional? Personally I think
> that
> > >>>> although we may eventually want to add a new store type to the
> public
> > >> APIs,
> > >>>> for this KIP maybe we do not have to add them but can delay for
> later
> > >> after
> > >>>> we've learned the best way to layout. LMK what do you think?
> > >>>>
> > >>>>
> > >>>>
> > >>>> Guozhang
> > >>>>
> > >>>>
> > >>>>
> > >>>> On Fri, Mar 11, 2022 at 2:13 PM Hao Li <h...@confluent.io.invalid>
> > >> wrote:
> > >>>>
> > >>>>> Hi Dev team,
> > >>>>>
> > >>>>> I'd like to start a discussion thread on Kafka Streams KIP-825:
> > >>>>>
> > >>>>>
> > >>>>
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-825%3A+introduce+a+new+API+to+control+when+aggregated+results+are+produced
> > >>>>>
> > >>>>> This KIP is aimed to add new APIs to support outputting final
> > >> aggregated
> > >>>>> results for windowed aggregations. I listed several options there
> and
> > >> I'm
> > >>>>> looking forward to your feedback.
> > >>>>>
> > >>>>> Thanks,
> > >>>>> Hao
> > >>>>>
> > >>>>
> > >>>>
> > >>>> --
> > >>>> -- Guozhang
> > >>>>
> > >>>
> > >>>
> > >>
> > >
> > >
> >
>
>
> --
> -- Guozhang
>


-- 
Thanks,
Hao

Reply via email to