Thanks for the updates, Walker! Looks great, though I do have a couple
questions about the latest updates:

   1. The new example says that without stream-side buffering, "ex" and
   "fy" are possible join results. How could those join results happen? The
   example versioned table suggests that table record "x" has timestamp 2, and
   table record "y" has timestamp 3. If stream record "e" has timestamp 1,
   then it can never be joined against record "x", and similarly for stream
   record "f" with timestamp 2 being joined against "y".
   2. I see in your replies above that "If table is not materialized it
   will materialize it as versioned" but I don't see this called out in the
   KIP -- seems worth calling out. Also, what will the history retention for
   the versioned table be? Will it be the same as the join grace period, or
   will it be greater?


And some additional thoughts:

Sounds like there are a few things users should watch out for when enabling
the stream-side buffer:

   - Records will get "stuck" if there are no newer records to advance
   stream time.
   - If there are large gaps between the timestamps of stream-side records,
   then it's possible that versioned store history retention will have expired
   by the time a record is evicted from the join buffer, leading to a join
   "miss." For example, if the join grace period and table history retention
   are both 10, and records come in the order:

   table side t0 with ts=0
   stream side s1 with ts=1 <-- enters buffer
   table side t10 with ts=10
   table side t20 with ts=20
   stream side s21 with ts=21 <-- evicts record s1 from buffer, but
   versioned store no longer contains data for ts=1 due to history retention
   having elapsed

   This will result in the join result (s1, null) even though it should've
   been (s1, t0), due to t0 having been expired from the versioned store
   already.
   - Out-of-order records from the stream-side will be reordered, and late
   records will be dropped.

I don't think any of these are reasons to not go forward with this KIP, but
it'd be good to call them out in the eventual documentation to decrease the
chance users get tripped up.

> We could maybe do an improvement later to advance stream time from table
side as well, but that might be debatable as we might get more late records.

Yes, the likelihood of late records increases but also the likelihood of
"join misses" due to versioned store history retention having elapsed
decreases, which feels important for certain use cases. Either way, agreed
that it can be a discussion for the future as incorporating this would
substantially complicate the implementation.

Also a couple nits:

   - The KIP currently says "We recently added versioned tables which allow
   the table side of the a join [...] but it is not taken advantage of in
   joins," but this doesn't seem true? If the table of a stream-table join is
   versioned, then the DSL's stream-table join processor will automatically
   perform timestamped lookups into the table, in order to take advantage of
   the new timestamp-aware store to provide better join semantics.
   - The KIP mentions "grace period" for versioned stores in a number of
   places but I think you actually mean "history retention"? The two happen to
   be the same today (it is not an option for users to configure the two
   separately) but this need not be true in the future. "History retention"
   governs how far back in time reads may occur, which is the relevant
   parameter for performing lookups as part of the stream-table join. "Grace
   period" in the context of versioned stores refers to how far back in time
   out-of-order writes may occur, which probably isn't directly relevant for
   introducing a stream-side buffer, though it's also possible I've overlooked
   something. (As a bonus, switching from "table grace period" in the KIP to
   "table history retention" also helps to clarify/distinguish that it's a
   different parameter from the "join grace period," which I could see being
   confusing to readers. :) )


Cheers,
Victoria

On Thu, May 18, 2023 at 1:43 PM Walker Carlson
<wcarl...@confluent.io.invalid> wrote:

