Thanks, Guozhang!

To clarify, I was asking specifically about deprecating just the method 
‘untilWindowClose’. I might not be thinking clearly about it, though. What does 
untilWindowClose do that this KIP doesn’t cover?

Thanks,
John

On Mon, Mar 21, 2022, at 20:31, Guozhang Wang wrote:
> Just my 2c: Suppressed is in `suppress` whose application scope is much
> larger and hence more flexible. I.e. it can be used anywhere for a `KTable`
> (but internally we would check whether certain emit policies like
> `untilWindowClose` is valid or not), whereas `trigger` as for now is only
> applicable in XXWindowedKStream. So I think it would not be completely
> replacing Suppressed.untilWindowClose.
>
> In the future, personally I'd still want to keep one control object still
> for all emit policies, and maybe if we have extended Emitted for other
> emitting policies covered by Suppressed today, we can discuss if we could
> have `KTable.suppress(Emitted..)` replacing `KTable.suppress(Suppressed..)`
> as a whole, but for this KIP I think it's too early.
>
>
> Guozhang
>
>
> On Mon, Mar 21, 2022 at 6:18 PM John Roesler <vvcep...@apache.org> wrote:
>
>> Hi all,
>>
>> Thanks for the Kip, Hao!
>>
>> For what it’s worth, I’m also in favor of your latest framing of the API,
>>
>> I think the name is fine. I assume it’s inspired by Flink? It’s not
>> identical to the concept of a trigger in Flink, which specifies when to
>> evaluate the window, which might be confusing to some people who have deep
>> experience with Flink. Then again, it seems close enough that it should be
>> clear to casual Flink users. For people with no other stream processing
>> experience, it might seem a bit esoteric compared to something
>> self-documenting like ‘emit()’, but the docs should  make it clear.
>>
>> One small question: it seems like this proposal is identical to
>> Suppressed.untilWindowClose, and the KIP states that this API is superior.
>> In that case, should we deprecate Suppressed.untilWindowClose?
>>
>> Thanks,
>> John
>>
>> On Mon, Mar 21, 2022, at 19:30, Guozhang Wang wrote:
>> > Hi Hao,
>> >
>> > For 2), I think it's a good idea in general to use a separate function on
>> > the Time/SessionWindowedKStream itself, to achieve the same effect that,
>> > for now, the emitting control is only for windowed aggregations as in
>> this
>> > KIP, than overloading existing functions. We can discuss further about
>> the
>> > actual function names, whether others like the name `trigger` or not. As
>> > for myself I feel `trigger` is a good one but I'd like to see if others
>> > have opinions as well.
>> >
>> >
>> > Guozhang
>> >
>> > On Mon, Mar 21, 2022 at 5:18 PM Hao Li <h...@confluent.io.invalid> wrote:
>> >
>> >> Hi Guozhang,
>> >>
>> >> Thanks for the feedback.
>> >>
>> >> 1. I agree to have an `Emitted` control class and two static
>> constructors
>> >> named `onWindowClose` and `onEachUpdate`.
>> >>
>> >> 2. For the API function changes, I'm thinking of adding a new function
>> >> called `trigger` to `TimeWindowedKStream` and `SessionWindowedKStream`.
>> It
>> >> takes `Emitted` config and returns the same stream. Example:
>> >>
>> >> stream
>> >>   .groupBy(...)
>> >>   .windowedBy(...)
>> >>   .trigger(Emitted.onWindowClose). // N
>> >>   .count()
>> >>
>> >> The benefits are:
>> >>   1. It's simple and avoids creating overloading of existing functions
>> like
>> >> `windowedBy` or `count`, `reduce` or `aggregate`. In fact, to add it to
>> >> `aggregate` functions, we need to add it to all existing `count`,
>> >> `aggregate` overloading functions which is a lot.
>> >>   2. It operates directly on windowed kstream and tells how its output
>> >> should be configured, if later we need to add this other type of
>> streams,
>> >> we can reuse same `trigger` API whereas other type of streams/tables may
>> >> not have `aggregate`, `windowedby` api to make it consistent.
>> >>
>> >> Hao
>> >>
>> >>
>> >> On Sat, Mar 19, 2022 at 5:40 PM Guozhang Wang <wangg...@gmail.com>
>> wrote:
>> >>
>> >> > Hello Hao,
>> >> >
>> >> > I'm preferring option 2 over the other options mainly because the
>> added
>> >> > config object could potentially be used in other operators as well
>> (not
>> >> > necessarily has to be a windowed operator and hence have to be
>> >> piggy-backed
>> >> > on `windowedBy`, and that's also why I suggested not naming it
>> >> > `WindowConfig` but just `EmitConfig`).
>> >> >
>> >> > As for Matthias' question, I think the difference between the windowed
>> >> > aggregate operator and the stream-stream join operator is that, for
>> the
>> >> > latter we think emit-final should be the only right emitting policy
>> and
>> >> > hence we should not let users to configure it. If users configure it
>> to
>> >> > e.g. emit eager they may get the old spurious emitting behavior which
>> is
>> >> > violating the semantics.
>> >> >
>> >> > For option 2) itself, I have a few more thoughts:
>> >> >
>> >> > 1. Thinking about Matthias' suggestions, I'm also leaning a bit
>> >> > towards adding the new param in the overloaded `aggregate`, than the
>> >> > overloaded `windowBy` function. The reason is that the emitting logic
>> >> could
>> >> > be either window based or non-window based, in the long run. Though
>> for
>> >> > this KIP we could just add it in `XXXWindowedKStream.aggregate()`, we
>> may
>> >> > want to extend to other non-windowed operators in the future.
>> >> > 2. To be consistent with other control class names, I feel maybe we
>> can
>> >> > name it "Emitted", not "EmitConfig".
>> >> > 3. Following the first comment, I think we can have the static
>> >> constructor
>> >> > names as "onWindowClose" and "onEachUpdate".
>> >> >
>> >> > The resulted code pattern would be like this:
>> >> >
>> >> >    stream
>> >> >      .groupBy(..)
>> >> >      .windowBy(TimeWindow..)
>> >> >      .count(Emitted.onWindowClose)
>> >> >
>> >> > WDYT?
>> >> >
>> >> >
>> >> > On Wed, Mar 16, 2022 at 12:07 PM Matthias J. Sax <mj...@apache.org>
>> >> wrote:
>> >> >
>> >> > > >> `allowedLateness` may not be a good name. What I have in mind is
>> to
>> >> > use
>> >> > > >> this to control how frequently we try to emit final results.
>> Maybe
>> >> > it's
>> >> > > >> more flexible to be used as config in properties as we don't
>> need to
>> >> > > >> recompile DSL to change it.
>> >> > >
>> >> > > I see; making it a config seems better. Frankly, I am not even sure
>> if
>> >> > > we need a config at all or if we can just hard code it? For the
>> >> > > stream-stream join left/outer join fix, there is only an internal
>> >> config
>> >> > > but no public config either.
>> >> > >
>> >> > > Option 1: Your proposal is?
>> >> > >
>> >> > >    stream
>> >> > >      .groupByKey()
>> >> > >      .windowBy(TimeWindow.ofSizeNoGrace(...))
>> >> > >      .configure(EmitConfig.emitFinal()
>> >> > >      .count()
>> >> > >
>> >> > > Does not change my argument that it seems to be misplace from an API
>> >> > > flow POV.
>> >> > >
>> >> > > Option 1 seems to be the least desirable to me.
>> >> > >
>> >> > > For option 2 and 3, and not sure which one I like better. Might be
>> good
>> >> > > if other could chime in, too. I think I slightly prefer option 2
>> over
>> >> > > option 3.
>> >> > >
>> >> > >
>> >> > > -Matthias
>> >> > >
>> >> > > On 3/15/22 5:33 PM, Hao Li wrote:
>> >> > > > Thanks for the feedback Matthias.
>> >> > > >
>> >> > > > `allowedLateness` may not be a good name. What I have in mind is
>> to
>> >> use
>> >> > > > this to control how frequently we try to emit final results. Maybe
>> >> it's
>> >> > > > more flexible to be used as config in properties as we don't need
>> to
>> >> > > > recompile DSL to change it.
>> >> > > >
>> >> > > > For option 1, I intend to use `emitFinal` to configure how
>> >> > > > `TimeWindowedKStream` should be outputted to `KTable` after
>> >> > aggregation.
>> >> > > > But `emitFinal` is not an action to the `TimeWindowedKStream`
>> >> > interface.
>> >> > > > Maybe adding `configure(EmitConfig config)` makes more sense?
>> >> > > >
>> >> > > > For option 2, config can be created using
>> `WindowConfig.emitFinal()`
>> >> or
>> >> > > > `EmitConfig.emitFinal`
>> >> > > >
>> >> > > > For option 3, it will be something like `TimeWindows(...,
>> EmitConfig
>> >> > > > emitConfig)`.
>> >> > > >
>> >> > > > For putting `EmitConfig` in aggregation operator, I think it
>> doesn't
>> >> > > > control how we do aggregation but how we output to `KTable`.
>> That's
>> >> > why I
>> >> > > > feel option 1 makes more sense as it applies to
>> >> `TimeWindowedKStream`.
>> >> > > But
>> >> > > > I'm also OK with option 2.
>> >> > > >
>> >> > > > Hao
>> >> > > >
>> >> > > >
>> >> > > >
>> >> > > >
>> >> > > > On Tue, Mar 15, 2022 at 4:48 PM Matthias J. Sax <mj...@apache.org
>> >
>> >> > > wrote:
>> >> > > >
>> >> > > >> Thanks for the KIP.
>> >> > > >>
>> >> > > >> A general comment: it seem that we won't need any new
>> >> > `allowedLateness`
>> >> > > >> parameter because the grace-period is defined on the window
>> itself
>> >> > > already?
>> >> > > >>
>> >> > > >> (On the other hand, if I think about it once more, maybe the
>> >> > > >> `grace-period` is actually not a property of the window but a
>> >> property
>> >> > > >> of the aggregation operator? _thinking_)
>> >> > > >>
>> >> > > >>   From an API flow point of view, option 1 might not be desirable
>> >> > IMHO:
>> >> > > >>
>> >> > > >>     stream
>> >> > > >>       .groupByKey()
>> >> > > >>       .windowBy(TimeWindow.ofSizeNoGrace(...))
>> >> > > >>       .emitFinal()
>> >> > > >>       .count()
>> >> > > >>
>> >> > > >> The call to `emitFinal(0` seems not to be on the right place for
>> >> this
>> >> > > case?
>> >> > > >>
>> >> > > >>
>> >> > > >> Option 2 might work (I think we need to discuss a few details of
>> the
>> >> > API
>> >> > > >> though):
>> >> > > >>
>> >> > > >>     stream
>> >> > > >>       .groupByKey()
>> >> > > >>       .windowBy(
>> >> > > >>         TimeWindow.ofSizeNoGrace(...),
>> >> > > >>         EmitConfig.emitFinal() -- just made this up; it's not in
>> the
>> >> > KIP
>> >> > > >>       )
>> >> > > >>       .count()
>> >> > > >>
>> >> > > >> I made up the `WindowConfig.emitFinal()` call -- from the KIP
>> it's
>> >> > > >> unclear what API you have in mind? `EmitFinalConfig` has not
>> public
>> >> > > >> constructor not any builder method.
>> >> > > >>
>> >> > > >>
>> >> > > >> For option 3, I am not sure what you really have in mind. Can you
>> >> > given
>> >> > > >> a concrete example (similar to above) how users would write their
>> >> > code?
>> >> > > >>
>> >> > > >>
>> >> > > >>
>> >> > > >> Did you consider to actually pass in the `EmitConfig` into the
>> >> > > >> aggregation operator? In the end, it seems not to be property of
>> the
>> >> > > >> window definition or windowing step, but a property of the actual
>> >> > > operator:
>> >> > > >>
>> >> > > >>     stream
>> >> > > >>       .groupByKey()
>> >> > > >>       .windowBy(
>> >> > > >>         TimeWindow.ofSizeNoGrace(...)
>> >> > > >>       )
>> >> > > >>       .count(EmitConfig.emitFinal())
>> >> > > >>
>> >> > > >> The API surface area that need to be updated might be larger for
>> >> this
>> >> > > >> case though...
>> >> > > >>
>> >> > > >>
>> >> > > >> -Matthias
>> >> > > >>
>> >> > > >>
>> >> > > >>
>> >> > > >> On 3/14/22 9:21 PM, Hao Li wrote:
>> >> > > >>> Thanks Guozhang!
>> >> > > >>>
>> >> > > >>> 1. I agree `EmitConfig` is better than `WindowConfig` and
>> option 2
>> >> > > >> modifies
>> >> > > >>> less places. What do you think of option 1 which doesn't change
>> the
>> >> > > >> current
>> >> > > >>> `windowedBy` api but configures `EmitConfig` separately. The
>> >> benefit
>> >> > of
>> >> > > >>> option 1 is if we need to configure something else later, we
>> don't
>> >> > need
>> >> > > >> to
>> >> > > >>> pile them on `windowedBy` but can add separate APIs.
>> >> > > >>> 2. I added it to `Stores` mainly to conform to
>> >> > > >>>
>> >> > > >>
>> >> > >
>> >> >
>> >>
>> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java#L227-L231
>> >> > > >> .
>> >> > > >>> But We can also create an internal API to do that without
>> modifying
>> >> > > >>> `Stores`.
>> >> > > >>>
>> >> > > >>> Hao
>> >> > > >>>
>> >> > > >>> On Mon, Mar 14, 2022 at 7:52 PM Guozhang Wang <
>> wangg...@gmail.com>
>> >> > > >> wrote:
>> >> > > >>>
>> >> > > >>>> Hello Hao,
>> >> > > >>>>
>> >> > > >>>> Thanks for the proposal, I have some preference among the
>> options
>> >> > here
>> >> > > >> so I
>> >> > > >>>> will copy them here:
>> >> > > >>>>
>> >> > > >>>> I'm now thinking if it's better to not add this new config on
>> each
>> >> > of
>> >> > > >> the
>> >> > > >>>> Window interfaces, but instead add that at the
>> >> > > KGroupedStream#windowedBy
>> >> > > >>>> function. Also instead of adding just a boolean flag, maybe we
>> can
>> >> > > add a
>> >> > > >>>> Configured class like Grouped, Suppressed, etc, e.g. let's call
>> >> it a
>> >> > > >>>> Emitted which for now would just have a single construct as
>> >> > > >>>> Emitted.atWindowClose whose semantics is the same as emitFinal
>> ==
>> >> > > true.
>> >> > > >> I
>> >> > > >>>> think the benefits are:
>> >> > > >>>>
>> >> > > >>>> 1) you do not need to modify multiple Window classes, but just
>> >> > > overload
>> >> > > >> one
>> >> > > >>>> windowedBy function with a second param. This is less of a
>> scope
>> >> for
>> >> > > >> now,
>> >> > > >>>> and also more extensible for any future changes.
>> >> > > >>>>
>> >> > > >>>> 2) With a config interface, we maintain its extensibility as
>> well
>> >> as
>> >> > > >> being
>> >> > > >>>> able to reuse this Emitted interface for other operators if we
>> >> > wanted
>> >> > > to
>> >> > > >>>> expand to.
>> >> > > >>>>
>> >> > > >>>> ----------------------------
>> >> > > >>>>
>> >> > > >>>> So in general I'm leaning towards option 2). For that, some
>> more
>> >> > > >> detailed
>> >> > > >>>> comments:
>> >> > > >>>>
>> >> > > >>>> 1) If we want to reuse that config object for other non-window
>> >> > > stateful
>> >> > > >>>> operations, I think naming it as `EmitConfig` is probably
>> better
>> >> > than
>> >> > > >>>> `WindowConfig`.
>> >> > > >>>> 2) I saw your PR (https://github.com/apache/kafka/pull/11892)
>> >> that
>> >> > > you
>> >> > > >> are
>> >> > > >>>> also proposing to add new stores into the public factory
>> Stores,
>> >> but
>> >> > > >> it's
>> >> > > >>>> not included in the KIP. Is that intentional? Personally I
>> think
>> >> > that
>> >> > > >>>> although we may eventually want to add a new store type to the
>> >> > public
>> >> > > >> APIs,
>> >> > > >>>> for this KIP maybe we do not have to add them but can delay for
>> >> > later
>> >> > > >> after
>> >> > > >>>> we've learned the best way to layout. LMK what do you think?
>> >> > > >>>>
>> >> > > >>>>
>> >> > > >>>>
>> >> > > >>>> Guozhang
>> >> > > >>>>
>> >> > > >>>>
>> >> > > >>>>
>> >> > > >>>> On Fri, Mar 11, 2022 at 2:13 PM Hao Li
>> <h...@confluent.io.invalid>
>> >> > > >> wrote:
>> >> > > >>>>
>> >> > > >>>>> Hi Dev team,
>> >> > > >>>>>
>> >> > > >>>>> I'd like to start a discussion thread on Kafka Streams
>> KIP-825:
>> >> > > >>>>>
>> >> > > >>>>>
>> >> > > >>>>
>> >> > > >>
>> >> > >
>> >> >
>> >>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-825%3A+introduce+a+new+API+to+control+when+aggregated+results+are+produced
>> >> > > >>>>>
>> >> > > >>>>> This KIP is aimed to add new APIs to support outputting final
>> >> > > >> aggregated
>> >> > > >>>>> results for windowed aggregations. I listed several options
>> there
>> >> > and
>> >> > > >> I'm
>> >> > > >>>>> looking forward to your feedback.
>> >> > > >>>>>
>> >> > > >>>>> Thanks,
>> >> > > >>>>> Hao
>> >> > > >>>>>
>> >> > > >>>>
>> >> > > >>>>
>> >> > > >>>> --
>> >> > > >>>> -- Guozhang
>> >> > > >>>>
>> >> > > >>>
>> >> > > >>>
>> >> > > >>
>> >> > > >
>> >> > > >
>> >> > >
>> >> >
>> >> >
>> >> > --
>> >> > -- Guozhang
>> >> >
>> >>
>> >>
>> >> --
>> >> Thanks,
>> >> Hao
>> >>
>> >
>> >
>> > --
>> > -- Guozhang
>>
>
>
> -- 
> -- Guozhang

Reply via email to