Good Point Victoria. I just removed the compacted topic mention from the
KIP. I agree with Burno about using a normal topic and deleting records
that have been processed.

On Tue, Jun 6, 2023 at 2:28 AM Bruno Cadonna <cado...@apache.org> wrote:

> Hi,
>
> another idea that came to my mind. Instead of using a compacted topic,
> the buffer could use a non-compacted topic and regularly delete records
> before a given offset as Streams does for repartition topics.
>
> Best,
> Bruno
>
> On 05.06.23 21:48, Bruno Cadonna wrote:
> > Hi Victoria,
> >
> > that is a good point!
> >
> > I think, the topic needs to be a compacted topic to be able to get rid
> > of records that are evicted from the buffer. So the key might be
> > something with the key, the timestamp, and a sequence number to
> > distinguish between records with the same key and same timestamp.
> >
> > Just an idea! Maybe Walker comes up with something better.
> >
> > Best,
> > Bruno
> >
> > On 05.06.23 20:38, Victoria Xia wrote:
> >> Hi Walker,
> >>
> >> Thanks for the latest updates! The KIP looks great. Just one question
> >> about
> >> the changelog topic for the join buffer: The KIP says "When a failure
> >> occurs the buffer will try to recover from an OffsetCheckpoint if
> >> possible.
> >> If not it will reload the buffer from a compacted change-log topic."
> This
> >> is a new changelog topic that will be introduced specifically for the
> >> join
> >> buffer, right? Why is the changelog topic compacted? What are the keys?
> I
> >> am confused because the buffer contains records from the stream-side
> >> of the
> >> join, for which multiple records with the same key should be treated as
> >> separate updates will all must be tracked in the buffer, rather than
> >> updates which replace each other.
> >>
> >> Thanks,
> >> Victoria
> >>
> >> On Mon, Jun 5, 2023 at 1:47 AM Bruno Cadonna <cado...@apache.org>
> wrote:
> >>
> >>> Hi Walker,
> >>>
> >>> Thanks once more for the updates to the KIP!
> >>>
> >>> Do you also plan to expose metrics for the buffer?
> >>>
> >>> Best,
> >>> Bruno
> >>>
> >>> On 02.06.23 17:16, Walker Carlson wrote:
> >>>> Hello Bruno,
> >>>>
> >>>> I think this covers your questions. Let me know what you think
> >>>>
> >>>> 2.
> >>>> We can use a changelog topic. I think we can treat it like any other
> >>> store
> >>>> and recover in the usual manner. Also implementation is on disk
> >>>>
> >>>> 3.
> >>>> The description is in the public interfaces description. I will copy
> it
> >>>> into the proposed changes as well.
> >>>>
> >>>> This is a bit of an implementation detail that I didn't want to add
> >>>> into
> >>>> the kip, but the record will be added to the buffer to keep the stream
> >>> time
> >>>> consistent, it will just be ejected immediately. If of course if this
> >>>> causes performance issues we will skip this step and track stream time
> >>>> separately. I will update the kip to say that stream time advances
> >>>> when a
> >>>> stream record enters the node.
> >>>>
> >>>> Also, yes, updated.
> >>>>
> >>>> 5.
> >>>> No there is no difference right now, everything gets processed as it
> >>> comes
> >>>> in and tries to find a record for its time stamp.
> >>>>
> >>>> Walker
> >>>>
> >>>> On Fri, Jun 2, 2023 at 6:41 AM Bruno Cadonna <cado...@apache.org>
> >>>> wrote:
> >>>>
> >>>>> Hi Walker,
> >>>>>
> >>>>> Thanks for the updates!
> >>>>>
> >>>>> 2.
> >>>>> It is still not clear to me how a failure is handled. I do not
> >>>>> understand what you mean by "recover from an OffsetCheckpoint".
> >>>>>
> >>>>> My understanding is that the buffer needs to be replicated into its
> >>>>> own
> >>>>> Kafka topic. The input topic is not enough. The offset of a record is
> >>>>> added to the offsets to commit once the record is streamed through
> the
> >>>>> subtopology. That means once the record is added to the buffer its
> >>>>> offset is added to the offsets to commit -- independently of
> >>>>> whether the
> >>>>> record was evicted from the buffer and sent to the join node or not.
> >>>>> Now, let's assume the following scenario
> >>>>> 1. a record is read from the input topic and added to the buffer, but
> >>>>> not evicted to be processed by the join node.
> >>>>> 2. When the processing of the subtopology finishes the offset of the
> >>>>> record is added to the offsets to commit.
> >>>>> 3. A commit happens.
> >>>>> 4. A failure happens
> >>>>>
> >>>>> After the failure the buffer is empty but the record will not be read
> >>>>> anymore from the input topic since its offset has been already
> >>>>> committed. The record is lost.
> >>>>> One solution to avoid the loss is to recreate the buffer from a
> >>>>> compacted Kafka topic as we do for suppression buffers. I do not
> >>>>> think,
> >>>>> we need any offset checkpoint here since, we keep the buffer in
> >>>>> memory,
> >>>>> right? Or do you plan to back the buffer with a persistent store?
> Even
> >>>>> in that case, a compacted Kafka topic would be needed.
> >>>>>
> >>>>>
> >>>>> 3.
> >>>>>    From the KIP it is still not clear to me what happens if a
> >>>>> record is
> >>>>> outside of the grace period. I guess the record that falls outside of
> >>>>> the grace period will not be added to the buffer, but will be send to
> >>>>> the join node. Since it is outside of the grace period it will also
> >>>>> not
> >>>>> increase stream time and it will not trigger an eviction. Also the
> >>>>> head
> >>>>> of the buffer will not contain a record that needs to be evicted
> since
> >>>>> the the timestamp of the head record will be within the interval
> >>>>> stream
> >>>>> time minus grace period. Is this correct? Please add such a
> >>>>> description
> >>>>> to the KIP.
> >>>>> Furthermore, I think there is a mistake in the text:
> >>>>> "... will dequeue when the record timestamp is greater than stream
> >>>>> time
> >>>>> plus the grace period". I guess that should be "... will dequeue when
> >>>>> the record timestamp is less than (or equal?) stream time minus the
> >>>>> grace period"
> >>>>>
> >>>>>
> >>>>> 5.
> >>>>> What is the difference between not setting the grace period and
> >>>>> setting
> >>>>> it to zero? If there is a difference, why is there a difference?
> >>>>>
> >>>>>
> >>>>> Best,
> >>>>> Bruno
> >>>>>
> >>>>>
> >>>>> On 01.06.23 23:58, Walker Carlson wrote:
> >>>>>> Hey Bruno thanks for the feedback.
> >>>>>>
> >>>>>> 1)
> >>>>>> I will add this to the kip, but stream time only advances as the
> when
> >>> the
> >>>>>> buffer receives a new record.
> >>>>>>
> >>>>>> 2)
> >>>>>> You are correct, I will add a failure section on to the kip. Since
> >>>>>> the
> >>>>>> records wont change in the buffer from when they are read from the
> >>> topic
> >>>>>> they are replicated already.
> >>>>>>
> >>>>>> 3)
> >>>>>> I see that I'm out voted on the dropping of records thing. We will
> >>>>>> pass
> >>>>>> them on and try to join them if possible. This might cause some null
> >>>>>> results, but increasing the table history retention should help
> that.
> >>>>>>
> >>>>>> 4)
> >>>>>> I can add some on the kip. But its pretty directly adding whatever
> >>>>>> the
> >>>>>> grace period is to the latency. I don't see a way around it.
> >>>>>>
> >>>>>> Walker
> >>>>>>
> >>>>>> On Thu, Jun 1, 2023 at 5:23 AM Bruno Cadonna <cado...@apache.org>
> >>> wrote:
> >>>>>>
> >>>>>>> Hi Walker,
> >>>>>>>
> >>>>>>> thanks for the KIP!
> >>>>>>>
> >>>>>>> Here my feedback:
> >>>>>>>
> >>>>>>> 1.
> >>>>>>> It is still not clear to me when stream time for the buffer
> >>>>>>> advances.
> >>>>>>> What is the event that let the stream time advance? In the
> >>> discussion, I
> >>>>>>> do not understand what you mean by "The segment store already has
> an
> >>>>>>> observed stream time, we advance based on that. That should only
> >>> advance
> >>>>>>> based on records that enter the store." Where does this segment
> >>>>>>> store
> >>>>>>> come from? Anyways, I think it would be great to also state how
> >>>>>>> stream
> >>>>>>> time advances in the KIP.
> >>>>>>>
> >>>>>>> 2.
> >>>>>>> How does the buffer behave in case of a failure? I think I
> >>>>>>> understand
> >>>>>>> that the buffer will use an implementation of
> >>> TimeOrderedKeyValueBuffer
> >>>>>>> and therefore the records in the buffer will be replicated to a
> >>>>>>> topic
> >>> in
> >>>>>>> Kafka, but I am not completely sure. Could you elaborate on this in
> >>> the
> >>>>>>> KIP?
> >>>>>>>
> >>>>>>> 3.
> >>>>>>> I agree with Matthias about dropping late records. We use grace
> >>> periods
> >>>>>>> in scenarios where we records are grouped like in windowed
> >>> aggregations
> >>>>>>> and windowed joins. The stream buffer you propose does not really
> >>> group
> >>>>>>> any records. It rather delays records and reorders them. I am not
> >>>>>>> sure
> >>>>>>> if grace period is the right naming/concept to apply here.
> >>>>>>> Instead of
> >>>>>>> dropping records that fall outside of the buffer's time interval
> the
> >>>>>>> join should skip the buffer and try to join the record
> >>>>>>> immediately. In
> >>>>>>> the end, a stream-table join is a unwindowed join, i.e., no
> grouping
> >>> is
> >>>>>>> applied to the records.
> >>>>>>> What do you and other folks think about this proposal?
> >>>>>>>
> >>>>>>> 4.
> >>>>>>> How does the proposed buffer, affects processing latency? Could you
> >>>>>>> please add some words about this to the KIP?
> >>>>>>>
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Bruno
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> On 31.05.23 01:49, Walker Carlson wrote:
> >>>>>>>> Thanks for all the additional comments. I will either address them
> >>> here
> >>>>>>> or
> >>>>>>>> update the kip accordingly.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> I mentioned a follow kip to add extra features before and in the
> >>>>>>> responses.
> >>>>>>>> I will try to briefly summarize what options and optimizations I
> >>>>>>>> plan
> >>>>> to
> >>>>>>>> include. If a concern is not covered in this list I for sure talk
> >>> about
> >>>>>>> it
> >>>>>>>> below.
> >>>>>>>>
> >>>>>>>> * Allowing non versioned tables to still use the stream buffer
> >>>>>>>> * Automatically materializing tables instead of forcing the user
> to
> >>> do
> >>>>> it
> >>>>>>>> * Configurable for in memory buffer
> >>>>>>>> * Order the records in offset order or in time order
> >>>>>>>> * Non memory use buffer (offset order, delayed pull from stream.)
> >>>>>>>> * Time synced between stream and table side (maybe)
> >>>>>>>> * Do not drop late records and process them as they come in
> >>>>>>>> instead.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> First, Victoria.
> >>>>>>>>
> >>>>>>>> 1) (One of your nits covers this, but you are correct it doesn't
> >>>>>>>> make
> >>>>>>>> sense. so I removed that part of the example.)
> >>>>>>>> For those examples with the "bad" join results I said without
> >>> buffering
> >>>>>>> the
> >>>>>>>> stream it would look like that, but that was incomplete. If the
> >>>>>>>> look
> >>> up
> >>>>>>> was
> >>>>>>>> simply looking at the latest version of the table when the stream
> >>>>> records
> >>>>>>>> came in then the results were possible. If we are using the
> >>>>>>>> point in
> >>>>> time
> >>>>>>>> lookup that versioned tables let us then you are correct the
> future
> >>>>>>> results
> >>>>>>>> are not possible.
> >>>>>>>>
> >>>>>>>> 2) I'll get to this later as Matthias brought up something
> related.
> >>>>>>>>
> >>>>>>>> To your additional thoughts, I agree that we need to call those
> >>> things
> >>>>>>> out
> >>>>>>>> in the documentation. I'm writing up a follow up kip with a lot of
> >>> the
> >>>>>>>> ideas we have discussed so that we can improve this feature beyond
> >>> the
> >>>>>>> base
> >>>>>>>> implementation if it's needed.
> >>>>>>>>
> >>>>>>>> I addressed the nits in the kip. I somehow missed the table stream
> >>>>> table
> >>>>>>>> join processor improvement, it makes your first question make a
> lot
> >>>>> more
> >>>>>>>> sense.  Table history retention is a much cleaner way to
> >>>>>>>> describe it.
> >>>>>>>>
> >>>>>>>> As to your mention of the syncing the time for the table and
> >>>>>>>> stream.
> >>>>>>>> Matthias mentioned that as well. I will address both here. I
> >>>>>>>> plan to
> >>>>>>> bring
> >>>>>>>> that up in the future, but for now we will leave it out. I
> >>>>>>>> suppose it
> >>>>>>> will
> >>>>>>>> be more useful after the table history retention is separable from
> >>> the
> >>>>>>>> table grace period.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> To address Matthias comments.
> >>>>>>>>
> >>>>>>>> You are correct by saying the in memory store shouldn't cause any
> >>>>>>> semantic
> >>>>>>>> concerns. My concern would be more with if we limited the number
> of
> >>>>>>> records
> >>>>>>>> on the buffer and what we would do if we hit said limits,
> (emitting
> >>>>> those
> >>>>>>>> records might be an issue, throwing an error and halting would
> >>>>>>>> not).
> >>> I
> >>>>>>>> think we can leave this discussion to the follow up kip along
> >>>>>>>> with a
> >>>>> few
> >>>>>>>> other options.
> >>>>>>>>
> >>>>>>>> I will go through your proposals now.
> >>>>>>>>
> >>>>>>>>       - don't support non-versioned KTables
> >>>>>>>>
> >>>>>>>> Sure, we can always expand this later on. Will include as part
> >>>>>>>> of the
> >>>>> of
> >>>>>>>> the improvement kip
> >>>>>>>>
> >>>>>>>>       - if grace period is added, users need to explicitly
> >>>>>>>> materialize
> >>>>> the
> >>>>>>>> table as version (either directly, or upstream. Upstream only
> works
> >>> if
> >>>>>>>> downstream tables "inherit" versioned semantics -- cf KIP-914)
> >>>>>>>>
> >>>>>>>> again, that works for me for now, if we find a use we can always
> >>>>>>>> add
> >>>>>>> later.
> >>>>>>>>
> >>>>>>>>       - the table's history retention time must be larger than the
> >>> grace
> >>>>>>>> period (should be easy to check at runtime, when we build the
> >>> topology)
> >>>>>>>>
> >>>>>>>> agreed
> >>>>>>>>
> >>>>>>>>       - because switching from non-versioned to version stores
> >>>>>>>> is not
> >>>>>>>> backward compatibly (cf KIP-914), users need to take care of this
> >>>>>>>> themselves, and this also implies that adding grace period is not
> a
> >>>>>>>> backward compatible change (even only if via indirect means)
> >>>>>>>>
> >>>>>>>> sure, this works
> >>>>>>>>
> >>>>>>>> As to the dropping of late records, I'm not sure. One one hand I
> >>>>>>>> like
> >>>>> not
> >>>>>>>> dropping things. But on the other I struggle to see how a user can
> >>>>> filter
> >>>>>>>> out late records that might have incomplete join results. The
> point
> >>> in
> >>>>>>> time
> >>>>>>>> look up will aggressively expire old data and if new data has been
> >>>>>>> replaced
> >>>>>>>> it will return null if outside of the retention. This seems like
> it
> >>>>> could
> >>>>>>>> corrupt the integrity of the join output. Seeing that we drop late
> >>>>>>> records
> >>>>>>>> on the table side as well I would think it makes sense to drop
> late
> >>>>>>> records
> >>>>>>>> on the stream buffer. I could be convinced otherwise I suppose, I
> >>> could
> >>>>>>> see
> >>>>>>>> adding this as an option in a follow up kip. It would be very
> >>>>>>>> easy to
> >>>>>>>> implement either way. For now unless no one else objects I'm
> >>>>>>>> going to
> >>>>>>> stick
> >>>>>>>> with dropping the records for the sake of getting this kip
> >>>>>>>> passed. It
> >>>>> is
> >>>>>>>> functionally a small change to make and we can update later if you
> >>> feel
> >>>>>>>> strongly about it.
> >>>>>>>>
> >>>>>>>> For the ordering. I have to say that it would be more
> >>>>>>>> complicated to
> >>>>>>>> implement it to be in offset order, if the goal it to get as
> >>>>>>>> many of
> >>>>> the
> >>>>>>>> records validly joined as possible. Because we would process as
> >>> things
> >>>>>>> left
> >>>>>>>> the buffer a sufficiency early enough record could hold up records
> >>> that
> >>>>>>>> would otherwise be valid past the table history retention. To fix
> >>> this
> >>>>> we
> >>>>>>>> could process by timestamp then store in a second queue and emit
> by
> >>>>>>> offset,
> >>>>>>>> but that would be a lot more complicated. If we didn't care
> >>>>>>>> about not
> >>>>>>>> missing some valid joins we could just have no store and pull from
> >>> the
> >>>>>>>> topic at a delay only caring about the timestamp of the next
> >>>>>>>> offset.
> >>>>> For
> >>>>>>>> now I want to stick with the timestamp ordering as it makes much
> >>>>>>>> more
> >>>>>>> sense
> >>>>>>>> to me, but would propose we add both of the other options I have
> >>>>>>>> laid
> >>>>> out
> >>>>>>>> here in the follow up kip.
> >>>>>>>>
> >>>>>>>> Lastly, I think having an empty store with zero grace period
> >>>>>>>> would be
> >>>>>>> super
> >>>>>>>> simple and not costly, so we might as well make it even if nothing
> >>> gets
> >>>>>>>> entered.
> >>>>>>>>
> >>>>>>>> I hope that address all your concerns,
> >>>>>>>>
> >>>>>>>> Walker
> >>>>>>>>
> >>>>>>>> On Thu, May 25, 2023 at 9:50 AM Matthias J. Sax <mj...@apache.org
> >
> >>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Walker,
> >>>>>>>>>
> >>>>>>>>> thanks for the updates. The KIP itself reads fine (of course
> >>> Victoria
> >>>>>>>>> made good comments about some phrases), but there is a couple of
> >>>>> things
> >>>>>>>>> from your latest reply I don't understand, and that I still think
> >>> need
> >>>>>>>>> some more discussions.
> >>>>>>>>>
> >>>>>>>>> Lukas, asked about in-memory option and `WindowStoreSupplier` and
> >>> you
> >>>>>>>>> mention "semantic concerns". There should not be any semantic
> >>>>> difference
> >>>>>>>>> from the underlying buffer implementation, so I am not sure
> >>>>>>>>> what you
> >>>>>>>>> mean here (also the relationship to suppress() is unclear to me)?
> >>> -- I
> >>>>>>>>> am ok to not make it configurable for now. We can always do it
> >>>>>>>>> via a
> >>>>>>>>> follow up KIP, and keep interface changes limited for now.
> >>>>>>>>>
> >>>>>>>>> Does it really make sense to allow a grace period if the table is
> >>>>>>>>> non-versioned? You also say: "If table is not materialized it
> will
> >>>>>>>>> materialize it as versioned." -- What history retention time
> would
> >>> we
> >>>>>>>>> pick for this case (also asked by Victoria)? Or should we
> >>>>>>>>> rather not
> >>>>>>>>> support this and force the user to materialize the table
> >>>>>>>>> explicitly,
> >>>>> and
> >>>>>>>>> thus explicitly picking a history retention time? It's tradeoff
> >>>>> between
> >>>>>>>>> usability and guiding uses that there will be a significant
> impact
> >>> on
> >>>>>>>>> disk usage. There is also compatibility concerns: If the table is
> >>> not
> >>>>>>>>> explicitly materialized in the old program, we would already
> >>>>>>>>> need to
> >>>>>>>>> materialize it also in the old program (of course, we would use a
> >>>>>>>>> non-versioned store so far). Thus, if somebody adds a grace
> >>>>>>>>> period,
> >>> we
> >>>>>>>>> cannot just switch the store type, as it would be a breaking
> >>>>>>>>> change,
> >>>>>>>>> potentially required an application re-set, or following the
> >>>>>>>>> upgrade
> >>>>>>>>> path for versioned state stores, and also changing the program to
> >>>>>>>>> explicitly materialize using a versioned store. Also note, that
> we
> >>>>> might
> >>>>>>>>> not materialize the actual join table, but only an upstream
> table,
> >>> and
> >>>>>>>>> use `ValueGetter` to access the upstream data.
> >>>>>>>>>
> >>>>>>>>> To this end, as you already mentioned, history retention of the
> >>> table
> >>>>>>>>> should be at least grace period. You proposed to include this in
> a
> >>>>>>>>> follow up KIP, but I am wondering if it's a fundamental
> >>>>>>>>> requirement
> >>>>> and
> >>>>>>>>> thus we should put a check in place right away and reject an
> >>>>>>>>> invalid
> >>>>>>>>> configuration? (It always easier to lift restriction than to
> >>> introduce
> >>>>>>>>> them later.) This would also imply that a non-versioned table
> >>>>>>>>> cannot
> >>>>> be
> >>>>>>>>> supported, because it does not have a history retention that is
> >>> larger
> >>>>>>>>> than grace period, and maybe also answer the requirement about
> >>>>>>>>> materialization: as we already always materialize something on
> the
> >>>>>>>>> tablet side as non-versioned store right now, it seems
> >>>>>>>>> difficult to
> >>>>>>>>> migrate the store to a versioned store. Ie, it might be ok to
> push
> >>> the
> >>>>>>>>> burden onto the user and say: if you start using grace period,
> you
> >>>>> also
> >>>>>>>>> need to manually switch from non-versioned to versioned KTables.
> >>> Doing
> >>>>>>>>> stuff automatically under the hood if very complex for this
> >>>>>>>>> case, we
> >>>>> if
> >>>>>>>>> we push the burden onto the user, it might be ok to not
> complicate
> >>>>> this
> >>>>>>>>> KIP significantly.
> >>>>>>>>>
> >>>>>>>>> To summarize the last two paragraphs, I would propose to:
> >>>>>>>>>       - don't support non-versioned KTables
> >>>>>>>>>       - if grace period is added, users need to explicitly
> >>> materialize
> >>>>> the
> >>>>>>>>> table as version (either directly, or upstream. Upstream only
> >>>>>>>>> works
> >>> if
> >>>>>>>>> downstream tables "inherit" versioned semantics -- cf KIP-914)
> >>>>>>>>>       - the table's history retention time must be larger than
> the
> >>> grace
> >>>>>>>>> period (should be easy to check at runtime, when we build the
> >>>>> topology)
> >>>>>>>>>       - because switching from non-versioned to version stores
> >>>>>>>>> is not
> >>>>>>>>> backward compatibly (cf KIP-914), users need to take care of this
> >>>>>>>>> themselves, and this also implies that adding grace period is
> >>>>>>>>> not a
> >>>>>>>>> backward compatible change (even only if via indirect means)
> >>>>>>>>>
> >>>>>>>>> About dropping late records: wondering if we should never drop a
> >>>>>>>>> stream-side record for a left-join, even if it's late? In
> general,
> >>> one
> >>>>>>>>> thing I observed over the years is, that it's easier to keep
> stuff
> >>> and
> >>>>>>>>> let users filter explicitly downstream (or make it configurable),
> >>>>>>>>> instead of dropping pro-actively, because users have no good
> >>>>>>>>> way to
> >>>>>>>>> resurrect record that got already dropped.
> >>>>>>>>>
> >>>>>>>>> For ordering, sounds reasonable to me only start with one
> >>>>>>>>> implementation, and maybe make it configurable as a follow up.
> >>>>> However,
> >>>>>>>>> I am wondering if starting with offset order might be the better
> >>>>> option
> >>>>>>>>> as it seems to align more with what we do so far? So instead of
> >>>>> storing
> >>>>>>>>> record ordered by timestamp, we can just store them ordered by
> >>> offset,
> >>>>>>>>> and still "poll" from the buffer based on the head records
> >>> timestamp.
> >>>>> Or
> >>>>>>>>> would this complicate the implementation significantly?
> >>>>>>>>>
> >>>>>>>>> I also think it's ok to not "sync" stream-time between the
> >>>>>>>>> table and
> >>>>> the
> >>>>>>>>> stream in this KIP, but we should consider doing this as a
> >>>>>>>>> follow up
> >>>>>>>>> change (not sure if we would need a KIP or not for a change this
> >>>>> this).
> >>>>>>>>>
> >>>>>>>>> About increasing/decreasing grace period: what you describe make
> >>> sense
> >>>>>>>>> to me. If decreased, the next record would just trigger emitting
> a
> >>> lot
> >>>>>>>>> of records, and for increase, the buffer would just need to "fill
> >>> up"
> >>>>>>>>> again. For reprocessing getting a different result with a
> >>>>>>>>> different
> >>>>>>>>> grace period is expected, so that's ok IMHO. -- There seems to be
> >>> one
> >>>>>>>>> special corner case: grace period zero. For this case, we
> actually
> >>>>> don't
> >>>>>>>>> need any store, and the stream-side could be stateless. I think
> it
> >>> can
> >>>>>>>>> have the same behavior, but if we want to "add / remove" the
> store
> >>>>>>>>> dynamically, we need to add specific code for it. For example,
> >>>>>>>>> even
> >>> if
> >>>>>>>>> we start up with a grace period of zero, we would need to check
> if
> >>>>> there
> >>>>>>>>> is a local store, and still emit everything in it, before we can
> >>> ditch
> >>>>>>>>> the store (not sure if that's even easily done at all). Or: we
> >>>>>>>>> would
> >>>>>>>>> need to have a store for _all_ cases, even if grace period is
> zero
> >>>>> (the
> >>>>>>>>> store would be empty all the time though), to avoid super complex
> >>>>> code?
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> -Matthias
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On 5/25/23 10:53 AM, Lucas Brutschy wrote:
> >>>>>>>>>> Hi Walker,
> >>>>>>>>>>
> >>>>>>>>>> thanks for your responses. That makes sense. I guess there is
> >>> always
> >>>>>>>>>> the option to make the implementation more configurable later
> on,
> >>> if
> >>>>>>>>>> users request it. Also thanks for the clarifications. From my
> >>>>>>>>>> side,
> >>>>>>>>>> the KIP is good to go.
> >>>>>>>>>>
> >>>>>>>>>> Cheers,
> >>>>>>>>>> Lucas
> >>>>>>>>>>
> >>>>>>>>>> On Wed, May 24, 2023 at 11:54 PM Victoria Xia
> >>>>>>>>>> <victoria....@confluent.io.invalid> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks for the updates, Walker! Looks great, though I do have a
> >>>>> couple
> >>>>>>>>>>> questions about the latest updates:
> >>>>>>>>>>>
> >>>>>>>>>>>         1. The new example says that without stream-side
> >>>>>>>>>>> buffering,
> >>>>> "ex"
> >>>>>>> and
> >>>>>>>>>>>         "fy" are possible join results. How could those join
> >>> results
> >>>>>>>>> happen? The
> >>>>>>>>>>>         example versioned table suggests that table record
> >>>>>>>>>>> "x" has
> >>>>>>>>> timestamp 2, and
> >>>>>>>>>>>         table record "y" has timestamp 3. If stream record
> >>>>>>>>>>> "e" has
> >>>>>>>>> timestamp 1,
> >>>>>>>>>>>         then it can never be joined against record "x", and
> >>> similarly
> >>>>> for
> >>>>>>>>> stream
> >>>>>>>>>>>         record "f" with timestamp 2 being joined against "y".
> >>>>>>>>>>>         2. I see in your replies above that "If table is not
> >>>>>>> materialized it
> >>>>>>>>>>>         will materialize it as versioned" but I don't see this
> >>> called
> >>>>> out
> >>>>>>>>> in the
> >>>>>>>>>>>         KIP -- seems worth calling out. Also, what will the
> >>>>>>>>>>> history
> >>>>>>>>> retention for
> >>>>>>>>>>>         the versioned table be? Will it be the same as the join
> >>> grace
> >>>>>>>>> period, or
> >>>>>>>>>>>         will it be greater?
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> And some additional thoughts:
> >>>>>>>>>>>
> >>>>>>>>>>> Sounds like there are a few things users should watch out for
> >>>>>>>>>>> when
> >>>>>>>>> enabling
> >>>>>>>>>>> the stream-side buffer:
> >>>>>>>>>>>
> >>>>>>>>>>>         - Records will get "stuck" if there are no newer
> >>>>>>>>>>> records to
> >>>>>>> advance
> >>>>>>>>>>>         stream time.
> >>>>>>>>>>>         - If there are large gaps between the timestamps of
> >>>>> stream-side
> >>>>>>>>> records,
> >>>>>>>>>>>         then it's possible that versioned store history
> >>>>>>>>>>> retention
> >>> will
> >>>>>>> have
> >>>>>>>>> expired
> >>>>>>>>>>>         by the time a record is evicted from the join buffer,
> >>> leading
> >>>>> to
> >>>>>>> a
> >>>>>>>>> join
> >>>>>>>>>>>         "miss." For example, if the join grace period and table
> >>>>> history
> >>>>>>>>> retention
> >>>>>>>>>>>         are both 10, and records come in the order:
> >>>>>>>>>>>
> >>>>>>>>>>>         table side t0 with ts=0
> >>>>>>>>>>>         stream side s1 with ts=1 <-- enters buffer
> >>>>>>>>>>>         table side t10 with ts=10
> >>>>>>>>>>>         table side t20 with ts=20
> >>>>>>>>>>>         stream side s21 with ts=21 <-- evicts record s1 from
> >>> buffer,
> >>>>> but
> >>>>>>>>>>>         versioned store no longer contains data for ts=1 due to
> >>>>> history
> >>>>>>>>> retention
> >>>>>>>>>>>         having elapsed
> >>>>>>>>>>>
> >>>>>>>>>>>         This will result in the join result (s1, null) even
> >>>>>>>>>>> though
> >>> it
> >>>>>>>>> should've
> >>>>>>>>>>>         been (s1, t0), due to t0 having been expired from the
> >>>>> versioned
> >>>>>>>>> store
> >>>>>>>>>>>         already.
> >>>>>>>>>>>         - Out-of-order records from the stream-side will be
> >>> reordered,
> >>>>>>> and
> >>>>>>>>> late
> >>>>>>>>>>>         records will be dropped.
> >>>>>>>>>>>
> >>>>>>>>>>> I don't think any of these are reasons to not go forward with
> >>>>>>>>>>> this
> >>>>>>> KIP,
> >>>>>>>>> but
> >>>>>>>>>>> it'd be good to call them out in the eventual documentation to
> >>>>>>> decrease
> >>>>>>>>> the
> >>>>>>>>>>> chance users get tripped up.
> >>>>>>>>>>>
> >>>>>>>>>>>> We could maybe do an improvement later to advance stream time
> >>> from
> >>>>>>>>> table
> >>>>>>>>>>> side as well, but that might be debatable as we might get more
> >>> late
> >>>>>>>>> records.
> >>>>>>>>>>>
> >>>>>>>>>>> Yes, the likelihood of late records increases but also the
> >>>>> likelihood
> >>>>>>> of
> >>>>>>>>>>> "join misses" due to versioned store history retention having
> >>>>> elapsed
> >>>>>>>>>>> decreases, which feels important for certain use cases. Either
> >>> way,
> >>>>>>>>> agreed
> >>>>>>>>>>> that it can be a discussion for the future as incorporating
> this
> >>>>> would
> >>>>>>>>>>> substantially complicate the implementation.
> >>>>>>>>>>>
> >>>>>>>>>>> Also a couple nits:
> >>>>>>>>>>>
> >>>>>>>>>>>         - The KIP currently says "We recently added versioned
> >>> tables
> >>>>>>> which
> >>>>>>>>> allow
> >>>>>>>>>>>         the table side of the a join [...] but it is not taken
> >>>>> advantage
> >>>>>>> of
> >>>>>>>>> in
> >>>>>>>>>>>         joins," but this doesn't seem true? If the table of a
> >>>>>>> stream-table
> >>>>>>>>> join is
> >>>>>>>>>>>         versioned, then the DSL's stream-table join processor
> >>>>>>>>>>> will
> >>>>>>>>> automatically
> >>>>>>>>>>>         perform timestamped lookups into the table, in order to
> >>> take
> >>>>>>>>> advantage of
> >>>>>>>>>>>         the new timestamp-aware store to provide better join
> >>>>> semantics.
> >>>>>>>>>>>         - The KIP mentions "grace period" for versioned
> >>>>>>>>>>> stores in a
> >>>>>>> number
> >>>>>>>>> of
> >>>>>>>>>>>         places but I think you actually mean "history
> >>>>>>>>>>> retention"?
> >>> The
> >>>>> two
> >>>>>>>>> happen to
> >>>>>>>>>>>         be the same today (it is not an option for users to
> >>> configure
> >>>>> the
> >>>>>>>>> two
> >>>>>>>>>>>         separately) but this need not be true in the future.
> >>> "History
> >>>>>>>>> retention"
> >>>>>>>>>>>         governs how far back in time reads may occur, which
> >>>>>>>>>>> is the
> >>>>>>> relevant
> >>>>>>>>>>>         parameter for performing lookups as part of the
> >>> stream-table
> >>>>>>> join.
> >>>>>>>>> "Grace
> >>>>>>>>>>>         period" in the context of versioned stores refers to
> how
> >>> far
> >>>>> back
> >>>>>>>>> in time
> >>>>>>>>>>>         out-of-order writes may occur, which probably isn't
> >>> directly
> >>>>>>>>> relevant for
> >>>>>>>>>>>         introducing a stream-side buffer, though it's also
> >>>>>>>>>>> possible
> >>>>> I've
> >>>>>>>>> overlooked
> >>>>>>>>>>>         something. (As a bonus, switching from "table grace
> >>> period" in
> >>>>>>> the
> >>>>>>>>> KIP to
> >>>>>>>>>>>         "table history retention" also helps to
> >>>>>>>>>>> clarify/distinguish
> >>>>> that
> >>>>>>>>> it's a
> >>>>>>>>>>>         different parameter from the "join grace period,"
> >>>>>>>>>>> which I
> >>>>> could
> >>>>>>> see
> >>>>>>>>> being
> >>>>>>>>>>>         confusing to readers. :) )
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Cheers,
> >>>>>>>>>>> Victoria
> >>>>>>>>>>>
> >>>>>>>>>>> On Thu, May 18, 2023 at 1:43 PM Walker Carlson
> >>>>>>>>>>> <wcarl...@confluent.io.invalid> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> Hey all,
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thanks for the comments, they gave me a lot to think about.
> >>>>>>>>>>>> I'll
> >>>>> try
> >>>>>>> to
> >>>>>>>>>>>> address them all inorder. I have made some updates to the kip
> >>>>> related
> >>>>>>>>> to
> >>>>>>>>>>>> them, but I mention where below.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Lucas
> >>>>>>>>>>>>
> >>>>>>>>>>>> Good idea about the example. I added a simple one.
> >>>>>>>>>>>>
> >>>>>>>>>>>> 1) I have thought about including options for the underlying
> >>> buffer
> >>>>>>>>>>>> configuration. One of which might be adding an in memory
> >>>>>>>>>>>> option.
> >>> My
> >>>>>>>>> biggest
> >>>>>>>>>>>> concern is about the semantic guarantees. This isn't like
> >>> suppress
> >>>>> or
> >>>>>>>>> with
> >>>>>>>>>>>> windows where producing incomplete results is repetitively
> >>>>> harmless.
> >>>>>>>>> Here
> >>>>>>>>>>>> we would be possibly producing incorrect results. I also would
> >>> like
> >>>>>>> to
> >>>>>>>>> keep
> >>>>>>>>>>>> the interface changes as simple as I can. Making more than
> this
> >>>>>>> change
> >>>>>>>>> to
> >>>>>>>>>>>> Joined I feel could make this more complicated than it needs
> to
> >>> be.
> >>>>>>> If
> >>>>>>>>> we
> >>>>>>>>>>>> really want to I could see adding a grace() option with a
> >>>>>>> BufferConifg
> >>>>>>>>> in
> >>>>>>>>>>>> there or something, but I would rather not.
> >>>>>>>>>>>>
> >>>>>>>>>>>> 2) The buffer will be independent of if the table is
> >>>>>>>>>>>> versioned or
> >>>>>>> not.
> >>>>>>>>> If
> >>>>>>>>>>>> table is not materialized it will materialize it as
> >>>>>>>>>>>> versioned. It
> >>>>>>> might
> >>>>>>>>>>>> make sense to do a follow up kip where we force the retention
> >>>>> period
> >>>>>>>>> of
> >>>>>>>>>>>> the versioned to be greater than whatever the max of the
> stream
> >>>>>>> buffer
> >>>>>>>>> is.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Victoria
> >>>>>>>>>>>>
> >>>>>>>>>>>> 1) Yes, records will exit in timestamp order not in offset
> >>>>>>>>>>>> order.
> >>>>>>>>>>>> 2) Late records will be dropped (Late as out of the grace
> >>> period).
> >>>>>>>>>     From my
> >>>>>>>>>>>> understanding that is the point of a grace period, no? Doesn't
> >>> the
> >>>>>>> same
> >>>>>>>>>>>> thing happen with versioned stores?
> >>>>>>>>>>>> 3) The segment store already has an observed stream time, we
> >>>>> advance
> >>>>>>>>> based
> >>>>>>>>>>>> on that. That should only advance based on records that
> >>>>>>>>>>>> enter the
> >>>>>>>>> store. So
> >>>>>>>>>>>> yes, only stream side records. We could maybe do an
> improvement
> >>>>> later
> >>>>>>>>> to
> >>>>>>>>>>>> advance stream time from table side as well, but that might be
> >>>>>>>>> debatable as
> >>>>>>>>>>>> we might get more late records. Anyways I would rather have
> >>>>>>>>>>>> that
> >>>>> as a
> >>>>>>>>>>>> separate discussion.
> >>>>>>>>>>>>
> >>>>>>>>>>>> in memory option? We can do that, for the buffer I plan to use
> >>> the
> >>>>>>>>>>>> TimeOrderedKeyValueBuffer interface which already has an in
> >>> memory
> >>>>>>>>>>>> implantation, so it would be simple.
> >>>>>>>>>>>>
> >>>>>>>>>>>> I said more in my answer to Lucas's question. The concern I
> >>>>>>>>>>>> have
> >>>>> with
> >>>>>>>>>>>> buffer configs or in memory is complicating the interface.
> Also
> >>>>>>>>> semantic
> >>>>>>>>>>>> guarantees but in memory shouldn't effect that
> >>>>>>>>>>>>
> >>>>>>>>>>>> Matthias
> >>>>>>>>>>>>
> >>>>>>>>>>>> 1) fixed out of order vs late terminology in the kip.
> >>>>>>>>>>>>
> >>>>>>>>>>>> 2) I was referring to having a stream. So after this kip we
> can
> >>>>> have
> >>>>>>> a
> >>>>>>>>>>>> buffered stream or a normal one. For the table we can use a
> >>>>> versioned
> >>>>>>>>> table
> >>>>>>>>>>>> or a normal table.
> >>>>>>>>>>>>
> >>>>>>>>>>>> 3 Good call out. I clarified this as "If the table side uses a
> >>>>>>>>> materialized
> >>>>>>>>>>>> version store, it can store multiple versions of each record
> >>> within
> >>>>>>> its
> >>>>>>>>>>>> defined grace period." and modified the rest of the paragraph
> a
> >>>>> bit.
> >>>>>>>>>>>>
> >>>>>>>>>>>> 4) I get the preserving off offset ordering, but if the
> >>>>>>>>>>>> stream is
> >>>>>>>>> buffered
> >>>>>>>>>>>> to join on timestamp instead of offset doesn't it already seem
> >>> like
> >>>>>>> we
> >>>>>>>>> care
> >>>>>>>>>>>> more about time in this case?
> >>>>>>>>>>>>
> >>>>>>>>>>>> If we end up adding more options it might make sense to do
> >>>>>>>>>>>> this.
> >>>>>>> Maybe
> >>>>>>>>>>>> offset order processing can be a follow up?
> >>>>>>>>>>>>
> >>>>>>>>>>>> I'll add a section for this in Rejected Alternatives. I
> >>>>>>>>>>>> think it
> >>>>>>> makes
> >>>>>>>>>>>> sense to do something like this but maybe in a follow up.
> >>>>>>>>>>>>
> >>>>>>>>>>>> 5) I hadn't thought about this. I suppose if they changed
> >>>>>>>>>>>> this in
> >>>>> an
> >>>>>>>>>>>> upgrade the next record would either evict a lot of records
> (if
> >>> the
> >>>>>>>>> grace
> >>>>>>>>>>>> period decreased) or there would be a pause until the new
> grace
> >>>>>>> period
> >>>>>>>>>>>> reached. Increasing is a bit more problematic, especially if
> >>>>>>>>>>>> the
> >>>>>>> table
> >>>>>>>>>>>> grace period and retention time stays the same. If the data is
> >>>>>>>>> reprocessed
> >>>>>>>>>>>> after a change like that then there would be different
> results,
> >>>>> but I
> >>>>>>>>> feel
> >>>>>>>>>>>> like that would be expected after such a change.
> >>>>>>>>>>>>
> >>>>>>>>>>>> What do you think should happen?
> >>>>>>>>>>>>
> >>>>>>>>>>>> Hopefully this answers your questions!
> >>>>>>>>>>>>
> >>>>>>>>>>>> Walker
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Mon, May 8, 2023 at 11:32 AM Matthias J. Sax <
> >>> mj...@apache.org>
> >>>>>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks for the KIP! Also some question/comments from my side:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 10) Notation: you use the term "late data" but I think you
> >>>>>>>>>>>>> mean
> >>>>>>>>>>>>> out-of-order. We reserve the term "late" to records that
> >>>>>>>>>>>>> arrive
> >>>>>>> after
> >>>>>>>>>>>>> grace period passed, and thus, "late == out-of-order data
> that
> >>> is
> >>>>>>>>>>>> dropped".
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 20) "There is only one option from the stream side and only
> >>>>> recently
> >>>>>>>>> is
> >>>>>>>>>>>>> there a second option on the table side."
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> What are those options? Victoria already asked about the
> table
> >>>>> side,
> >>>>>>>>> but
> >>>>>>>>>>>>> I am also not sure what option you mean for the stream side?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 30) "If the table side uses a materialized version store the
> >>> value
> >>>>>>> is
> >>>>>>>>>>>>> the latest by stream time rather than by offset within its
> >>> defined
> >>>>>>>>> grace
> >>>>>>>>>>>>> period."
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> The phrase "the value is the latest by stream time" is
> >>>>>>>>>>>>> confusing
> >>>>> --
> >>>>>>> in
> >>>>>>>>>>>>> the end, a versioned stores multiple versions, not just one.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 40) I am also wondering about ordering. In general, KS
> >>>>>>>>>>>>> tries to
> >>>>>>>>> preserve
> >>>>>>>>>>>>> offset-order during processing (with some exception, when
> >>>>>>>>>>>>> offset
> >>>>>>> order
> >>>>>>>>>>>>> preservation is not clearly defined). Given that the
> >>>>>>>>>>>>> stream-side
> >>>>>>>>> buffer
> >>>>>>>>>>>>> is really just a "linear buffer", we could easily preserve
> >>>>>>>>> offset-order.
> >>>>>>>>>>>>> But I also see a benefit of re-ordering and emitting
> >>> out-of-order
> >>>>>>> data
> >>>>>>>>>>>>> right away when read (instead of blocking them behind
> in-order
> >>>>>>> records
> >>>>>>>>>>>>> that are not ready yet). -- It might even be a possibility,
> to
> >>> let
> >>>>>>>>> users
> >>>>>>>>>>>>> pick a emit strategy eg "EmitStrategy.preserveOffsets" (name
> >>> just
> >>>>> a
> >>>>>>>>>>>>> placeholder).
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> The KIP should explain this in more detail and also discuss
> >>>>>>> different
> >>>>>>>>>>>>> options and mention them in "Rejected alternatives" in case
> we
> >>>>> don't
> >>>>>>>>>>>>> want to include them.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 50) What happens when users change the grace period?
> >>>>>>>>>>>>> Especially,
> >>>>>>> when
> >>>>>>>>>>>>> they turn it on/off (but also increasing/decreasing is an
> >>>>>>> interesting
> >>>>>>>>>>>>> point)? I think we should try to support this if possible;
> the
> >>>>>>>>>>>>> "Compatibility" section needs to cover switching on/off in
> >>>>>>>>>>>>> more
> >>>>>>>>> detail.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On 5/2/23 2:06 PM, Victoria Xia wrote:
> >>>>>>>>>>>>>> Cool KIP, Walker! Thanks for sharing this proposal.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> A few clarifications:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 1. Is the order that records exit the buffer in
> >>>>>>>>>>>>>> necessarily the
> >>>>>>> same
> >>>>>>>>> as
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>>> order that records enter the buffer in, or no? Based on the
> >>>>>>>>> description
> >>>>>>>>>>>>> in
> >>>>>>>>>>>>>> the KIP, it sounds like the answer is no, i.e., records will
> >>> exit
> >>>>>>> the
> >>>>>>>>>>>>>> buffer in increasing timestamp order, which means that
> >>>>>>>>>>>>>> they may
> >>>>> be
> >>>>>>>>>>>>> ordered
> >>>>>>>>>>>>>> (even for the same key) compared to the input order.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 2. What happens if the join grace period is nonzero, and a
> >>>>>>>>> stream-side
> >>>>>>>>>>>>>> record arrives with a timestamp that is older than the
> >>>>>>>>>>>>>> current
> >>>>>>> stream
> >>>>>>>>>>>>> time
> >>>>>>>>>>>>>> minus the grace period? Will this record trigger a join
> >>>>>>>>>>>>>> result,
> >>>>> or
> >>>>>>>>> will
> >>>>>>>>>>>>> it
> >>>>>>>>>>>>>> be dropped? Based on the description for what happens when
> >>>>>>>>>>>>>> the
> >>>>> join
> >>>>>>>>>>>> grace
> >>>>>>>>>>>>>> period is set to zero, it sounds like the late record will
> be
> >>>>>>>>> dropped,
> >>>>>>>>>>>>> even
> >>>>>>>>>>>>>> if the join grace period is nonzero. Is that true?
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 3. What could cause stream time to advance, for purposes of
> >>>>>>> removing
> >>>>>>>>>>>>>> records from the join buffer? For example, will new records
> >>>>>>> arriving
> >>>>>>>>> on
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>>> table side of the join cause stream time to advance? From
> the
> >>> KIP
> >>>>>>> it
> >>>>>>>>>>>>> sounds
> >>>>>>>>>>>>>> like only stream-side records will advance stream time --
> >>>>>>>>>>>>>> does
> >>>>> that
> >>>>>>>>>>>> mean
> >>>>>>>>>>>>>> that the join processor itself will have to track this
> stream
> >>>>> time?
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Also +1 to Lucas's question about what options will be
> >>> available
> >>>>>>> for
> >>>>>>>>>>>>>> configuring the join buffer. Will users have the option to
> >>> choose
> >>>>>>>>>>>> whether
> >>>>>>>>>>>>>> they want the buffer to be in-memory vs persistent?
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> - Victoria
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On Fri, Apr 28, 2023 at 11:54 AM Lucas Brutschy
> >>>>>>>>>>>>>> <lbruts...@confluent.io.invalid> wrote:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> HI Walker,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> thanks for the KIP! We definitely need this. I have two
> >>>>> questions:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>        - Have you considered allowing the customization
> >>>>>>>>>>>>>>> of the
> >>>>>>>>> underlying
> >>>>>>>>>>>>>>> buffer implementation? As I can see, `StreamJoined` lets
> you
> >>>>>>>>> customize
> >>>>>>>>>>>>>>> the underlying store via a `WindowStoreSupplier`. Would it
> >>> make
> >>>>>>>>> sense
> >>>>>>>>>>>>>>> for `Joined` to have this as well? I can imagine one may
> >>>>>>>>>>>>>>> want
> >>> to
> >>>>>>>>> limit
> >>>>>>>>>>>>>>> the number of records in the buffer, for example. If we hit
> >>> the
> >>>>>>>>>>>>>>> maximum, the only option would be to drop semantic
> >>>>>>>>>>>>>>> guarantees,
> >>>>> but
> >>>>>>>>>>>>>>> users may still want to do this.
> >>>>>>>>>>>>>>>        - With "second option on the table side" you are
> >>> referring
> >>>>> to
> >>>>>>>>>>>>>>> versioned tables, right? Will the buffer on the stream side
> >>>>> behave
> >>>>>>>>> any
> >>>>>>>>>>>>>>> different whether the table side is versioned or not?
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Finally, I think a simple example in the motivation section
> >>>>> could
> >>>>>>>>> help
> >>>>>>>>>>>>>>> non-experts understand the KIP.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>> Lucas
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On Tue, Apr 25, 2023 at 9:13 PM Walker Carlson
> >>>>>>>>>>>>>>> <wcarl...@confluent.io.invalid> wrote:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Hello everybody,
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> I have a stream proposal to improve the stream table
> >>>>>>>>>>>>>>>> join by
> >>>>>>>>> adding a
> >>>>>>>>>>>>>>> grace
> >>>>>>>>>>>>>>>> period and buffer to the stream side of the join to allow
> >>>>>>>>> processing
> >>>>>>>>>>>> in
> >>>>>>>>>>>>>>>> timestamp order matching the recent improvements of the
> >>>>> versioned
> >>>>>>>>>>>>> tables.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Please take a look here <
> >>>>>>>>>>>> https://cwiki.apache.org/confluence/x/lAs0Dw>
> >>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>> share your thoughts.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> best,
> >>>>>>>>>>>>>>>> Walker
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
>

Reply via email to