> Hey all,
>
> Thanks for the comments, they gave me a lot to think about. I'll try to
> address them all inorder. I have made some updates to the kip related to
> them, but I mention where below.
>
> Lucas
>
> Good idea about the example. I added a simple one.
>
> 1) I have thought about including options for the underlying buffer
> configuration. One of which might be adding an in memory option. My biggest
> concern is about the semantic guarantees. This isn't like suppress or with
> windows where producing incomplete results is repetitively harmless. Here
> we would be possibly producing incorrect results. I also would like to keep
> the interface changes as simple as I can. Making more than this change to
> Joined I feel could make this more complicated than it needs to be. If we
> really want to I could see adding a grace() option with a BufferConifg in
> there or something, but I would rather not.
>
> 2) The buffer will be independent of if the table is versioned or not. If
> table is not materialized it will materialize it as versioned. It might
> make sense to do a follow up kip where we force the retention period  of
> the versioned to be greater than whatever the max of the stream buffer is.
>
> Victoria
>
> 1) Yes, records will exit in timestamp order not in offset order.
> 2) Late records will be dropped (Late as out of the grace period). From my
> understanding that is the point of a grace period, no? Doesn't the same
> thing happen with versioned stores?
> 3) The segment store already has an observed stream time, we advance based
> on that. That should only advance based on records that enter the store. So
> yes, only stream side records. We could maybe do an improvement later to
> advance stream time from table side as well, but that might be debatable as
> we might get more late records. Anyways I would rather have that as a
> separate discussion.
>
> in memory option? We can do that, for the buffer I plan to use the
> TimeOrderedKeyValueBuffer interface which already has an in memory
> implantation, so it would be simple.
>
> I said more in my answer to Lucas's question. The concern I have with
> buffer configs or in memory is complicating the interface. Also semantic
> guarantees but in memory shouldn't effect that
>
> Matthias
>
> 1) fixed out of order vs late terminology in the kip.
>
> 2) I was referring to having a stream. So after this kip we can have a
> buffered stream or a normal one. For the table we can use a versioned table
> or a normal table.
>
> 3 Good call out. I clarified this as "If the table side uses a materialized
> version store, it can store multiple versions of each record within its
> defined grace period." and modified the rest of the paragraph a bit.
>
> 4) I get the preserving off offset ordering, but if the stream is buffered
> to join on timestamp instead of offset doesn't it already seem like we care
> more about time in this case?
>
> If we end up adding more options it might make sense to do this. Maybe
> offset order processing can be a follow up?
>
> I'll add a section for this in Rejected Alternatives. I think it makes
> sense to do something like this but maybe in a follow up.
>
> 5) I hadn't thought about this. I suppose if they changed this in an
> upgrade the next record would either evict a lot of records (if the grace
> period decreased) or there would be a pause until the new grace period
> reached. Increasing is a bit more problematic, especially if the table
> grace period and retention time stays the same. If the data is reprocessed
> after a change like that then there would be different results, but I feel
> like that would be expected after such a change.
>
> What do you think should happen?
>
> Hopefully this answers your questions!
>
> Walker
>
> On Mon, May 8, 2023 at 11:32 AM Matthias J. Sax <mj...@apache.org> wrote:
>
> > Thanks for the KIP! Also some question/comments from my side:
> >
> > 10) Notation: you use the term "late data" but I think you mean
> > out-of-order. We reserve the term "late" to records that arrive after
> > grace period passed, and thus, "late == out-of-order data that is
> dropped".
> >
> >
> > 20) "There is only one option from the stream side and only recently is
> > there a second option on the table side."
> >
> > What are those options? Victoria already asked about the table side, but
> > I am also not sure what option you mean for the stream side?
> >
> >
> > 30) "If the table side uses a materialized version store the value is
> > the latest by stream time rather than by offset within its defined grace
> > period."
> >
> > The phrase "the value is the latest by stream time" is confusing -- in
> > the end, a versioned stores multiple versions, not just one.
> >
> >
> > 40) I am also wondering about ordering. In general, KS tries to preserve
> > offset-order during processing (with some exception, when offset order
> > preservation is not clearly defined). Given that the stream-side buffer
> > is really just a "linear buffer", we could easily preserve offset-order.
> > But I also see a benefit of re-ordering and emitting out-of-order data
> > right away when read (instead of blocking them behind in-order records
> > that are not ready yet). -- It might even be a possibility, to let users
> > pick a emit strategy eg "EmitStrategy.preserveOffsets" (name just a
> > placeholder).
> >
> > The KIP should explain this in more detail and also discuss different
> > options and mention them in "Rejected alternatives" in case we don't
> > want to include them.
> >
> >
> > 50) What happens when users change the grace period? Especially, when
> > they turn it on/off (but also increasing/decreasing is an interesting
> > point)? I think we should try to support this if possible; the
> > "Compatibility" section needs to cover switching on/off in more detail.
> >
> >
> > -Matthias
> >
> >
> >
> >
> > On 5/2/23 2:06 PM, Victoria Xia wrote:
> > > Cool KIP, Walker! Thanks for sharing this proposal.
> > >
> > > A few clarifications:
> > >
> > > 1. Is the order that records exit the buffer in necessarily the same as
> > the
> > > order that records enter the buffer in, or no? Based on the description
> > in
> > > the KIP, it sounds like the answer is no, i.e., records will exit the
> > > buffer in increasing timestamp order, which means that they may be
> > ordered
> > > (even for the same key) compared to the input order.
> > >
> > > 2. What happens if the join grace period is nonzero, and a stream-side
> > > record arrives with a timestamp that is older than the current stream
> > time
> > > minus the grace period? Will this record trigger a join result, or will
> > it
> > > be dropped? Based on the description for what happens when the join
> grace
> > > period is set to zero, it sounds like the late record will be dropped,
> > even
> > > if the join grace period is nonzero. Is that true?
> > >
> > > 3. What could cause stream time to advance, for purposes of removing
> > > records from the join buffer? For example, will new records arriving on
> > the
> > > table side of the join cause stream time to advance? From the KIP it
> > sounds
> > > like only stream-side records will advance stream time -- does that
> mean
> > > that the join processor itself will have to track this stream time?
> > >
> > > Also +1 to Lucas's question about what options will be available for
> > > configuring the join buffer. Will users have the option to choose
> whether
> > > they want the buffer to be in-memory vs persistent?
> > >
> > > - Victoria
> > >
> > > On Fri, Apr 28, 2023 at 11:54 AM Lucas Brutschy
> > > <lbruts...@confluent.io.invalid> wrote:
> > >
> > >> HI Walker,
> > >>
> > >> thanks for the KIP! We definitely need this. I have two questions:
> > >>
> > >>   - Have you considered allowing the customization of the underlying
> > >> buffer implementation? As I can see, `StreamJoined` lets you customize
> > >> the underlying store via a `WindowStoreSupplier`. Would it make sense
> > >> for `Joined` to have this as well? I can imagine one may want to limit
> > >> the number of records in the buffer, for example. If we hit the
> > >> maximum, the only option would be to drop semantic guarantees, but
> > >> users may still want to do this.
> > >>   - With "second option on the table side" you are referring to
> > >> versioned tables, right? Will the buffer on the stream side behave any
> > >> different whether the table side is versioned or not?
> > >>
> > >> Finally, I think a simple example in the motivation section could help
> > >> non-experts understand the KIP.
> > >>
> > >> Best,
> > >> Lucas
> > >>
> > >> On Tue, Apr 25, 2023 at 9:13 PM Walker Carlson
> > >> <wcarl...@confluent.io.invalid> wrote:
> > >>>
> > >>> Hello everybody,
> > >>>
> > >>> I have a stream proposal to improve the stream table join by adding a
> > >> grace
> > >>> period and buffer to the stream side of the join to allow processing
> in
> > >>> timestamp order matching the recent improvements of the versioned
> > tables.
> > >>>
> > >>> Please take a look here <
> https://cwiki.apache.org/confluence/x/lAs0Dw>
> > >> and
> > >>> share your thoughts.
> > >>>
> > >>> best,
> > >>> Walker
> > >>
> > >
> >
>

Reply via email to