I think the following case is only doable via `suppress`:

stream
  .groupBy(..)
  .windowBy(..)
  .aggregate(..) //result in a KTable<Windowed<..>>
  .mapValues(..)
  .suppress(Suppressed.untilWindowClose) // since we can trace back to
parent node, to find a window definition


Guozhang


On Mon, Mar 21, 2022 at 6:36 PM John Roesler <vvcep...@apache.org> wrote:

> Thanks, Guozhang!
>
> To clarify, I was asking specifically about deprecating just the method
> ‘untilWindowClose’. I might not be thinking clearly about it, though. What
> does untilWindowClose do that this KIP doesn’t cover?
>
> Thanks,
> John
>
> On Mon, Mar 21, 2022, at 20:31, Guozhang Wang wrote:
> > Just my 2c: Suppressed is in `suppress` whose application scope is much
> > larger and hence more flexible. I.e. it can be used anywhere for a
> `KTable`
> > (but internally we would check whether certain emit policies like
> > `untilWindowClose` is valid or not), whereas `trigger` as for now is only
> > applicable in XXWindowedKStream. So I think it would not be completely
> > replacing Suppressed.untilWindowClose.
> >
> > In the future, personally I'd still want to keep one control object still
> > for all emit policies, and maybe if we have extended Emitted for other
> > emitting policies covered by Suppressed today, we can discuss if we could
> > have `KTable.suppress(Emitted..)` replacing
> `KTable.suppress(Suppressed..)`
> > as a whole, but for this KIP I think it's too early.
> >
> >
> > Guozhang
> >
> >
> > On Mon, Mar 21, 2022 at 6:18 PM John Roesler <vvcep...@apache.org>
> wrote:
> >
> >> Hi all,
> >>
> >> Thanks for the Kip, Hao!
> >>
> >> For what it’s worth, I’m also in favor of your latest framing of the
> API,
> >>
> >> I think the name is fine. I assume it’s inspired by Flink? It’s not
> >> identical to the concept of a trigger in Flink, which specifies when to
> >> evaluate the window, which might be confusing to some people who have
> deep
> >> experience with Flink. Then again, it seems close enough that it should
> be
> >> clear to casual Flink users. For people with no other stream processing
> >> experience, it might seem a bit esoteric compared to something
> >> self-documenting like ‘emit()’, but the docs should  make it clear.
> >>
> >> One small question: it seems like this proposal is identical to
> >> Suppressed.untilWindowClose, and the KIP states that this API is
> superior.
> >> In that case, should we deprecate Suppressed.untilWindowClose?
> >>
> >> Thanks,
> >> John
> >>
> >> On Mon, Mar 21, 2022, at 19:30, Guozhang Wang wrote:
> >> > Hi Hao,
> >> >
> >> > For 2), I think it's a good idea in general to use a separate
> function on
> >> > the Time/SessionWindowedKStream itself, to achieve the same effect
> that,
> >> > for now, the emitting control is only for windowed aggregations as in
> >> this
> >> > KIP, than overloading existing functions. We can discuss further about
> >> the
> >> > actual function names, whether others like the name `trigger` or not.
> As
> >> > for myself I feel `trigger` is a good one but I'd like to see if
> others
> >> > have opinions as well.
> >> >
> >> >
> >> > Guozhang
> >> >
> >> > On Mon, Mar 21, 2022 at 5:18 PM Hao Li <h...@confluent.io.invalid>
> wrote:
> >> >
> >> >> Hi Guozhang,
> >> >>
> >> >> Thanks for the feedback.
> >> >>
> >> >> 1. I agree to have an `Emitted` control class and two static
> >> constructors
> >> >> named `onWindowClose` and `onEachUpdate`.
> >> >>
> >> >> 2. For the API function changes, I'm thinking of adding a new
> function
> >> >> called `trigger` to `TimeWindowedKStream` and
> `SessionWindowedKStream`.
> >> It
> >> >> takes `Emitted` config and returns the same stream. Example:
> >> >>
> >> >> stream
> >> >>   .groupBy(...)
> >> >>   .windowedBy(...)
> >> >>   .trigger(Emitted.onWindowClose). // N
> >> >>   .count()
> >> >>
> >> >> The benefits are:
> >> >>   1. It's simple and avoids creating overloading of existing
> functions
> >> like
> >> >> `windowedBy` or `count`, `reduce` or `aggregate`. In fact, to add it
> to
> >> >> `aggregate` functions, we need to add it to all existing `count`,
> >> >> `aggregate` overloading functions which is a lot.
> >> >>   2. It operates directly on windowed kstream and tells how its
> output
> >> >> should be configured, if later we need to add this other type of
> >> streams,
> >> >> we can reuse same `trigger` API whereas other type of streams/tables
> may
> >> >> not have `aggregate`, `windowedby` api to make it consistent.
> >> >>
> >> >> Hao
> >> >>
> >> >>
> >> >> On Sat, Mar 19, 2022 at 5:40 PM Guozhang Wang <wangg...@gmail.com>
> >> wrote:
> >> >>
> >> >> > Hello Hao,
> >> >> >
> >> >> > I'm preferring option 2 over the other options mainly because the
> >> added
> >> >> > config object could potentially be used in other operators as well
> >> (not
> >> >> > necessarily has to be a windowed operator and hence have to be
> >> >> piggy-backed
> >> >> > on `windowedBy`, and that's also why I suggested not naming it
> >> >> > `WindowConfig` but just `EmitConfig`).
> >> >> >
> >> >> > As for Matthias' question, I think the difference between the
> windowed
> >> >> > aggregate operator and the stream-stream join operator is that, for
> >> the
> >> >> > latter we think emit-final should be the only right emitting policy
> >> and
> >> >> > hence we should not let users to configure it. If users configure
> it
> >> to
> >> >> > e.g. emit eager they may get the old spurious emitting behavior
> which
> >> is
> >> >> > violating the semantics.
> >> >> >
> >> >> > For option 2) itself, I have a few more thoughts:
> >> >> >
> >> >> > 1. Thinking about Matthias' suggestions, I'm also leaning a bit
> >> >> > towards adding the new param in the overloaded `aggregate`, than
> the
> >> >> > overloaded `windowBy` function. The reason is that the emitting
> logic
> >> >> could
> >> >> > be either window based or non-window based, in the long run. Though
> >> for
> >> >> > this KIP we could just add it in `XXXWindowedKStream.aggregate()`,
> we
> >> may
> >> >> > want to extend to other non-windowed operators in the future.
> >> >> > 2. To be consistent with other control class names, I feel maybe we
> >> can
> >> >> > name it "Emitted", not "EmitConfig".
> >> >> > 3. Following the first comment, I think we can have the static
> >> >> constructor
> >> >> > names as "onWindowClose" and "onEachUpdate".
> >> >> >
> >> >> > The resulted code pattern would be like this:
> >> >> >
> >> >> >    stream
> >> >> >      .groupBy(..)
> >> >> >      .windowBy(TimeWindow..)
> >> >> >      .count(Emitted.onWindowClose)
> >> >> >
> >> >> > WDYT?
> >> >> >
> >> >> >
> >> >> > On Wed, Mar 16, 2022 at 12:07 PM Matthias J. Sax <mj...@apache.org
> >
> >> >> wrote:
> >> >> >
> >> >> > > >> `allowedLateness` may not be a good name. What I have in mind
> is
> >> to
> >> >> > use
> >> >> > > >> this to control how frequently we try to emit final results.
> >> Maybe
> >> >> > it's
> >> >> > > >> more flexible to be used as config in properties as we don't
> >> need to
> >> >> > > >> recompile DSL to change it.
> >> >> > >
> >> >> > > I see; making it a config seems better. Frankly, I am not even
> sure
> >> if
> >> >> > > we need a config at all or if we can just hard code it? For the
> >> >> > > stream-stream join left/outer join fix, there is only an internal
> >> >> config
> >> >> > > but no public config either.
> >> >> > >
> >> >> > > Option 1: Your proposal is?
> >> >> > >
> >> >> > >    stream
> >> >> > >      .groupByKey()
> >> >> > >      .windowBy(TimeWindow.ofSizeNoGrace(...))
> >> >> > >      .configure(EmitConfig.emitFinal()
> >> >> > >      .count()
> >> >> > >
> >> >> > > Does not change my argument that it seems to be misplace from an
> API
> >> >> > > flow POV.
> >> >> > >
> >> >> > > Option 1 seems to be the least desirable to me.
> >> >> > >
> >> >> > > For option 2 and 3, and not sure which one I like better. Might
> be
> >> good
> >> >> > > if other could chime in, too. I think I slightly prefer option 2
> >> over
> >> >> > > option 3.
> >> >> > >
> >> >> > >
> >> >> > > -Matthias
> >> >> > >
> >> >> > > On 3/15/22 5:33 PM, Hao Li wrote:
> >> >> > > > Thanks for the feedback Matthias.
> >> >> > > >
> >> >> > > > `allowedLateness` may not be a good name. What I have in mind
> is
> >> to
> >> >> use
> >> >> > > > this to control how frequently we try to emit final results.
> Maybe
> >> >> it's
> >> >> > > > more flexible to be used as config in properties as we don't
> need
> >> to
> >> >> > > > recompile DSL to change it.
> >> >> > > >
> >> >> > > > For option 1, I intend to use `emitFinal` to configure how
> >> >> > > > `TimeWindowedKStream` should be outputted to `KTable` after
> >> >> > aggregation.
> >> >> > > > But `emitFinal` is not an action to the `TimeWindowedKStream`
> >> >> > interface.
> >> >> > > > Maybe adding `configure(EmitConfig config)` makes more sense?
> >> >> > > >
> >> >> > > > For option 2, config can be created using
> >> `WindowConfig.emitFinal()`
> >> >> or
> >> >> > > > `EmitConfig.emitFinal`
> >> >> > > >
> >> >> > > > For option 3, it will be something like `TimeWindows(...,
> >> EmitConfig
> >> >> > > > emitConfig)`.
> >> >> > > >
> >> >> > > > For putting `EmitConfig` in aggregation operator, I think it
> >> doesn't
> >> >> > > > control how we do aggregation but how we output to `KTable`.
> >> That's
> >> >> > why I
> >> >> > > > feel option 1 makes more sense as it applies to
> >> >> `TimeWindowedKStream`.
> >> >> > > But
> >> >> > > > I'm also OK with option 2.
> >> >> > > >
> >> >> > > > Hao
> >> >> > > >
> >> >> > > >
> >> >> > > >
> >> >> > > >
> >> >> > > > On Tue, Mar 15, 2022 at 4:48 PM Matthias J. Sax <
> mj...@apache.org
> >> >
> >> >> > > wrote:
> >> >> > > >
> >> >> > > >> Thanks for the KIP.
> >> >> > > >>
> >> >> > > >> A general comment: it seem that we won't need any new
> >> >> > `allowedLateness`
> >> >> > > >> parameter because the grace-period is defined on the window
> >> itself
> >> >> > > already?
> >> >> > > >>
> >> >> > > >> (On the other hand, if I think about it once more, maybe the
> >> >> > > >> `grace-period` is actually not a property of the window but a
> >> >> property
> >> >> > > >> of the aggregation operator? _thinking_)
> >> >> > > >>
> >> >> > > >>   From an API flow point of view, option 1 might not be
> desirable
> >> >> > IMHO:
> >> >> > > >>
> >> >> > > >>     stream
> >> >> > > >>       .groupByKey()
> >> >> > > >>       .windowBy(TimeWindow.ofSizeNoGrace(...))
> >> >> > > >>       .emitFinal()
> >> >> > > >>       .count()
> >> >> > > >>
> >> >> > > >> The call to `emitFinal(0` seems not to be on the right place
> for
> >> >> this
> >> >> > > case?
> >> >> > > >>
> >> >> > > >>
> >> >> > > >> Option 2 might work (I think we need to discuss a few details
> of
> >> the
> >> >> > API
> >> >> > > >> though):
> >> >> > > >>
> >> >> > > >>     stream
> >> >> > > >>       .groupByKey()
> >> >> > > >>       .windowBy(
> >> >> > > >>         TimeWindow.ofSizeNoGrace(...),
> >> >> > > >>         EmitConfig.emitFinal() -- just made this up; it's not
> in
> >> the
> >> >> > KIP
> >> >> > > >>       )
> >> >> > > >>       .count()
> >> >> > > >>
> >> >> > > >> I made up the `WindowConfig.emitFinal()` call -- from the KIP
> >> it's
> >> >> > > >> unclear what API you have in mind? `EmitFinalConfig` has not
> >> public
> >> >> > > >> constructor not any builder method.
> >> >> > > >>
> >> >> > > >>
> >> >> > > >> For option 3, I am not sure what you really have in mind. Can
> you
> >> >> > given
> >> >> > > >> a concrete example (similar to above) how users would write
> their
> >> >> > code?
> >> >> > > >>
> >> >> > > >>
> >> >> > > >>
> >> >> > > >> Did you consider to actually pass in the `EmitConfig` into the
> >> >> > > >> aggregation operator? In the end, it seems not to be property
> of
> >> the
> >> >> > > >> window definition or windowing step, but a property of the
> actual
> >> >> > > operator:
> >> >> > > >>
> >> >> > > >>     stream
> >> >> > > >>       .groupByKey()
> >> >> > > >>       .windowBy(
> >> >> > > >>         TimeWindow.ofSizeNoGrace(...)
> >> >> > > >>       )
> >> >> > > >>       .count(EmitConfig.emitFinal())
> >> >> > > >>
> >> >> > > >> The API surface area that need to be updated might be larger
> for
> >> >> this
> >> >> > > >> case though...
> >> >> > > >>
> >> >> > > >>
> >> >> > > >> -Matthias
> >> >> > > >>
> >> >> > > >>
> >> >> > > >>
> >> >> > > >> On 3/14/22 9:21 PM, Hao Li wrote:
> >> >> > > >>> Thanks Guozhang!
> >> >> > > >>>
> >> >> > > >>> 1. I agree `EmitConfig` is better than `WindowConfig` and
> >> option 2
> >> >> > > >> modifies
> >> >> > > >>> less places. What do you think of option 1 which doesn't
> change
> >> the
> >> >> > > >> current
> >> >> > > >>> `windowedBy` api but configures `EmitConfig` separately. The
> >> >> benefit
> >> >> > of
> >> >> > > >>> option 1 is if we need to configure something else later, we
> >> don't
> >> >> > need
> >> >> > > >> to
> >> >> > > >>> pile them on `windowedBy` but can add separate APIs.
> >> >> > > >>> 2. I added it to `Stores` mainly to conform to
> >> >> > > >>>
> >> >> > > >>
> >> >> > >
> >> >> >
> >> >>
> >>
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java#L227-L231
> >> >> > > >> .
> >> >> > > >>> But We can also create an internal API to do that without
> >> modifying
> >> >> > > >>> `Stores`.
> >> >> > > >>>
> >> >> > > >>> Hao
> >> >> > > >>>
> >> >> > > >>> On Mon, Mar 14, 2022 at 7:52 PM Guozhang Wang <
> >> wangg...@gmail.com>
> >> >> > > >> wrote:
> >> >> > > >>>
> >> >> > > >>>> Hello Hao,
> >> >> > > >>>>
> >> >> > > >>>> Thanks for the proposal, I have some preference among the
> >> options
> >> >> > here
> >> >> > > >> so I
> >> >> > > >>>> will copy them here:
> >> >> > > >>>>
> >> >> > > >>>> I'm now thinking if it's better to not add this new config
> on
> >> each
> >> >> > of
> >> >> > > >> the
> >> >> > > >>>> Window interfaces, but instead add that at the
> >> >> > > KGroupedStream#windowedBy
> >> >> > > >>>> function. Also instead of adding just a boolean flag, maybe
> we
> >> can
> >> >> > > add a
> >> >> > > >>>> Configured class like Grouped, Suppressed, etc, e.g. let's
> call
> >> >> it a
> >> >> > > >>>> Emitted which for now would just have a single construct as
> >> >> > > >>>> Emitted.atWindowClose whose semantics is the same as
> emitFinal
> >> ==
> >> >> > > true.
> >> >> > > >> I
> >> >> > > >>>> think the benefits are:
> >> >> > > >>>>
> >> >> > > >>>> 1) you do not need to modify multiple Window classes, but
> just
> >> >> > > overload
> >> >> > > >> one
> >> >> > > >>>> windowedBy function with a second param. This is less of a
> >> scope
> >> >> for
> >> >> > > >> now,
> >> >> > > >>>> and also more extensible for any future changes.
> >> >> > > >>>>
> >> >> > > >>>> 2) With a config interface, we maintain its extensibility as
> >> well
> >> >> as
> >> >> > > >> being
> >> >> > > >>>> able to reuse this Emitted interface for other operators if
> we
> >> >> > wanted
> >> >> > > to
> >> >> > > >>>> expand to.
> >> >> > > >>>>
> >> >> > > >>>> ----------------------------
> >> >> > > >>>>
> >> >> > > >>>> So in general I'm leaning towards option 2). For that, some
> >> more
> >> >> > > >> detailed
> >> >> > > >>>> comments:
> >> >> > > >>>>
> >> >> > > >>>> 1) If we want to reuse that config object for other
> non-window
> >> >> > > stateful
> >> >> > > >>>> operations, I think naming it as `EmitConfig` is probably
> >> better
> >> >> > than
> >> >> > > >>>> `WindowConfig`.
> >> >> > > >>>> 2) I saw your PR (
> https://github.com/apache/kafka/pull/11892)
> >> >> that
> >> >> > > you
> >> >> > > >> are
> >> >> > > >>>> also proposing to add new stores into the public factory
> >> Stores,
> >> >> but
> >> >> > > >> it's
> >> >> > > >>>> not included in the KIP. Is that intentional? Personally I
> >> think
> >> >> > that
> >> >> > > >>>> although we may eventually want to add a new store type to
> the
> >> >> > public
> >> >> > > >> APIs,
> >> >> > > >>>> for this KIP maybe we do not have to add them but can delay
> for
> >> >> > later
> >> >> > > >> after
> >> >> > > >>>> we've learned the best way to layout. LMK what do you think?
> >> >> > > >>>>
> >> >> > > >>>>
> >> >> > > >>>>
> >> >> > > >>>> Guozhang
> >> >> > > >>>>
> >> >> > > >>>>
> >> >> > > >>>>
> >> >> > > >>>> On Fri, Mar 11, 2022 at 2:13 PM Hao Li
> >> <h...@confluent.io.invalid>
> >> >> > > >> wrote:
> >> >> > > >>>>
> >> >> > > >>>>> Hi Dev team,
> >> >> > > >>>>>
> >> >> > > >>>>> I'd like to start a discussion thread on Kafka Streams
> >> KIP-825:
> >> >> > > >>>>>
> >> >> > > >>>>>
> >> >> > > >>>>
> >> >> > > >>
> >> >> > >
> >> >> >
> >> >>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-825%3A+introduce+a+new+API+to+control+when+aggregated+results+are+produced
> >> >> > > >>>>>
> >> >> > > >>>>> This KIP is aimed to add new APIs to support outputting
> final
> >> >> > > >> aggregated
> >> >> > > >>>>> results for windowed aggregations. I listed several options
> >> there
> >> >> > and
> >> >> > > >> I'm
> >> >> > > >>>>> looking forward to your feedback.
> >> >> > > >>>>>
> >> >> > > >>>>> Thanks,
> >> >> > > >>>>> Hao
> >> >> > > >>>>>
> >> >> > > >>>>
> >> >> > > >>>>
> >> >> > > >>>> --
> >> >> > > >>>> -- Guozhang
> >> >> > > >>>>
> >> >> > > >>>
> >> >> > > >>>
> >> >> > > >>
> >> >> > > >
> >> >> > > >
> >> >> > >
> >> >> >
> >> >> >
> >> >> > --
> >> >> > -- Guozhang
> >> >> >
> >> >>
> >> >>
> >> >> --
> >> >> Thanks,
> >> >> Hao
> >> >>
> >> >
> >> >
> >> > --
> >> > -- Guozhang
> >>
> >
> >
> > --
> > -- Guozhang
>


-- 
-- Guozhang

Reply via email to