Hi Hao,

Thank you for the KIP!

Regarding option 1, I would not use `Emitted.onWindowClose()` since that does not seem compatible with the proposed flow. Conceptually, now the flow states that the aggregation is triggered on window close and all aggregation results are emitted. `Emitted` suggests that the aggregation is configured to only emit final results.

Thus, I propose the following:

stream
    .groupBy(..)
    .windowedBy(..)
    .trigger(TriggerParameters.onWindowClose())
    .aggregate(..) //result in a KTable<Windowed<..>>
    .mapValues(..)

An alternative to `trigger()` could be `schedule()`, but I do not really like it.

One thing I noticed with option 1 is that all other methods in the example above are operations on data. `groupBy()` groups, `windowedBy()` partitions, `aggregate()` computes the aggregate, `mapValues()` maps values, even `suppress()` suppresses intermediate results. But what does `trigger()` do? `trigger()` seems a config lost among operations.

However, if we do not want to restrict ourselves to only use methods when we want to specify operations on data, I have the following proposal:

stream
    .groupBy(..)
    .windowedBy(..)
    .onWindowClose()
    .aggregate(..) //result in a KTable<Windowed<..>>
    .mapValues(..)

Best,
Bruno

P.S.: Why is it `windowedBy()` and not `windowBy()`? All other operations also use present tense.

On 22.03.22 06:36, Hao Li wrote:
Hi John,

Yes. For naming, `trigger` is similar to Flink's trigger, but it has a
different meaning in our case. `emit` sounds like an action to emit? How
about `emitTrigger`? I'm open to suggestions for the naming.

For deprecating `Suppressed.untilWindowClose`, I agree with Guozhang we can
deprecate `Suppressed` config as a whole later. Or we can deprecate
`Suppressed.untilWindowClose` in later KIP after implementation of emit
final is done.

BTW, isn't

stream
   .groupBy(..)
   .windowBy(..)
   .aggregate(..) //result in a KTable<Windowed<..>>
   .mapValues(..)
   .suppress(Suppressed.untilWindowClose) // since we can trace back to
parent node, to find a window definition

same as

stream
   .groupBy(..)
   .windowBy(..)
   .trigger(Emitted.onWindowClose)
   .aggregate(..) //result in a KTable<Windowed<..>>
   .mapValues(..)
?

Hao


On Mon, Mar 21, 2022 at 7:28 PM Guozhang Wang <wangg...@gmail.com> wrote:

I think the following case is only doable via `suppress`:

stream
   .groupBy(..)
   .windowBy(..)
   .aggregate(..) //result in a KTable<Windowed<..>>
   .mapValues(..)
   .suppress(Suppressed.untilWindowClose) // since we can trace back to
parent node, to find a window definition


Guozhang


On Mon, Mar 21, 2022 at 6:36 PM John Roesler <vvcep...@apache.org> wrote:

Thanks, Guozhang!

To clarify, I was asking specifically about deprecating just the method
‘untilWindowClose’. I might not be thinking clearly about it, though.
What
does untilWindowClose do that this KIP doesn’t cover?

Thanks,
John

On Mon, Mar 21, 2022, at 20:31, Guozhang Wang wrote:
Just my 2c: Suppressed is in `suppress` whose application scope is much
larger and hence more flexible. I.e. it can be used anywhere for a
`KTable`
(but internally we would check whether certain emit policies like
`untilWindowClose` is valid or not), whereas `trigger` as for now is
only
applicable in XXWindowedKStream. So I think it would not be completely
replacing Suppressed.untilWindowClose.

In the future, personally I'd still want to keep one control object
still
for all emit policies, and maybe if we have extended Emitted for other
emitting policies covered by Suppressed today, we can discuss if we
could
have `KTable.suppress(Emitted..)` replacing
`KTable.suppress(Suppressed..)`
as a whole, but for this KIP I think it's too early.


Guozhang


On Mon, Mar 21, 2022 at 6:18 PM John Roesler <vvcep...@apache.org>
wrote:

Hi all,

Thanks for the Kip, Hao!

For what it’s worth, I’m also in favor of your latest framing of the
API,

I think the name is fine. I assume it’s inspired by Flink? It’s not
identical to the concept of a trigger in Flink, which specifies when
to
evaluate the window, which might be confusing to some people who have
deep
experience with Flink. Then again, it seems close enough that it
should
be
clear to casual Flink users. For people with no other stream
processing
experience, it might seem a bit esoteric compared to something
self-documenting like ‘emit()’, but the docs should  make it clear.

One small question: it seems like this proposal is identical to
Suppressed.untilWindowClose, and the KIP states that this API is
superior.
In that case, should we deprecate Suppressed.untilWindowClose?

Thanks,
John

On Mon, Mar 21, 2022, at 19:30, Guozhang Wang wrote:
Hi Hao,

