Hey all,

Thanks for the comments, they gave me a lot to think about. I'll try to
address them all inorder. I have made some updates to the kip related to
them, but I mention where below.

Lucas

Good idea about the example. I added a simple one.

1) I have thought about including options for the underlying buffer
configuration. One of which might be adding an in memory option. My biggest
concern is about the semantic guarantees. This isn't like suppress or with
windows where producing incomplete results is repetitively harmless. Here
we would be possibly producing incorrect results. I also would like to keep
the interface changes as simple as I can. Making more than this change to
Joined I feel could make this more complicated than it needs to be. If we
really want to I could see adding a grace() option with a BufferConifg in
there or something, but I would rather not.

2) The buffer will be independent of if the table is versioned or not. If
table is not materialized it will materialize it as versioned. It might
make sense to do a follow up kip where we force the retention period  of
the versioned to be greater than whatever the max of the stream buffer is.

Victoria

1) Yes, records will exit in timestamp order not in offset order.
2) Late records will be dropped (Late as out of the grace period). From my
understanding that is the point of a grace period, no? Doesn't the same
thing happen with versioned stores?
3) The segment store already has an observed stream time, we advance based
on that. That should only advance based on records that enter the store. So
yes, only stream side records. We could maybe do an improvement later to
advance stream time from table side as well, but that might be debatable as
we might get more late records. Anyways I would rather have that as a
separate discussion.

in memory option? We can do that, for the buffer I plan to use the
TimeOrderedKeyValueBuffer interface which already has an in memory
implantation, so it would be simple.

I said more in my answer to Lucas's question. The concern I have with
buffer configs or in memory is complicating the interface. Also semantic
guarantees but in memory shouldn't effect that

Matthias

1) fixed out of order vs late terminology in the kip.

2) I was referring to having a stream. So after this kip we can have a
buffered stream or a normal one. For the table we can use a versioned table
or a normal table.

3 Good call out. I clarified this as "If the table side uses a materialized
version store, it can store multiple versions of each record within its
defined grace period." and modified the rest of the paragraph a bit.

4) I get the preserving off offset ordering, but if the stream is buffered
to join on timestamp instead of offset doesn't it already seem like we care
more about time in this case?

If we end up adding more options it might make sense to do this. Maybe
offset order processing can be a follow up?

I'll add a section for this in Rejected Alternatives. I think it makes
sense to do something like this but maybe in a follow up.

5) I hadn't thought about this. I suppose if they changed this in an
upgrade the next record would either evict a lot of records (if the grace
period decreased) or there would be a pause until the new grace period
reached. Increasing is a bit more problematic, especially if the table
grace period and retention time stays the same. If the data is reprocessed
after a change like that then there would be different results, but I feel
like that would be expected after such a change.

What do you think should happen?

Hopefully this answers your questions!

Walker

On Mon, May 8, 2023 at 11:32 AM Matthias J. Sax <mj...@apache.org> wrote:

> Thanks for the KIP! Also some question/comments from my side:
>
> 10) Notation: you use the term "late data" but I think you mean
> out-of-order. We reserve the term "late" to records that arrive after
> grace period passed, and thus, "late == out-of-order data that is dropped".
>
>
> 20) "There is only one option from the stream side and only recently is
> there a second option on the table side."
>
> What are those options? Victoria already asked about the table side, but
> I am also not sure what option you mean for the stream side?
>
>
> 30) "If the table side uses a materialized version store the value is
> the latest by stream time rather than by offset within its defined grace
> period."
>
> The phrase "the value is the latest by stream time" is confusing -- in
> the end, a versioned stores multiple versions, not just one.
>
>
> 40) I am also wondering about ordering. In general, KS tries to preserve
> offset-order during processing (with some exception, when offset order
> preservation is not clearly defined). Given that the stream-side buffer
> is really just a "linear buffer", we could easily preserve offset-order.
> But I also see a benefit of re-ordering and emitting out-of-order data
> right away when read (instead of blocking them behind in-order records
> that are not ready yet). -- It might even be a possibility, to let users
> pick a emit strategy eg "EmitStrategy.preserveOffsets" (name just a
> placeholder).
>
> The KIP should explain this in more detail and also discuss different
> options and mention them in "Rejected alternatives" in case we don't
> want to include them.
>
>
> 50) What happens when users change the grace period? Especially, when
> they turn it on/off (but also increasing/decreasing is an interesting
> point)? I think we should try to support this if possible; the
> "Compatibility" section needs to cover switching on/off in more detail.
>
>
> -Matthias
>
>
>
>
> On 5/2/23 2:06 PM, Victoria Xia wrote:
> > Cool KIP, Walker! Thanks for sharing this proposal.
> >
> > A few clarifications:
> >
> > 1. Is the order that records exit the buffer in necessarily the same as
> the
> > order that records enter the buffer in, or no? Based on the description
> in
> > the KIP, it sounds like the answer is no, i.e., records will exit the
> > buffer in increasing timestamp order, which means that they may be
> ordered
> > (even for the same key) compared to the input order.
> >
> > 2. What happens if the join grace period is nonzero, and a stream-side
> > record arrives with a timestamp that is older than the current stream
> time
> > minus the grace period? Will this record trigger a join result, or will
> it
> > be dropped? Based on the description for what happens when the join grace
> > period is set to zero, it sounds like the late record will be dropped,
> even
> > if the join grace period is nonzero. Is that true?
> >
> > 3. What could cause stream time to advance, for purposes of removing
> > records from the join buffer? For example, will new records arriving on
> the
> > table side of the join cause stream time to advance? From the KIP it
> sounds
> > like only stream-side records will advance stream time -- does that mean
> > that the join processor itself will have to track this stream time?
> >
> > Also +1 to Lucas's question about what options will be available for
> > configuring the join buffer. Will users have the option to choose whether
> > they want the buffer to be in-memory vs persistent?
> >
> > - Victoria
> >
> > On Fri, Apr 28, 2023 at 11:54 AM Lucas Brutschy
> > <lbruts...@confluent.io.invalid> wrote:
> >
> >> HI Walker,
> >>
> >> thanks for the KIP! We definitely need this. I have two questions:
> >>
> >>   - Have you considered allowing the customization of the underlying
> >> buffer implementation? As I can see, `StreamJoined` lets you customize
> >> the underlying store via a `WindowStoreSupplier`. Would it make sense
> >> for `Joined` to have this as well? I can imagine one may want to limit
> >> the number of records in the buffer, for example. If we hit the
> >> maximum, the only option would be to drop semantic guarantees, but
> >> users may still want to do this.
> >>   - With "second option on the table side" you are referring to
> >> versioned tables, right? Will the buffer on the stream side behave any
> >> different whether the table side is versioned or not?
> >>
> >> Finally, I think a simple example in the motivation section could help
> >> non-experts understand the KIP.
> >>
> >> Best,
> >> Lucas
> >>
> >> On Tue, Apr 25, 2023 at 9:13 PM Walker Carlson
> >> <wcarl...@confluent.io.invalid> wrote:
> >>>
> >>> Hello everybody,
> >>>
> >>> I have a stream proposal to improve the stream table join by adding a
> >> grace
> >>> period and buffer to the stream side of the join to allow processing in
> >>> timestamp order matching the recent improvements of the versioned
> tables.
> >>>
> >>> Please take a look here <https://cwiki.apache.org/confluence/x/lAs0Dw>
> >> and
> >>> share your thoughts.
> >>>
> >>> best,
> >>> Walker
> >>
> >
>

Reply via email to