Good Point Victoria. I just removed the compacted topic mention from the KIP. I agree with Burno about using a normal topic and deleting records that have been processed.
On Tue, Jun 6, 2023 at 2:28 AM Bruno Cadonna <cado...@apache.org> wrote: > Hi, > > another idea that came to my mind. Instead of using a compacted topic, > the buffer could use a non-compacted topic and regularly delete records > before a given offset as Streams does for repartition topics. > > Best, > Bruno > > On 05.06.23 21:48, Bruno Cadonna wrote: > > Hi Victoria, > > > > that is a good point! > > > > I think, the topic needs to be a compacted topic to be able to get rid > > of records that are evicted from the buffer. So the key might be > > something with the key, the timestamp, and a sequence number to > > distinguish between records with the same key and same timestamp. > > > > Just an idea! Maybe Walker comes up with something better. > > > > Best, > > Bruno > > > > On 05.06.23 20:38, Victoria Xia wrote: > >> Hi Walker, > >> > >> Thanks for the latest updates! The KIP looks great. Just one question > >> about > >> the changelog topic for the join buffer: The KIP says "When a failure > >> occurs the buffer will try to recover from an OffsetCheckpoint if > >> possible. > >> If not it will reload the buffer from a compacted change-log topic." > This > >> is a new changelog topic that will be introduced specifically for the > >> join > >> buffer, right? Why is the changelog topic compacted? What are the keys? > I > >> am confused because the buffer contains records from the stream-side > >> of the > >> join, for which multiple records with the same key should be treated as > >> separate updates will all must be tracked in the buffer, rather than > >> updates which replace each other. > >> > >> Thanks, > >> Victoria > >> > >> On Mon, Jun 5, 2023 at 1:47 AM Bruno Cadonna <cado...@apache.org> > wrote: > >> > >>> Hi Walker, > >>> > >>> Thanks once more for the updates to the KIP! > >>> > >>> Do you also plan to expose metrics for the buffer? > >>> > >>> Best, > >>> Bruno > >>> > >>> On 02.06.23 17:16, Walker Carlson wrote: > >>>> Hello Bruno, > >>>> > >>>> I think this covers your questions. Let me know what you think > >>>> > >>>> 2. > >>>> We can use a changelog topic. I think we can treat it like any other > >>> store > >>>> and recover in the usual manner. Also implementation is on disk > >>>> > >>>> 3. > >>>> The description is in the public interfaces description. I will copy > it > >>>> into the proposed changes as well. > >>>> > >>>> This is a bit of an implementation detail that I didn't want to add > >>>> into > >>>> the kip, but the record will be added to the buffer to keep the stream > >>> time > >>>> consistent, it will just be ejected immediately. If of course if this > >>>> causes performance issues we will skip this step and track stream time > >>>> separately. I will update the kip to say that stream time advances > >>>> when a > >>>> stream record enters the node. > >>>> > >>>> Also, yes, updated. > >>>> > >>>> 5. > >>>> No there is no difference right now, everything gets processed as it > >>> comes > >>>> in and tries to find a record for its time stamp. > >>>> > >>>> Walker > >>>> > >>>> On Fri, Jun 2, 2023 at 6:41 AM Bruno Cadonna <cado...@apache.org> > >>>> wrote: > >>>> > >>>>> Hi Walker, > >>>>> > >>>>> Thanks for the updates! > >>>>> > >>>>> 2. > >>>>> It is still not clear to me how a failure is handled. I do not > >>>>> understand what you mean by "recover from an OffsetCheckpoint". > >>>>> > >>>>> My understanding is that the buffer needs to be replicated into its > >>>>> own > >>>>> Kafka topic. The input topic is not enough. The offset of a record is > >>>>> added to the offsets to commit once the record is streamed through > the > >>>>> subtopology. That means once the record is added to the buffer its > >>>>> offset is added to the offsets to commit -- independently of > >>>>> whether the > >>>>> record was evicted from the buffer and sent to the join node or not. > >>>>> Now, let's assume the following scenario > >>>>> 1. a record is read from the input topic and added to the buffer, but > >>>>> not evicted to be processed by the join node. > >>>>> 2. When the processing of the subtopology finishes the offset of the > >>>>> record is added to the offsets to commit. > >>>>> 3. A commit happens. > >>>>> 4. A failure happens > >>>>> > >>>>> After the failure the buffer is empty but the record will not be read > >>>>> anymore from the input topic since its offset has been already > >>>>> committed. The record is lost. > >>>>> One solution to avoid the loss is to recreate the buffer from a > >>>>> compacted Kafka topic as we do for suppression buffers. I do not > >>>>> think, > >>>>> we need any offset checkpoint here since, we keep the buffer in > >>>>> memory, > >>>>> right? Or do you plan to back the buffer with a persistent store? > Even > >>>>> in that case, a compacted Kafka topic would be needed. > >>>>> > >>>>> > >>>>> 3. > >>>>> From the KIP it is still not clear to me what happens if a > >>>>> record is > >>>>> outside of the grace period. I guess the record that falls outside of > >>>>> the grace period will not be added to the buffer, but will be send to > >>>>> the join node. Since it is outside of the grace period it will also > >>>>> not > >>>>> increase stream time and it will not trigger an eviction. Also the > >>>>> head > >>>>> of the buffer will not contain a record that needs to be evicted > since > >>>>> the the timestamp of the head record will be within the interval > >>>>> stream > >>>>> time minus grace period. Is this correct? Please add such a > >>>>> description > >>>>> to the KIP. > >>>>> Furthermore, I think there is a mistake in the text: > >>>>> "... will dequeue when the record timestamp is greater than stream > >>>>> time > >>>>> plus the grace period". I guess that should be "... will dequeue when > >>>>> the record timestamp is less than (or equal?) stream time minus the > >>>>> grace period" > >>>>> > >>>>> > >>>>> 5. > >>>>> What is the difference between not setting the grace period and > >>>>> setting > >>>>> it to zero? If there is a difference, why is there a difference? > >>>>> > >>>>> > >>>>> Best, > >>>>> Bruno > >>>>> > >>>>> > >>>>> On 01.06.23 23:58, Walker Carlson wrote: > >>>>>> Hey Bruno thanks for the feedback. > >>>>>> > >>>>>> 1) > >>>>>> I will add this to the kip, but stream time only advances as the > when > >>> the > >>>>>> buffer receives a new record. > >>>>>> > >>>>>> 2) > >>>>>> You are correct, I will add a failure section on to the kip. Since > >>>>>> the > >>>>>> records wont change in the buffer from when they are read from the > >>> topic > >>>>>> they are replicated already. > >>>>>> > >>>>>> 3) > >>>>>> I see that I'm out voted on the dropping of records thing. We will > >>>>>> pass > >>>>>> them on and try to join them if possible. This might cause some null > >>>>>> results, but increasing the table history retention should help > that. > >>>>>> > >>>>>> 4) > >>>>>> I can add some on the kip. But its pretty directly adding whatever > >>>>>> the > >>>>>> grace period is to the latency. I don't see a way around it. > >>>>>> > >>>>>> Walker > >>>>>> > >>>>>> On Thu, Jun 1, 2023 at 5:23 AM Bruno Cadonna <cado...@apache.org> > >>> wrote: > >>>>>> > >>>>>>> Hi Walker, > >>>>>>> > >>>>>>> thanks for the KIP! > >>>>>>> > >>>>>>> Here my feedback: > >>>>>>> > >>>>>>> 1. > >>>>>>> It is still not clear to me when stream time for the buffer > >>>>>>> advances. > >>>>>>> What is the event that let the stream time advance? In the > >>> discussion, I > >>>>>>> do not understand what you mean by "The segment store already has > an > >>>>>>> observed stream time, we advance based on that. That should only > >>> advance > >>>>>>> based on records that enter the store." Where does this segment > >>>>>>> store > >>>>>>> come from? Anyways, I think it would be great to also state how > >>>>>>> stream > >>>>>>> time advances in the KIP. > >>>>>>> > >>>>>>> 2. > >>>>>>> How does the buffer behave in case of a failure? I think I > >>>>>>> understand > >>>>>>> that the buffer will use an implementation of > >>> TimeOrderedKeyValueBuffer > >>>>>>> and therefore the records in the buffer will be replicated to a > >>>>>>> topic > >>> in > >>>>>>> Kafka, but I am not completely sure. Could you elaborate on this in > >>> the > >>>>>>> KIP? > >>>>>>> > >>>>>>> 3. > >>>>>>> I agree with Matthias about dropping late records. We use grace > >>> periods > >>>>>>> in scenarios where we records are grouped like in windowed > >>> aggregations > >>>>>>> and windowed joins. The stream buffer you propose does not really > >>> group > >>>>>>> any records. It rather delays records and reorders them. I am not > >>>>>>> sure > >>>>>>> if grace period is the right naming/concept to apply here. > >>>>>>> Instead of > >>>>>>> dropping records that fall outside of the buffer's time interval > the > >>>>>>> join should skip the buffer and try to join the record > >>>>>>> immediately. In > >>>>>>> the end, a stream-table join is a unwindowed join, i.e., no > grouping > >>> is > >>>>>>> applied to the records. > >>>>>>> What do you and other folks think about this proposal? > >>>>>>> > >>>>>>> 4. > >>>>>>> How does the proposed buffer, affects processing latency? Could you > >>>>>>> please add some words about this to the KIP? > >>>>>>> > >>>>>>> > >>>>>>> Best, > >>>>>>> Bruno > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> On 31.05.23 01:49, Walker Carlson wrote: > >>>>>>>> Thanks for all the additional comments. I will either address them > >>> here > >>>>>>> or > >>>>>>>> update the kip accordingly. > >>>>>>>> > >>>>>>>> > >>>>>>>> I mentioned a follow kip to add extra features before and in the > >>>>>>> responses. > >>>>>>>> I will try to briefly summarize what options and optimizations I > >>>>>>>> plan > >>>>> to > >>>>>>>> include. If a concern is not covered in this list I for sure talk > >>> about > >>>>>>> it > >>>>>>>> below. > >>>>>>>> > >>>>>>>> * Allowing non versioned tables to still use the stream buffer > >>>>>>>> * Automatically materializing tables instead of forcing the user > to > >>> do > >>>>> it > >>>>>>>> * Configurable for in memory buffer > >>>>>>>> * Order the records in offset order or in time order > >>>>>>>> * Non memory use buffer (offset order, delayed pull from stream.) > >>>>>>>> * Time synced between stream and table side (maybe) > >>>>>>>> * Do not drop late records and process them as they come in > >>>>>>>> instead. > >>>>>>>> > >>>>>>>> > >>>>>>>> First, Victoria. > >>>>>>>> > >>>>>>>> 1) (One of your nits covers this, but you are correct it doesn't > >>>>>>>> make > >>>>>>>> sense. so I removed that part of the example.) > >>>>>>>> For those examples with the "bad" join results I said without > >>> buffering > >>>>>>> the > >>>>>>>> stream it would look like that, but that was incomplete. If the > >>>>>>>> look > >>> up > >>>>>>> was > >>>>>>>> simply looking at the latest version of the table when the stream > >>>>> records > >>>>>>>> came in then the results were possible. If we are using the > >>>>>>>> point in > >>>>> time > >>>>>>>> lookup that versioned tables let us then you are correct the > future > >>>>>>> results > >>>>>>>> are not possible. > >>>>>>>> > >>>>>>>> 2) I'll get to this later as Matthias brought up something > related. > >>>>>>>> > >>>>>>>> To your additional thoughts, I agree that we need to call those > >>> things > >>>>>>> out > >>>>>>>> in the documentation. I'm writing up a follow up kip with a lot of > >>> the > >>>>>>>> ideas we have discussed so that we can improve this feature beyond > >>> the > >>>>>>> base > >>>>>>>> implementation if it's needed. > >>>>>>>> > >>>>>>>> I addressed the nits in the kip. I somehow missed the table stream > >>>>> table > >>>>>>>> join processor improvement, it makes your first question make a > lot > >>>>> more > >>>>>>>> sense. Table history retention is a much cleaner way to > >>>>>>>> describe it. > >>>>>>>> > >>>>>>>> As to your mention of the syncing the time for the table and > >>>>>>>> stream. > >>>>>>>> Matthias mentioned that as well. I will address both here. I > >>>>>>>> plan to > >>>>>>> bring > >>>>>>>> that up in the future, but for now we will leave it out. I > >>>>>>>> suppose it > >>>>>>> will > >>>>>>>> be more useful after the table history retention is separable from > >>> the > >>>>>>>> table grace period. > >>>>>>>> > >>>>>>>> > >>>>>>>> To address Matthias comments. > >>>>>>>> > >>>>>>>> You are correct by saying the in memory store shouldn't cause any > >>>>>>> semantic > >>>>>>>> concerns. My concern would be more with if we limited the number > of > >>>>>>> records > >>>>>>>> on the buffer and what we would do if we hit said limits, > (emitting > >>>>> those > >>>>>>>> records might be an issue, throwing an error and halting would > >>>>>>>> not). > >>> I > >>>>>>>> think we can leave this discussion to the follow up kip along > >>>>>>>> with a > >>>>> few > >>>>>>>> other options. > >>>>>>>> > >>>>>>>> I will go through your proposals now. > >>>>>>>> > >>>>>>>> - don't support non-versioned KTables > >>>>>>>> > >>>>>>>> Sure, we can always expand this later on. Will include as part > >>>>>>>> of the > >>>>> of > >>>>>>>> the improvement kip > >>>>>>>> > >>>>>>>> - if grace period is added, users need to explicitly > >>>>>>>> materialize > >>>>> the > >>>>>>>> table as version (either directly, or upstream. Upstream only > works > >>> if > >>>>>>>> downstream tables "inherit" versioned semantics -- cf KIP-914) > >>>>>>>> > >>>>>>>> again, that works for me for now, if we find a use we can always > >>>>>>>> add > >>>>>>> later. > >>>>>>>> > >>>>>>>> - the table's history retention time must be larger than the > >>> grace > >>>>>>>> period (should be easy to check at runtime, when we build the > >>> topology) > >>>>>>>> > >>>>>>>> agreed > >>>>>>>> > >>>>>>>> - because switching from non-versioned to version stores > >>>>>>>> is not > >>>>>>>> backward compatibly (cf KIP-914), users need to take care of this > >>>>>>>> themselves, and this also implies that adding grace period is not > a > >>>>>>>> backward compatible change (even only if via indirect means) > >>>>>>>> > >>>>>>>> sure, this works > >>>>>>>> > >>>>>>>> As to the dropping of late records, I'm not sure. One one hand I > >>>>>>>> like > >>>>> not > >>>>>>>> dropping things. But on the other I struggle to see how a user can > >>>>> filter > >>>>>>>> out late records that might have incomplete join results. The > point > >>> in > >>>>>>> time > >>>>>>>> look up will aggressively expire old data and if new data has been > >>>>>>> replaced > >>>>>>>> it will return null if outside of the retention. This seems like > it > >>>>> could > >>>>>>>> corrupt the integrity of the join output. Seeing that we drop late > >>>>>>> records > >>>>>>>> on the table side as well I would think it makes sense to drop > late > >>>>>>> records > >>>>>>>> on the stream buffer. I could be convinced otherwise I suppose, I > >>> could > >>>>>>> see > >>>>>>>> adding this as an option in a follow up kip. It would be very > >>>>>>>> easy to > >>>>>>>> implement either way. For now unless no one else objects I'm > >>>>>>>> going to > >>>>>>> stick > >>>>>>>> with dropping the records for the sake of getting this kip > >>>>>>>> passed. It > >>>>> is > >>>>>>>> functionally a small change to make and we can update later if you > >>> feel > >>>>>>>> strongly about it. > >>>>>>>> > >>>>>>>> For the ordering. I have to say that it would be more > >>>>>>>> complicated to > >>>>>>>> implement it to be in offset order, if the goal it to get as > >>>>>>>> many of > >>>>> the > >>>>>>>> records validly joined as possible. Because we would process as > >>> things > >>>>>>> left > >>>>>>>> the buffer a sufficiency early enough record could hold up records > >>> that > >>>>>>>> would otherwise be valid past the table history retention. To fix > >>> this > >>>>> we > >>>>>>>> could process by timestamp then store in a second queue and emit > by > >>>>>>> offset, > >>>>>>>> but that would be a lot more complicated. If we didn't care > >>>>>>>> about not > >>>>>>>> missing some valid joins we could just have no store and pull from > >>> the > >>>>>>>> topic at a delay only caring about the timestamp of the next > >>>>>>>> offset. > >>>>> For > >>>>>>>> now I want to stick with the timestamp ordering as it makes much > >>>>>>>> more > >>>>>>> sense > >>>>>>>> to me, but would propose we add both of the other options I have > >>>>>>>> laid > >>>>> out > >>>>>>>> here in the follow up kip. > >>>>>>>> > >>>>>>>> Lastly, I think having an empty store with zero grace period > >>>>>>>> would be > >>>>>>> super > >>>>>>>> simple and not costly, so we might as well make it even if nothing > >>> gets > >>>>>>>> entered. > >>>>>>>> > >>>>>>>> I hope that address all your concerns, > >>>>>>>> > >>>>>>>> Walker > >>>>>>>> > >>>>>>>> On Thu, May 25, 2023 at 9:50 AM Matthias J. Sax <mj...@apache.org > > > >>>>>>> wrote: > >>>>>>>> > >>>>>>>>> Walker, > >>>>>>>>> > >>>>>>>>> thanks for the updates. The KIP itself reads fine (of course > >>> Victoria > >>>>>>>>> made good comments about some phrases), but there is a couple of > >>>>> things > >>>>>>>>> from your latest reply I don't understand, and that I still think > >>> need > >>>>>>>>> some more discussions. > >>>>>>>>> > >>>>>>>>> Lukas, asked about in-memory option and `WindowStoreSupplier` and > >>> you > >>>>>>>>> mention "semantic concerns". There should not be any semantic > >>>>> difference > >>>>>>>>> from the underlying buffer implementation, so I am not sure > >>>>>>>>> what you > >>>>>>>>> mean here (also the relationship to suppress() is unclear to me)? > >>> -- I > >>>>>>>>> am ok to not make it configurable for now. We can always do it > >>>>>>>>> via a > >>>>>>>>> follow up KIP, and keep interface changes limited for now. > >>>>>>>>> > >>>>>>>>> Does it really make sense to allow a grace period if the table is > >>>>>>>>> non-versioned? You also say: "If table is not materialized it > will > >>>>>>>>> materialize it as versioned." -- What history retention time > would > >>> we > >>>>>>>>> pick for this case (also asked by Victoria)? Or should we > >>>>>>>>> rather not > >>>>>>>>> support this and force the user to materialize the table > >>>>>>>>> explicitly, > >>>>> and > >>>>>>>>> thus explicitly picking a history retention time? It's tradeoff > >>>>> between > >>>>>>>>> usability and guiding uses that there will be a significant > impact > >>> on > >>>>>>>>> disk usage. There is also compatibility concerns: If the table is > >>> not > >>>>>>>>> explicitly materialized in the old program, we would already > >>>>>>>>> need to > >>>>>>>>> materialize it also in the old program (of course, we would use a > >>>>>>>>> non-versioned store so far). Thus, if somebody adds a grace > >>>>>>>>> period, > >>> we > >>>>>>>>> cannot just switch the store type, as it would be a breaking > >>>>>>>>> change, > >>>>>>>>> potentially required an application re-set, or following the > >>>>>>>>> upgrade > >>>>>>>>> path for versioned state stores, and also changing the program to > >>>>>>>>> explicitly materialize using a versioned store. Also note, that > we > >>>>> might > >>>>>>>>> not materialize the actual join table, but only an upstream > table, > >>> and > >>>>>>>>> use `ValueGetter` to access the upstream data. > >>>>>>>>> > >>>>>>>>> To this end, as you already mentioned, history retention of the > >>> table > >>>>>>>>> should be at least grace period. You proposed to include this in > a > >>>>>>>>> follow up KIP, but I am wondering if it's a fundamental > >>>>>>>>> requirement > >>>>> and > >>>>>>>>> thus we should put a check in place right away and reject an > >>>>>>>>> invalid > >>>>>>>>> configuration? (It always easier to lift restriction than to > >>> introduce > >>>>>>>>> them later.) This would also imply that a non-versioned table > >>>>>>>>> cannot > >>>>> be > >>>>>>>>> supported, because it does not have a history retention that is > >>> larger > >>>>>>>>> than grace period, and maybe also answer the requirement about > >>>>>>>>> materialization: as we already always materialize something on > the > >>>>>>>>> tablet side as non-versioned store right now, it seems > >>>>>>>>> difficult to > >>>>>>>>> migrate the store to a versioned store. Ie, it might be ok to > push > >>> the > >>>>>>>>> burden onto the user and say: if you start using grace period, > you > >>>>> also > >>>>>>>>> need to manually switch from non-versioned to versioned KTables. > >>> Doing > >>>>>>>>> stuff automatically under the hood if very complex for this > >>>>>>>>> case, we > >>>>> if > >>>>>>>>> we push the burden onto the user, it might be ok to not > complicate > >>>>> this > >>>>>>>>> KIP significantly. > >>>>>>>>> > >>>>>>>>> To summarize the last two paragraphs, I would propose to: > >>>>>>>>> - don't support non-versioned KTables > >>>>>>>>> - if grace period is added, users need to explicitly > >>> materialize > >>>>> the > >>>>>>>>> table as version (either directly, or upstream. Upstream only > >>>>>>>>> works > >>> if > >>>>>>>>> downstream tables "inherit" versioned semantics -- cf KIP-914) > >>>>>>>>> - the table's history retention time must be larger than > the > >>> grace > >>>>>>>>> period (should be easy to check at runtime, when we build the > >>>>> topology) > >>>>>>>>> - because switching from non-versioned to version stores > >>>>>>>>> is not > >>>>>>>>> backward compatibly (cf KIP-914), users need to take care of this > >>>>>>>>> themselves, and this also implies that adding grace period is > >>>>>>>>> not a > >>>>>>>>> backward compatible change (even only if via indirect means) > >>>>>>>>> > >>>>>>>>> About dropping late records: wondering if we should never drop a > >>>>>>>>> stream-side record for a left-join, even if it's late? In > general, > >>> one > >>>>>>>>> thing I observed over the years is, that it's easier to keep > stuff > >>> and > >>>>>>>>> let users filter explicitly downstream (or make it configurable), > >>>>>>>>> instead of dropping pro-actively, because users have no good > >>>>>>>>> way to > >>>>>>>>> resurrect record that got already dropped. > >>>>>>>>> > >>>>>>>>> For ordering, sounds reasonable to me only start with one > >>>>>>>>> implementation, and maybe make it configurable as a follow up. > >>>>> However, > >>>>>>>>> I am wondering if starting with offset order might be the better > >>>>> option > >>>>>>>>> as it seems to align more with what we do so far? So instead of > >>>>> storing > >>>>>>>>> record ordered by timestamp, we can just store them ordered by > >>> offset, > >>>>>>>>> and still "poll" from the buffer based on the head records > >>> timestamp. > >>>>> Or > >>>>>>>>> would this complicate the implementation significantly? > >>>>>>>>> > >>>>>>>>> I also think it's ok to not "sync" stream-time between the > >>>>>>>>> table and > >>>>> the > >>>>>>>>> stream in this KIP, but we should consider doing this as a > >>>>>>>>> follow up > >>>>>>>>> change (not sure if we would need a KIP or not for a change this > >>>>> this). > >>>>>>>>> > >>>>>>>>> About increasing/decreasing grace period: what you describe make > >>> sense > >>>>>>>>> to me. If decreased, the next record would just trigger emitting > a > >>> lot > >>>>>>>>> of records, and for increase, the buffer would just need to "fill > >>> up" > >>>>>>>>> again. For reprocessing getting a different result with a > >>>>>>>>> different > >>>>>>>>> grace period is expected, so that's ok IMHO. -- There seems to be > >>> one > >>>>>>>>> special corner case: grace period zero. For this case, we > actually > >>>>> don't > >>>>>>>>> need any store, and the stream-side could be stateless. I think > it > >>> can > >>>>>>>>> have the same behavior, but if we want to "add / remove" the > store > >>>>>>>>> dynamically, we need to add specific code for it. For example, > >>>>>>>>> even > >>> if > >>>>>>>>> we start up with a grace period of zero, we would need to check > if > >>>>> there > >>>>>>>>> is a local store, and still emit everything in it, before we can > >>> ditch > >>>>>>>>> the store (not sure if that's even easily done at all). Or: we > >>>>>>>>> would > >>>>>>>>> need to have a store for _all_ cases, even if grace period is > zero > >>>>> (the > >>>>>>>>> store would be empty all the time though), to avoid super complex > >>>>> code? > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> -Matthias > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On 5/25/23 10:53 AM, Lucas Brutschy wrote: > >>>>>>>>>> Hi Walker, > >>>>>>>>>> > >>>>>>>>>> thanks for your responses. That makes sense. I guess there is > >>> always > >>>>>>>>>> the option to make the implementation more configurable later > on, > >>> if > >>>>>>>>>> users request it. Also thanks for the clarifications. From my > >>>>>>>>>> side, > >>>>>>>>>> the KIP is good to go. > >>>>>>>>>> > >>>>>>>>>> Cheers, > >>>>>>>>>> Lucas > >>>>>>>>>> > >>>>>>>>>> On Wed, May 24, 2023 at 11:54 PM Victoria Xia > >>>>>>>>>> <victoria....@confluent.io.invalid> wrote: > >>>>>>>>>>> > >>>>>>>>>>> Thanks for the updates, Walker! Looks great, though I do have a > >>>>> couple > >>>>>>>>>>> questions about the latest updates: > >>>>>>>>>>> > >>>>>>>>>>> 1. The new example says that without stream-side > >>>>>>>>>>> buffering, > >>>>> "ex" > >>>>>>> and > >>>>>>>>>>> "fy" are possible join results. How could those join > >>> results > >>>>>>>>> happen? The > >>>>>>>>>>> example versioned table suggests that table record > >>>>>>>>>>> "x" has > >>>>>>>>> timestamp 2, and > >>>>>>>>>>> table record "y" has timestamp 3. If stream record > >>>>>>>>>>> "e" has > >>>>>>>>> timestamp 1, > >>>>>>>>>>> then it can never be joined against record "x", and > >>> similarly > >>>>> for > >>>>>>>>> stream > >>>>>>>>>>> record "f" with timestamp 2 being joined against "y". > >>>>>>>>>>> 2. I see in your replies above that "If table is not > >>>>>>> materialized it > >>>>>>>>>>> will materialize it as versioned" but I don't see this > >>> called > >>>>> out > >>>>>>>>> in the > >>>>>>>>>>> KIP -- seems worth calling out. Also, what will the > >>>>>>>>>>> history > >>>>>>>>> retention for > >>>>>>>>>>> the versioned table be? Will it be the same as the join > >>> grace > >>>>>>>>> period, or > >>>>>>>>>>> will it be greater? > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> And some additional thoughts: > >>>>>>>>>>> > >>>>>>>>>>> Sounds like there are a few things users should watch out for > >>>>>>>>>>> when > >>>>>>>>> enabling > >>>>>>>>>>> the stream-side buffer: > >>>>>>>>>>> > >>>>>>>>>>> - Records will get "stuck" if there are no newer > >>>>>>>>>>> records to > >>>>>>> advance > >>>>>>>>>>> stream time. > >>>>>>>>>>> - If there are large gaps between the timestamps of > >>>>> stream-side > >>>>>>>>> records, > >>>>>>>>>>> then it's possible that versioned store history > >>>>>>>>>>> retention > >>> will > >>>>>>> have > >>>>>>>>> expired > >>>>>>>>>>> by the time a record is evicted from the join buffer, > >>> leading > >>>>> to > >>>>>>> a > >>>>>>>>> join > >>>>>>>>>>> "miss." For example, if the join grace period and table > >>>>> history > >>>>>>>>> retention > >>>>>>>>>>> are both 10, and records come in the order: > >>>>>>>>>>> > >>>>>>>>>>> table side t0 with ts=0 > >>>>>>>>>>> stream side s1 with ts=1 <-- enters buffer > >>>>>>>>>>> table side t10 with ts=10 > >>>>>>>>>>> table side t20 with ts=20 > >>>>>>>>>>> stream side s21 with ts=21 <-- evicts record s1 from > >>> buffer, > >>>>> but > >>>>>>>>>>> versioned store no longer contains data for ts=1 due to > >>>>> history > >>>>>>>>> retention > >>>>>>>>>>> having elapsed > >>>>>>>>>>> > >>>>>>>>>>> This will result in the join result (s1, null) even > >>>>>>>>>>> though > >>> it > >>>>>>>>> should've > >>>>>>>>>>> been (s1, t0), due to t0 having been expired from the > >>>>> versioned > >>>>>>>>> store > >>>>>>>>>>> already. > >>>>>>>>>>> - Out-of-order records from the stream-side will be > >>> reordered, > >>>>>>> and > >>>>>>>>> late > >>>>>>>>>>> records will be dropped. > >>>>>>>>>>> > >>>>>>>>>>> I don't think any of these are reasons to not go forward with > >>>>>>>>>>> this > >>>>>>> KIP, > >>>>>>>>> but > >>>>>>>>>>> it'd be good to call them out in the eventual documentation to > >>>>>>> decrease > >>>>>>>>> the > >>>>>>>>>>> chance users get tripped up. > >>>>>>>>>>> > >>>>>>>>>>>> We could maybe do an improvement later to advance stream time > >>> from > >>>>>>>>> table > >>>>>>>>>>> side as well, but that might be debatable as we might get more > >>> late > >>>>>>>>> records. > >>>>>>>>>>> > >>>>>>>>>>> Yes, the likelihood of late records increases but also the > >>>>> likelihood > >>>>>>> of > >>>>>>>>>>> "join misses" due to versioned store history retention having > >>>>> elapsed > >>>>>>>>>>> decreases, which feels important for certain use cases. Either > >>> way, > >>>>>>>>> agreed > >>>>>>>>>>> that it can be a discussion for the future as incorporating > this > >>>>> would > >>>>>>>>>>> substantially complicate the implementation. > >>>>>>>>>>> > >>>>>>>>>>> Also a couple nits: > >>>>>>>>>>> > >>>>>>>>>>> - The KIP currently says "We recently added versioned > >>> tables > >>>>>>> which > >>>>>>>>> allow > >>>>>>>>>>> the table side of the a join [...] but it is not taken > >>>>> advantage > >>>>>>> of > >>>>>>>>> in > >>>>>>>>>>> joins," but this doesn't seem true? If the table of a > >>>>>>> stream-table > >>>>>>>>> join is > >>>>>>>>>>> versioned, then the DSL's stream-table join processor > >>>>>>>>>>> will > >>>>>>>>> automatically > >>>>>>>>>>> perform timestamped lookups into the table, in order to > >>> take > >>>>>>>>> advantage of > >>>>>>>>>>> the new timestamp-aware store to provide better join > >>>>> semantics. > >>>>>>>>>>> - The KIP mentions "grace period" for versioned > >>>>>>>>>>> stores in a > >>>>>>> number > >>>>>>>>> of > >>>>>>>>>>> places but I think you actually mean "history > >>>>>>>>>>> retention"? > >>> The > >>>>> two > >>>>>>>>> happen to > >>>>>>>>>>> be the same today (it is not an option for users to > >>> configure > >>>>> the > >>>>>>>>> two > >>>>>>>>>>> separately) but this need not be true in the future. > >>> "History > >>>>>>>>> retention" > >>>>>>>>>>> governs how far back in time reads may occur, which > >>>>>>>>>>> is the > >>>>>>> relevant > >>>>>>>>>>> parameter for performing lookups as part of the > >>> stream-table > >>>>>>> join. > >>>>>>>>> "Grace > >>>>>>>>>>> period" in the context of versioned stores refers to > how > >>> far > >>>>> back > >>>>>>>>> in time > >>>>>>>>>>> out-of-order writes may occur, which probably isn't > >>> directly > >>>>>>>>> relevant for > >>>>>>>>>>> introducing a stream-side buffer, though it's also > >>>>>>>>>>> possible > >>>>> I've > >>>>>>>>> overlooked > >>>>>>>>>>> something. (As a bonus, switching from "table grace > >>> period" in > >>>>>>> the > >>>>>>>>> KIP to > >>>>>>>>>>> "table history retention" also helps to > >>>>>>>>>>> clarify/distinguish > >>>>> that > >>>>>>>>> it's a > >>>>>>>>>>> different parameter from the "join grace period," > >>>>>>>>>>> which I > >>>>> could > >>>>>>> see > >>>>>>>>> being > >>>>>>>>>>> confusing to readers. :) ) > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> Cheers, > >>>>>>>>>>> Victoria > >>>>>>>>>>> > >>>>>>>>>>> On Thu, May 18, 2023 at 1:43 PM Walker Carlson > >>>>>>>>>>> <wcarl...@confluent.io.invalid> wrote: > >>>>>>>>>>> > >>>>>>>>>>>> Hey all, > >>>>>>>>>>>> > >>>>>>>>>>>> Thanks for the comments, they gave me a lot to think about. > >>>>>>>>>>>> I'll > >>>>> try > >>>>>>> to > >>>>>>>>>>>> address them all inorder. I have made some updates to the kip > >>>>> related > >>>>>>>>> to > >>>>>>>>>>>> them, but I mention where below. > >>>>>>>>>>>> > >>>>>>>>>>>> Lucas > >>>>>>>>>>>> > >>>>>>>>>>>> Good idea about the example. I added a simple one. > >>>>>>>>>>>> > >>>>>>>>>>>> 1) I have thought about including options for the underlying > >>> buffer > >>>>>>>>>>>> configuration. One of which might be adding an in memory > >>>>>>>>>>>> option. > >>> My > >>>>>>>>> biggest > >>>>>>>>>>>> concern is about the semantic guarantees. This isn't like > >>> suppress > >>>>> or > >>>>>>>>> with > >>>>>>>>>>>> windows where producing incomplete results is repetitively > >>>>> harmless. > >>>>>>>>> Here > >>>>>>>>>>>> we would be possibly producing incorrect results. I also would > >>> like > >>>>>>> to > >>>>>>>>> keep > >>>>>>>>>>>> the interface changes as simple as I can. Making more than > this > >>>>>>> change > >>>>>>>>> to > >>>>>>>>>>>> Joined I feel could make this more complicated than it needs > to > >>> be. > >>>>>>> If > >>>>>>>>> we > >>>>>>>>>>>> really want to I could see adding a grace() option with a > >>>>>>> BufferConifg > >>>>>>>>> in > >>>>>>>>>>>> there or something, but I would rather not. > >>>>>>>>>>>> > >>>>>>>>>>>> 2) The buffer will be independent of if the table is > >>>>>>>>>>>> versioned or > >>>>>>> not. > >>>>>>>>> If > >>>>>>>>>>>> table is not materialized it will materialize it as > >>>>>>>>>>>> versioned. It > >>>>>>> might > >>>>>>>>>>>> make sense to do a follow up kip where we force the retention > >>>>> period > >>>>>>>>> of > >>>>>>>>>>>> the versioned to be greater than whatever the max of the > stream > >>>>>>> buffer > >>>>>>>>> is. > >>>>>>>>>>>> > >>>>>>>>>>>> Victoria > >>>>>>>>>>>> > >>>>>>>>>>>> 1) Yes, records will exit in timestamp order not in offset > >>>>>>>>>>>> order. > >>>>>>>>>>>> 2) Late records will be dropped (Late as out of the grace > >>> period). > >>>>>>>>> From my > >>>>>>>>>>>> understanding that is the point of a grace period, no? Doesn't > >>> the > >>>>>>> same > >>>>>>>>>>>> thing happen with versioned stores? > >>>>>>>>>>>> 3) The segment store already has an observed stream time, we > >>>>> advance > >>>>>>>>> based > >>>>>>>>>>>> on that. That should only advance based on records that > >>>>>>>>>>>> enter the > >>>>>>>>> store. So > >>>>>>>>>>>> yes, only stream side records. We could maybe do an > improvement > >>>>> later > >>>>>>>>> to > >>>>>>>>>>>> advance stream time from table side as well, but that might be > >>>>>>>>> debatable as > >>>>>>>>>>>> we might get more late records. Anyways I would rather have > >>>>>>>>>>>> that > >>>>> as a > >>>>>>>>>>>> separate discussion. > >>>>>>>>>>>> > >>>>>>>>>>>> in memory option? We can do that, for the buffer I plan to use > >>> the > >>>>>>>>>>>> TimeOrderedKeyValueBuffer interface which already has an in > >>> memory > >>>>>>>>>>>> implantation, so it would be simple. > >>>>>>>>>>>> > >>>>>>>>>>>> I said more in my answer to Lucas's question. The concern I > >>>>>>>>>>>> have > >>>>> with > >>>>>>>>>>>> buffer configs or in memory is complicating the interface. > Also > >>>>>>>>> semantic > >>>>>>>>>>>> guarantees but in memory shouldn't effect that > >>>>>>>>>>>> > >>>>>>>>>>>> Matthias > >>>>>>>>>>>> > >>>>>>>>>>>> 1) fixed out of order vs late terminology in the kip. > >>>>>>>>>>>> > >>>>>>>>>>>> 2) I was referring to having a stream. So after this kip we > can > >>>>> have > >>>>>>> a > >>>>>>>>>>>> buffered stream or a normal one. For the table we can use a > >>>>> versioned > >>>>>>>>> table > >>>>>>>>>>>> or a normal table. > >>>>>>>>>>>> > >>>>>>>>>>>> 3 Good call out. I clarified this as "If the table side uses a > >>>>>>>>> materialized > >>>>>>>>>>>> version store, it can store multiple versions of each record > >>> within > >>>>>>> its > >>>>>>>>>>>> defined grace period." and modified the rest of the paragraph > a > >>>>> bit. > >>>>>>>>>>>> > >>>>>>>>>>>> 4) I get the preserving off offset ordering, but if the > >>>>>>>>>>>> stream is > >>>>>>>>> buffered > >>>>>>>>>>>> to join on timestamp instead of offset doesn't it already seem > >>> like > >>>>>>> we > >>>>>>>>> care > >>>>>>>>>>>> more about time in this case? > >>>>>>>>>>>> > >>>>>>>>>>>> If we end up adding more options it might make sense to do > >>>>>>>>>>>> this. > >>>>>>> Maybe > >>>>>>>>>>>> offset order processing can be a follow up? > >>>>>>>>>>>> > >>>>>>>>>>>> I'll add a section for this in Rejected Alternatives. I > >>>>>>>>>>>> think it > >>>>>>> makes > >>>>>>>>>>>> sense to do something like this but maybe in a follow up. > >>>>>>>>>>>> > >>>>>>>>>>>> 5) I hadn't thought about this. I suppose if they changed > >>>>>>>>>>>> this in > >>>>> an > >>>>>>>>>>>> upgrade the next record would either evict a lot of records > (if > >>> the > >>>>>>>>> grace > >>>>>>>>>>>> period decreased) or there would be a pause until the new > grace > >>>>>>> period > >>>>>>>>>>>> reached. Increasing is a bit more problematic, especially if > >>>>>>>>>>>> the > >>>>>>> table > >>>>>>>>>>>> grace period and retention time stays the same. If the data is > >>>>>>>>> reprocessed > >>>>>>>>>>>> after a change like that then there would be different > results, > >>>>> but I > >>>>>>>>> feel > >>>>>>>>>>>> like that would be expected after such a change. > >>>>>>>>>>>> > >>>>>>>>>>>> What do you think should happen? > >>>>>>>>>>>> > >>>>>>>>>>>> Hopefully this answers your questions! > >>>>>>>>>>>> > >>>>>>>>>>>> Walker > >>>>>>>>>>>> > >>>>>>>>>>>> On Mon, May 8, 2023 at 11:32 AM Matthias J. Sax < > >>> mj...@apache.org> > >>>>>>>>> wrote: > >>>>>>>>>>>> > >>>>>>>>>>>>> Thanks for the KIP! Also some question/comments from my side: > >>>>>>>>>>>>> > >>>>>>>>>>>>> 10) Notation: you use the term "late data" but I think you > >>>>>>>>>>>>> mean > >>>>>>>>>>>>> out-of-order. We reserve the term "late" to records that > >>>>>>>>>>>>> arrive > >>>>>>> after > >>>>>>>>>>>>> grace period passed, and thus, "late == out-of-order data > that > >>> is > >>>>>>>>>>>> dropped". > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> 20) "There is only one option from the stream side and only > >>>>> recently > >>>>>>>>> is > >>>>>>>>>>>>> there a second option on the table side." > >>>>>>>>>>>>> > >>>>>>>>>>>>> What are those options? Victoria already asked about the > table > >>>>> side, > >>>>>>>>> but > >>>>>>>>>>>>> I am also not sure what option you mean for the stream side? > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> 30) "If the table side uses a materialized version store the > >>> value > >>>>>>> is > >>>>>>>>>>>>> the latest by stream time rather than by offset within its > >>> defined > >>>>>>>>> grace > >>>>>>>>>>>>> period." > >>>>>>>>>>>>> > >>>>>>>>>>>>> The phrase "the value is the latest by stream time" is > >>>>>>>>>>>>> confusing > >>>>> -- > >>>>>>> in > >>>>>>>>>>>>> the end, a versioned stores multiple versions, not just one. > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> 40) I am also wondering about ordering. In general, KS > >>>>>>>>>>>>> tries to > >>>>>>>>> preserve > >>>>>>>>>>>>> offset-order during processing (with some exception, when > >>>>>>>>>>>>> offset > >>>>>>> order > >>>>>>>>>>>>> preservation is not clearly defined). Given that the > >>>>>>>>>>>>> stream-side > >>>>>>>>> buffer > >>>>>>>>>>>>> is really just a "linear buffer", we could easily preserve > >>>>>>>>> offset-order. > >>>>>>>>>>>>> But I also see a benefit of re-ordering and emitting > >>> out-of-order > >>>>>>> data > >>>>>>>>>>>>> right away when read (instead of blocking them behind > in-order > >>>>>>> records > >>>>>>>>>>>>> that are not ready yet). -- It might even be a possibility, > to > >>> let > >>>>>>>>> users > >>>>>>>>>>>>> pick a emit strategy eg "EmitStrategy.preserveOffsets" (name > >>> just > >>>>> a > >>>>>>>>>>>>> placeholder). > >>>>>>>>>>>>> > >>>>>>>>>>>>> The KIP should explain this in more detail and also discuss > >>>>>>> different > >>>>>>>>>>>>> options and mention them in "Rejected alternatives" in case > we > >>>>> don't > >>>>>>>>>>>>> want to include them. > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> 50) What happens when users change the grace period? > >>>>>>>>>>>>> Especially, > >>>>>>> when > >>>>>>>>>>>>> they turn it on/off (but also increasing/decreasing is an > >>>>>>> interesting > >>>>>>>>>>>>> point)? I think we should try to support this if possible; > the > >>>>>>>>>>>>> "Compatibility" section needs to cover switching on/off in > >>>>>>>>>>>>> more > >>>>>>>>> detail. > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> -Matthias > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> On 5/2/23 2:06 PM, Victoria Xia wrote: > >>>>>>>>>>>>>> Cool KIP, Walker! Thanks for sharing this proposal. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> A few clarifications: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 1. Is the order that records exit the buffer in > >>>>>>>>>>>>>> necessarily the > >>>>>>> same > >>>>>>>>> as > >>>>>>>>>>>>> the > >>>>>>>>>>>>>> order that records enter the buffer in, or no? Based on the > >>>>>>>>> description > >>>>>>>>>>>>> in > >>>>>>>>>>>>>> the KIP, it sounds like the answer is no, i.e., records will > >>> exit > >>>>>>> the > >>>>>>>>>>>>>> buffer in increasing timestamp order, which means that > >>>>>>>>>>>>>> they may > >>>>> be > >>>>>>>>>>>>> ordered > >>>>>>>>>>>>>> (even for the same key) compared to the input order. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 2. What happens if the join grace period is nonzero, and a > >>>>>>>>> stream-side > >>>>>>>>>>>>>> record arrives with a timestamp that is older than the > >>>>>>>>>>>>>> current > >>>>>>> stream > >>>>>>>>>>>>> time > >>>>>>>>>>>>>> minus the grace period? Will this record trigger a join > >>>>>>>>>>>>>> result, > >>>>> or > >>>>>>>>> will > >>>>>>>>>>>>> it > >>>>>>>>>>>>>> be dropped? Based on the description for what happens when > >>>>>>>>>>>>>> the > >>>>> join > >>>>>>>>>>>> grace > >>>>>>>>>>>>>> period is set to zero, it sounds like the late record will > be > >>>>>>>>> dropped, > >>>>>>>>>>>>> even > >>>>>>>>>>>>>> if the join grace period is nonzero. Is that true? > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 3. What could cause stream time to advance, for purposes of > >>>>>>> removing > >>>>>>>>>>>>>> records from the join buffer? For example, will new records > >>>>>>> arriving > >>>>>>>>> on > >>>>>>>>>>>>> the > >>>>>>>>>>>>>> table side of the join cause stream time to advance? From > the > >>> KIP > >>>>>>> it > >>>>>>>>>>>>> sounds > >>>>>>>>>>>>>> like only stream-side records will advance stream time -- > >>>>>>>>>>>>>> does > >>>>> that > >>>>>>>>>>>> mean > >>>>>>>>>>>>>> that the join processor itself will have to track this > stream > >>>>> time? > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Also +1 to Lucas's question about what options will be > >>> available > >>>>>>> for > >>>>>>>>>>>>>> configuring the join buffer. Will users have the option to > >>> choose > >>>>>>>>>>>> whether > >>>>>>>>>>>>>> they want the buffer to be in-memory vs persistent? > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> - Victoria > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> On Fri, Apr 28, 2023 at 11:54 AM Lucas Brutschy > >>>>>>>>>>>>>> <lbruts...@confluent.io.invalid> wrote: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> HI Walker, > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> thanks for the KIP! We definitely need this. I have two > >>>>> questions: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> - Have you considered allowing the customization > >>>>>>>>>>>>>>> of the > >>>>>>>>> underlying > >>>>>>>>>>>>>>> buffer implementation? As I can see, `StreamJoined` lets > you > >>>>>>>>> customize > >>>>>>>>>>>>>>> the underlying store via a `WindowStoreSupplier`. Would it > >>> make > >>>>>>>>> sense > >>>>>>>>>>>>>>> for `Joined` to have this as well? I can imagine one may > >>>>>>>>>>>>>>> want > >>> to > >>>>>>>>> limit > >>>>>>>>>>>>>>> the number of records in the buffer, for example. If we hit > >>> the > >>>>>>>>>>>>>>> maximum, the only option would be to drop semantic > >>>>>>>>>>>>>>> guarantees, > >>>>> but > >>>>>>>>>>>>>>> users may still want to do this. > >>>>>>>>>>>>>>> - With "second option on the table side" you are > >>> referring > >>>>> to > >>>>>>>>>>>>>>> versioned tables, right? Will the buffer on the stream side > >>>>> behave > >>>>>>>>> any > >>>>>>>>>>>>>>> different whether the table side is versioned or not? > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Finally, I think a simple example in the motivation section > >>>>> could > >>>>>>>>> help > >>>>>>>>>>>>>>> non-experts understand the KIP. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>>> Lucas > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> On Tue, Apr 25, 2023 at 9:13 PM Walker Carlson > >>>>>>>>>>>>>>> <wcarl...@confluent.io.invalid> wrote: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Hello everybody, > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> I have a stream proposal to improve the stream table > >>>>>>>>>>>>>>>> join by > >>>>>>>>> adding a > >>>>>>>>>>>>>>> grace > >>>>>>>>>>>>>>>> period and buffer to the stream side of the join to allow > >>>>>>>>> processing > >>>>>>>>>>>> in > >>>>>>>>>>>>>>>> timestamp order matching the recent improvements of the > >>>>> versioned > >>>>>>>>>>>>> tables. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Please take a look here < > >>>>>>>>>>>> https://cwiki.apache.org/confluence/x/lAs0Dw> > >>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>> share your thoughts. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> best, > >>>>>>>>>>>>>>>> Walker > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> >