For 2), I think it's a good idea in general to use a separate
function on
the Time/SessionWindowedKStream itself, to achieve the same effect
that,
for now, the emitting control is only for windowed aggregations as
in
this
KIP, than overloading existing functions. We can discuss further
about
the
actual function names, whether others like the name `trigger` or
not.
As
for myself I feel `trigger` is a good one but I'd like to see if
others
have opinions as well.


Guozhang

On Mon, Mar 21, 2022 at 5:18 PM Hao Li <h...@confluent.io.invalid>
wrote:

Hi Guozhang,

Thanks for the feedback.

1. I agree to have an `Emitted` control class and two static
constructors
named `onWindowClose` and `onEachUpdate`.

2. For the API function changes, I'm thinking of adding a new
function
called `trigger` to `TimeWindowedKStream` and
`SessionWindowedKStream`.
It
takes `Emitted` config and returns the same stream. Example:

stream
   .groupBy(...)
   .windowedBy(...)
   .trigger(Emitted.onWindowClose). // N
   .count()

The benefits are:
   1. It's simple and avoids creating overloading of existing
functions
like
`windowedBy` or `count`, `reduce` or `aggregate`. In fact, to add
it
to
`aggregate` functions, we need to add it to all existing `count`,
`aggregate` overloading functions which is a lot.
   2. It operates directly on windowed kstream and tells how its
output
should be configured, if later we need to add this other type of
streams,
we can reuse same `trigger` API whereas other type of
streams/tables
may
not have `aggregate`, `windowedby` api to make it consistent.

Hao


On Sat, Mar 19, 2022 at 5:40 PM Guozhang Wang <wangg...@gmail.com>
wrote:

Hello Hao,

I'm preferring option 2 over the other options mainly because the
added
config object could potentially be used in other operators as
well
(not
necessarily has to be a windowed operator and hence have to be
piggy-backed
on `windowedBy`, and that's also why I suggested not naming it
`WindowConfig` but just `EmitConfig`).

As for Matthias' question, I think the difference between the
windowed
aggregate operator and the stream-stream join operator is that,
for
the
latter we think emit-final should be the only right emitting
policy
and
hence we should not let users to configure it. If users configure
it
to
e.g. emit eager they may get the old spurious emitting behavior
which
is
violating the semantics.

For option 2) itself, I have a few more thoughts:

1. Thinking about Matthias' suggestions, I'm also leaning a bit
towards adding the new param in the overloaded `aggregate`, than
the
overloaded `windowBy` function. The reason is that the emitting
logic
could
be either window based or non-window based, in the long run.
Though
for
this KIP we could just add it in
`XXXWindowedKStream.aggregate()`,
we
may
want to extend to other non-windowed operators in the future.
2. To be consistent with other control class names, I feel maybe
we
can
name it "Emitted", not "EmitConfig".
3. Following the first comment, I think we can have the static
constructor
names as "onWindowClose" and "onEachUpdate".

The resulted code pattern would be like this:

    stream
      .groupBy(..)
      .windowBy(TimeWindow..)
      .count(Emitted.onWindowClose)

WDYT?


On Wed, Mar 16, 2022 at 12:07 PM Matthias J. Sax <
mj...@apache.org

wrote:

`allowedLateness` may not be a good name. What I have in
mind
is
to
use
this to control how frequently we try to emit final results.
Maybe
it's
more flexible to be used as config in properties as we don't
need to
recompile DSL to change it.

I see; making it a config seems better. Frankly, I am not even
sure
if
we need a config at all or if we can just hard code it? For the
stream-stream join left/outer join fix, there is only an
internal
config
but no public config either.

