Hey Guozhang, 1) About the param passed into the `recover()` function: it seems to me > that the semantics of "recover(offset)" is: recover this state to a > transaction boundary which is at least the passed-in offset. And the only > possibility that the returned offset is different than the passed-in offset > is that if the previous failure happens after we've done all the commit > procedures except writing the new checkpoint, in which case the returned > offset would be larger than the passed-in offset. Otherwise it should > always be equal to the passed-in offset, is that right?
Right now, the only case when `recover` returns an offset different from the passed one is when the failure happens *during* commit. If the failure happens after commit but before the checkpoint, `recover` might return either a passed or newer committed offset, depending on the implementation. The `recover` implementation in the prototype returns a passed offset because it deletes the commit marker that holds that offset after the commit is done. In that case, the store will replay the last commit from the changelog. I think it is fine as the changelog replay is idempotent. 2) It seems the only use for the "transactional()" function is to determine > if we can update the checkpoint file while in EOS. Right now, there are 2 other uses for `transactional()`: 1. To determine what to do during initialization if the checkpoint is gone (see [1]). If the state store is transactional, we don't have to wipe the existing data. Thinking about it now, we do not really need this check whether the store is `transactional` because if it is not, we'd not have written the checkpoint in the first place. I am going to remove that check. 2. To determine if the persistent kv store in KStreamImplJoin should be transactional (see [2], [3]). I am not sure if we can get rid of the checks in point 2. If so, I'd be happy to encapsulate `transactional()` logic in `commit/recover`. Best, Alex 1. https://github.com/apache/kafka/pull/12393/files#diff-971d9ef7ea8aefffff687fc7ee131bd166ced94445f4ab55aa83007541dccfdaL256-R281 2. https://github.com/apache/kafka/pull/12393/files#diff-9ce43046fdef1233ab762e728abd1d3d44d7c270b28dcf6b63aa31a93a30af07R266-R278 3. https://github.com/apache/kafka/pull/12393/files#diff-9ce43046fdef1233ab762e728abd1d3d44d7c270b28dcf6b63aa31a93a30af07R348-R354 On Tue, Jul 26, 2022 at 6:39 PM Nick Telford <nick.telf...@gmail.com> wrote: > Hi Alex, > > Excellent proposal, I'm very keen to see this land! > > Would it be useful to permit configuring the type of store used for > uncommitted offsets on a store-by-store basis? This way, users could choose > whether to use, e.g. an in-memory store or RocksDB, potentially reducing > the overheads associated with RocksDb for smaller stores, but without the > memory pressure issues? > > I suspect that in most cases, the number of uncommitted records will be > very small, because the default commit interval is 100ms. > > Regards, > > Nick > > On Tue, 26 Jul 2022 at 01:36, Guozhang Wang <wangg...@gmail.com> wrote: > > > Hello Alex, > > > > Thanks for the updated KIP, I looked over it and browsed the WIP and just > > have a couple meta thoughts: > > > > 1) About the param passed into the `recover()` function: it seems to me > > that the semantics of "recover(offset)" is: recover this state to a > > transaction boundary which is at least the passed-in offset. And the only > > possibility that the returned offset is different than the passed-in > offset > > is that if the previous failure happens after we've done all the commit > > procedures except writing the new checkpoint, in which case the returned > > offset would be larger than the passed-in offset. Otherwise it should > > always be equal to the passed-in offset, is that right? > > > > 2) It seems the only use for the "transactional()" function is to > determine > > if we can update the checkpoint file while in EOS. But the purpose of the > > checkpoint file's offsets is just to tell "the local state's current > > snapshot's progress is at least the indicated offsets" anyways, and with > > this KIP maybe we would just do: > > > > a) when in ALOS, upon failover: we set the starting offset as > > checkpointed-offset, then restore() from changelog till the end-offset. > > This way we may restore some records twice. > > b) when in EOS, upon failover: we first call > recover(checkpointed-offset), > > then set the starting offset as the returned offset (which may be larger > > than checkpointed-offset), then restore until the end-offset. > > > > So why not also: > > c) we let the `commit()` function to also return an offset, which > indicates > > "checkpointable offsets". > > d) for existing non-transactional stores, we just have a default > > implementation of "commit()" which is simply a flush, and returns a > > sentinel value like -1. Then later if we get checkpointable offsets -1, > we > > do not write the checkpoint. Upon clean shutting down we can just > > checkpoint regardless of the returned value from "commit". > > e) for existing non-transactional stores, we just have a default > > implementation of "recover()" which is to wipe out the local store and > > return offset 0 if the passed in offset is -1, otherwise if not -1 then > it > > indicates a clean shutdown in the last run, can this function is just a > > no-op. > > > > In that case, we would not need the "transactional()" function anymore, > > since for non-transactional stores their behaviors are still wrapped in > the > > `commit / recover` function pairs. > > > > I have not completed the thorough pass on your WIP PR, so maybe I could > > come up with some more feedback later, but just let me know if my > > understanding above is correct or not? > > > > > > Guozhang > > > > > > > > > > On Thu, Jul 14, 2022 at 7:01 AM Alexander Sorokoumov > > <asorokou...@confluent.io.invalid> wrote: > > > > > Hi, > > > > > > I updated the KIP with the following changes: > > > * Replaced in-memory batches with the secondary-store approach as the > > > default implementation to address the feedback about memory pressure as > > > suggested by Sagar and Bruno. > > > * Introduced StateStore#commit and StateStore#recover methods as an > > > extension of the rollback idea. @Guozhang, please see the comment below > > on > > > why I took a slightly different approach than you suggested. > > > * Removed mentions of changes to IQv1 and IQv2. Transactional state > > stores > > > enable reading committed in IQ, but it is really an independent feature > > > that deserves its own KIP. Conflating them unnecessarily increases the > > > scope for discussion, implementation, and testing in a single unit of > > work. > > > > > > I also published a prototype - > > https://github.com/apache/kafka/pull/12393 > > > that implements changes described in the proposal. > > > > > > Regarding explicit rollback, I think it is a powerful idea that allows > > > other StateStore implementations to take a different path to the > > > transactional behavior rather than keep 2 state stores. Instead of > > > introducing a new commit token, I suggest using a changelog offset that > > > already 1:1 corresponds to the materialized state. This works nicely > > > because Kafka Stream first commits an AK transaction and only then > > > checkpoints the state store, so we can use the changelog offset to > commit > > > the state store transaction. > > > > > > I called the method StateStore#recover rather than StateStore#rollback > > > because a state store might either roll back or forward depending on > the > > > specific point of the crash failure.Consider the write algorithm in > Kafka > > > Streams is: > > > 1. write stuff to the state store > > > 2. producer.sendOffsetsToTransaction(token); > > producer.commitTransaction(); > > > 3. flush > > > 4. checkpoint > > > > > > Let's consider 3 cases: > > > 1. If the crash failure happens between #2 and #3, the state store > rolls > > > back and replays the uncommitted transaction from the changelog. > > > 2. If the crash failure happens during #3, the state store can roll > > forward > > > and finish the flush/commit. > > > 3. If the crash failure happens between #3 and #4, the state store > should > > > do nothing during recovery and just proceed with the checkpoint. > > > > > > Looking forward to your feedback, > > > Alexander > > > > > > On Wed, Jun 8, 2022 at 12:16 AM Alexander Sorokoumov < > > > asorokou...@confluent.io> wrote: > > > > > > > Hi, > > > > > > > > As a status update, I did the following changes to the KIP: > > > > * replaced configuration via the top-level config with configuration > > via > > > > Stores factory and StoreSuppliers, > > > > * added IQv2 and elaborated how readCommitted will work when the > store > > is > > > > not transactional, > > > > * removed claims about ALOS. > > > > > > > > I am going to be OOO in the next couple of weeks and will resume > > working > > > > on the proposal and responding to the discussion in this thread > > starting > > > > June 27. My next top priorities are: > > > > 1. Prototype the rollback approach as suggested by Guozhang. > > > > 2. Replace in-memory batches with the secondary-store approach as the > > > > default implementation to address the feedback about memory pressure > as > > > > suggested by Sagar and Bruno. > > > > 3. Adjust Stores methods to make transactional implementations > > pluggable. > > > > 4. Publish the POC for the first review. > > > > > > > > Best regards, > > > > Alex > > > > > > > > On Wed, Jun 1, 2022 at 2:52 PM Guozhang Wang <wangg...@gmail.com> > > wrote: > > > > > > > >> Alex, > > > >> > > > >> Thanks for your replies! That is very helpful. > > > >> > > > >> Just to broaden our discussions a bit here, I think there are some > > other > > > >> approaches in parallel to the idea of "enforce to only persist upon > > > >> explicit flush" and I'd like to throw one here -- not really > > advocating > > > >> it, > > > >> but just for us to compare the pros and cons: > > > >> > > > >> 1) We let the StateStore's `flush` function to return a token > instead > > of > > > >> returning `void`. > > > >> 2) We add another `rollback(token)` interface of StateStore which > > would > > > >> effectively rollback the state as indicated by the token to the > > snapshot > > > >> when the corresponding `flush` is called. > > > >> 3) We encode the token and commit as part of > > > >> `producer#sendOffsetsToTransaction`. > > > >> > > > >> Users could optionally implement the new functions, or they can just > > not > > > >> return the token at all and not implement the second function. > Again, > > > the > > > >> APIs are just for the sake of illustration, not feeling they are the > > > most > > > >> natural :) > > > >> > > > >> Then the procedure would be: > > > >> > > > >> 1. the previous checkpointed offset is 100 > > > >> ... > > > >> 3. flush store, make sure all writes are persisted; get the returned > > > token > > > >> that indicates the snapshot of 200. > > > >> 4. producer.sendOffsetsToTransaction(token); > > > producer.commitTransaction(); > > > >> 5. Update the checkpoint file (say, the new value is 200). > > > >> > > > >> Then if there's a failure, say between 3/4, we would get the token > > from > > > >> the > > > >> last committed txn, and first we would do the restoration (which may > > get > > > >> the state to somewhere between 100 and 200), then call > > > >> `store.rollback(token)` to rollback to the snapshot of offset 100. > > > >> > > > >> The pros is that we would then not need to enforce the state stores > to > > > not > > > >> persist any data during the txn: for stores that may not be able to > > > >> implement the `rollback` function, they can still reduce its impl to > > > "not > > > >> persisting any data" via this API, but for stores that can indeed > > > support > > > >> the rollback, their implementation may be more efficient. The cons > > > though, > > > >> on top of my head are 1) more complicated logic differentiating > > between > > > >> EOS > > > >> with and without store rollback support, and ALOS, 2) encoding the > > token > > > >> as > > > >> part of the commit offset is not ideal if it is big, 3) the recovery > > > logic > > > >> including the state store is also a bit more complicated. > > > >> > > > >> > > > >> Guozhang > > > >> > > > >> > > > >> > > > >> > > > >> > > > >> On Wed, Jun 1, 2022 at 1:29 PM Alexander Sorokoumov > > > >> <asorokou...@confluent.io.invalid> wrote: > > > >> > > > >> > Hi Guozhang, > > > >> > > > > >> > But I'm still trying to clarify how it guarantees EOS, and it > seems > > > >> that we > > > >> > > would achieve it by enforcing to not persist any data written > > within > > > >> this > > > >> > > transaction until step 4. Is that correct? > > > >> > > > > >> > > > > >> > This is correct. Both alternatives - in-memory WriteBatchWithIndex > > and > > > >> > transactionality via the secondary store guarantee EOS by not > > > persisting > > > >> > data in the "main" state store until it is committed in the > > changelog > > > >> > topic. > > > >> > > > > >> > Oh what I meant is not what KStream code does, but that StateStore > > > impl > > > >> > > classes themselves could potentially flush data to become > > persisted > > > >> > > asynchronously > > > >> > > > > >> > > > > >> > Thank you for elaborating! You are correct, the underlying state > > store > > > >> > should not persist data until the streams app calls > > StateStore#flush. > > > >> There > > > >> > are 2 options how a State Store implementation can guarantee that > - > > > >> either > > > >> > keep uncommitted writes in memory or be able to roll back the > > changes > > > >> that > > > >> > were not committed during recovery. RocksDB's WriteBatchWithIndex > is > > > an > > > >> > implementation of the first option. A considered alternative, > > > >> Transactions > > > >> > via Secondary State Store for Uncommitted Changes, is the way to > > > >> implement > > > >> > the second option. > > > >> > > > > >> > As everyone correctly pointed out, keeping uncommitted data in > > memory > > > >> > introduces a very real risk of OOM that we will need to handle. > The > > > >> more I > > > >> > think about it, the more I lean towards going with the > Transactions > > > via > > > >> > Secondary Store as the way to implement transactionality as it > does > > > not > > > >> > have that issue. > > > >> > > > > >> > Best, > > > >> > Alex > > > >> > > > > >> > > > > >> > On Wed, Jun 1, 2022 at 12:59 PM Guozhang Wang <wangg...@gmail.com > > > > > >> wrote: > > > >> > > > > >> > > Hello Alex, > > > >> > > > > > >> > > > we flush the cache, but not the underlying state store. > > > >> > > > > > >> > > You're right. The ordering I mentioned above is actually: > > > >> > > > > > >> > > ... > > > >> > > 3. producer.sendOffsetsToTransaction(); > > > producer.commitTransaction(); > > > >> > > 4. flush store, make sure all writes are persisted. > > > >> > > 5. Update the checkpoint file to 200. > > > >> > > > > > >> > > But I'm still trying to clarify how it guarantees EOS, and it > > seems > > > >> that > > > >> > we > > > >> > > would achieve it by enforcing to not persist any data written > > within > > > >> this > > > >> > > transaction until step 4. Is that correct? > > > >> > > > > > >> > > > Can you please point me to the place in the codebase where we > > > >> trigger > > > >> > > async flush before the commit? > > > >> > > > > > >> > > Oh what I meant is not what KStream code does, but that > StateStore > > > >> impl > > > >> > > classes themselves could potentially flush data to become > > persisted > > > >> > > asynchronously, e.g. RocksDB does that naturally out of the > > control > > > of > > > >> > > KStream code. I think it is related to my previous question: if > we > > > >> think > > > >> > by > > > >> > > guaranteeing EOS at the state store level, we would effectively > > ask > > > >> the > > > >> > > impl classes that "you should not persist any data until `flush` > > is > > > >> > called > > > >> > > explicitly", is the StateStore interface the right level to > > enforce > > > >> such > > > >> > > mechanisms, or should we just do that on top of the StateStores, > > > e.g. > > > >> > > during the transaction we just keep all the writes in the cache > > (of > > > >> > course > > > >> > > we need to consider how to work around memory pressure as > > previously > > > >> > > mentioned), and then upon committing, we just write the cached > > > records > > > >> > as a > > > >> > > whole into the store and then call flush. > > > >> > > > > > >> > > > > > >> > > Guozhang > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > On Tue, May 31, 2022 at 4:08 PM Alexander Sorokoumov > > > >> > > <asorokou...@confluent.io.invalid> wrote: > > > >> > > > > > >> > > > Hey, > > > >> > > > > > > >> > > > Thank you for the wealth of great suggestions and questions! I > > am > > > >> going > > > >> > > to > > > >> > > > address the feedback in batches and update the proposal async, > > as > > > >> it is > > > >> > > > probably going to be easier for everyone. I will also write a > > > >> separate > > > >> > > > message after making updates to the KIP. > > > >> > > > > > > >> > > > @John, > > > >> > > > > > > >> > > > > Did you consider instead just adding the option to the > > > >> > > > > RocksDB*StoreSupplier classes and the factories in Stores ? > > > >> > > > > > > >> > > > Thank you for suggesting that. I think that this idea is > better > > > than > > > >> > > what I > > > >> > > > came up with and will update the KIP with configuring > > > >> transactionality > > > >> > > via > > > >> > > > the suppliers and Stores. > > > >> > > > > > > >> > > > what is the advantage over just doing the same thing with the > > > >> > RecordCache > > > >> > > > > and not introducing the WriteBatch at all? > > > >> > > > > > > >> > > > Can you point me to RecordCache? I can't find it in the > project. > > > The > > > >> > > > advantage would be that WriteBatch guarantees write atomicity. > > As > > > >> far > > > >> > as > > > >> > > I > > > >> > > > understood the way RecordCache works, it might leave the > system > > in > > > >> an > > > >> > > > inconsistent state during crash failure on write. > > > >> > > > > > > >> > > > You mentioned that a transactional store can help reduce > > > >> duplication in > > > >> > > the > > > >> > > > > case of ALOS > > > >> > > > > > > >> > > > I will remove claims about ALOS from the proposal. Thank you > for > > > >> > > > elaborating! > > > >> > > > > > > >> > > > As a reminder, we have a new IQv2 mechanism now. Should we > > propose > > > >> any > > > >> > > > > changes to IQv1 to support this transactional mechanism, > > versus > > > >> just > > > >> > > > > proposing it for IQv2? Certainly, it seems strange only to > > > >> propose a > > > >> > > > change > > > >> > > > > for IQv1 and not v2. > > > >> > > > > > > >> > > > > > > >> > > > I will update the proposal with complementary API changes for > > > IQv2 > > > >> > > > > > > >> > > > What should IQ do if I request to readCommitted on a > > > >> non-transactional > > > >> > > > > store? > > > >> > > > > > > >> > > > We can assume that non-transactional stores commit on write, > so > > IQ > > > >> > works > > > >> > > in > > > >> > > > the same way with non-transactional stores regardless of the > > value > > > >> of > > > >> > > > readCommitted. > > > >> > > > > > > >> > > > > > > >> > > > @Guozhang, > > > >> > > > > > > >> > > > * If we crash between line 3 and 4, then at that time the > local > > > >> > > persistent > > > >> > > > > store image is representing as of offset 200, but upon > > recovery > > > >> all > > > >> > > > > changelog records from 100 to log-end-offset would be > > considered > > > >> as > > > >> > > > aborted > > > >> > > > > and not be replayed and we would restart processing from > > > position > > > >> > 100. > > > >> > > > > Restart processing will violate EOS.I'm not sure how e.g. > > > >> RocksDB's > > > >> > > > > WriteBatchWithIndex would make sure that the step 4 and > step 5 > > > >> could > > > >> > be > > > >> > > > > done atomically here. > > > >> > > > > > > >> > > > > > > >> > > > Could you please point me to the place in the codebase where a > > > task > > > >> > > flushes > > > >> > > > the store before committing the transaction? > > > >> > > > Looking at TaskExecutor ( > > > >> > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > > > > https://github.com/apache/kafka/blob/4c9eeef5b2dff9a4f0977fbc5ac7eaaf930d0d0e/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java#L144-L167 > > > >> > > > ), > > > >> > > > StreamTask#prepareCommit ( > > > >> > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > > > > https://github.com/apache/kafka/blob/4c9eeef5b2dff9a4f0977fbc5ac7eaaf930d0d0e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L398 > > > >> > > > ), > > > >> > > > and CachedStateStore ( > > > >> > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > > > > https://github.com/apache/kafka/blob/4c9eeef5b2dff9a4f0977fbc5ac7eaaf930d0d0e/streams/src/main/java/org/apache/kafka/streams/state/internals/CachedStateStore.java#L29-L34 > > > >> > > > ) > > > >> > > > we flush the cache, but not the underlying state store. > Explicit > > > >> > > > StateStore#flush happens in AbstractTask#maybeWriteCheckpoint > ( > > > >> > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > > > > https://github.com/apache/kafka/blob/4c9eeef5b2dff9a4f0977fbc5ac7eaaf930d0d0e/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java#L91-L99 > > > >> > > > ). > > > >> > > > Is there something I am missing here? > > > >> > > > > > > >> > > > Today all cached data that have not been flushed are not > > committed > > > >> for > > > >> > > > > sure, but even flushed data to the persistent underlying > store > > > may > > > >> > also > > > >> > > > be > > > >> > > > > uncommitted since flushing can be triggered asynchronously > > > before > > > >> the > > > >> > > > > commit. > > > >> > > > > > > >> > > > Can you please point me to the place in the codebase where we > > > >> trigger > > > >> > > async > > > >> > > > flush before the commit? This would certainly be a reason to > > > >> introduce > > > >> > a > > > >> > > > dedicated StateStore#commit method. > > > >> > > > > > > >> > > > Thanks again for the feedback. I am going to update the KIP > and > > > then > > > >> > > > respond to the next batch of questions and suggestions. > > > >> > > > > > > >> > > > Best, > > > >> > > > Alex > > > >> > > > > > > >> > > > On Mon, May 30, 2022 at 5:13 PM Suhas Satish > > > >> > > <ssat...@confluent.io.invalid > > > >> > > > > > > > >> > > > wrote: > > > >> > > > > > > >> > > > > Thanks for the KIP proposal Alex. > > > >> > > > > 1. Configuration default > > > >> > > > > > > > >> > > > > You mention applications using streams DSL with built-in > > rocksDB > > > >> > state > > > >> > > > > store will get transactional state stores by default when > EOS > > is > > > >> > > enabled, > > > >> > > > > but the default implementation for apps using PAPI will > > fallback > > > >> to > > > >> > > > > non-transactional behavior. > > > >> > > > > Shouldn't we have the same default behavior for both types > of > > > >> apps - > > > >> > > DSL > > > >> > > > > and PAPI? > > > >> > > > > > > > >> > > > > On Mon, May 30, 2022 at 2:11 AM Bruno Cadonna < > > > cado...@apache.org > > > >> > > > > >> > > > wrote: > > > >> > > > > > > > >> > > > > > Thanks for the PR, Alex! > > > >> > > > > > > > > >> > > > > > I am also glad to see this coming. > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > 1. Configuration > > > >> > > > > > > > > >> > > > > > I would also prefer to restrict the configuration of > > > >> transactional > > > >> > on > > > >> > > > > > the state sore. Ideally, calling method transactional() on > > the > > > >> > state > > > >> > > > > > store would be enough. An option on the store builder > would > > > >> make it > > > >> > > > > > possible to turn transactionality on and off (as John > > > proposed). > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > 2. Memory usage in RocksDB > > > >> > > > > > > > > >> > > > > > This seems to be a major issue. We do not have any > guarantee > > > >> that > > > >> > > > > > uncommitted writes fit into memory and I guess we will > never > > > >> have. > > > >> > > What > > > >> > > > > > happens when the uncommitted writes do not fit into > memory? > > > Does > > > >> > > > RocksDB > > > >> > > > > > throw an exception? Can we handle such an exception > without > > > >> > crashing? > > > >> > > > > > > > > >> > > > > > Does the RocksDB behavior even need to be included in this > > > KIP? > > > >> In > > > >> > > the > > > >> > > > > > end it is an implementation detail. > > > >> > > > > > > > > >> > > > > > What we should consider - though - is a memory limit in > some > > > >> form. > > > >> > > And > > > >> > > > > > what we do when the memory limit is exceeded. > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > 3. PoC > > > >> > > > > > > > > >> > > > > > I agree with Guozhang that a PoC is a good idea to better > > > >> > understand > > > >> > > > the > > > >> > > > > > devils in the details. > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > Best, > > > >> > > > > > Bruno > > > >> > > > > > > > > >> > > > > > On 25.05.22 01:52, Guozhang Wang wrote: > > > >> > > > > > > Hello Alex, > > > >> > > > > > > > > > >> > > > > > > Thanks for writing the proposal! Glad to see it coming. > I > > > >> think > > > >> > > this > > > >> > > > is > > > >> > > > > > the > > > >> > > > > > > kind of a KIP that since too many devils would be buried > > in > > > >> the > > > >> > > > details > > > >> > > > > > and > > > >> > > > > > > it's better to start working on a POC, either in > parallel, > > > or > > > >> > > before > > > >> > > > we > > > >> > > > > > > resume our discussion, rather than blocking any > > > implementation > > > >> > > until > > > >> > > > we > > > >> > > > > > are > > > >> > > > > > > satisfied with the proposal. > > > >> > > > > > > > > > >> > > > > > > Just as a concrete example, I personally am still not > 100% > > > >> clear > > > >> > > how > > > >> > > > > the > > > >> > > > > > > proposal would work to achieve EOS with the state > stores. > > > For > > > >> > > > example, > > > >> > > > > > the > > > >> > > > > > > commit procedure today looks like this: > > > >> > > > > > > > > > >> > > > > > > 0: there's an existing checkpoint file indicating the > > > >> changelog > > > >> > > > offset > > > >> > > > > of > > > >> > > > > > > the local state store image is 100. Now a commit is > > > triggered: > > > >> > > > > > > 1. flush cache (since it contains partially processed > > > >> records), > > > >> > > make > > > >> > > > > sure > > > >> > > > > > > all records are written to the producer. > > > >> > > > > > > 2. flush producer, making sure all changelog records > have > > > now > > > >> > > acked. > > > >> > > > // > > > >> > > > > > > here we would get the new changelog position, say 200 > > > >> > > > > > > 3. flush store, make sure all writes are persisted. > > > >> > > > > > > 4. producer.sendOffsetsToTransaction(); > > > >> > > producer.commitTransaction(); > > > >> > > > > // > > > >> > > > > > we > > > >> > > > > > > would make the writes in changelog up to offset 200 > > > committed > > > >> > > > > > > 5. Update the checkpoint file to 200. > > > >> > > > > > > > > > >> > > > > > > The question about atomicity between those lines, for > > > example: > > > >> > > > > > > > > > >> > > > > > > * If we crash between line 4 and line 5, the local > > > checkpoint > > > >> > file > > > >> > > > > would > > > >> > > > > > > stay as 100, and upon recovery we would replay the > > changelog > > > >> from > > > >> > > 100 > > > >> > > > > to > > > >> > > > > > > 200. This is not ideal but does not violate EOS, since > the > > > >> > > changelogs > > > >> > > > > are > > > >> > > > > > > all overwrites anyways. > > > >> > > > > > > * If we crash between line 3 and 4, then at that time > the > > > >> local > > > >> > > > > > persistent > > > >> > > > > > > store image is representing as of offset 200, but upon > > > >> recovery > > > >> > all > > > >> > > > > > > changelog records from 100 to log-end-offset would be > > > >> considered > > > >> > as > > > >> > > > > > aborted > > > >> > > > > > > and not be replayed and we would restart processing from > > > >> position > > > >> > > > 100. > > > >> > > > > > > Restart processing will violate EOS.I'm not sure how > e.g. > > > >> > RocksDB's > > > >> > > > > > > WriteBatchWithIndex would make sure that the step 4 and > > > step 5 > > > >> > > could > > > >> > > > be > > > >> > > > > > > done atomically here. > > > >> > > > > > > > > > >> > > > > > > Originally what I was thinking when creating the JIRA > > ticket > > > >> is > > > >> > > that > > > >> > > > we > > > >> > > > > > > need to let the state store to provide a transactional > API > > > >> like > > > >> > > > "token > > > >> > > > > > > commit()" used in step 4) above which returns a token, > > that > > > >> e.g. > > > >> > in > > > >> > > > our > > > >> > > > > > > example above indicates offset 200, and that token would > > be > > > >> > written > > > >> > > > as > > > >> > > > > > part > > > >> > > > > > > of the records in Kafka transaction in step 5). And upon > > > >> recovery > > > >> > > the > > > >> > > > > > state > > > >> > > > > > > store would have another API like "rollback(token)" > where > > > the > > > >> > token > > > >> > > > is > > > >> > > > > > read > > > >> > > > > > > from the latest committed txn, and be used to rollback > the > > > >> store > > > >> > to > > > >> > > > > that > > > >> > > > > > > committed image. I think your proposal is different, and > > it > > > >> seems > > > >> > > > like > > > >> > > > > > > you're proposing we swap step 3) and 4) above, but the > > > >> atomicity > > > >> > > > issue > > > >> > > > > > > still remains since now you may have the store image at > > 100 > > > >> but > > > >> > the > > > >> > > > > > > changelog is committed at 200. I'd like to learn more > > about > > > >> the > > > >> > > > details > > > >> > > > > > > on how it resolves such issues. > > > >> > > > > > > > > > >> > > > > > > Anyways, that's just an example to make the point that > > there > > > >> are > > > >> > > lots > > > >> > > > > of > > > >> > > > > > > implementational details which would drive the public > API > > > >> design, > > > >> > > and > > > >> > > > > we > > > >> > > > > > > should probably first do a POC, and come back to discuss > > the > > > >> KIP. > > > >> > > Let > > > >> > > > > me > > > >> > > > > > > know what you think? > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > Guozhang > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > On Tue, May 24, 2022 at 10:35 AM Sagar < > > > >> > sagarmeansoc...@gmail.com> > > > >> > > > > > wrote: > > > >> > > > > > > > > > >> > > > > > >> Hi Alexander, > > > >> > > > > > >> > > > >> > > > > > >> Thanks for the KIP! This seems like a great proposal. I > > > have > > > >> the > > > >> > > > same > > > >> > > > > > >> opinion as John on the Configuration part though. I > think > > > >> the 2 > > > >> > > > level > > > >> > > > > > >> config and its behaviour based on the setting/unsetting > > of > > > >> the > > > >> > > flag > > > >> > > > > > seems > > > >> > > > > > >> confusing to me as well. Since the KIP seems > specifically > > > >> > centred > > > >> > > > > around > > > >> > > > > > >> RocksDB it might be better to add it at the Supplier > > level > > > as > > > >> > John > > > >> > > > > > >> suggested. > > > >> > > > > > >> > > > >> > > > > > >> On similar lines, this config name => > > > >> > > > > > *statestore.transactional.mechanism > > > >> > > > > > >> *may > > > >> > > > > > >> also need rethinking as the value assigned to > > > >> > > it(rocksdb_indexbatch) > > > >> > > > > > >> implicitly seems to assume that rocksdb is the only > > > >> statestore > > > >> > > that > > > >> > > > > > Kafka > > > >> > > > > > >> Stream supports while that's not the case. > > > >> > > > > > >> > > > >> > > > > > >> Also, regarding the potential memory pressure that can > be > > > >> > > introduced > > > >> > > > > by > > > >> > > > > > >> WriteBatchIndex, do you think it might make more sense > to > > > >> > include > > > >> > > > some > > > >> > > > > > >> numbers/benchmarks on how much the memory consumption > > might > > > >> > > > increase? > > > >> > > > > > >> > > > >> > > > > > >> Lastly, the read_uncommitted flag's behaviour on IQ may > > > need > > > >> > more > > > >> > > > > > >> elaboration. > > > >> > > > > > >> > > > >> > > > > > >> These points aside, as I said, this is a great > proposal! > > > >> > > > > > >> > > > >> > > > > > >> Thanks! > > > >> > > > > > >> Sagar. > > > >> > > > > > >> > > > >> > > > > > >> On Tue, May 24, 2022 at 10:35 PM John Roesler < > > > >> > > vvcep...@apache.org> > > > >> > > > > > wrote: > > > >> > > > > > >> > > > >> > > > > > >>> Thanks for the KIP, Alex! > > > >> > > > > > >>> > > > >> > > > > > >>> I'm really happy to see your proposal. This > improvement > > > >> fills a > > > >> > > > > > >>> long-standing gap. > > > >> > > > > > >>> > > > >> > > > > > >>> I have a few questions: > > > >> > > > > > >>> > > > >> > > > > > >>> 1. Configuration > > > >> > > > > > >>> The KIP only mentions RocksDB, but of course, Streams > > also > > > >> > ships > > > >> > > > with > > > >> > > > > > an > > > >> > > > > > >>> InMemory store, and users also plug in their own > custom > > > >> state > > > >> > > > stores. > > > >> > > > > > It > > > >> > > > > > >> is > > > >> > > > > > >>> also common to use multiple types of state stores in > the > > > >> same > > > >> > > > > > application > > > >> > > > > > >>> for different purposes. > > > >> > > > > > >>> > > > >> > > > > > >>> Against this backdrop, the choice to configure > > > >> transactionality > > > >> > > as > > > >> > > > a > > > >> > > > > > >>> top-level config, as well as to configure the store > > > >> transaction > > > >> > > > > > mechanism > > > >> > > > > > >>> as a top-level config, seems a bit off. > > > >> > > > > > >>> > > > >> > > > > > >>> Did you consider instead just adding the option to the > > > >> > > > > > >>> RocksDB*StoreSupplier classes and the factories in > > Stores > > > ? > > > >> It > > > >> > > > seems > > > >> > > > > > like > > > >> > > > > > >>> the desire to enable the feature by default, but with > a > > > >> > > > feature-flag > > > >> > > > > to > > > >> > > > > > >>> disable it was a factor here. However, as you pointed > > out, > > > >> > there > > > >> > > > are > > > >> > > > > > some > > > >> > > > > > >>> major considerations that users should be aware of, so > > > >> opt-in > > > >> > > > doesn't > > > >> > > > > > >> seem > > > >> > > > > > >>> like a bad choice, either. You could add an Enum > > argument > > > to > > > >> > > those > > > >> > > > > > >>> factories like `RocksDBTransactionalMechanism.{NONE, > > > >> > > > > > >>> > > > >> > > > > > >>> Some points in favor of this approach: > > > >> > > > > > >>> * Avoid "stores that don't support transactions ignore > > the > > > >> > > config" > > > >> > > > > > >>> complexity > > > >> > > > > > >>> * Users can choose how to spend their memory budget, > > > making > > > >> > some > > > >> > > > > stores > > > >> > > > > > >>> transactional and others not > > > >> > > > > > >>> * When we add transactional support to in-memory > stores, > > > we > > > >> > don't > > > >> > > > > have > > > >> > > > > > to > > > >> > > > > > >>> figure out what to do with the mechanism config (i.e., > > > what > > > >> do > > > >> > > you > > > >> > > > > set > > > >> > > > > > >> the > > > >> > > > > > >>> mechanism to when there are multiple kinds of > > > transactional > > > >> > > stores > > > >> > > > in > > > >> > > > > > the > > > >> > > > > > >>> topology?) > > > >> > > > > > >>> > > > >> > > > > > >>> 2. caching/flushing/transactions > > > >> > > > > > >>> The coupling between memory usage and flushing that > you > > > >> > mentioned > > > >> > > > is > > > >> > > > > a > > > >> > > > > > >> bit > > > >> > > > > > >>> troubling. It also occurs to me that there seems to be > > > some > > > >> > > > > > relationship > > > >> > > > > > >>> with the existing record cache, which is also an > > in-memory > > > >> > > holding > > > >> > > > > area > > > >> > > > > > >> for > > > >> > > > > > >>> records that are not yet written to the cache and/or > > store > > > >> > > (albeit > > > >> > > > > with > > > >> > > > > > >> no > > > >> > > > > > >>> particular semantics). Have you considered how all > these > > > >> > > components > > > >> > > > > > >> should > > > >> > > > > > >>> relate? For example, should a "full" WriteBatch > actually > > > >> > trigger > > > >> > > a > > > >> > > > > > flush > > > >> > > > > > >> so > > > >> > > > > > >>> that we don't get OOMEs? If the proposed transactional > > > >> > mechanism > > > >> > > > > forces > > > >> > > > > > >> all > > > >> > > > > > >>> uncommitted writes to be buffered in memory, until a > > > commit, > > > >> > then > > > >> > > > > what > > > >> > > > > > is > > > >> > > > > > >>> the advantage over just doing the same thing with the > > > >> > RecordCache > > > >> > > > and > > > >> > > > > > not > > > >> > > > > > >>> introducing the WriteBatch at all? > > > >> > > > > > >>> > > > >> > > > > > >>> 3. ALOS > > > >> > > > > > >>> You mentioned that a transactional store can help > reduce > > > >> > > > duplication > > > >> > > > > in > > > >> > > > > > >>> the case of ALOS. We might want to be careful about > > claims > > > >> like > > > >> > > > that. > > > >> > > > > > >>> Duplication isn't the way that repeated processing > > > >> manifests in > > > >> > > > state > > > >> > > > > > >>> stores. Rather, it is in the form of dirty reads > during > > > >> > > > reprocessing. > > > >> > > > > > >> This > > > >> > > > > > >>> feature may reduce the incidence of dirty reads during > > > >> > > > reprocessing, > > > >> > > > > > but > > > >> > > > > > >>> not in a predictable way. During regular processing > > today, > > > >> we > > > >> > > will > > > >> > > > > send > > > >> > > > > > >>> some records through to the changelog in between > commit > > > >> > > intervals. > > > >> > > > > > Under > > > >> > > > > > >>> ALOS, if any of those dirty writes gets committed to > the > > > >> > > changelog > > > >> > > > > > topic, > > > >> > > > > > >>> then upon failure, we have to roll the store forward > to > > > them > > > >> > > > anyway, > > > >> > > > > > >>> regardless of this new transactional mechanism. > That's a > > > >> > fixable > > > >> > > > > > problem, > > > >> > > > > > >>> by the way, but this KIP doesn't seem to fix it. I > > wonder > > > >> if we > > > >> > > > > should > > > >> > > > > > >> make > > > >> > > > > > >>> any claims about the relationship of this feature to > > ALOS > > > if > > > >> > the > > > >> > > > > > >> real-world > > > >> > > > > > >>> behavior is so complex. > > > >> > > > > > >>> > > > >> > > > > > >>> 4. IQ > > > >> > > > > > >>> As a reminder, we have a new IQv2 mechanism now. > Should > > we > > > >> > > propose > > > >> > > > > any > > > >> > > > > > >>> changes to IQv1 to support this transactional > mechanism, > > > >> versus > > > >> > > > just > > > >> > > > > > >>> proposing it for IQv2? Certainly, it seems strange > only > > to > > > >> > > propose > > > >> > > > a > > > >> > > > > > >> change > > > >> > > > > > >>> for IQv1 and not v2. > > > >> > > > > > >>> > > > >> > > > > > >>> Regarding your proposal for IQv1, I'm unsure what the > > > >> behavior > > > >> > > > should > > > >> > > > > > be > > > >> > > > > > >>> for readCommitted, since the current behavior also > reads > > > >> out of > > > >> > > the > > > >> > > > > > >>> RecordCache. I guess if readCommitted==false, then we > > will > > > >> > > continue > > > >> > > > > to > > > >> > > > > > >> read > > > >> > > > > > >>> from the cache first, then the Batch, then the store; > > and > > > if > > > >> > > > > > >>> readCommitted==true, we would skip the cache and the > > Batch > > > >> and > > > >> > > only > > > >> > > > > > read > > > >> > > > > > >>> from the persistent RocksDB store? > > > >> > > > > > >>> > > > >> > > > > > >>> What should IQ do if I request to readCommitted on a > > > >> > > > > non-transactional > > > >> > > > > > >>> store? > > > >> > > > > > >>> > > > >> > > > > > >>> Thanks again for proposing the KIP, and my apologies > for > > > the > > > >> > long > > > >> > > > > > reply; > > > >> > > > > > >>> I'm hoping to air all my concerns in one "batch" to > save > > > >> time > > > >> > for > > > >> > > > > you. > > > >> > > > > > >>> > > > >> > > > > > >>> Thanks, > > > >> > > > > > >>> -John > > > >> > > > > > >>> > > > >> > > > > > >>> On Tue, May 24, 2022, at 03:45, Alexander Sorokoumov > > > wrote: > > > >> > > > > > >>>> Hi all, > > > >> > > > > > >>>> > > > >> > > > > > >>>> I've written a KIP for making Kafka Streams state > > stores > > > >> > > > > transactional > > > >> > > > > > >>> and > > > >> > > > > > >>>> would like to start a discussion: > > > >> > > > > > >>>> > > > >> > > > > > >>>> > > > >> > > > > > >>> > > > >> > > > > > >> > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-844%3A+Transactional+State+Stores > > > >> > > > > > >>>> > > > >> > > > > > >>>> Best, > > > >> > > > > > >>>> Alex > > > >> > > > > > >>> > > > >> > > > > > >> > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > -- > > > >> > > > > > > > >> > > > > [image: Confluent] <https://www.confluent.io> > > > >> > > > > Suhas Satish > > > >> > > > > Engineering Manager > > > >> > > > > Follow us: [image: Blog] > > > >> > > > > < > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > > > > https://www.confluent.io/blog?utm_source=footer&utm_medium=email&utm_campaign=ch.email-signature_type.community_content.blog > > > >> > > > > >[image: > > > >> > > > > Twitter] <https://twitter.com/ConfluentInc>[image: > LinkedIn] > > > >> > > > > <https://www.linkedin.com/company/confluent/> > > > >> > > > > > > > >> > > > > [image: Try Confluent Cloud for Free] > > > >> > > > > < > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > > > > https://www.confluent.io/get-started?utm_campaign=tm.fm-apac_cd.inbound&utm_source=gmail&utm_medium=organic > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > > >> > > -- > > > >> > > -- Guozhang > > > >> > > > > > >> > > > > >> > > > >> > > > >> -- > > > >> -- Guozhang > > > >> > > > > > > > > > > > > > -- > > -- Guozhang > > >