Thanks for the KIP! Also some question/comments from my side:

10) Notation: you use the term "late data" but I think you mean out-of-order. We reserve the term "late" to records that arrive after grace period passed, and thus, "late == out-of-order data that is dropped".


20) "There is only one option from the stream side and only recently is there a second option on the table side."

What are those options? Victoria already asked about the table side, but I am also not sure what option you mean for the stream side?


30) "If the table side uses a materialized version store the value is the latest by stream time rather than by offset within its defined grace period."

The phrase "the value is the latest by stream time" is confusing -- in the end, a versioned stores multiple versions, not just one.


40) I am also wondering about ordering. In general, KS tries to preserve offset-order during processing (with some exception, when offset order preservation is not clearly defined). Given that the stream-side buffer is really just a "linear buffer", we could easily preserve offset-order. But I also see a benefit of re-ordering and emitting out-of-order data right away when read (instead of blocking them behind in-order records that are not ready yet). -- It might even be a possibility, to let users pick a emit strategy eg "EmitStrategy.preserveOffsets" (name just a placeholder).

The KIP should explain this in more detail and also discuss different options and mention them in "Rejected alternatives" in case we don't want to include them.


50) What happens when users change the grace period? Especially, when they turn it on/off (but also increasing/decreasing is an interesting point)? I think we should try to support this if possible; the "Compatibility" section needs to cover switching on/off in more detail.


-Matthias




On 5/2/23 2:06 PM, Victoria Xia wrote:
Cool KIP, Walker! Thanks for sharing this proposal.

A few clarifications:

1. Is the order that records exit the buffer in necessarily the same as the
order that records enter the buffer in, or no? Based on the description in
the KIP, it sounds like the answer is no, i.e., records will exit the
buffer in increasing timestamp order, which means that they may be ordered
(even for the same key) compared to the input order.

2. What happens if the join grace period is nonzero, and a stream-side
record arrives with a timestamp that is older than the current stream time
minus the grace period? Will this record trigger a join result, or will it
be dropped? Based on the description for what happens when the join grace
period is set to zero, it sounds like the late record will be dropped, even
if the join grace period is nonzero. Is that true?

3. What could cause stream time to advance, for purposes of removing
records from the join buffer? For example, will new records arriving on the
table side of the join cause stream time to advance? From the KIP it sounds
like only stream-side records will advance stream time -- does that mean
that the join processor itself will have to track this stream time?

Also +1 to Lucas's question about what options will be available for
configuring the join buffer. Will users have the option to choose whether
they want the buffer to be in-memory vs persistent?

- Victoria

On Fri, Apr 28, 2023 at 11:54 AM Lucas Brutschy
<lbruts...@confluent.io.invalid> wrote:

HI Walker,

thanks for the KIP! We definitely need this. I have two questions:

  - Have you considered allowing the customization of the underlying
buffer implementation? As I can see, `StreamJoined` lets you customize
the underlying store via a `WindowStoreSupplier`. Would it make sense
for `Joined` to have this as well? I can imagine one may want to limit
the number of records in the buffer, for example. If we hit the
maximum, the only option would be to drop semantic guarantees, but
users may still want to do this.
  - With "second option on the table side" you are referring to
versioned tables, right? Will the buffer on the stream side behave any
different whether the table side is versioned or not?

Finally, I think a simple example in the motivation section could help
non-experts understand the KIP.

Best,
Lucas

On Tue, Apr 25, 2023 at 9:13 PM Walker Carlson
<wcarl...@confluent.io.invalid> wrote:

Hello everybody,

I have a stream proposal to improve the stream table join by adding a
grace
period and buffer to the stream side of the join to allow processing in
timestamp order matching the recent improvements of the versioned tables.

Please take a look here <https://cwiki.apache.org/confluence/x/lAs0Dw>
and
share your thoughts.

best,
Walker


Reply via email to