Hi Hao,

I agree with Guozhang: Great summary! Thank you!

Regarding "aligned with other config class names", there is this DSL grammar John once specified
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+DSL+Grammar
and we have already used it in the code. I found the grammar quite useful.

I am undecided if option 1 is really worth it. What are actually the arguments in favor of it? Is it only that we do not need to overload other methods? This does not seem worth to break DSL principles. An alternative proposal would be to go with option 2 and conform with the grammar above:

<W extends Window> TimeWindowedKStream<K, V> windowedBy(final Windows<W> windows, WindowedByParameters parameters);

TimeWindowedKStream<K, V> windowedBy(final SlidingWindows windows, WindowedByParameters parameters);

SessionWindowedKStream<K, V> windowedBy(final SessionWindows windows, WindowedByParameters parameters);

This is similar to option 2 in the KIP, but it ensures that we put all future needed configs in the parameters object and we do not need to overload the methods anymore.

Then if we also get KAFKA-10298 done, we could even collapse all `windowedBy()` methods into one.

Best,
Bruno

On 22.03.22 22:31, Guozhang Wang wrote:
Thanks for the great summary Hao. I'm still learning towards option 2)
here, and I'm in favor of `trigger` as function name, and `Triggered` as
config class name (mainly to be aligned with other config class names).
Also want to see other's preferences between the options, as well as the
namings.


Guozhang



On Tue, Mar 22, 2022 at 12:23 PM Hao Li <h...@confluent.io.invalid> wrote:

`windowedStream.onWindowClose()` was the original option 1
(`windowedStream.emitFinal()`) but was rejected
because we could add more emit types and this will result in adding more
functions. I still prefer the
"windowedStream.someFunc(Controlled.onWindowClose)"
model since it's flexible and clear that it's configuring the emit policy.
Let me summarize all the naming options we have and compare:

*API function name:*

*1. `windowedStream.trigger()`*
     Pros:
        i. Simple
        ii. Similar to Flink's trigger function (is this a con actually?)
     Cons:
        i. `trigger()` can be confused with Flink trigger (raised by John)
        ii. `trigger()` feels like an operation instead of a configure
function (raised by Bruno)?

*2. `windowedStream.emitTrigger()`*
      Pros:
        i. Avoid confusion from Flink's trigger API
        ii. `emitTrigger` feels like configuring the trigger because
"trigger" here is a noun instead of verbose in `trigger()`
      Cons:
      i: Verbose?
     ii: Not consistent with `Suppressed.untilWindowClose`?


*Config class/object name:*

1. *`Emitted.onWindowClose()`* and *`Emitted.onEachUpdate()`*
      Cons:
      i. Doesn't go along with `trigger` (raised by Bruno)

2. *`Triggered.onWindowClose()`* and *`Triggered.onEachUpdate()`*

3. *`EmitTrigger.onWindowClose()`* and *`EmitTrigger.onEachUpdate()`*

4. *`(Emit|Trigger)(Config|Policy).onWindowClose()`* and
*`(Emit|Trigger)(Config|Policy).onEachUpdate()`*
      This is a combination of different names like: `EmitConfig`,
`EmitPolicy`, `TriggerConfig` and `TriggerPolicy`...


If we are settled with option 1), we can add new options to these names and
comment on their Pros and Cons.

Hao





On Tue, Mar 22, 2022 at 10:48 AM Guozhang Wang <wangg...@gmail.com> wrote:

I see what you mean now, and I think it's a fair point that composing
`trigger` and `emitted` seems awkward.

Re: data process operator v.s. control operator, I shared your concern as
well, and here's my train of thoughts: Having only data process operators
was my primary motivation for how we add the suppress operator --- it
indeed "suppresses" data. But as a hind-sight it's disadvantage is that,
for example in Suppressed.onWindowClose() should be only related to an
earlier windowedBy operator which is possibly very far from it in the
resulting DSL code. It's not only a bit awkward for users to write such
code, but also in such cases the DSL builder needs to maintain and
propagate this information to the suppress operator further down. So we
are
now thinking about "putting the control object as close as to where the
related processor really happens". And in that world my original
preference was somewhere in option 2), i.e. just put the control as a
param
of the related "windowedBy" operator, but the trade-off is we keep adding
overloaded functions to these operators. So after some back and forth
thoughts I'm learning towards relaxing our principles to only have
processing operators but no flow-control operators. That being said, if
you
have any ideas that we can have both world's benefits I'm all ears.

