Hi all,

Catching up on the recent revision — thanks Nick for picking this back
up, and Bill for the thorough pass. A few points below - sorry if I
repeat things, this has been discussed for a long time and my memory
is fading.

LB1: The doc string for statestore.uncommitted.max.bytes says "each
instance". I think this means `Kafka Streams instance`, but it could
be state store instance, stream thread instance etc. Worth making a
bit more precise. Existing statestore.cache.max.bytes is documented as
the budget "across all threads".

LB2: With the default StateStore#readOnly(IsolationLevel) returning
`this`, a READ_COMMITTED IQ against a custom non-transactional store
silently returns uncommitted data. That's a quiet correctness gap for
users mixing custom stores with the new IQ option. Two alternatives
worth considering: have the default throw
UnsupportedOperationException for READ_COMMITTED on stores that don't
override it, or expose a StateStore#supportedIsolationLevels() so the
IQ layer can fail fast (or fall back loudly). At minimum I think the
Javadoc on the default needs a strong note.

LB3: Do I understand ALOS semantics correctly? Presumably "visible
after the next commit completes", which is still a useful guarantee,
but worth making explicit in the KIP.

LB4: Could the KIP make explicit whether changelog restore writes go
through the transaction buffer?

LB5: On the note that "we should encourage users who enable
transactions to also set their default isolation level to
READ_COMMITTED" — rather than relying on documentation alone, could we
make the default of default.interactive.query.isolation.level
conditional on enable.transactional.statestores, defaulting to
READ_COMMITTED when transactions are enabled and READ_UNCOMMITTED
otherwise? We are doing "adaptive defaults" for commit.interval.ms as
well

Cheers,
Lucas

