`allowedLateness` may not be a good name. What I have in mind is to use
this to control how frequently we try to emit final results. Maybe it's
more flexible to be used as config in properties as we don't need to
recompile DSL to change it.

I see; making it a config seems better. Frankly, I am not even sure if we need a config at all or if we can just hard code it? For the stream-stream join left/outer join fix, there is only an internal config but no public config either.

Option 1: Your proposal is?

  stream
    .groupByKey()
    .windowBy(TimeWindow.ofSizeNoGrace(...))
    .configure(EmitConfig.emitFinal()
    .count()

Does not change my argument that it seems to be misplace from an API flow POV.

Option 1 seems to be the least desirable to me.

For option 2 and 3, and not sure which one I like better. Might be good if other could chime in, too. I think I slightly prefer option 2 over option 3.


-Matthias

On 3/15/22 5:33 PM, Hao Li wrote:
Thanks for the feedback Matthias.

`allowedLateness` may not be a good name. What I have in mind is to use
this to control how frequently we try to emit final results. Maybe it's
more flexible to be used as config in properties as we don't need to
recompile DSL to change it.

For option 1, I intend to use `emitFinal` to configure how
`TimeWindowedKStream` should be outputted to `KTable` after aggregation.
But `emitFinal` is not an action to the `TimeWindowedKStream` interface.
Maybe adding `configure(EmitConfig config)` makes more sense?

For option 2, config can be created using `WindowConfig.emitFinal()` or
`EmitConfig.emitFinal`

For option 3, it will be something like `TimeWindows(..., EmitConfig
emitConfig)`.

For putting `EmitConfig` in aggregation operator, I think it doesn't
control how we do aggregation but how we output to `KTable`. That's why I
feel option 1 makes more sense as it applies to `TimeWindowedKStream`. But
I'm also OK with option 2.

Hao




On Tue, Mar 15, 2022 at 4:48 PM Matthias J. Sax <mj...@apache.org> wrote:

Thanks for the KIP.

A general comment: it seem that we won't need any new `allowedLateness`
parameter because the grace-period is defined on the window itself already?

(On the other hand, if I think about it once more, maybe the
`grace-period` is actually not a property of the window but a property
of the aggregation operator? _thinking_)

  From an API flow point of view, option 1 might not be desirable IMHO:

    stream
      .groupByKey()
      .windowBy(TimeWindow.ofSizeNoGrace(...))
      .emitFinal()
      .count()

The call to `emitFinal(0` seems not to be on the right place for this case?


Option 2 might work (I think we need to discuss a few details of the API
though):

    stream
      .groupByKey()
      .windowBy(
        TimeWindow.ofSizeNoGrace(...),
        EmitConfig.emitFinal() -- just made this up; it's not in the KIP
      )
      .count()

I made up the `WindowConfig.emitFinal()` call -- from the KIP it's
unclear what API you have in mind? `EmitFinalConfig` has not public
constructor not any builder method.


For option 3, I am not sure what you really have in mind. Can you given
a concrete example (similar to above) how users would write their code?



Did you consider to actually pass in the `EmitConfig` into the
aggregation operator? In the end, it seems not to be property of the
window definition or windowing step, but a property of the actual operator:

    stream
      .groupByKey()
      .windowBy(
        TimeWindow.ofSizeNoGrace(...)
      )
      .count(EmitConfig.emitFinal())

The API surface area that need to be updated might be larger for this
case though...


-Matthias



On 3/14/22 9:21 PM, Hao Li wrote:
Thanks Guozhang!

1. I agree `EmitConfig` is better than `WindowConfig` and option 2
modifies
less places. What do you think of option 1 which doesn't change the
current
`windowedBy` api but configures `EmitConfig` separately. The benefit of
option 1 is if we need to configure something else later, we don't need
to
pile them on `windowedBy` but can add separate APIs.
2. I added it to `Stores` mainly to conform to

https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java#L227-L231
.
But We can also create an internal API to do that without modifying
`Stores`.

Hao

On Mon, Mar 14, 2022 at 7:52 PM Guozhang Wang <wangg...@gmail.com>
wrote:

Hello Hao,

Thanks for the proposal, I have some preference among the options here
so I
will copy them here:

I'm now thinking if it's better to not add this new config on each of
the
Window interfaces, but instead add that at the KGroupedStream#windowedBy
function. Also instead of adding just a boolean flag, maybe we can add a
Configured class like Grouped, Suppressed, etc, e.g. let's call it a
Emitted which for now would just have a single construct as
Emitted.atWindowClose whose semantics is the same as emitFinal == true.
I
think the benefits are:

1) you do not need to modify multiple Window classes, but just overload
one
windowedBy function with a second param. This is less of a scope for
now,
and also more extensible for any future changes.

2) With a config interface, we maintain its extensibility as well as
being
able to reuse this Emitted interface for other operators if we wanted to
expand to.

----------------------------

So in general I'm leaning towards option 2). For that, some more
detailed
comments:

1) If we want to reuse that config object for other non-window stateful
operations, I think naming it as `EmitConfig` is probably better than
`WindowConfig`.
2) I saw your PR (https://github.com/apache/kafka/pull/11892) that you
are
also proposing to add new stores into the public factory Stores, but
it's
not included in the KIP. Is that intentional? Personally I think that
although we may eventually want to add a new store type to the public
APIs,
for this KIP maybe we do not have to add them but can delay for later
after
we've learned the best way to layout. LMK what do you think?



Guozhang



On Fri, Mar 11, 2022 at 2:13 PM Hao Li <h...@confluent.io.invalid>
wrote:

Hi Dev team,

I'd like to start a discussion thread on Kafka Streams KIP-825:



https://cwiki.apache.org/confluence/display/KAFKA/KIP-825%3A+introduce+a+new+API+to+control+when+aggregated+results+are+produced

This KIP is aimed to add new APIs to support outputting final
aggregated
results for windowed aggregations. I listed several options there and
I'm
looking forward to your feedback.

Thanks,
Hao



--
-- Guozhang






Reply via email to