Walker,

thanks for the updates. The KIP itself reads fine (of course Victoria made good comments about some phrases), but there is a couple of things from your latest reply I don't understand, and that I still think need some more discussions.

Lukas, asked about in-memory option and `WindowStoreSupplier` and you mention "semantic concerns". There should not be any semantic difference from the underlying buffer implementation, so I am not sure what you mean here (also the relationship to suppress() is unclear to me)? -- I am ok to not make it configurable for now. We can always do it via a follow up KIP, and keep interface changes limited for now.

Does it really make sense to allow a grace period if the table is non-versioned? You also say: "If table is not materialized it will materialize it as versioned." -- What history retention time would we pick for this case (also asked by Victoria)? Or should we rather not support this and force the user to materialize the table explicitly, and thus explicitly picking a history retention time? It's tradeoff between usability and guiding uses that there will be a significant impact on disk usage. There is also compatibility concerns: If the table is not explicitly materialized in the old program, we would already need to materialize it also in the old program (of course, we would use a non-versioned store so far). Thus, if somebody adds a grace period, we cannot just switch the store type, as it would be a breaking change, potentially required an application re-set, or following the upgrade path for versioned state stores, and also changing the program to explicitly materialize using a versioned store. Also note, that we might not materialize the actual join table, but only an upstream table, and use `ValueGetter` to access the upstream data.

To this end, as you already mentioned, history retention of the table should be at least grace period. You proposed to include this in a follow up KIP, but I am wondering if it's a fundamental requirement and thus we should put a check in place right away and reject an invalid configuration? (It always easier to lift restriction than to introduce them later.) This would also imply that a non-versioned table cannot be supported, because it does not have a history retention that is larger than grace period, and maybe also answer the requirement about materialization: as we already always materialize something on the tablet side as non-versioned store right now, it seems difficult to migrate the store to a versioned store. Ie, it might be ok to push the burden onto the user and say: if you start using grace period, you also need to manually switch from non-versioned to versioned KTables. Doing stuff automatically under the hood if very complex for this case, we if we push the burden onto the user, it might be ok to not complicate this KIP significantly.

To summarize the last two paragraphs, I would propose to:
 - don't support non-versioned KTables
- if grace period is added, users need to explicitly materialize the table as version (either directly, or upstream. Upstream only works if downstream tables "inherit" versioned semantics -- cf KIP-914) - the table's history retention time must be larger than the grace period (should be easy to check at runtime, when we build the topology) - because switching from non-versioned to version stores is not backward compatibly (cf KIP-914), users need to take care of this themselves, and this also implies that adding grace period is not a backward compatible change (even only if via indirect means)

About dropping late records: wondering if we should never drop a stream-side record for a left-join, even if it's late? In general, one thing I observed over the years is, that it's easier to keep stuff and let users filter explicitly downstream (or make it configurable), instead of dropping pro-actively, because users have no good way to resurrect record that got already dropped.

For ordering, sounds reasonable to me only start with one implementation, and maybe make it configurable as a follow up. However, I am wondering if starting with offset order might be the better option as it seems to align more with what we do so far? So instead of storing record ordered by timestamp, we can just store them ordered by offset, and still "poll" from the buffer based on the head records timestamp. Or would this complicate the implementation significantly?

I also think it's ok to not "sync" stream-time between the table and the stream in this KIP, but we should consider doing this as a follow up change (not sure if we would need a KIP or not for a change this this).