On Tue, May 5, 2026 at 4:24 PM Nick Telford <[email protected]> wrote:
>
> Hi Bill,
>
> I've updated the "State Store Caching" section to explain the main caveat
> (regarding memory accounting) and noted that we'll revisit this in the
> future.
>
> I believe all your other points have been addressed. Is there anything else
> you'd like me to clarify?
>
> Regards,
> Nick
>
> On Thu, 30 Apr 2026 at 15:09, Bill Bejeck <[email protected]> wrote:
>
> > >
> > >  I'm not sure there's a way to preserve the
> > > caching of intermediate writes for aggregations without effectively
> > > "double-buffering" writes.
> >
> >
> > Yes, I beggining to have the same opinion, we'll see as the development
> > progresses.
> >
> > -Bill
> >
> > On Thu, Apr 30, 2026 at 8:38 AM Nick Telford <[email protected]>
> > wrote:
> >
> > > Regarding the cache: if we disable write-caching when transactional
> > stores
> > > are enabled, that will disable the caching of intermediate values for
> > > aggregations, causing aggregation processors to run on every write.
> > > Depending on the application, this could be negligible (which I believe
> > > will be the case for basically any builtin aggregation), or it could be
> > > significant (if the aggregation processor does anything particularly
> > > heavyweight, like I/O).
> > >
> > > Is this an acceptable trade-off? I'm not sure there's a way to preserve
> > the
> > > caching of intermediate writes for aggregations without effectively
> > > "double-buffering" writes.
> > >
> > > Could we also add the cache's "flush to downstream processors only on
> > > commit" behaviour to our transaction buffers? My concern with that is
> > that
> > > our transaction buffering requires a WriteLock during commit, to prevent
> > IQ
> > > threads seeing an inconsistent view of the store. Therefore, store commit
> > > should not block while awaiting extensive downstream processing.
> > >
> > > Regards,
> > > Nick
> > >
> > > On Thu, 30 Apr 2026 at 10:34, Nick Telford <[email protected]>
> > wrote:
> > >
> > > > Thanks Bill!
> > > >
> > > > I've updated the KIP as requested:
> > > >
> > >
> > https://urldefense.com/v3/__https://cwiki.apache.org/confluence/display/KAFKA/KIP-892*3A*Transactional*Semantics*for*StateStores__;JSsrKys!!Ayb5sqE7!oqZIe0km6lX3ZrJXM7VdP4PhGrvof6HybQe--v1FjPzrJ-vquzvPVhiybq-P5Phwrw0kLJE9i5tGHOOw_cL-zPoQI-0$
> > > >
> > > > 1. Updated the section on thread safety with specifics about
> > RocksDBStore
> > > > and InMemory stores.
> > > > 2. Added a section on State Store Caching
> > > >
> > > > Let me know what you think, especially about the caching section, and
> > if
> > > > there's any other changes you'd like to see.
> > > >
> > > > Cheers,
> > > > Nick
> > > >
> > > > On Wed, 29 Apr 2026 at 21:57, Bill Bejeck <[email protected]> wrote:
> > > >
> > > >> Thanks for the responses Nick.
> > > >>
> > > >> >BB1 fair enough.
> > > >>
> > > >> >BB2 - can we update the transactional thread safety part of the KIP
> > > with
> > > >> the response you provided here?
> > > >>
> > > >> >BB3b - I think that approach will work.  My main concern is to
> > provide
> > > a
> > > >> clear picture of the memory used and what's the interaction with both
> > > >> caches and flushing.
> > > >>
> > > >> On Tue, Apr 21, 2026 at 10:52 AM Nick Telford <[email protected]
> > >
> > > >> wrote:
> > > >>
> > > >> > BB3b: Thinking a bit more about the cache vs. transaction buffer -
> > > what
> > > >> if
> > > >> > we make a subtle change to the cache: when
> > > >> enable.transactional.statestores
> > > >> > = true, we no longer cache writes (put/delete), only reads. Since
> > the
> > > >> > transaction buffer will already be storing recently written records
> > > (at
> > > >> > least until commit), we don't need to keep them in another cache. We
> > > >> would
> > > >> > still cache records read *from* the transaction buffer (because we
> > > would
> > > >> > take the read of a recently written record as a signal that it's
> > worth
> > > >> > caching). The only caveat to this would be that the transaction
> > buffer
> > > >> > flushes recently written records on-commit, so cache hit-rate of
> > > >> recently
> > > >> > written records will dip during commits.
> > > >> >
> > > >> > What do you think?
> > > >> >
> > > >> > On Tue, 21 Apr 2026 at 10:39, Nick Telford <[email protected]>
> > > >> wrote:
> > > >> >
> > > >> > > Hi Bill,
> > > >> > >
> > > >> > > Thanks for reviewing the updated KIP!
> > > >> > >
> > > >> > > BB1: I'm not quite sure I understand what you're saying here. We
> > > need
> > > >> to
> > > >> > > take a copy/snapshot of the read buffer because the read buffer is
> > > >> > emptied
> > > >> > > on-commit, and any Iterator created on a ConcurrentSkipListMap
> > > >> > > automatically reflects those changes, so would terminate iteration
> > > >> > > prematurely. Also, any new writes to the buffer (put/delete) would
> > > be
> > > >> > > immediately reflected in the iterator, making its results
> > > >> inconsistent.
> > > >> > > TL;DR: copying it gives us full snapshot isolation for IQ
> > iterators.
> > > >> > > FWIW, the implementation already takes an Iterator over the
> > > underlying
> > > >> > > store, as well as the buffer, and merges the two iterators
> > together.
> > > >> But
> > > >> > > the read buffer's Iterator must be over a copy of the read buffer,
> > > to
> > > >> > > provide a consistent snapshot of the store.
> > > >> > >
> > > >> > > BB2: Basically, the RocksDBTransactionBuffer will maintain an
> > extra
> > > >> > "write
> > > >> > > buffer", which uses a RocksDB WriteBatch to stage writes during
> > > >> > > put/delete. This yields better throughput than iterating the
> > > >> > > ConcurrentSkipListMap and writing to a WriteBatch during commit,
> > and
> > > >> > > reduces the time that IQ threads (that need a readLock on the
> > > >> > > ConcurrentSkipListMap) are blocked from creating iterators during
> > > >> commit.
> > > >> > >
> > > >> > > InMemoryTransactionBuffers do not need this extra "write buffer",
> > as
> > > >> > there
> > > >> > > is no more efficient implementation for them to use. This means
> > > that,
> > > >> vs.
> > > >> > > RocksDB, InMemoryStores will actually use less memory buffering
> > > >> records.
> > > >> > >
> > > >> > > BB3: This is a good question, and one that I've not really dug too
> > > >> deeply
> > > >> > > into. I suspect that there's a level of duplication of concerns
> > > here,
> > > >> but
> > > >> > > there are subtle differences: the cache is an LRU cache, which can
> > > >> cache
> > > >> > > records across multiple commits (if they're frequently read, for
> > > >> > example),
> > > >> > > whereas the transaction buffer always exclusively contains
> > > uncommitted
> > > >> > > writes.
> > > >> > >
> > > >> > > In terms of correctness/consistency, I don't think we have a
> > > problem.
> > > >> For
> > > >> > > READ_COMMITTED IQ, we skip both the cache and transaction buffers.
> > > For
> > > >> > > READ_UNCOMMITTED IQ (and StreamThread reads) we read in the order
> > > >> > dictated
> > > >> > > by the StateStore layering, which is: cache first, then
> > transaction
> > > >> > buffer,
> > > >> > > then underlying store. Since writes always update the cache, this
> > is
> > > >> > > correct. The one wrinkle is that cached writes are only written to
> > > >> > > downstream store layers (and therefore buffered by the transaction
> > > >> > buffer)
> > > >> > > when the entry is evicted, or the cache is flushed or committed.
> > > This
> > > >> > > essentially double-buffers writes, which is a bit wasteful of
> > > memory.
> > > >> > >
> > > >> > > I don't really know much about the cache, but as far as I can tell
> > > it
> > > >> > > might eliminate some (de)serialization overhead? Regardless, given
> > > >> that
> > > >> > it
> > > >> > > implements a different behaviour (LRU vs. buffering uncommitted
> > > >> writes),
> > > >> > > eliminating it might cause some subtle performance regressions for
> > > >> *some*
> > > >> > > users.
> > > >> > >
> > > >> > > Maybe instead we have the *default* cache size change to 0 when
> > > >> > > enable.transactional.statestores = true and
> > > >> > > statestore.uncommitted.max.bytes > 0? That would reduce memory
> > usage
> > > >> for
> > > >> > > users that aren't explicitly relying on the cache.
> > > >> > >
> > > >> > > BB4: Ahh yes, good catch! This is showing how old this KIP is :-)
> > > >> > >
> > > >> > > Regards,
> > > >> > > Nick
> > > >> > >
> > > >> > > On Mon, 20 Apr 2026 at 21:11, Bill Bejeck <[email protected]>
> > > wrote:
> > > >> > >
> > > >> > >> Hi Nick,
> > > >> > >>
> > > >> > >> Thanks for the update to the KIP!  It's great to see this moving
> > > >> again.
> > > >> > >> Overall the KIP updates LGTM, but I do have a couple of comments
> > > >> > >>
> > > >> > >> BB1- The KIP specificies using a ConcurrentSkipListMap for the
> > > >> > >> transactional buffer and for IQ READ_UNCOMMITTED queries KS will
> > > >> take a
> > > >> > >> snapshot and during the copy lock the StreamThread with
> > > >> > >> RenetrantReadWriteLock.
> > > >> > >> Now if the IQ queries will only be served from the in-memory
> > > >> > transactional
> > > >> > >> buffer, that's an acceptable trade-off since even though the
> > > >> > >> ConcurrentSkipListMap uses weakly consistent iterators, a commit
> > > >> > >> mid-iteration would mean those records not viewed as a result of
> > > >> > clearing
> > > >> > >> the buffer.  Would it be worth exploring using IQ against the
> > store
> > > >> and
> > > >> > >> the
> > > >> > >> buffer thus removing the copy+locking?  I think this could be
> > > >> > >> worthwhile (unless just technically unfeasible) as the way it
> > > stands
> > > >> > users
> > > >> > >> will have to possibly make tradeoffs in performance by adjusting
> > > >> either
> > > >> > >> the
> > > >> > >> commit interval or the size of uncommitted bytes. Of course as
> > > >> specified
> > > >> > >> in
> > > >> > >> the KIP they have the option of going with READ_COMMITTED.
> > > >> > >>
> > > >> > >> BB2 - In the KIP section "Transaction Buffer Thread Safety"
> > there's
> > > >> this
> > > >> > >> sentence which I find confusing:
> > > >> > >>
> > > >> > >> > Reads from the StreamThread and READ_UNCOMMITTED IQ threads are
> > > >> > serviced
> > > >> > >> > by the read buffer. The write buffer is (optionally) used to
> > > buffer
> > > >> > >> > writes for commit to the database, if doing so would yield
> > better
> > > >> > >> > performance than using the read buffer.
> > > >> > >>
> > > >> > >> Can you clarify what this means?
> > > >> > >>
> > > >> > >> BB3 - How does the `statestore.uncommitted.max.bytes` work in
> > > >> > >> conjuction with the `statestore.cache.max.bytes.buffering`
> > config?
> > > >> If
> > > >> > >> users elect to go with transactional stores is the statestore
> > cache
> > > >> > still
> > > >> > >> needed?  Since the statestore cache flushes on commit() as well
> > as
> > > >> the
> > > >> > >> transactional cache what's the workflow like between the two?
> > The
> > > >> > default
> > > >> > >> size of the store cache is 10M so if it becomes full and a commit
> > > is
> > > >> not
> > > >> > >> called  would it empty into the transactional cache?
> > > >> > >>
> > > >> > >> BB4- In discussing the `enable.transactional.statestores` config
> > it
> > > >> > >> mentions "exactly-once, exactly-once-v2 or exactly-once-beta" but
> > > we
> > > >> can
> > > >> > >> go
> > > >> > >> with just `exactly-once-v2` , we removed the others since 4.0
> > > >> > >>
> > > >> > >> Thanks!
> > > >> > >> Bill
> > > >> > >>
> > > >> > >>
> > > >> > >> On Fri, Apr 17, 2026 at 1:13 PM Nick Telford <
> > > [email protected]
> > > >> >
> > > >> > >> wrote:
> > > >> > >>
> > > >> > >> > Hi everyone,
> > > >> > >> >
> > > >> > >> > We're circling back to KIP-892, with the goal of including it
> > in
> > > >> 4.4.
> > > >> > To
> > > >> > >> > that end, I've updated the KIP with substantial design
> > > >> improvements:
> > > >> > >> >
> > > >> > >> >
> > > >> > >>
> > > >> >
> > > >>
> > >
> > https://urldefense.com/v3/__https://cwiki.apache.org/confluence/display/KAFKA/KIP-892:*Transactional*Semantics*for*StateStores__;KysrKw!!Ayb5sqE7!oqZIe0km6lX3ZrJXM7VdP4PhGrvof6HybQe--v1FjPzrJ-vquzvPVhiybq-P5Phwrw0kLJE9i5tGHOOw_cL-Ekm9I5E$
> > > >> > >> >
> > > >> > >> > Highlights:
> > > >> > >> >
> > > >> > >> >    - TransactionBuffers that support both RocksDB and InMemory
> > > >> stores
> > > >> > >> >    - Support for custom IsolationLevel of Interactive Queries,
> > > >> > >> including a
> > > >> > >> >    default (IQv1 and IQv2) and a per-request isolation level
> > > (IQv2
> > > >> > only)
> > > >> > >> >    - New store-level metric for uncommitted-bytes
> > > >> > >> >
> > > >> > >> > Let me know what you think!
> > > >> > >> >
> > > >> > >> > Regards,
> > > >> > >> > Nick
> > > >> > >> >
> > > >> > >> > On Wed, 17 Apr 2024 at 11:50, Nick Telford <
> > > [email protected]
> > > >> >
> > > >> > >> wrote:
> > > >> > >> >
> > > >> > >> > > Hi Walker,
> > > >> > >> > >
> > > >> > >> > > Feel free to ask away, either on the mailing list of the
> > > >> Confluent
> > > >> > >> > > Community Slack, where I hang out :-)
> > > >> > >> > >
> > > >> > >> > > The implementation is *mostly* complete, although it needs
> > some
> > > >> > >> > polishing.
> > > >> > >> > > It's worth noting that KIP-1035 is a hard prerequisite for
> > > this.
> > > >> > >> > >
> > > >> > >> > > Regards,
> > > >> > >> > > Nick
> > > >> > >> > >
> > > >> > >> >
> > > >> > >>
> > > >> > >
> > > >> >
> > > >>
> > > >
> > >
> >

Reply via email to