Hey Guozhang,

1) About the param passed into the `recover()` function: it seems to me
> that the semantics of "recover(offset)" is: recover this state to a
> transaction boundary which is at least the passed-in offset. And the only
> possibility that the returned offset is different than the passed-in offset
> is that if the previous failure happens after we've done all the commit
> procedures except writing the new checkpoint, in which case the returned
> offset would be larger than the passed-in offset. Otherwise it should
> always be equal to the passed-in offset, is that right?


Right now, the only case when `recover` returns an offset different from
the passed one is when the failure happens *during* commit.


If the failure happens after commit but before the checkpoint, `recover`
might return either a passed or newer committed offset, depending on the
implementation. The `recover` implementation in the prototype returns a
passed offset because it deletes the commit marker that holds that offset
after the commit is done. In that case, the store will replay the last
commit from the changelog. I think it is fine as the changelog replay is
idempotent.

2) It seems the only use for the "transactional()" function is to determine
> if we can update the checkpoint file while in EOS.


Right now, there are 2 other uses for `transactional()`:
1. To determine what to do during initialization if the checkpoint is gone
(see [1]). If the state store is transactional, we don't have to wipe the
existing data. Thinking about it now, we do not really need this check
whether the store is `transactional` because if it is not, we'd not have
written the checkpoint in the first place. I am going to remove that check.
2. To determine if the persistent kv store in KStreamImplJoin should be
transactional (see [2], [3]).

I am not sure if we can get rid of the checks in point 2. If so, I'd be
happy to encapsulate `transactional()` logic in `commit/recover`.

Best,
Alex

1.
https://github.com/apache/kafka/pull/12393/files#diff-971d9ef7ea8aefffff687fc7ee131bd166ced94445f4ab55aa83007541dccfdaL256-R281
2.
https://github.com/apache/kafka/pull/12393/files#diff-9ce43046fdef1233ab762e728abd1d3d44d7c270b28dcf6b63aa31a93a30af07R266-R278
3.
https://github.com/apache/kafka/pull/12393/files#diff-9ce43046fdef1233ab762e728abd1d3d44d7c270b28dcf6b63aa31a93a30af07R348-R354

On Tue, Jul 26, 2022 at 6:39 PM Nick Telford <nick.telf...@gmail.com> wrote:

> Hi Alex,
>
> Excellent proposal, I'm very keen to see this land!
>
> Would it be useful to permit configuring the type of store used for
> uncommitted offsets on a store-by-store basis? This way, users could choose
> whether to use, e.g. an in-memory store or RocksDB, potentially reducing
> the overheads associated with RocksDb for smaller stores, but without the
> memory pressure issues?
>
> I suspect that in most cases, the number of uncommitted records will be
> very small, because the default commit interval is 100ms.
>
> Regards,
>
> Nick
>
> On Tue, 26 Jul 2022 at 01:36, Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Hello Alex,
> >
> > Thanks for the updated KIP, I looked over it and browsed the WIP and just
> > have a couple meta thoughts:
> >
> > 1) About the param passed into the `recover()` function: it seems to me
> > that the semantics of "recover(offset)" is: recover this state to a
> > transaction boundary which is at least the passed-in offset. And the only
> > possibility that the returned offset is different than the passed-in
> offset
> > is that if the previous failure happens after we've done all the commit
> > procedures except writing the new checkpoint, in which case the returned
> > offset would be larger than the passed-in offset. Otherwise it should
> > always be equal to the passed-in offset, is that right?
> >
> > 2) It seems the only use for the "transactional()" function is to
> determine
> > if we can update the checkpoint file while in EOS. But the purpose of the
> > checkpoint file's offsets is just to tell "the local state's current
> > snapshot's progress is at least the indicated offsets" anyways, and with
> > this KIP maybe we would just do:
> >
> > a) when in ALOS, upon failover: we set the starting offset as
> > checkpointed-offset, then restore() from changelog till the end-offset.
> > This way we may restore some records twice.
> > b) when in EOS, upon failover: we first call
> recover(checkpointed-offset),
> > then set the starting offset as the returned offset (which may be larger
> > than checkpointed-offset), then restore until the end-offset.
> >
> > So why not also:
> > c) we let the `commit()` function to also return an offset, which
> indicates
> > "checkpointable offsets".
> > d) for existing non-transactional stores, we just have a default
> > implementation of "commit()" which is simply a flush, and returns a
> > sentinel value like -1. Then later if we get checkpointable offsets -1,
> we
> > do not write the checkpoint. Upon clean shutting down we can just
> > checkpoint regardless of the returned value from "commit".
> > e) for existing non-transactional stores, we just have a default
> > implementation of "recover()" which is to wipe out the local store and
> > return offset 0 if the passed in offset is -1, otherwise if not -1 then
> it
> > indicates a clean shutdown in the last run, can this function is just a
> > no-op.
> >
> > In that case, we would not need the "transactional()" function anymore,
> > since for non-transactional stores their behaviors are still wrapped in
> the
> > `commit / recover` function pairs.
> >
> > I have not completed the thorough pass on your WIP PR, so maybe I could
> > come up with some more feedback later, but just let me know if my
> > understanding above is correct or not?
> >
> >
> > Guozhang
> >
> >
> >
> >
> > On Thu, Jul 14, 2022 at 7:01 AM Alexander Sorokoumov
> > <asorokou...@confluent.io.invalid> wrote:
> >
> > > Hi,
> > >
> > > I updated the KIP with the following changes:
> > > * Replaced in-memory batches with the secondary-store approach as the
> > > default implementation to address the feedback about memory pressure as
> > > suggested by Sagar and Bruno.
> > > * Introduced StateStore#commit and StateStore#recover methods as an
> > > extension of the rollback idea. @Guozhang, please see the comment below
> > on
> > > why I took a slightly different approach than you suggested.
> > > * Removed mentions of changes to IQv1 and IQv2. Transactional state
> > stores
> > > enable reading committed in IQ, but it is really an independent feature
> > > that deserves its own KIP. Conflating them unnecessarily increases the
> > > scope for discussion, implementation, and testing in a single unit of
> > work.
> > >
> > > I also published a prototype -
> > https://github.com/apache/kafka/pull/12393
> > > that implements changes described in the proposal.
> > >
> > > Regarding explicit rollback, I think it is a powerful idea that allows
> > > other StateStore implementations to take a different path to the
> > > transactional behavior rather than keep 2 state stores. Instead of
> > > introducing a new commit token, I suggest using a changelog offset that
> > > already 1:1 corresponds to the materialized state. This works nicely
> > > because Kafka Stream first commits an AK transaction and only then
> > > checkpoints the state store, so we can use the changelog offset to
> commit
> > > the state store transaction.
> > >
> > > I called the method StateStore#recover rather than StateStore#rollback
> > > because a state store might either roll back or forward depending on
> the
> > > specific point of the crash failure.Consider the write algorithm in
> Kafka
> > > Streams is:
> > > 1. write stuff to the state store
> > > 2. producer.sendOffsetsToTransaction(token);
> > producer.commitTransaction();
> > > 3. flush
> > > 4. checkpoint
> > >
> > > Let's consider 3 cases:
> > > 1. If the crash failure happens between #2 and #3, the state store
> rolls
> > > back and replays the uncommitted transaction from the changelog.
> > > 2. If the crash failure happens during #3, the state store can roll
> > forward
> > > and finish the flush/commit.
> > > 3. If the crash failure happens between #3 and #4, the state store
> should
> > > do nothing during recovery and just proceed with the checkpoint.
> > >
> > > Looking forward to your feedback,
> > > Alexander
> > >
> > > On Wed, Jun 8, 2022 at 12:16 AM Alexander Sorokoumov <
> > > asorokou...@confluent.io> wrote:
> > >
> > > > Hi,
> > > >
> > > > As a status update, I did the following changes to the KIP:
> > > > * replaced configuration via the top-level config with configuration
> > via
> > > > Stores factory and StoreSuppliers,
> > > > * added IQv2 and elaborated how readCommitted will work when the
> store
> > is
> > > > not transactional,
> > > > * removed claims about ALOS.
> > > >
> > > > I am going to be OOO in the next couple of weeks and will resume
> > working
> > > > on the proposal and responding to the discussion in this thread
> > starting
> > > > June 27. My next top priorities are:
> > > > 1. Prototype the rollback approach as suggested by Guozhang.
> > > > 2. Replace in-memory batches with the secondary-store approach as the
> > > > default implementation to address the feedback about memory pressure
> as
> > > > suggested by Sagar and Bruno.
> > > > 3. Adjust Stores methods to make transactional implementations
> > pluggable.
> > > > 4. Publish the POC for the first review.
> > > >
> > > > Best regards,
> > > > Alex
> > > >
> > > > On Wed, Jun 1, 2022 at 2:52 PM Guozhang Wang <wangg...@gmail.com>
> > wrote:
> > > >
> > > >> Alex,
> > > >>
> > > >> Thanks for your replies! That is very helpful.
> > > >>
> > > >> Just to broaden our discussions a bit here, I think there are some
> > other
> > > >> approaches in parallel to the idea of "enforce to only persist upon
> > > >> explicit flush" and I'd like to throw one here -- not really
> > advocating
> > > >> it,
> > > >> but just for us to compare the pros and cons:
> > > >>
> > > >> 1) We let the StateStore's `flush` function to return a token
> instead
> > of
> > > >> returning `void`.
> > > >> 2) We add another `rollback(token)` interface of StateStore which
> > would
> > > >> effectively rollback the state as indicated by the token to the
> > snapshot
> > > >> when the corresponding `flush` is called.
> > > >> 3) We encode the token and commit as part of
> > > >> `producer#sendOffsetsToTransaction`.
> > > >>
> > > >> Users could optionally implement the new functions, or they can just
> > not
> > > >> return the token at all and not implement the second function.
> Again,
> > > the
> > > >> APIs are just for the sake of illustration, not feeling they are the
> > > most
> > > >> natural :)
> > > >>
> > > >> Then the procedure would be:
> > > >>
> > > >> 1. the previous checkpointed offset is 100
> > > >> ...
> > > >> 3. flush store, make sure all writes are persisted; get the returned
> > > token
> > > >> that indicates the snapshot of 200.
> > > >> 4. producer.sendOffsetsToTransaction(token);
> > > producer.commitTransaction();
> > > >> 5. Update the checkpoint file (say, the new value is 200).
> > > >>
> > > >> Then if there's a failure, say between 3/4, we would get the token
> > from
> > > >> the
> > > >> last committed txn, and first we would do the restoration (which may
> > get
> > > >> the state to somewhere between 100 and 200), then call
> > > >> `store.rollback(token)` to rollback to the snapshot of offset 100.
> > > >>
> > > >> The pros is that we would then not need to enforce the state stores
> to
> > > not
> > > >> persist any data during the txn: for stores that may not be able to
> > > >> implement the `rollback` function, they can still reduce its impl to
> > > "not
> > > >> persisting any data" via this API, but for stores that can indeed
> > > support
> > > >> the rollback, their implementation may be more efficient. The cons
> > > though,
> > > >> on top of my head are 1) more complicated logic differentiating
> > between
> > > >> EOS
> > > >> with and without store rollback support, and ALOS, 2) encoding the
> > token
> > > >> as
> > > >> part of the commit offset is not ideal if it is big, 3) the recovery
> > > logic
> > > >> including the state store is also a bit more complicated.
> > > >>
> > > >>
> > > >> Guozhang
> > > >>
> > > >>
> > > >>
> > > >>
> > > >>
> > > >> On Wed, Jun 1, 2022 at 1:29 PM Alexander Sorokoumov
> > > >> <asorokou...@confluent.io.invalid> wrote:
> > > >>
> > > >> > Hi Guozhang,
> > > >> >
> > > >> > But I'm still trying to clarify how it guarantees EOS, and it
> seems
> > > >> that we
> > > >> > > would achieve it by enforcing to not persist any data written
> > within
> > > >> this
> > > >> > > transaction until step 4. Is that correct?
> > > >> >
> > > >> >
> > > >> > This is correct. Both alternatives - in-memory WriteBatchWithIndex
> > and
> > > >> > transactionality via the secondary store guarantee EOS by not
> > > persisting
> > > >> > data in the "main" state store until it is committed in the
> > changelog
> > > >> > topic.
> > > >> >
> > > >> > Oh what I meant is not what KStream code does, but that StateStore
> > > impl
> > > >> > > classes themselves could potentially flush data to become
> > persisted
> > > >> > > asynchronously
> > > >> >
> > > >> >
> > > >> > Thank you for elaborating! You are correct, the underlying state
> > store
> > > >> > should not persist data until the streams app calls
> > StateStore#flush.
> > > >> There
> > > >> > are 2 options how a State Store implementation can guarantee that
> -
> > > >> either
> > > >> > keep uncommitted writes in memory or be able to roll back the
> > changes
> > > >> that
> > > >> > were not committed during recovery. RocksDB's WriteBatchWithIndex
> is
> > > an
> > > >> > implementation of the first option. A considered alternative,
> > > >> Transactions
> > > >> > via Secondary State Store for Uncommitted Changes, is the way to
> > > >> implement
> > > >> > the second option.
> > > >> >
> > > >> > As everyone correctly pointed out, keeping uncommitted data in
> > memory
> > > >> > introduces a very real risk of OOM that we will need to handle.
> The
> > > >> more I
> > > >> > think about it, the more I lean towards going with the
> Transactions
> > > via
> > > >> > Secondary Store as the way to implement transactionality as it
> does
> > > not
> > > >> > have that issue.
> > > >> >
> > > >> > Best,
> > > >> > Alex
> > > >> >
> > > >> >
> > > >> > On Wed, Jun 1, 2022 at 12:59 PM Guozhang Wang <wangg...@gmail.com
> >
> > > >> wrote:
> > > >> >
> > > >> > > Hello Alex,
> > > >> > >
> > > >> > > > we flush the cache, but not the underlying state store.
> > > >> > >
> > > >> > > You're right. The ordering I mentioned above is actually:
> > > >> > >
> > > >> > > ...
> > > >> > > 3. producer.sendOffsetsToTransaction();
> > > producer.commitTransaction();
> > > >> > > 4. flush store, make sure all writes are persisted.
> > > >> > > 5. Update the checkpoint file to 200.
> > > >> > >
> > > >> > > But I'm still trying to clarify how it guarantees EOS, and it
> > seems
> > > >> that
> > > >> > we
> > > >> > > would achieve it by enforcing to not persist any data written
> > within
> > > >> this
> > > >> > > transaction until step 4. Is that correct?
> > > >> > >
> > > >> > > > Can you please point me to the place in the codebase where we
> > > >> trigger
> > > >> > > async flush before the commit?
> > > >> > >
> > > >> > > Oh what I meant is not what KStream code does, but that
> StateStore
> > > >> impl
> > > >> > > classes themselves could potentially flush data to become
> > persisted
> > > >> > > asynchronously, e.g. RocksDB does that naturally out of the
> > control
> > > of
> > > >> > > KStream code. I think it is related to my previous question: if
> we
> > > >> think
> > > >> > by
> > > >> > > guaranteeing EOS at the state store level, we would effectively
> > ask
> > > >> the
> > > >> > > impl classes that "you should not persist any data until `flush`
> > is
> > > >> > called
> > > >> > > explicitly", is the StateStore interface the right level to
> > enforce
> > > >> such
> > > >> > > mechanisms, or should we just do that on top of the StateStores,
> > > e.g.
> > > >> > > during the transaction we just keep all the writes in the cache
> > (of
> > > >> > course
> > > >> > > we need to consider how to work around memory pressure as
> > previously
> > > >> > > mentioned), and then upon committing, we just write the cached
> > > records
> > > >> > as a
> > > >> > > whole into the store and then call flush.
> > > >> > >
> > > >> > >
> > > >> > > Guozhang
> > > >> > >
> > > >> > >
> > > >> > >
> > > >> > >
> > > >> > >
> > > >> > >
> > > >> > >
> > > >> > > On Tue, May 31, 2022 at 4:08 PM Alexander Sorokoumov
> > > >> > > <asorokou...@confluent.io.invalid> wrote:
> > > >> > >
> > > >> > > > Hey,
> > > >> > > >
> > > >> > > > Thank you for the wealth of great suggestions and questions! I
> > am
> > > >> going
> > > >> > > to
> > > >> > > > address the feedback in batches and update the proposal async,
> > as
> > > >> it is
> > > >> > > > probably going to be easier for everyone. I will also write a
> > > >> separate
> > > >> > > > message after making updates to the KIP.
> > > >> > > >
> > > >> > > > @John,
> > > >> > > >
> > > >> > > > > Did you consider instead just adding the option to the
> > > >> > > > > RocksDB*StoreSupplier classes and the factories in Stores ?
> > > >> > > >
> > > >> > > > Thank you for suggesting that. I think that this idea is
> better
> > > than
> > > >> > > what I
> > > >> > > > came up with and will update the KIP with configuring
> > > >> transactionality
> > > >> > > via
> > > >> > > > the suppliers and Stores.
> > > >> > > >
> > > >> > > > what is the advantage over just doing the same thing with the
> > > >> > RecordCache
> > > >> > > > > and not introducing the WriteBatch at all?
> > > >> > > >
> > > >> > > > Can you point me to RecordCache? I can't find it in the
> project.
> > > The
> > > >> > > > advantage would be that WriteBatch guarantees write atomicity.
> > As
> > > >> far
> > > >> > as
> > > >> > > I
> > > >> > > > understood the way RecordCache works, it might leave the
> system
> > in
> > > >> an
> > > >> > > > inconsistent state during crash failure on write.
> > > >> > > >
> > > >> > > > You mentioned that a transactional store can help reduce
> > > >> duplication in
> > > >> > > the
> > > >> > > > > case of ALOS
> > > >> > > >
> > > >> > > > I will remove claims about ALOS from the proposal. Thank you
> for
> > > >> > > > elaborating!
> > > >> > > >
> > > >> > > > As a reminder, we have a new IQv2 mechanism now. Should we
> > propose
> > > >> any
> > > >> > > > > changes to IQv1 to support this transactional mechanism,
> > versus
> > > >> just
> > > >> > > > > proposing it for IQv2? Certainly, it seems strange only to
> > > >> propose a
> > > >> > > > change
> > > >> > > > > for IQv1 and not v2.
> > > >> > > >
> > > >> > > >
> > > >> > > >  I will update the proposal with complementary API changes for
> > > IQv2
> > > >> > > >
> > > >> > > > What should IQ do if I request to readCommitted on a
> > > >> non-transactional
> > > >> > > > > store?
> > > >> > > >
> > > >> > > > We can assume that non-transactional stores commit on write,
> so
> > IQ
> > > >> > works
> > > >> > > in
> > > >> > > > the same way with non-transactional stores regardless of the
> > value
> > > >> of
> > > >> > > > readCommitted.
> > > >> > > >
> > > >> > > >
> > > >> > > >  @Guozhang,
> > > >> > > >
> > > >> > > > * If we crash between line 3 and 4, then at that time the
> local
> > > >> > > persistent
> > > >> > > > > store image is representing as of offset 200, but upon
> > recovery
> > > >> all
> > > >> > > > > changelog records from 100 to log-end-offset would be
> > considered
> > > >> as
> > > >> > > > aborted
> > > >> > > > > and not be replayed and we would restart processing from
> > > position
> > > >> > 100.
> > > >> > > > > Restart processing will violate EOS.I'm not sure how e.g.
> > > >> RocksDB's
> > > >> > > > > WriteBatchWithIndex would make sure that the step 4 and
> step 5
> > > >> could
> > > >> > be
> > > >> > > > > done atomically here.
> > > >> > > >
> > > >> > > >
> > > >> > > > Could you please point me to the place in the codebase where a
> > > task
> > > >> > > flushes
> > > >> > > > the store before committing the transaction?
> > > >> > > > Looking at TaskExecutor (
> > > >> > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> https://github.com/apache/kafka/blob/4c9eeef5b2dff9a4f0977fbc5ac7eaaf930d0d0e/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java#L144-L167
> > > >> > > > ),
> > > >> > > > StreamTask#prepareCommit (
> > > >> > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> https://github.com/apache/kafka/blob/4c9eeef5b2dff9a4f0977fbc5ac7eaaf930d0d0e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L398
> > > >> > > > ),
> > > >> > > > and CachedStateStore (
> > > >> > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> https://github.com/apache/kafka/blob/4c9eeef5b2dff9a4f0977fbc5ac7eaaf930d0d0e/streams/src/main/java/org/apache/kafka/streams/state/internals/CachedStateStore.java#L29-L34
> > > >> > > > )
> > > >> > > > we flush the cache, but not the underlying state store.
> Explicit
> > > >> > > > StateStore#flush happens in AbstractTask#maybeWriteCheckpoint
> (
> > > >> > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> https://github.com/apache/kafka/blob/4c9eeef5b2dff9a4f0977fbc5ac7eaaf930d0d0e/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java#L91-L99
> > > >> > > > ).
> > > >> > > > Is there something I am missing here?
> > > >> > > >
> > > >> > > > Today all cached data that have not been flushed are not
> > committed
> > > >> for
> > > >> > > > > sure, but even flushed data to the persistent underlying
> store
> > > may
> > > >> > also
> > > >> > > > be
> > > >> > > > > uncommitted since flushing can be triggered asynchronously
> > > before
> > > >> the
> > > >> > > > > commit.
> > > >> > > >
> > > >> > > > Can you please point me to the place in the codebase where we
> > > >> trigger
> > > >> > > async
> > > >> > > > flush before the commit? This would certainly be a reason to
> > > >> introduce
> > > >> > a
> > > >> > > > dedicated StateStore#commit method.
> > > >> > > >
> > > >> > > > Thanks again for the feedback. I am going to update the KIP
> and
> > > then
> > > >> > > > respond to the next batch of questions and suggestions.
> > > >> > > >
> > > >> > > > Best,
> > > >> > > > Alex
> > > >> > > >
> > > >> > > > On Mon, May 30, 2022 at 5:13 PM Suhas Satish
> > > >> > > <ssat...@confluent.io.invalid
> > > >> > > > >
> > > >> > > > wrote:
> > > >> > > >
> > > >> > > > > Thanks for the KIP proposal Alex.
> > > >> > > > > 1. Configuration default
> > > >> > > > >
> > > >> > > > > You mention applications using streams DSL with built-in
> > rocksDB
> > > >> > state
> > > >> > > > > store will get transactional state stores by default when
> EOS
> > is
> > > >> > > enabled,
> > > >> > > > > but the default implementation for apps using PAPI will
> > fallback
> > > >> to
> > > >> > > > > non-transactional behavior.
> > > >> > > > > Shouldn't we have the same default behavior for both types
> of
> > > >> apps -
> > > >> > > DSL
> > > >> > > > > and PAPI?
> > > >> > > > >
> > > >> > > > > On Mon, May 30, 2022 at 2:11 AM Bruno Cadonna <
> > > cado...@apache.org
> > > >> >
> > > >> > > > wrote:
> > > >> > > > >
> > > >> > > > > > Thanks for the PR, Alex!
> > > >> > > > > >
> > > >> > > > > > I am also glad to see this coming.
> > > >> > > > > >
> > > >> > > > > >
> > > >> > > > > > 1. Configuration
> > > >> > > > > >
> > > >> > > > > > I would also prefer to restrict the configuration of
> > > >> transactional
> > > >> > on
> > > >> > > > > > the state sore. Ideally, calling method transactional() on
> > the
> > > >> > state
> > > >> > > > > > store would be enough. An option on the store builder
> would
> > > >> make it
> > > >> > > > > > possible to turn transactionality on and off (as John
> > > proposed).
> > > >> > > > > >
> > > >> > > > > >
> > > >> > > > > > 2. Memory usage in RocksDB
> > > >> > > > > >
> > > >> > > > > > This seems to be a major issue. We do not have any
> guarantee
> > > >> that
> > > >> > > > > > uncommitted writes fit into memory and I guess we will
> never
> > > >> have.
> > > >> > > What
> > > >> > > > > > happens when the uncommitted writes do not fit into
> memory?
> > > Does
> > > >> > > > RocksDB
> > > >> > > > > > throw an exception? Can we handle such an exception
> without
> > > >> > crashing?
> > > >> > > > > >
> > > >> > > > > > Does the RocksDB behavior even need to be included in this
> > > KIP?
> > > >> In
> > > >> > > the
> > > >> > > > > > end it is an implementation detail.
> > > >> > > > > >
> > > >> > > > > > What we should consider - though - is a memory limit in
> some
> > > >> form.
> > > >> > > And
> > > >> > > > > > what we do when the memory limit is exceeded.
> > > >> > > > > >
> > > >> > > > > >
> > > >> > > > > > 3. PoC
> > > >> > > > > >
> > > >> > > > > > I agree with Guozhang that a PoC is a good idea to better
> > > >> > understand
> > > >> > > > the
> > > >> > > > > > devils in the details.
> > > >> > > > > >
> > > >> > > > > >
> > > >> > > > > > Best,
> > > >> > > > > > Bruno
> > > >> > > > > >
> > > >> > > > > > On 25.05.22 01:52, Guozhang Wang wrote:
> > > >> > > > > > > Hello Alex,
> > > >> > > > > > >
> > > >> > > > > > > Thanks for writing the proposal! Glad to see it coming.
> I
> > > >> think
> > > >> > > this
> > > >> > > > is
> > > >> > > > > > the
> > > >> > > > > > > kind of a KIP that since too many devils would be buried
> > in
> > > >> the
> > > >> > > > details
> > > >> > > > > > and
> > > >> > > > > > > it's better to start working on a POC, either in
> parallel,
> > > or
> > > >> > > before
> > > >> > > > we
> > > >> > > > > > > resume our discussion, rather than blocking any
> > > implementation
> > > >> > > until
> > > >> > > > we
> > > >> > > > > > are
> > > >> > > > > > > satisfied with the proposal.
> > > >> > > > > > >
> > > >> > > > > > > Just as a concrete example, I personally am still not
> 100%
> > > >> clear
> > > >> > > how
> > > >> > > > > the
> > > >> > > > > > > proposal would work to achieve EOS with the state
> stores.
> > > For
> > > >> > > > example,
> > > >> > > > > > the
> > > >> > > > > > > commit procedure today looks like this:
> > > >> > > > > > >
> > > >> > > > > > > 0: there's an existing checkpoint file indicating the
> > > >> changelog
> > > >> > > > offset
> > > >> > > > > of
> > > >> > > > > > > the local state store image is 100. Now a commit is
> > > triggered:
> > > >> > > > > > > 1. flush cache (since it contains partially processed
> > > >> records),
> > > >> > > make
> > > >> > > > > sure
> > > >> > > > > > > all records are written to the producer.
> > > >> > > > > > > 2. flush producer, making sure all changelog records
> have
> > > now
> > > >> > > acked.
> > > >> > > > //
> > > >> > > > > > > here we would get the new changelog position, say 200
> > > >> > > > > > > 3. flush store, make sure all writes are persisted.
> > > >> > > > > > > 4. producer.sendOffsetsToTransaction();
> > > >> > > producer.commitTransaction();
> > > >> > > > > //
> > > >> > > > > > we
> > > >> > > > > > > would make the writes in changelog up to offset 200
> > > committed
> > > >> > > > > > > 5. Update the checkpoint file to 200.
> > > >> > > > > > >
> > > >> > > > > > > The question about atomicity between those lines, for
> > > example:
> > > >> > > > > > >
> > > >> > > > > > > * If we crash between line 4 and line 5, the local
> > > checkpoint
> > > >> > file
> > > >> > > > > would
> > > >> > > > > > > stay as 100, and upon recovery we would replay the
> > changelog
> > > >> from
> > > >> > > 100
> > > >> > > > > to
> > > >> > > > > > > 200. This is not ideal but does not violate EOS, since
> the
> > > >> > > changelogs
> > > >> > > > > are
> > > >> > > > > > > all overwrites anyways.
> > > >> > > > > > > * If we crash between line 3 and 4, then at that time
> the
> > > >> local
> > > >> > > > > > persistent
> > > >> > > > > > > store image is representing as of offset 200, but upon
> > > >> recovery
> > > >> > all
> > > >> > > > > > > changelog records from 100 to log-end-offset would be
> > > >> considered
> > > >> > as
> > > >> > > > > > aborted
> > > >> > > > > > > and not be replayed and we would restart processing from
> > > >> position
> > > >> > > > 100.
> > > >> > > > > > > Restart processing will violate EOS.I'm not sure how
> e.g.
> > > >> > RocksDB's
> > > >> > > > > > > WriteBatchWithIndex would make sure that the step 4 and
> > > step 5
> > > >> > > could
> > > >> > > > be
> > > >> > > > > > > done atomically here.
> > > >> > > > > > >
> > > >> > > > > > > Originally what I was thinking when creating the JIRA
> > ticket
> > > >> is
> > > >> > > that
> > > >> > > > we
> > > >> > > > > > > need to let the state store to provide a transactional
> API
> > > >> like
> > > >> > > > "token
> > > >> > > > > > > commit()" used in step 4) above which returns a token,
> > that
> > > >> e.g.
> > > >> > in
> > > >> > > > our
> > > >> > > > > > > example above indicates offset 200, and that token would
> > be
> > > >> > written
> > > >> > > > as
> > > >> > > > > > part
> > > >> > > > > > > of the records in Kafka transaction in step 5). And upon
> > > >> recovery
> > > >> > > the
> > > >> > > > > > state
> > > >> > > > > > > store would have another API like "rollback(token)"
> where
> > > the
> > > >> > token
> > > >> > > > is
> > > >> > > > > > read
> > > >> > > > > > > from the latest committed txn, and be used to rollback
> the
> > > >> store
> > > >> > to
> > > >> > > > > that
> > > >> > > > > > > committed image. I think your proposal is different, and
> > it
> > > >> seems
> > > >> > > > like
> > > >> > > > > > > you're proposing we swap step 3) and 4) above, but the
> > > >> atomicity
> > > >> > > > issue
> > > >> > > > > > > still remains since now you may have the store image at
> > 100
> > > >> but
> > > >> > the
> > > >> > > > > > > changelog is committed at 200. I'd like to learn more
> > about
> > > >> the
> > > >> > > > details
> > > >> > > > > > > on how it resolves such issues.
> > > >> > > > > > >
> > > >> > > > > > > Anyways, that's just an example to make the point that
> > there
> > > >> are
> > > >> > > lots
> > > >> > > > > of
> > > >> > > > > > > implementational details which would drive the public
> API
> > > >> design,
> > > >> > > and
> > > >> > > > > we
> > > >> > > > > > > should probably first do a POC, and come back to discuss
> > the
> > > >> KIP.
> > > >> > > Let
> > > >> > > > > me
> > > >> > > > > > > know what you think?
> > > >> > > > > > >
> > > >> > > > > > >
> > > >> > > > > > > Guozhang
> > > >> > > > > > >
> > > >> > > > > > >
> > > >> > > > > > >
> > > >> > > > > > >
> > > >> > > > > > >
> > > >> > > > > > >
> > > >> > > > > > >
> > > >> > > > > > >
> > > >> > > > > > >
> > > >> > > > > > > On Tue, May 24, 2022 at 10:35 AM Sagar <
> > > >> > sagarmeansoc...@gmail.com>
> > > >> > > > > > wrote:
> > > >> > > > > > >
> > > >> > > > > > >> Hi Alexander,
> > > >> > > > > > >>
> > > >> > > > > > >> Thanks for the KIP! This seems like a great proposal. I
> > > have
> > > >> the
> > > >> > > > same
> > > >> > > > > > >> opinion as John on the Configuration part though. I
> think
> > > >> the 2
> > > >> > > > level
> > > >> > > > > > >> config and its behaviour based on the setting/unsetting
> > of
> > > >> the
> > > >> > > flag
> > > >> > > > > > seems
> > > >> > > > > > >> confusing to me as well. Since the KIP seems
> specifically
> > > >> > centred
> > > >> > > > > around
> > > >> > > > > > >> RocksDB it might be better to add it at the Supplier
> > level
> > > as
> > > >> > John
> > > >> > > > > > >> suggested.
> > > >> > > > > > >>
> > > >> > > > > > >> On similar lines, this config name =>
> > > >> > > > > > *statestore.transactional.mechanism
> > > >> > > > > > >> *may
> > > >> > > > > > >> also need rethinking as the value assigned to
> > > >> > > it(rocksdb_indexbatch)
> > > >> > > > > > >> implicitly seems to assume that rocksdb is the only
> > > >> statestore
> > > >> > > that
> > > >> > > > > > Kafka
> > > >> > > > > > >> Stream supports while that's not the case.
> > > >> > > > > > >>
> > > >> > > > > > >> Also, regarding the potential memory pressure that can
> be
> > > >> > > introduced
> > > >> > > > > by
> > > >> > > > > > >> WriteBatchIndex, do you think it might make more sense
> to
> > > >> > include
> > > >> > > > some
> > > >> > > > > > >> numbers/benchmarks on how much the memory consumption
> > might
> > > >> > > > increase?
> > > >> > > > > > >>
> > > >> > > > > > >> Lastly, the read_uncommitted flag's behaviour on IQ may
> > > need
> > > >> > more
> > > >> > > > > > >> elaboration.
> > > >> > > > > > >>
> > > >> > > > > > >> These points aside, as I said, this is a great
> proposal!
> > > >> > > > > > >>
> > > >> > > > > > >> Thanks!
> > > >> > > > > > >> Sagar.
> > > >> > > > > > >>
> > > >> > > > > > >> On Tue, May 24, 2022 at 10:35 PM John Roesler <
> > > >> > > vvcep...@apache.org>
> > > >> > > > > > wrote:
> > > >> > > > > > >>
> > > >> > > > > > >>> Thanks for the KIP, Alex!
> > > >> > > > > > >>>
> > > >> > > > > > >>> I'm really happy to see your proposal. This
> improvement
> > > >> fills a
> > > >> > > > > > >>> long-standing gap.
> > > >> > > > > > >>>
> > > >> > > > > > >>> I have a few questions:
> > > >> > > > > > >>>
> > > >> > > > > > >>> 1. Configuration
> > > >> > > > > > >>> The KIP only mentions RocksDB, but of course, Streams
> > also
> > > >> > ships
> > > >> > > > with
> > > >> > > > > > an
> > > >> > > > > > >>> InMemory store, and users also plug in their own
> custom
> > > >> state
> > > >> > > > stores.
> > > >> > > > > > It
> > > >> > > > > > >> is
> > > >> > > > > > >>> also common to use multiple types of state stores in
> the
> > > >> same
> > > >> > > > > > application
> > > >> > > > > > >>> for different purposes.
> > > >> > > > > > >>>
> > > >> > > > > > >>> Against this backdrop, the choice to configure
> > > >> transactionality
> > > >> > > as
> > > >> > > > a
> > > >> > > > > > >>> top-level config, as well as to configure the store
> > > >> transaction
> > > >> > > > > > mechanism
> > > >> > > > > > >>> as a top-level config, seems a bit off.
> > > >> > > > > > >>>
> > > >> > > > > > >>> Did you consider instead just adding the option to the
> > > >> > > > > > >>> RocksDB*StoreSupplier classes and the factories in
> > Stores
> > > ?
> > > >> It
> > > >> > > > seems
> > > >> > > > > > like
> > > >> > > > > > >>> the desire to enable the feature by default, but with
> a
> > > >> > > > feature-flag
> > > >> > > > > to
> > > >> > > > > > >>> disable it was a factor here. However, as you pointed
> > out,
> > > >> > there
> > > >> > > > are
> > > >> > > > > > some
> > > >> > > > > > >>> major considerations that users should be aware of, so
> > > >> opt-in
> > > >> > > > doesn't
> > > >> > > > > > >> seem
> > > >> > > > > > >>> like a bad choice, either. You could add an Enum
> > argument
> > > to
> > > >> > > those
> > > >> > > > > > >>> factories like `RocksDBTransactionalMechanism.{NONE,
> > > >> > > > > > >>>
> > > >> > > > > > >>> Some points in favor of this approach:
> > > >> > > > > > >>> * Avoid "stores that don't support transactions ignore
> > the
> > > >> > > config"
> > > >> > > > > > >>> complexity
> > > >> > > > > > >>> * Users can choose how to spend their memory budget,
> > > making
> > > >> > some
> > > >> > > > > stores
> > > >> > > > > > >>> transactional and others not
> > > >> > > > > > >>> * When we add transactional support to in-memory
> stores,
> > > we
> > > >> > don't
> > > >> > > > > have
> > > >> > > > > > to
> > > >> > > > > > >>> figure out what to do with the mechanism config (i.e.,
> > > what
> > > >> do
> > > >> > > you
> > > >> > > > > set
> > > >> > > > > > >> the
> > > >> > > > > > >>> mechanism to when there are multiple kinds of
> > > transactional
> > > >> > > stores
> > > >> > > > in
> > > >> > > > > > the
> > > >> > > > > > >>> topology?)
> > > >> > > > > > >>>
> > > >> > > > > > >>> 2. caching/flushing/transactions
> > > >> > > > > > >>> The coupling between memory usage and flushing that
> you
> > > >> > mentioned
> > > >> > > > is
> > > >> > > > > a
> > > >> > > > > > >> bit
> > > >> > > > > > >>> troubling. It also occurs to me that there seems to be
> > > some
> > > >> > > > > > relationship
> > > >> > > > > > >>> with the existing record cache, which is also an
> > in-memory
> > > >> > > holding
> > > >> > > > > area
> > > >> > > > > > >> for
> > > >> > > > > > >>> records that are not yet written to the cache and/or
> > store
> > > >> > > (albeit
> > > >> > > > > with
> > > >> > > > > > >> no
> > > >> > > > > > >>> particular semantics). Have you considered how all
> these
> > > >> > > components
> > > >> > > > > > >> should
> > > >> > > > > > >>> relate? For example, should a "full" WriteBatch
> actually
> > > >> > trigger
> > > >> > > a
> > > >> > > > > > flush
> > > >> > > > > > >> so
> > > >> > > > > > >>> that we don't get OOMEs? If the proposed transactional
> > > >> > mechanism
> > > >> > > > > forces
> > > >> > > > > > >> all
> > > >> > > > > > >>> uncommitted writes to be buffered in memory, until a
> > > commit,
> > > >> > then
> > > >> > > > > what
> > > >> > > > > > is
> > > >> > > > > > >>> the advantage over just doing the same thing with the
> > > >> > RecordCache
> > > >> > > > and
> > > >> > > > > > not
> > > >> > > > > > >>> introducing the WriteBatch at all?
> > > >> > > > > > >>>
> > > >> > > > > > >>> 3. ALOS
> > > >> > > > > > >>> You mentioned that a transactional store can help
> reduce
> > > >> > > > duplication
> > > >> > > > > in
> > > >> > > > > > >>> the case of ALOS. We might want to be careful about
> > claims
> > > >> like
> > > >> > > > that.
> > > >> > > > > > >>> Duplication isn't the way that repeated processing
> > > >> manifests in
> > > >> > > > state
> > > >> > > > > > >>> stores. Rather, it is in the form of dirty reads
> during
> > > >> > > > reprocessing.
> > > >> > > > > > >> This
> > > >> > > > > > >>> feature may reduce the incidence of dirty reads during
> > > >> > > > reprocessing,
> > > >> > > > > > but
> > > >> > > > > > >>> not in a predictable way. During regular processing
> > today,
> > > >> we
> > > >> > > will
> > > >> > > > > send
> > > >> > > > > > >>> some records through to the changelog in between
> commit
> > > >> > > intervals.
> > > >> > > > > > Under
> > > >> > > > > > >>> ALOS, if any of those dirty writes gets committed to
> the
> > > >> > > changelog
> > > >> > > > > > topic,
> > > >> > > > > > >>> then upon failure, we have to roll the store forward
> to
> > > them
> > > >> > > > anyway,
> > > >> > > > > > >>> regardless of this new transactional mechanism.
> That's a
> > > >> > fixable
> > > >> > > > > > problem,
> > > >> > > > > > >>> by the way, but this KIP doesn't seem to fix it. I
> > wonder
> > > >> if we
> > > >> > > > > should
> > > >> > > > > > >> make
> > > >> > > > > > >>> any claims about the relationship of this feature to
> > ALOS
> > > if
> > > >> > the
> > > >> > > > > > >> real-world
> > > >> > > > > > >>> behavior is so complex.
> > > >> > > > > > >>>
> > > >> > > > > > >>> 4. IQ
> > > >> > > > > > >>> As a reminder, we have a new IQv2 mechanism now.
> Should
> > we
> > > >> > > propose
> > > >> > > > > any
> > > >> > > > > > >>> changes to IQv1 to support this transactional
> mechanism,
> > > >> versus
> > > >> > > > just
> > > >> > > > > > >>> proposing it for IQv2? Certainly, it seems strange
> only
> > to
> > > >> > > propose
> > > >> > > > a
> > > >> > > > > > >> change
> > > >> > > > > > >>> for IQv1 and not v2.
> > > >> > > > > > >>>
> > > >> > > > > > >>> Regarding your proposal for IQv1, I'm unsure what the
> > > >> behavior
> > > >> > > > should
> > > >> > > > > > be
> > > >> > > > > > >>> for readCommitted, since the current behavior also
> reads
> > > >> out of
> > > >> > > the
> > > >> > > > > > >>> RecordCache. I guess if readCommitted==false, then we
> > will
> > > >> > > continue
> > > >> > > > > to
> > > >> > > > > > >> read
> > > >> > > > > > >>> from the cache first, then the Batch, then the store;
> > and
> > > if
> > > >> > > > > > >>> readCommitted==true, we would skip the cache and the
> > Batch
> > > >> and
> > > >> > > only
> > > >> > > > > > read
> > > >> > > > > > >>> from the persistent RocksDB store?
> > > >> > > > > > >>>
> > > >> > > > > > >>> What should IQ do if I request to readCommitted on a
> > > >> > > > > non-transactional
> > > >> > > > > > >>> store?
> > > >> > > > > > >>>
> > > >> > > > > > >>> Thanks again for proposing the KIP, and my apologies
> for
> > > the
> > > >> > long
> > > >> > > > > > reply;
> > > >> > > > > > >>> I'm hoping to air all my concerns in one "batch" to
> save
> > > >> time
> > > >> > for
> > > >> > > > > you.
> > > >> > > > > > >>>
> > > >> > > > > > >>> Thanks,
> > > >> > > > > > >>> -John
> > > >> > > > > > >>>
> > > >> > > > > > >>> On Tue, May 24, 2022, at 03:45, Alexander Sorokoumov
> > > wrote:
> > > >> > > > > > >>>> Hi all,
> > > >> > > > > > >>>>
> > > >> > > > > > >>>> I've written a KIP for making Kafka Streams state
> > stores
> > > >> > > > > transactional
> > > >> > > > > > >>> and
> > > >> > > > > > >>>> would like to start a discussion:
> > > >> > > > > > >>>>
> > > >> > > > > > >>>>
> > > >> > > > > > >>>
> > > >> > > > > > >>
> > > >> > > > > >
> > > >> > > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-844%3A+Transactional+State+Stores
> > > >> > > > > > >>>>
> > > >> > > > > > >>>> Best,
> > > >> > > > > > >>>> Alex
> > > >> > > > > > >>>
> > > >> > > > > > >>
> > > >> > > > > > >
> > > >> > > > > > >
> > > >> > > > > >
> > > >> > > > >
> > > >> > > > >
> > > >> > > > > --
> > > >> > > > >
> > > >> > > > > [image: Confluent] <https://www.confluent.io>
> > > >> > > > > Suhas Satish
> > > >> > > > > Engineering Manager
> > > >> > > > > Follow us: [image: Blog]
> > > >> > > > > <
> > > >> > > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> https://www.confluent.io/blog?utm_source=footer&utm_medium=email&utm_campaign=ch.email-signature_type.community_content.blog
> > > >> > > > > >[image:
> > > >> > > > > Twitter] <https://twitter.com/ConfluentInc>[image:
> LinkedIn]
> > > >> > > > > <https://www.linkedin.com/company/confluent/>
> > > >> > > > >
> > > >> > > > > [image: Try Confluent Cloud for Free]
> > > >> > > > > <
> > > >> > > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> https://www.confluent.io/get-started?utm_campaign=tm.fm-apac_cd.inbound&utm_source=gmail&utm_medium=organic
> > > >> > > > > >
> > > >> > > > >
> > > >> > > >
> > > >> > >
> > > >> > >
> > > >> > > --
> > > >> > > -- Guozhang
> > > >> > >
> > > >> >
> > > >>
> > > >>
> > > >> --
> > > >> -- Guozhang
> > > >>
> > > >
> > >
> >
> >
> > --
> > -- Guozhang
> >
>

Reply via email to