Hi everyone,

I've updated the KIP with a more detailed design, which reflects the
implementation I've been working on:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores

This new design should address the outstanding points already made in the
thread.

Please let me know if there are areas that are unclear or need more
clarification.

I have a (nearly) working implementation. I'm confident that the remaining
work (making Segments behave) will not impact the documented design.

Regards,

Nick

On Tue, 6 Dec 2022 at 19:24, Colt McNealy <c...@littlehorse.io> wrote:

> Nick,
>
> Thank you for the reply; that makes sense. I was hoping that, since reading
> uncommitted records from IQ in EOS isn't part of the documented API, maybe
> you *wouldn't* have to wait for the next major release to make that change;
> but given that it would be considered a major change, I like your approach
> the best.
>
> Wishing you a speedy recovery and happy coding!
>
> Thanks,
> Colt McNealy
> *Founder, LittleHorse.io*
>
>
> On Tue, Dec 6, 2022 at 10:30 AM Nick Telford <nick.telf...@gmail.com>
> wrote:
>
> > Hi Colt,
> >
> > 10: Yes, I agree it's not ideal. I originally intended to try to keep the
> > behaviour unchanged as much as possible, otherwise we'd have to wait for
> a
> > major version release to land these changes.
> > 20: Good point, ALOS doesn't need the same level of guarantee, and the
> > typically longer commit intervals would be problematic when reading only
> > "committed" records.
> >
> > I've been away for 5 days recovering from minor surgery, but I spent a
> > considerable amount of that time working through ideas for possible
> > solutions in my head. I think your suggestion of keeping ALOS as-is, but
> > buffering writes for EOS is the right path forwards, although I have a
> > solution that both expands on this, and provides for some more formal
> > guarantees.
> >
> > Essentially, adding support to KeyValueStores for "Transactions", with
> > clearly defined IsolationLevels. Using "Read Committed" when under EOS,
> and
> > "Read Uncommitted" under ALOS.
> >
> > The nice thing about this approach is that it gives us much more clearly
> > defined isolation behaviour that can be properly documented to ensure
> users
> > know what to expect.
> >
> > I'm still working out the kinks in the design, and will update the KIP
> when
> > I have something. The main struggle is trying to implement this without
> > making any major changes to the existing interfaces or breaking existing
> > implementations, because currently everything expects to operate directly
> > on a StateStore, and not a Transaction of that store. I think I'm getting
> > close, although sadly I won't be able to progress much until next week
> due
> > to some work commitments.
> >
> > Regards,
> > Nick
> >
> > On Thu, 1 Dec 2022 at 00:01, Colt McNealy <c...@littlehorse.io> wrote:
> >
> > > Nick,
> > >
> > > Thank you for the explanation, and also for the updated KIP. I am quite
> > > eager for this improvement to be released as it would greatly reduce
> the
> > > operational difficulties of EOS streams apps.
> > >
> > > Two questions:
> > >
> > > 10)
> > > >When reading records, we will use the
> > > WriteBatchWithIndex#getFromBatchAndDB
> > >  and WriteBatchWithIndex#newIteratorWithBase utilities in order to
> ensure
> > > that uncommitted writes are available to query.
> > > Why do extra work to enable the reading of uncommitted writes during
> IQ?
> > > Code complexity aside, reading uncommitted writes is, in my opinion, a
> > > minor flaw in EOS IQ; it would be very nice to have the guarantee that,
> > > with EOS, IQ only reads committed records. In order to avoid dirty
> reads,
> > > one currently must query a standby replica (but this still doesn't
> fully
> > > guarantee monotonic reads).
> > >
> > > 20) Is it also necessary to enable this optimization on ALOS stores?
> The
> > > motivation of KIP-844 was mainly to reduce the need to restore state
> from
> > > scratch on unclean EOS shutdowns; with ALOS it was acceptable to accept
> > > that there may have been uncommitted writes on disk. On a side note, if
> > you
> > > enable this type of store on ALOS processors, the community would
> > > definitely want to enable queries on dirty reads; otherwise users would
> > > have to wait 30 seconds (default) to see an update.
> > >
> > > Thank you for doing this fantastic work!
> > > Colt McNealy
> > > *Founder, LittleHorse.io*
> > >
> > >
> > > On Wed, Nov 30, 2022 at 10:44 AM Nick Telford <nick.telf...@gmail.com>
> > > wrote:
> > >
> > > > Hi everyone,
> > > >
> > > > I've drastically reduced the scope of this KIP to no longer include
> the
> > > > StateStore management of checkpointing. This can be added as a KIP
> > later
> > > on
> > > > to further optimize the consistency and performance of state stores.
> > > >
> > > > I've also added a section discussing some of the concerns around
> > > > concurrency, especially in the presence of Iterators. I'm thinking of
> > > > wrapping WriteBatchWithIndex with a reference-counting copy-on-write
> > > > implementation (that only makes a copy if there's an active
> iterator),
> > > but
> > > > I'm open to suggestions.
> > > >
> > > > Regards,
> > > > Nick
> > > >
> > > > On Mon, 28 Nov 2022 at 16:36, Nick Telford <nick.telf...@gmail.com>
> > > wrote:
> > > >
> > > > > Hi Colt,
> > > > >
> > > > > I didn't do any profiling, but the 844 implementation:
> > > > >
> > > > >    - Writes uncommitted records to a temporary RocksDB instance
> > > > >       - Since tombstones need to be flagged, all record values are
> > > > >       prefixed with a value/tombstone marker. This necessitates a
> > > memory
> > > > copy.
> > > > >    - On-commit, iterates all records in this temporary instance and
> > > > >    writes them to the main RocksDB store.
> > > > >    - While iterating, the value/tombstone marker needs to be parsed
> > and
> > > > >    the real value extracted. This necessitates another memory copy.
> > > > >
> > > > > My guess is that the cost of iterating the temporary RocksDB store
> is
> > > the
> > > > > major factor, with the 2 extra memory copies per-Record
> contributing
> > a
> > > > > significant amount too.
> > > > >
> > > > > Regards,
> > > > > Nick
> > > > >
> > > > > On Mon, 28 Nov 2022 at 16:12, Colt McNealy <c...@littlehorse.io>
> > > wrote:
> > > > >
> > > > >> Hi all,
> > > > >>
> > > > >> Out of curiosity, why does the performance of the store degrade so
> > > > >> significantly with the 844 implementation? I wouldn't be too
> > surprised
> > > > by
> > > > >> a
> > > > >> 50-60% drop (caused by each record being written twice), but 96%
> is
> > > > >> extreme.
> > > > >>
> > > > >> The only thing I can think of which could create such a bottleneck
> > > would
> > > > >> be
> > > > >> that perhaps the 844 implementation deserializes and then
> > > re-serializes
> > > > >> the
> > > > >> store values when copying from the uncommitted to committed store,
> > > but I
> > > > >> wasn't able to figure that out when I scanned the PR.
> > > > >>
> > > > >> Colt McNealy
> > > > >> *Founder, LittleHorse.io*
> > > > >>
> > > > >>
> > > > >> On Mon, Nov 28, 2022 at 7:56 AM Nick Telford <
> > nick.telf...@gmail.com>
> > > > >> wrote:
> > > > >>
> > > > >> > Hi everyone,
> > > > >> >
> > > > >> > I've updated the KIP to resolve all the points that have been
> > raised
> > > > so
> > > > >> > far, with one exception: the ALOS default commit interval of 5
> > > minutes
> > > > >> is
> > > > >> > likely to cause WriteBatchWithIndex memory to grow too large.
> > > > >> >
> > > > >> > There's a couple of different things I can think of to solve
> this:
> > > > >> >
> > > > >> >    - We already have a memory/record limit in the KIP to prevent
> > OOM
> > > > >> >    errors. Should we choose a default value for these? My
> concern
> > > here
> > > > >> is
> > > > >> > that
> > > > >> >    anything we choose might seem rather arbitrary. We could
> change
> > > > >> >    its behaviour such that under ALOS, it only triggers the
> commit
> > > of
> > > > >> the
> > > > >> >    StateStore, but under EOS, it triggers a commit of the Kafka
> > > > >> > transaction.
> > > > >> >    - We could introduce a separate `checkpoint.interval.ms` to
> > > allow
> > > > >> ALOS
> > > > >> >    to commit the StateStores more frequently than the general
> > > > >> >    commit.interval.ms? My concern here is that the semantics of
> > > this
> > > > >> > config
> > > > >> >    would depend on the processing.mode; under ALOS it would
> allow
> > > more
> > > > >> >    frequently committing stores, whereas under EOS it couldn't.
> > > > >> >
> > > > >> > Any better ideas?
> > > > >> >
> > > > >> > On Wed, 23 Nov 2022 at 16:25, Nick Telford <
> > nick.telf...@gmail.com>
> > > > >> wrote:
> > > > >> >
> > > > >> > > Hi Alex,
> > > > >> > >
> > > > >> > > Thanks for the feedback.
> > > > >> > >
> > > > >> > > I've updated the discussion of OOM issues by describing how
> > we'll
> > > > >> handle
> > > > >> > > it. Here's the new text:
> > > > >> > >
> > > > >> > > To mitigate this, we will automatically force a Task commit if
> > the
> > > > >> total
> > > > >> > >> uncommitted records returned by
> > > > >> > >> StateStore#approximateNumUncommittedEntries()  exceeds a
> > > threshold,
> > > > >> > >> configured by max.uncommitted.state.entries.per.task; or the
> > > total
> > > > >> > >> memory used for buffering uncommitted records returned by
> > > > >> > >> StateStore#approximateNumUncommittedBytes() exceeds the
> > threshold
> > > > >> > >> configured by max.uncommitted.state.bytes.per.task. This will
> > > > roughly
> > > > >> > >> bound the memory required per-Task for buffering uncommitted
> > > > records,
> > > > >> > >> irrespective of the commit.interval.ms, and will effectively
> > > bound
> > > > >> the
> > > > >> > >> number of records that will need to be restored in the event
> > of a
> > > > >> > failure.
> > > > >> > >>
> > > > >> > >
> > > > >> > >
> > > > >> > > These limits will be checked in StreamTask#process and a
> > premature
> > > > >> commit
> > > > >> > >> will be requested via Task#requestCommit().
> > > > >> > >>
> > > > >> > >
> > > > >> > >
> > > > >> > > Note that these new methods provide default implementations
> that
> > > > >> ensure
> > > > >> > >> existing custom stores and non-transactional stores (e.g.
> > > > >> > >> InMemoryKeyValueStore) do not force any early commits.
> > > > >> > >
> > > > >> > >
> > > > >> > > I've chosen to have the StateStore expose approximations of
> its
> > > > buffer
> > > > >> > > size/count instead of opaquely requesting a commit in order to
> > > > >> delegate
> > > > >> > the
> > > > >> > > decision making to the Task itself. This enables Tasks to look
> > at
> > > > >> *all*
> > > > >> > of
> > > > >> > > their StateStores, and determine whether an early commit is
> > > > necessary.
> > > > >> > > Notably, it enables pre-Task thresholds, instead of per-Store,
> > > which
> > > > >> > > prevents Tasks with many StateStores from using much more
> memory
> > > > than
> > > > >> > Tasks
> > > > >> > > with one StateStore. This makes sense, since commits are done
> > > > by-Task,
> > > > >> > not
> > > > >> > > by-Store.
> > > > >> > >
> > > > >> > > Prizes* for anyone who can come up with a better name for the
> > new
> > > > >> config
> > > > >> > > properties!
> > > > >> > >
> > > > >> > > Thanks for pointing out the potential performance issues of
> > WBWI.
> > > > From
> > > > >> > the
> > > > >> > > benchmarks that user posted[1], it looks like WBWI still
> > performs
> > > > >> > > considerably better than individual puts, which is the
> existing
> > > > >> design,
> > > > >> > so
> > > > >> > > I'd actually expect a performance boost from WBWI, just not as
> > > great
> > > > >> as
> > > > >> > > we'd get from a plain WriteBatch. This does suggest that a
> good
> > > > >> > > optimization would be to use a regular WriteBatch for
> > restoration
> > > > (in
> > > > >> > > RocksDBStore#restoreBatch), since we know that those records
> > will
> > > > >> never
> > > > >> > be
> > > > >> > > queried before they're committed.
> > > > >> > >
> > > > >> > > 1:
> > > > >> >
> > > >
> > https://github.com/adamretter/rocksjava-write-methods-benchmark#results
> > > > >> > >
> > > > >> > > * Just kidding, no prizes, sadly.
> > > > >> > >
> > > > >> > > On Wed, 23 Nov 2022 at 12:28, Alexander Sorokoumov
> > > > >> > > <asorokou...@confluent.io.invalid> wrote:
> > > > >> > >
> > > > >> > >> Hey Nick,
> > > > >> > >>
> > > > >> > >> Thank you for the KIP! With such a significant performance
> > > > >> degradation
> > > > >> > in
> > > > >> > >> the secondary store approach, we should definitely consider
> > > > >> > >> WriteBatchWithIndex. I also like encapsulating checkpointing
> > > inside
> > > > >> the
> > > > >> > >> default state store implementation to improve performance.
> > > > >> > >>
> > > > >> > >> +1 to John's comment to keep the current checkpointing as a
> > > > fallback
> > > > >> > >> mechanism. We want to keep existing users' workflows intact
> if
> > we
> > > > >> can. A
> > > > >> > >> non-intrusive way would be to add a separate StateStore
> method,
> > > > say,
> > > > >> > >> StateStore#managesCheckpointing(), that controls whether the
> > > state
> > > > >> store
> > > > >> > >> implementation owns checkpointing.
> > > > >> > >>
> > > > >> > >> I think that a solution to the transactional writes should
> > > address
> > > > >> the
> > > > >> > >> OOMEs. One possible way to address that is to wire
> StateStore's
> > > > >> commit
> > > > >> > >> request by adding, say, StateStore#commitNeeded that is
> checked
> > > in
> > > > >> > >> StreamTask#commitNeeded via the corresponding
> > > > ProcessorStateManager.
> > > > >> > With
> > > > >> > >> that change, RocksDBStore will have to track the current
> > > > transaction
> > > > >> > size
> > > > >> > >> and request a commit when the size goes over a (configurable)
> > > > >> threshold.
> > > > >> > >>
> > > > >> > >> AFAIU WriteBatchWithIndex might perform significantly slower
> > than
> > > > >> > non-txn
> > > > >> > >> puts as the batch size grows [1]. We should have a
> > configuration
> > > to
> > > > >> fall
> > > > >> > >> back to the current behavior (and/or disable txn stores for
> > ALOS)
> > > > >> unless
> > > > >> > >> the benchmarks show negligible overhead for longer commits /
> > > > >> > large-enough
> > > > >> > >> batch sizes.
> > > > >> > >>
> > > > >> > >> If you prefer to keep the KIP smaller, I would rather cut out
> > > > >> > >> state-store-managed checkpointing rather than proper OOMe
> > > handling
> > > > >> and
> > > > >> > >> being able to switch to non-txn behavior. The checkpointing
> is
> > > not
> > > > >> > >> necessary to solve the recovery-under-EOS problem. On the
> other
> > > > hand,
> > > > >> > once
> > > > >> > >> WriteBatchWithIndex is in, it will be much easier to add
> > > > >> > >> state-store-managed checkpointing.
> > > > >> > >>
> > > > >> > >> If you share the current implementation, I am happy to help
> you
> > > > >> address
> > > > >> > >> the
> > > > >> > >> OOMe and configuration parts as well as review and test the
> > > patch.
> > > > >> > >>
> > > > >> > >> Best,
> > > > >> > >> Alex
> > > > >> > >>
> > > > >> > >>
> > > > >> > >> 1. https://github.com/facebook/rocksdb/issues/608
> > > > >> > >>
> > > > >> > >> On Tue, Nov 22, 2022 at 6:31 PM Nick Telford <
> > > > nick.telf...@gmail.com
> > > > >> >
> > > > >> > >> wrote:
> > > > >> > >>
> > > > >> > >> > Hi John,
> > > > >> > >> >
> > > > >> > >> > Thanks for the review and feedback!
> > > > >> > >> >
> > > > >> > >> > 1. Custom Stores: I've been mulling over this problem
> myself.
> > > As
> > > > it
> > > > >> > >> stands,
> > > > >> > >> > custom stores would essentially lose checkpointing with no
> > > > >> indication
> > > > >> > >> that
> > > > >> > >> > they're expected to make changes, besides a line in the
> > release
> > > > >> > notes. I
> > > > >> > >> > agree that the best solution would be to provide a default
> > that
> > > > >> > >> checkpoints
> > > > >> > >> > to a file. The one thing I would change is that the
> > > checkpointing
> > > > >> is
> > > > >> > to
> > > > >> > >> a
> > > > >> > >> > store-local file, instead of a per-Task file. This way the
> > > > >> StateStore
> > > > >> > >> still
> > > > >> > >> > technically owns its own checkpointing (via a default
> > > > >> implementation),
> > > > >> > >> and
> > > > >> > >> > the StateManager/Task execution engine doesn't need to know
> > > > >> anything
> > > > >> > >> about
> > > > >> > >> > checkpointing, which greatly simplifies some of the logic.
> > > > >> > >> >
> > > > >> > >> > 2. OOME errors: The main reasons why I didn't explore a
> > > solution
> > > > to
> > > > >> > >> this is
> > > > >> > >> > a) to keep this KIP as simple as possible, and b) because
> I'm
> > > not
> > > > >> > >> exactly
> > > > >> > >> > how to signal that a Task should commit prematurely. I'm
> > > > confident
> > > > >> > it's
> > > > >> > >> > possible, and I think it's worth adding a section on
> handling
> > > > this.
> > > > >> > >> Besides
> > > > >> > >> > my proposal to force an early commit once memory usage
> > reaches
> > > a
> > > > >> > >> threshold,
> > > > >> > >> > is there any other approach that you might suggest for
> > tackling
> > > > >> this
> > > > >> > >> > problem?
> > > > >> > >> >
> > > > >> > >> > 3. ALOS: I can add in an explicit paragraph, but my
> > assumption
> > > is
> > > > >> that
> > > > >> > >> > since transactional behaviour comes at little/no cost, that
> > it
> > > > >> should
> > > > >> > be
> > > > >> > >> > available by default on all stores, irrespective of the
> > > > processing
> > > > >> > mode.
> > > > >> > >> > While ALOS doesn't use transactions, the Task itself still
> > > > >> "commits",
> > > > >> > so
> > > > >> > >> > the behaviour should be correct under ALOS too. I'm not
> > > convinced
> > > > >> that
> > > > >> > >> it's
> > > > >> > >> > worth having both transactional/non-transactional stores
> > > > >> available, as
> > > > >> > >> it
> > > > >> > >> > would considerably increase the complexity of the codebase,
> > for
> > > > >> very
> > > > >> > >> little
> > > > >> > >> > benefit.
> > > > >> > >> >
> > > > >> > >> > 4. Method deprecation: Are you referring to
> > > > >> StateStore#getPosition()?
> > > > >> > >> As I
> > > > >> > >> > understand it, Position contains the position of the
> *source*
> > > > >> topics,
> > > > >> > >> > whereas the commit offsets would be the *changelog*
> offsets.
> > So
> > > > >> it's
> > > > >> > >> still
> > > > >> > >> > necessary to retain the Position data, as well as the
> > changelog
> > > > >> > offsets.
> > > > >> > >> > What I meant in the KIP is that Position offsets are
> > currently
> > > > >> stored
> > > > >> > >> in a
> > > > >> > >> > file, and since we can atomically store metadata along with
> > the
> > > > >> record
> > > > >> > >> > batch we commit to RocksDB, we can move our Position
> offsets
> > in
> > > > to
> > > > >> > this
> > > > >> > >> > metadata too, and gain the same transactional guarantees
> that
> > > we
> > > > >> will
> > > > >> > >> for
> > > > >> > >> > changelog offsets, ensuring that the Position offsets are
> > > > >> consistent
> > > > >> > >> with
> > > > >> > >> > the records that are read from the database.
> > > > >> > >> >
> > > > >> > >> > Regards,
> > > > >> > >> > Nick
> > > > >> > >> >
> > > > >> > >> > On Tue, 22 Nov 2022 at 16:25, John Roesler <
> > > vvcep...@apache.org>
> > > > >> > wrote:
> > > > >> > >> >
> > > > >> > >> > > Thanks for publishing this alternative, Nick!
> > > > >> > >> > >
> > > > >> > >> > > The benchmark you mentioned in the KIP-844 discussion
> seems
> > > > like
> > > > >> a
> > > > >> > >> > > compelling reason to revisit the built-in
> transactionality
> > > > >> > mechanism.
> > > > >> > >> I
> > > > >> > >> > > also appreciate you analysis, showing that for most use
> > > cases,
> > > > >> the
> > > > >> > >> write
> > > > >> > >> > > batch approach should be just fine.
> > > > >> > >> > >
> > > > >> > >> > > There are a couple of points that would hold me back from
> > > > >> approving
> > > > >> > >> this
> > > > >> > >> > > KIP right now:
> > > > >> > >> > >
> > > > >> > >> > > 1. Loss of coverage for custom stores.
> > > > >> > >> > > The fact that you can plug in a (relatively) simple
> > > > >> implementation
> > > > >> > of
> > > > >> > >> the
> > > > >> > >> > > XStateStore interfaces and automagically get a
> distributed
> > > > >> database
> > > > >> > >> out
> > > > >> > >> > of
> > > > >> > >> > > it is a significant benefit of Kafka Streams. I'd hate to
> > > lose
> > > > >> it,
> > > > >> > so
> > > > >> > >> it
> > > > >> > >> > > would be better to spend some time and come up with a way
> > to
> > > > >> > preserve
> > > > >> > >> > that
> > > > >> > >> > > property. For example, can we provide a default
> > > implementation
> > > > of
> > > > >> > >> > > `commit(..)` that re-implements the existing
> > checkpoint-file
> > > > >> > >> approach? Or
> > > > >> > >> > > perhaps add an `isTransactional()` flag to the state
> store
> > > > >> interface
> > > > >> > >> so
> > > > >> > >> > > that the runtime can decide whether to continue to manage
> > > > >> checkpoint
> > > > >> > >> > files
> > > > >> > >> > > vs delegating transactionality to the stores?
> > > > >> > >> > >
> > > > >> > >> > > 2. Guarding against OOME
> > > > >> > >> > > I appreciate your analysis, but I don't think it's
> > sufficient
> > > > to
> > > > >> say
> > > > >> > >> that
> > > > >> > >> > > we will solve the memory problem later if it becomes
> > > necessary.
> > > > >> The
> > > > >> > >> > > experience leading to that situation would be quite bad:
> > > > Imagine,
> > > > >> > you
> > > > >> > >> > > upgrade to AK 3.next, your tests pass, so you deploy to
> > > > >> production.
> > > > >> > >> That
> > > > >> > >> > > night, you get paged because your app is now crashing
> with
> > > > >> OOMEs. As
> > > > >> > >> with
> > > > >> > >> > > all OOMEs, you'll have a really hard time finding the
> root
> > > > cause,
> > > > >> > and
> > > > >> > >> > once
> > > > >> > >> > > you do, you won't have a clear path to resolve the issue.
> > You
> > > > >> could
> > > > >> > >> only
> > > > >> > >> > > tune down the commit interval and cache buffer size until
> > you
> > > > >> stop
> > > > >> > >> > getting
> > > > >> > >> > > crashes.
> > > > >> > >> > >
> > > > >> > >> > > FYI, I know of multiple cases where people run EOS with
> > much
> > > > >> larger
> > > > >> > >> > commit
> > > > >> > >> > > intervals to get better batching than the default, so I
> > don't
> > > > >> think
> > > > >> > >> this
> > > > >> > >> > > pathological case would be as rare as you suspect.
> > > > >> > >> > >
> > > > >> > >> > > Given that we already have the rudiments of an idea of
> what
> > > we
> > > > >> could
> > > > >> > >> do
> > > > >> > >> > to
> > > > >> > >> > > prevent this downside, we should take the time to design
> a
> > > > >> solution.
> > > > >> > >> We
> > > > >> > >> > owe
> > > > >> > >> > > it to our users to ensure that awesome new features don't
> > > come
> > > > >> with
> > > > >> > >> > bitter
> > > > >> > >> > > pills unless we can't avoid it.
> > > > >> > >> > >
> > > > >> > >> > > 3. ALOS mode.
> > > > >> > >> > > On the other hand, I didn't see an indication of how
> stores
> > > > will
> > > > >> be
> > > > >> > >> > > handled under ALOS (aka non-EOS) mode. Theoretically, the
> > > > >> > >> > transactionality
> > > > >> > >> > > of the store and the processing mode are orthogonal. A
> > > > >> transactional
> > > > >> > >> > store
> > > > >> > >> > > would serve ALOS just as well as a non-transactional one
> > (if
> > > > not
> > > > >> > >> better).
> > > > >> > >> > > Under ALOS, though, the default commit interval is five
> > > > minutes,
> > > > >> so
> > > > >> > >> the
> > > > >> > >> > > memory issue is far more pressing.
> > > > >> > >> > >
> > > > >> > >> > > As I see it, we have several options to resolve this
> point.
> > > We
> > > > >> could
> > > > >> > >> > > demonstrate that transactional stores work just fine for
> > ALOS
> > > > >> and we
> > > > >> > >> can
> > > > >> > >> > > therefore just swap over unconditionally. We could also
> > > disable
> > > > >> the
> > > > >> > >> > > transactional mechanism under ALOS so that stores operate
> > > just
> > > > >> the
> > > > >> > >> same
> > > > >> > >> > as
> > > > >> > >> > > they do today when run in ALOS mode. Finally, we could do
> > the
> > > > >> same
> > > > >> > as
> > > > >> > >> in
> > > > >> > >> > > KIP-844 and make transactional stores opt-in (it'd be
> > better
> > > to
> > > > >> > avoid
> > > > >> > >> the
> > > > >> > >> > > extra opt-in mechanism, but it's a good
> > get-out-of-jail-free
> > > > >> card).
> > > > >> > >> > >
> > > > >> > >> > > 4. (minor point) Deprecation of methods
> > > > >> > >> > >
> > > > >> > >> > > You mentioned that the new `commit` method replaces
> flush,
> > > > >> > >> > > updateChangelogOffsets, and checkpoint. It seems to me
> that
> > > the
> > > > >> > point
> > > > >> > >> > about
> > > > >> > >> > > atomicity and Position also suggests that it replaces the
> > > > >> Position
> > > > >> > >> > > callbacks. However, the proposal only deprecates `flush`.
> > > > Should
> > > > >> we
> > > > >> > be
> > > > >> > >> > > deprecating other methods as well?
> > > > >> > >> > >
> > > > >> > >> > > Thanks again for the KIP! It's really nice that you and
> > Alex
> > > > will
> > > > >> > get
> > > > >> > >> the
> > > > >> > >> > > chance to collaborate on both directions so that we can
> get
> > > the
> > > > >> best
> > > > >> > >> > > outcome for Streams and its users.
> > > > >> > >> > >
> > > > >> > >> > > -John
> > > > >> > >> > >
> > > > >> > >> > >
> > > > >> > >> > > On 2022/11/21 15:02:15 Nick Telford wrote:
> > > > >> > >> > > > Hi everyone,
> > > > >> > >> > > >
> > > > >> > >> > > > As I mentioned in the discussion thread for KIP-844,
> I've
> > > > been
> > > > >> > >> working
> > > > >> > >> > on
> > > > >> > >> > > > an alternative approach to achieving better
> transactional
> > > > >> > semantics
> > > > >> > >> for
> > > > >> > >> > > > Kafka Streams StateStores.
> > > > >> > >> > > >
> > > > >> > >> > > > I've published this separately as KIP-892:
> Transactional
> > > > >> Semantics
> > > > >> > >> for
> > > > >> > >> > > > StateStores
> > > > >> > >> > > > <
> > > > >> > >> > >
> > > > >> > >> >
> > > > >> > >>
> > > > >> >
> > > > >>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores
> > > > >> > >> > > >,
> > > > >> > >> > > > so that it can be discussed/reviewed separately from
> > > KIP-844.
> > > > >> > >> > > >
> > > > >> > >> > > > Alex: I'm especially interested in what you think!
> > > > >> > >> > > >
> > > > >> > >> > > > I have a nearly complete implementation of the changes
> > > > >> outlined in
> > > > >> > >> this
> > > > >> > >> > > > KIP, please let me know if you'd like me to push them
> for
> > > > >> review
> > > > >> > in
> > > > >> > >> > > advance
> > > > >> > >> > > > of a vote.
> > > > >> > >> > > >
> > > > >> > >> > > > Regards,
> > > > >> > >> > > >
> > > > >> > >> > > > Nick
> > > > >> > >> > > >
> > > > >> > >> > >
> > > > >> > >> >
> > > > >> > >>
> > > > >> > >
> > > > >> >
> > > > >>
> > > > >
> > > >
> > >
> >
>

Reply via email to