About increasing/decreasing grace period: what you describe make sense to me. If decreased, the next record would just trigger emitting a lot of records, and for increase, the buffer would just need to "fill up" again. For reprocessing getting a different result with a different grace period is expected, so that's ok IMHO. -- There seems to be one special corner case: grace period zero. For this case, we actually don't need any store, and the stream-side could be stateless. I think it can have the same behavior, but if we want to "add / remove" the store dynamically, we need to add specific code for it. For example, even if we start up with a grace period of zero, we would need to check if there is a local store, and still emit everything in it, before we can ditch the store (not sure if that's even easily done at all). Or: we would need to have a store for _all_ cases, even if grace period is zero (the store would be empty all the time though), to avoid super complex code?


-Matthias


On 5/25/23 10:53 AM, Lucas Brutschy wrote:
Hi Walker,

thanks for your responses. That makes sense. I guess there is always
the option to make the implementation more configurable later on, if
users request it. Also thanks for the clarifications. From my side,
the KIP is good to go.

Cheers,
Lucas

On Wed, May 24, 2023 at 11:54 PM Victoria Xia
<victoria....@confluent.io.invalid> wrote:

Thanks for the updates, Walker! Looks great, though I do have a couple
questions about the latest updates:

    1. The new example says that without stream-side buffering, "ex" and
    "fy" are possible join results. How could those join results happen? The
    example versioned table suggests that table record "x" has timestamp 2, and
    table record "y" has timestamp 3. If stream record "e" has timestamp 1,
    then it can never be joined against record "x", and similarly for stream
    record "f" with timestamp 2 being joined against "y".
    2. I see in your replies above that "If table is not materialized it
    will materialize it as versioned" but I don't see this called out in the
    KIP -- seems worth calling out. Also, what will the history retention for
    the versioned table be? Will it be the same as the join grace period, or
    will it be greater?


And some additional thoughts:

Sounds like there are a few things users should watch out for when enabling
the stream-side buffer:

    - Records will get "stuck" if there are no newer records to advance
    stream time.
    - If there are large gaps between the timestamps of stream-side records,
    then it's possible that versioned store history retention will have expired
    by the time a record is evicted from the join buffer, leading to a join
    "miss." For example, if the join grace period and table history retention
    are both 10, and records come in the order:

    table side t0 with ts=0
    stream side s1 with ts=1 <-- enters buffer
    table side t10 with ts=10
    table side t20 with ts=20
    stream side s21 with ts=21 <-- evicts record s1 from buffer, but
    versioned store no longer contains data for ts=1 due to history retention
    having elapsed

    This will result in the join result (s1, null) even though it should've
    been (s1, t0), due to t0 having been expired from the versioned store
    already.
    - Out-of-order records from the stream-side will be reordered, and late
    records will be dropped.

I don't think any of these are reasons to not go forward with this KIP, but
it'd be good to call them out in the eventual documentation to decrease the
chance users get tripped up.

We could maybe do an improvement later to advance stream time from table
side as well, but that might be debatable as we might get more late records.

Yes, the likelihood of late records increases but also the likelihood of
"join misses" due to versioned store history retention having elapsed
decreases, which feels important for certain use cases. Either way, agreed
that it can be a discussion for the future as incorporating this would
substantially complicate the implementation.

Also a couple nits:

    - The KIP currently says "We recently added versioned tables which allow
    the table side of the a join [...] but it is not taken advantage of in
    joins," but this doesn't seem true? If the table of a stream-table join is
    versioned, then the DSL's stream-table join processor will automatically
    perform timestamped lookups into the table, in order to take advantage of
    the new timestamp-aware store to provide better join semantics.
    - The KIP mentions "grace period" for versioned stores in a number of
    places but I think you actually mean "history retention"? The two happen to
    be the same today (it is not an option for users to configure the two
    separately) but this need not be true in the future. "History retention"
    governs how far back in time reads may occur, which is the relevant
    parameter for performing lookups as part of the stream-table join. "Grace
    period" in the context of versioned stores refers to how far back in time
    out-of-order writes may occur, which probably isn't directly relevant for
    introducing a stream-side buffer, though it's also possible I've overlooked
    something. (As a bonus, switching from "table grace period" in the KIP to
    "table history retention" also helps to clarify/distinguish that it's a
    different parameter from the "join grace period," which I could see being
    confusing to readers. :) )


Cheers,
Victoria

On Thu, May 18, 2023 at 1:43 PM Walker Carlson
<wcarl...@confluent.io.invalid> wrote:

Hey all,

Thanks for the comments, they gave me a lot to think about. I'll try to
address them all inorder. I have made some updates to the kip related to
them, but I mention where below.

Lucas

Good idea about the example. I added a simple one.

1) I have thought about including options for the underlying buffer
configuration. One of which might be adding an in memory option. My biggest
concern is about the semantic guarantees. This isn't like suppress or with
windows where producing incomplete results is repetitively harmless. Here
we would be possibly producing incorrect results. I also would like to keep
the interface changes as simple as I can. Making more than this change to
Joined I feel could make this more complicated than it needs to be. If we
really want to I could see adding a grace() option with a BufferConifg in
there or something, but I would rather not.

2) The buffer will be independent of if the table is versioned or not. If
table is not materialized it will materialize it as versioned. It might
make sense to do a follow up kip where we force the retention period  of
the versioned to be greater than whatever the max of the stream buffer is.

Victoria

1) Yes, records will exit in timestamp order not in offset order.
2) Late records will be dropped (Late as out of the grace period). From my
understanding that is the point of a grace period, no? Doesn't the same
thing happen with versioned stores?
3) The segment store already has an observed stream time, we advance based
on that. That should only advance based on records that enter the store. So
yes, only stream side records. We could maybe do an improvement later to
advance stream time from table side as well, but that might be debatable as
we might get more late records. Anyways I would rather have that as a
separate discussion.

in memory option? We can do that, for the buffer I plan to use the
TimeOrderedKeyValueBuffer interface which already has an in memory
implantation, so it would be simple.

I said more in my answer to Lucas's question. The concern I have with
buffer configs or in memory is complicating the interface. Also semantic
guarantees but in memory shouldn't effect that

Matthias

1) fixed out of order vs late terminology in the kip.

2) I was referring to having a stream. So after this kip we can have a
buffered stream or a normal one. For the table we can use a versioned table
or a normal table.