Re: using a direct function like "windowedStream.onWindowClose()" v.s.
"windowedStream.someFunc(Controlled.onWindowClose)", again my motivation
for the latter is for extensibility without adding more functions in the
future. If people feel this is not worthy we can do the first option as
well. If we just feel the `trigger` and `emitted` does not feel
composible
together, maybe we can consider something like
`windowedStream.trigger(Triggered.onWindowClose())"?

Re: windowedBy v.s. windowBy, yeah I do not really have a good reason why
we should use past term as well :P But if it's not bothering people much
I'd say we just keep it than deprecate/rename new APIs.


On Tue, Mar 22, 2022 at 9:42 AM Bruno Cadonna <cado...@apache.org>
wrote:

Hi Guozhang,

There is no semantic difference. It is a cosmetic difference.
Conceptually, I relate `Emitted` with the aggregation and not with
`trigger()` in the API flow, because the aggregation emits the result
not `trigger()`. Therefore, I proposed to not use `Emitted` as the name
of the config object passed to `trigger()`.


Best,
Bruno

On 22.03.22 17:24, Guozhang Wang wrote:
Hi Bruno,

Could you elaborate a bit more here, what's the semantic difference
between
"the aggregation is triggered on window close and all aggregation
results
are emitted." for trigger(TriggerParameters.onWindowClose()), and
"the
aggregation is configured to only emit final results." for
trigger(Emitted.onWindowClose())?

On Tue, Mar 22, 2022 at 4:19 AM Bruno Cadonna <cado...@apache.org>
wrote:

Hi Hao,

Thank you for the KIP!

Regarding option 1, I would not use `Emitted.onWindowClose()` since
that
does not seem compatible with the proposed flow. Conceptually, now
the
flow states that the aggregation is triggered on window close and
all
aggregation results are emitted. `Emitted` suggests that the
aggregation
is configured to only emit final results.

Thus, I propose the following:

stream
       .groupBy(..)
       .windowedBy(..)
       .trigger(TriggerParameters.onWindowClose())
       .aggregate(..) //result in a KTable<Windowed<..>>
       .mapValues(..)

An alternative to `trigger()` could be `schedule()`, but I do not
really
like it.

One thing I noticed with option 1 is that all other methods in the
example above are operations on data. `groupBy()` groups,
`windowedBy()`
partitions, `aggregate()` computes the aggregate, `mapValues()` maps
values, even `suppress()` suppresses intermediate results. But what
does
`trigger()` do? `trigger()` seems a config lost among operations.

However, if we do not want to restrict ourselves to only use methods
when we want to specify operations on data, I have the following
proposal:

stream
       .groupBy(..)
       .windowedBy(..)
       .onWindowClose()
       .aggregate(..) //result in a KTable<Windowed<..>>
       .mapValues(..)

Best,
Bruno

P.S.: Why is it `windowedBy()` and not `windowBy()`? All other
operations also use present tense.

On 22.03.22 06:36, Hao Li wrote:
Hi John,

Yes. For naming, `trigger` is similar to Flink's trigger, but it
has
a
different meaning in our case. `emit` sounds like an action to
emit?
How
about `emitTrigger`? I'm open to suggestions for the naming.

For deprecating `Suppressed.untilWindowClose`, I agree with
Guozhang
we
can
deprecate `Suppressed` config as a whole later. Or we can deprecate
`Suppressed.untilWindowClose` in later KIP after implementation of
emit
final is done.

BTW, isn't

stream
     .groupBy(..)
     .windowBy(..)
     .aggregate(..) //result in a KTable<Windowed<..>>
     .mapValues(..)
     .suppress(Suppressed.untilWindowClose) // since we can trace
back
to
parent node, to find a window definition

same as

stream
     .groupBy(..)
     .windowBy(..)
     .trigger(Emitted.onWindowClose)
     .aggregate(..) //result in a KTable<Windowed<..>>
     .mapValues(..)
?

Hao


On Mon, Mar 21, 2022 at 7:28 PM Guozhang Wang <wangg...@gmail.com>
wrote:

I think the following case is only doable via `suppress`:

stream
     .groupBy(..)
     .windowBy(..)
     .aggregate(..) //result in a KTable<Windowed<..>>
     .mapValues(..)
     .suppress(Suppressed.untilWindowClose) // since we can trace
back
to
parent node, to find a window definition


Guozhang


On Mon, Mar 21, 2022 at 6:36 PM John Roesler <vvcep...@apache.org

wrote:

Thanks, Guozhang!

To clarify, I was asking specifically about deprecating just the
method
‘untilWindowClose’. I might not be thinking clearly about it,
though.
What
does untilWindowClose do that this KIP doesn’t cover?

Thanks,
John

On Mon, Mar 21, 2022, at 20:31, Guozhang Wang wrote:
Just my 2c: Suppressed is in `suppress` whose application scope
is
much
larger and hence more flexible. I.e. it can be used anywhere
for a
`KTable`
(but internally we would check whether certain emit policies
like
`untilWindowClose` is valid or not), whereas `trigger` as for
now
is
only
applicable in XXWindowedKStream. So I think it would not be
completely
replacing Suppressed.untilWindowClose.

In the future, personally I'd still want to keep one control
object
still
for all emit policies, and maybe if we have extended Emitted for
other
emitting policies covered by Suppressed today, we can discuss if
we
could
have `KTable.suppress(Emitted..)` replacing
`KTable.suppress(Suppressed..)`
as a whole, but for this KIP I think it's too early.


Guozhang


On Mon, Mar 21, 2022 at 6:18 PM John Roesler <
vvcep...@apache.org

wrote:

Hi all,

Thanks for the Kip, Hao!

For what it’s worth, I’m also in favor of your latest framing
of
the
API,

I think the name is fine. I assume it’s inspired by Flink? It’s
not
identical to the concept of a trigger in Flink, which specifies
when
to
evaluate the window, which might be confusing to some people
who
have
deep
experience with Flink. Then again, it seems close enough that
it
should
be
clear to casual Flink users. For people with no other stream
processing
experience, it might seem a bit esoteric compared to something
self-documenting like ‘emit()’, but the docs should  make it
clear.

One small question: it seems like this proposal is identical to
Suppressed.untilWindowClose, and the KIP states that this API
is
superior.
In that case, should we deprecate Suppressed.untilWindowClose?

Thanks,
John

On Mon, Mar 21, 2022, at 19:30, Guozhang Wang wrote:
Hi Hao,

For 2), I think it's a good idea in general to use a separate
function on
the Time/SessionWindowedKStream itself, to achieve the same
effect
that,
for now, the emitting control is only for windowed
aggregations
as
in
this
KIP, than overloading existing functions. We can discuss
further
about
the
actual function names, whether others like the name `trigger`
or
not.
As
for myself I feel `trigger` is a good one but I'd like to see
if
others
have opinions as well.


Guozhang

On Mon, Mar 21, 2022 at 5:18 PM Hao Li
<h...@confluent.io.invalid

wrote:

Hi Guozhang,

Thanks for the feedback.

1. I agree to have an `Emitted` control class and two static
constructors
named `onWindowClose` and `onEachUpdate`.

2. For the API function changes, I'm thinking of adding a new
function
called `trigger` to `TimeWindowedKStream` and
`SessionWindowedKStream`.
It
takes `Emitted` config and returns the same stream. Example:

stream
     .groupBy(...)
     .windowedBy(...)
     .trigger(Emitted.onWindowClose). // N
     .count()

The benefits are:
     1. It's simple and avoids creating overloading of
existing
functions
like
`windowedBy` or `count`, `reduce` or `aggregate`. In fact, to
add
it
to
`aggregate` functions, we need to add it to all existing
`count`,
`aggregate` overloading functions which is a lot.
     2. It operates directly on windowed kstream and tells how
its
output
should be configured, if later we need to add this other type
of
streams,
we can reuse same `trigger` API whereas other type of
streams/tables
may
not have `aggregate`, `windowedby` api to make it consistent.

Hao


On Sat, Mar 19, 2022 at 5:40 PM Guozhang Wang <
wangg...@gmail.com>
wrote:

Hello Hao,

I'm preferring option 2 over the other options mainly
because
the
added
config object could potentially be used in other operators
as
well
(not
necessarily has to be a windowed operator and hence have to
be
piggy-backed
on `windowedBy`, and that's also why I suggested not naming
it
`WindowConfig` but just `EmitConfig`).

As for Matthias' question, I think the difference between
the
windowed
aggregate operator and the stream-stream join operator is
that,
for
the
latter we think emit-final should be the only right emitting
policy
and
hence we should not let users to configure it. If users
configure
it
to
e.g. emit eager they may get the old spurious emitting
behavior
which
is
violating the semantics.

For option 2) itself, I have a few more thoughts:

1. Thinking about Matthias' suggestions, I'm also leaning a
bit
towards adding the new param in the overloaded `aggregate`,
than
the
overloaded `windowBy` function. The reason is that the
emitting
logic
could
be either window based or non-window based, in the long run.
Though
for
this KIP we could just add it in
`XXXWindowedKStream.aggregate()`,
we
may
want to extend to other non-windowed operators in the
future.
2. To be consistent with other control class names, I feel
maybe
we
can
name it "Emitted", not "EmitConfig".
3. Following the first comment, I think we can have the
static
constructor
names as "onWindowClose" and "onEachUpdate".

The resulted code pattern would be like this:

      stream
        .groupBy(..)
        .windowBy(TimeWindow..)
        .count(Emitted.onWindowClose)

WDYT?


On Wed, Mar 16, 2022 at 12:07 PM Matthias J. Sax <
mj...@apache.org

wrote:

`allowedLateness` may not be a good name. What I have in
mind
is
to
use
this to control how frequently we try to emit final
results.
Maybe
it's
more flexible to be used as config in properties as we
don't
need to
recompile DSL to change it.

I see; making it a config seems better. Frankly, I am not
even
sure
if
we need a config at all or if we can just hard code it? For
the
stream-stream join left/outer join fix, there is only an
internal
config
but no public config either.

Option 1: Your proposal is?

      stream
        .groupByKey()
        .windowBy(TimeWindow.ofSizeNoGrace(...))
        .configure(EmitConfig.emitFinal()
        .count()

Does not change my argument that it seems to be misplace
from
an
API
flow POV.

Option 1 seems to be the least desirable to me.

For option 2 and 3, and not sure which one I like better.
Might
be
good
if other could chime in, too. I think I slightly prefer
option
2
over
option 3.


-Matthias

On 3/15/22 5:33 PM, Hao Li wrote:
Thanks for the feedback Matthias.

`allowedLateness` may not be a good name. What I have in
mind
is
to
use
this to control how frequently we try to emit final
results.
Maybe
it's
more flexible to be used as config in properties as we
don't
need
to
recompile DSL to change it.

For option 1, I intend to use `emitFinal` to configure how
`TimeWindowedKStream` should be outputted to `KTable`
after
aggregation.
But `emitFinal` is not an action to the
`TimeWindowedKStream`
interface.
Maybe adding `configure(EmitConfig config)` makes more
sense?

For option 2, config can be created using
`WindowConfig.emitFinal()`
or
`EmitConfig.emitFinal`

For option 3, it will be something like `TimeWindows(...,
EmitConfig
emitConfig)`.

For putting `EmitConfig` in aggregation operator, I think
it
doesn't
control how we do aggregation but how we output to
`KTable`.
That's
why I
feel option 1 makes more sense as it applies to
`TimeWindowedKStream`.
But
I'm also OK with option 2.

Hao




On Tue, Mar 15, 2022 at 4:48 PM Matthias J. Sax <
mj...@apache.org

wrote:

Thanks for the KIP.

A general comment: it seem that we won't need any new
`allowedLateness`
parameter because the grace-period is defined on the
window
itself
already?

(On the other hand, if I think about it once more, maybe
the
`grace-period` is actually not a property of the window
but
a
property
of the aggregation operator? _thinking_)

     From an API flow point of view, option 1 might not be
desirable
IMHO:

       stream
         .groupByKey()
         .windowBy(TimeWindow.ofSizeNoGrace(...))
         .emitFinal()
         .count()

The call to `emitFinal(0` seems not to be on the right
place
for
this
case?


Option 2 might work (I think we need to discuss a few
details
of
the
API
though):

       stream
         .groupByKey()
         .windowBy(
           TimeWindow.ofSizeNoGrace(...),
           EmitConfig.emitFinal() -- just made this up;
it's
not
in
the
KIP
         )
         .count()

I made up the `WindowConfig.emitFinal()` call -- from the
KIP
it's
unclear what API you have in mind? `EmitFinalConfig` has
not
public
constructor not any builder method.


For option 3, I am not sure what you really have in mind.
Can
you
given
a concrete example (similar to above) how users would
write
their
code?



Did you consider to actually pass in the `EmitConfig`
into
the
aggregation operator? In the end, it seems not to be
property
of
the
window definition or windowing step, but a property of
the
actual
operator:

       stream
         .groupByKey()
         .windowBy(
           TimeWindow.ofSizeNoGrace(...)
         )
         .count(EmitConfig.emitFinal())

The API surface area that need to be updated might be
larger
for
this
case though...


-Matthias



On 3/14/22 9:21 PM, Hao Li wrote:
Thanks Guozhang!

1. I agree `EmitConfig` is better than `WindowConfig`
and
option 2
modifies
less places. What do you think of option 1 which doesn't
change
the
current
`windowedBy` api but configures `EmitConfig` separately.
The
benefit
of
option 1 is if we need to configure something else
later,
we
don't
need
to
pile them on `windowedBy` but can add separate APIs.
2. I added it to `Stores` mainly to conform to











https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java#L227-L231
.
But We can also create an internal API to do that
without
modifying
`Stores`.

Hao

On Mon, Mar 14, 2022 at 7:52 PM Guozhang Wang <
wangg...@gmail.com>
wrote:

Hello Hao,

Thanks for the proposal, I have some preference among
the
options
here
so I
will copy them here:

I'm now thinking if it's better to not add this new
config
on
each
of
the
Window interfaces, but instead add that at the
KGroupedStream#windowedBy
function. Also instead of adding just a boolean flag,
maybe
we
can
add a
Configured class like Grouped, Suppressed, etc, e.g.
let's
call
it a
Emitted which for now would just have a single
construct
as
Emitted.atWindowClose whose semantics is the same as
emitFinal
==
true.
I
think the benefits are:

1) you do not need to modify multiple Window classes,
but
just
overload
one
windowedBy function with a second param. This is less
of
a
scope
for
now,
and also more extensible for any future changes.

2) With a config interface, we maintain its
extensibility
as
well
as
being
able to reuse this Emitted interface for other
operators
if
we
wanted
to
expand to.

----------------------------

So in general I'm leaning towards option 2). For that,
some
more
detailed
comments:

1) If we want to reuse that config object for other
non-window
stateful
operations, I think naming it as `EmitConfig` is
probably
better
than
`WindowConfig`.
2) I saw your PR (
https://github.com/apache/kafka/pull/11892)
that
you
are
also proposing to add new stores into the public
factory
Stores,
but
it's
not included in the KIP. Is that intentional?
Personally
I
think
that
although we may eventually want to add a new store type
to
the
public
APIs,
for this KIP maybe we do not have to add them but can
delay
for
later
after
we've learned the best way to layout. LMK what do you
think?



Guozhang



On Fri, Mar 11, 2022 at 2:13 PM Hao Li
<h...@confluent.io.invalid>
wrote:

Hi Dev team,

I'd like to start a discussion thread on Kafka Streams
KIP-825:













https://cwiki.apache.org/confluence/display/KAFKA/KIP-825%3A+introduce+a+new+API+to+control+when+aggregated+results+are+produced

This KIP is aimed to add new APIs to support
outputting
final
aggregated
results for windowed aggregations. I listed several
options
there
and
I'm
looking forward to your feedback.

Thanks,
Hao



--
-- Guozhang









--
-- Guozhang



--
Thanks,
Hao



--
-- Guozhang



--
-- Guozhang



--
-- Guozhang









--
-- Guozhang



--
Thanks,
Hao



Reply via email to