Option 1: Your proposal is?

    stream
      .groupByKey()
      .windowBy(TimeWindow.ofSizeNoGrace(...))
      .configure(EmitConfig.emitFinal()
      .count()

Does not change my argument that it seems to be misplace from
an
API
flow POV.

Option 1 seems to be the least desirable to me.

For option 2 and 3, and not sure which one I like better. Might
be
good
if other could chime in, too. I think I slightly prefer option
2
over
option 3.


-Matthias

On 3/15/22 5:33 PM, Hao Li wrote:
Thanks for the feedback Matthias.

`allowedLateness` may not be a good name. What I have in mind
is
to
use
this to control how frequently we try to emit final results.
Maybe
it's
more flexible to be used as config in properties as we don't
need
to
recompile DSL to change it.

For option 1, I intend to use `emitFinal` to configure how
`TimeWindowedKStream` should be outputted to `KTable` after
aggregation.
But `emitFinal` is not an action to the `TimeWindowedKStream`
interface.
Maybe adding `configure(EmitConfig config)` makes more sense?

For option 2, config can be created using
`WindowConfig.emitFinal()`
or
`EmitConfig.emitFinal`

For option 3, it will be something like `TimeWindows(...,
EmitConfig
emitConfig)`.

For putting `EmitConfig` in aggregation operator, I think it
doesn't
control how we do aggregation but how we output to `KTable`.
That's
why I
feel option 1 makes more sense as it applies to
`TimeWindowedKStream`.
But
I'm also OK with option 2.

Hao




On Tue, Mar 15, 2022 at 4:48 PM Matthias J. Sax <
mj...@apache.org

wrote:

Thanks for the KIP.

A general comment: it seem that we won't need any new
`allowedLateness`
parameter because the grace-period is defined on the window
itself
already?

(On the other hand, if I think about it once more, maybe the
`grace-period` is actually not a property of the window but
a
property
of the aggregation operator? _thinking_)

   From an API flow point of view, option 1 might not be
desirable
IMHO:

     stream
       .groupByKey()
       .windowBy(TimeWindow.ofSizeNoGrace(...))
       .emitFinal()
       .count()

The call to `emitFinal(0` seems not to be on the right place
for
this
case?


Option 2 might work (I think we need to discuss a few
details
of
the
API
though):

     stream
       .groupByKey()
       .windowBy(
         TimeWindow.ofSizeNoGrace(...),
         EmitConfig.emitFinal() -- just made this up; it's
not
in
the
KIP
       )
       .count()

I made up the `WindowConfig.emitFinal()` call -- from the
KIP
it's
unclear what API you have in mind? `EmitFinalConfig` has not
public
constructor not any builder method.


For option 3, I am not sure what you really have in mind.
Can
you
given
a concrete example (similar to above) how users would write
their
code?



Did you consider to actually pass in the `EmitConfig` into
the
aggregation operator? In the end, it seems not to be
property
of
the
window definition or windowing step, but a property of the
actual
operator:

     stream
       .groupByKey()
       .windowBy(
         TimeWindow.ofSizeNoGrace(...)
       )
       .count(EmitConfig.emitFinal())

The API surface area that need to be updated might be larger
for
this
case though...


-Matthias



On 3/14/22 9:21 PM, Hao Li wrote:
Thanks Guozhang!

1. I agree `EmitConfig` is better than `WindowConfig` and
option 2
modifies
less places. What do you think of option 1 which doesn't
change
the
current
`windowedBy` api but configures `EmitConfig` separately.
The
benefit
of
option 1 is if we need to configure something else later,
we
don't
need
to
pile them on `windowedBy` but can add separate APIs.
2. I added it to `Stores` mainly to conform to







https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java#L227-L231
.
But We can also create an internal API to do that without
modifying
`Stores`.

Hao

On Mon, Mar 14, 2022 at 7:52 PM Guozhang Wang <
wangg...@gmail.com>
wrote:

Hello Hao,

Thanks for the proposal, I have some preference among the
options
here
so I
will copy them here:

I'm now thinking if it's better to not add this new config
on
each
of
the
Window interfaces, but instead add that at the
KGroupedStream#windowedBy
function. Also instead of adding just a boolean flag,
maybe
we
can
add a
Configured class like Grouped, Suppressed, etc, e.g. let's
call
it a
Emitted which for now would just have a single construct
as
Emitted.atWindowClose whose semantics is the same as
emitFinal
==
true.
I
think the benefits are:

1) you do not need to modify multiple Window classes, but
just
overload
one
windowedBy function with a second param. This is less of a
scope
for
now,
and also more extensible for any future changes.

2) With a config interface, we maintain its extensibility
as
well
as
being
able to reuse this Emitted interface for other operators
if
we
wanted
to
expand to.

----------------------------

So in general I'm leaning towards option 2). For that,
some
more
detailed
comments:

1) If we want to reuse that config object for other
non-window
stateful
operations, I think naming it as `EmitConfig` is probably
better
than
`WindowConfig`.
2) I saw your PR (
https://github.com/apache/kafka/pull/11892)
that
you
are
also proposing to add new stores into the public factory
Stores,
but
it's
not included in the KIP. Is that intentional? Personally I
think
that
although we may eventually want to add a new store type to
the
public
APIs,
for this KIP maybe we do not have to add them but can
delay
for
later
after
we've learned the best way to layout. LMK what do you
think?



Guozhang



On Fri, Mar 11, 2022 at 2:13 PM Hao Li
<h...@confluent.io.invalid>
wrote:

Hi Dev team,

I'd like to start a discussion thread on Kafka Streams
KIP-825:









https://cwiki.apache.org/confluence/display/KAFKA/KIP-825%3A+introduce+a+new+API+to+control+when+aggregated+results+are+produced

This KIP is aimed to add new APIs to support outputting
final
aggregated
results for windowed aggregations. I listed several
options
there
and
I'm
looking forward to your feedback.

Thanks,
Hao



--
-- Guozhang









--
-- Guozhang



--
Thanks,
Hao



--
-- Guozhang



--
-- Guozhang



--
-- Guozhang



Reply via email to