3 Good call out. I clarified this as "If the table side uses a materialized
version store, it can store multiple versions of each record within its
defined grace period." and modified the rest of the paragraph a bit.

4) I get the preserving off offset ordering, but if the stream is buffered
to join on timestamp instead of offset doesn't it already seem like we care
more about time in this case?

If we end up adding more options it might make sense to do this. Maybe
offset order processing can be a follow up?

I'll add a section for this in Rejected Alternatives. I think it makes
sense to do something like this but maybe in a follow up.

5) I hadn't thought about this. I suppose if they changed this in an
upgrade the next record would either evict a lot of records (if the grace
period decreased) or there would be a pause until the new grace period
reached. Increasing is a bit more problematic, especially if the table
grace period and retention time stays the same. If the data is reprocessed
after a change like that then there would be different results, but I feel
like that would be expected after such a change.

What do you think should happen?

Hopefully this answers your questions!

Walker

On Mon, May 8, 2023 at 11:32 AM Matthias J. Sax <mj...@apache.org> wrote:

Thanks for the KIP! Also some question/comments from my side:

10) Notation: you use the term "late data" but I think you mean
out-of-order. We reserve the term "late" to records that arrive after
grace period passed, and thus, "late == out-of-order data that is
dropped".


20) "There is only one option from the stream side and only recently is
there a second option on the table side."

What are those options? Victoria already asked about the table side, but
I am also not sure what option you mean for the stream side?


30) "If the table side uses a materialized version store the value is
the latest by stream time rather than by offset within its defined grace
period."

The phrase "the value is the latest by stream time" is confusing -- in
the end, a versioned stores multiple versions, not just one.


40) I am also wondering about ordering. In general, KS tries to preserve
offset-order during processing (with some exception, when offset order
preservation is not clearly defined). Given that the stream-side buffer
is really just a "linear buffer", we could easily preserve offset-order.
But I also see a benefit of re-ordering and emitting out-of-order data
right away when read (instead of blocking them behind in-order records
that are not ready yet). -- It might even be a possibility, to let users
pick a emit strategy eg "EmitStrategy.preserveOffsets" (name just a
placeholder).

The KIP should explain this in more detail and also discuss different
options and mention them in "Rejected alternatives" in case we don't
want to include them.


50) What happens when users change the grace period? Especially, when
they turn it on/off (but also increasing/decreasing is an interesting
point)? I think we should try to support this if possible; the
"Compatibility" section needs to cover switching on/off in more detail.


-Matthias




On 5/2/23 2:06 PM, Victoria Xia wrote:
Cool KIP, Walker! Thanks for sharing this proposal.

A few clarifications:

1. Is the order that records exit the buffer in necessarily the same as
the
order that records enter the buffer in, or no? Based on the description
in
the KIP, it sounds like the answer is no, i.e., records will exit the
buffer in increasing timestamp order, which means that they may be
ordered
(even for the same key) compared to the input order.

2. What happens if the join grace period is nonzero, and a stream-side
record arrives with a timestamp that is older than the current stream
time
minus the grace period? Will this record trigger a join result, or will
it
be dropped? Based on the description for what happens when the join
grace
period is set to zero, it sounds like the late record will be dropped,
even
if the join grace period is nonzero. Is that true?

3. What could cause stream time to advance, for purposes of removing
records from the join buffer? For example, will new records arriving on
the
table side of the join cause stream time to advance? From the KIP it
sounds
like only stream-side records will advance stream time -- does that
mean
that the join processor itself will have to track this stream time?

Also +1 to Lucas's question about what options will be available for
configuring the join buffer. Will users have the option to choose
whether
they want the buffer to be in-memory vs persistent?

- Victoria

On Fri, Apr 28, 2023 at 11:54 AM Lucas Brutschy
<lbruts...@confluent.io.invalid> wrote:

HI Walker,

thanks for the KIP! We definitely need this. I have two questions:

   - Have you considered allowing the customization of the underlying
buffer implementation? As I can see, `StreamJoined` lets you customize
the underlying store via a `WindowStoreSupplier`. Would it make sense
for `Joined` to have this as well? I can imagine one may want to limit
the number of records in the buffer, for example. If we hit the
maximum, the only option would be to drop semantic guarantees, but
users may still want to do this.
   - With "second option on the table side" you are referring to
versioned tables, right? Will the buffer on the stream side behave any
different whether the table side is versioned or not?

Finally, I think a simple example in the motivation section could help
non-experts understand the KIP.

Best,
Lucas

On Tue, Apr 25, 2023 at 9:13 PM Walker Carlson
<wcarl...@confluent.io.invalid> wrote:

Hello everybody,

I have a stream proposal to improve the stream table join by adding a
grace
period and buffer to the stream side of the join to allow processing
in
timestamp order matching the recent improvements of the versioned
tables.

Please take a look here <
https://cwiki.apache.org/confluence/x/lAs0Dw>
and
share your thoughts.

best,
Walker




Reply via email to