Hey, Thank you for the wealth of great suggestions and questions! I am going to address the feedback in batches and update the proposal async, as it is probably going to be easier for everyone. I will also write a separate message after making updates to the KIP.
@John, > Did you consider instead just adding the option to the > RocksDB*StoreSupplier classes and the factories in Stores ? Thank you for suggesting that. I think that this idea is better than what I came up with and will update the KIP with configuring transactionality via the suppliers and Stores. what is the advantage over just doing the same thing with the RecordCache > and not introducing the WriteBatch at all? Can you point me to RecordCache? I can't find it in the project. The advantage would be that WriteBatch guarantees write atomicity. As far as I understood the way RecordCache works, it might leave the system in an inconsistent state during crash failure on write. You mentioned that a transactional store can help reduce duplication in the > case of ALOS I will remove claims about ALOS from the proposal. Thank you for elaborating! As a reminder, we have a new IQv2 mechanism now. Should we propose any > changes to IQv1 to support this transactional mechanism, versus just > proposing it for IQv2? Certainly, it seems strange only to propose a change > for IQv1 and not v2. I will update the proposal with complementary API changes for IQv2 What should IQ do if I request to readCommitted on a non-transactional > store? We can assume that non-transactional stores commit on write, so IQ works in the same way with non-transactional stores regardless of the value of readCommitted. @Guozhang, * If we crash between line 3 and 4, then at that time the local persistent > store image is representing as of offset 200, but upon recovery all > changelog records from 100 to log-end-offset would be considered as aborted > and not be replayed and we would restart processing from position 100. > Restart processing will violate EOS.I'm not sure how e.g. RocksDB's > WriteBatchWithIndex would make sure that the step 4 and step 5 could be > done atomically here. Could you please point me to the place in the codebase where a task flushes the store before committing the transaction? Looking at TaskExecutor ( https://github.com/apache/kafka/blob/4c9eeef5b2dff9a4f0977fbc5ac7eaaf930d0d0e/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java#L144-L167), StreamTask#prepareCommit ( https://github.com/apache/kafka/blob/4c9eeef5b2dff9a4f0977fbc5ac7eaaf930d0d0e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L398), and CachedStateStore ( https://github.com/apache/kafka/blob/4c9eeef5b2dff9a4f0977fbc5ac7eaaf930d0d0e/streams/src/main/java/org/apache/kafka/streams/state/internals/CachedStateStore.java#L29-L34) we flush the cache, but not the underlying state store. Explicit StateStore#flush happens in AbstractTask#maybeWriteCheckpoint ( https://github.com/apache/kafka/blob/4c9eeef5b2dff9a4f0977fbc5ac7eaaf930d0d0e/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java#L91-L99 ). Is there something I am missing here? Today all cached data that have not been flushed are not committed for > sure, but even flushed data to the persistent underlying store may also be > uncommitted since flushing can be triggered asynchronously before the > commit. Can you please point me to the place in the codebase where we trigger async flush before the commit? This would certainly be a reason to introduce a dedicated StateStore#commit method. Thanks again for the feedback. I am going to update the KIP and then respond to the next batch of questions and suggestions. Best, Alex On Mon, May 30, 2022 at 5:13 PM Suhas Satish <ssat...@confluent.io.invalid> wrote: > Thanks for the KIP proposal Alex. > 1. Configuration default > > You mention applications using streams DSL with built-in rocksDB state > store will get transactional state stores by default when EOS is enabled, > but the default implementation for apps using PAPI will fallback to > non-transactional behavior. > Shouldn't we have the same default behavior for both types of apps - DSL > and PAPI? > > On Mon, May 30, 2022 at 2:11 AM Bruno Cadonna <cado...@apache.org> wrote: > > > Thanks for the PR, Alex! > > > > I am also glad to see this coming. > > > > > > 1. Configuration > > > > I would also prefer to restrict the configuration of transactional on > > the state sore. Ideally, calling method transactional() on the state > > store would be enough. An option on the store builder would make it > > possible to turn transactionality on and off (as John proposed). > > > > > > 2. Memory usage in RocksDB > > > > This seems to be a major issue. We do not have any guarantee that > > uncommitted writes fit into memory and I guess we will never have. What > > happens when the uncommitted writes do not fit into memory? Does RocksDB > > throw an exception? Can we handle such an exception without crashing? > > > > Does the RocksDB behavior even need to be included in this KIP? In the > > end it is an implementation detail. > > > > What we should consider - though - is a memory limit in some form. And > > what we do when the memory limit is exceeded. > > > > > > 3. PoC > > > > I agree with Guozhang that a PoC is a good idea to better understand the > > devils in the details. > > > > > > Best, > > Bruno > > > > On 25.05.22 01:52, Guozhang Wang wrote: > > > Hello Alex, > > > > > > Thanks for writing the proposal! Glad to see it coming. I think this is > > the > > > kind of a KIP that since too many devils would be buried in the details > > and > > > it's better to start working on a POC, either in parallel, or before we > > > resume our discussion, rather than blocking any implementation until we > > are > > > satisfied with the proposal. > > > > > > Just as a concrete example, I personally am still not 100% clear how > the > > > proposal would work to achieve EOS with the state stores. For example, > > the > > > commit procedure today looks like this: > > > > > > 0: there's an existing checkpoint file indicating the changelog offset > of > > > the local state store image is 100. Now a commit is triggered: > > > 1. flush cache (since it contains partially processed records), make > sure > > > all records are written to the producer. > > > 2. flush producer, making sure all changelog records have now acked. // > > > here we would get the new changelog position, say 200 > > > 3. flush store, make sure all writes are persisted. > > > 4. producer.sendOffsetsToTransaction(); producer.commitTransaction(); > // > > we > > > would make the writes in changelog up to offset 200 committed > > > 5. Update the checkpoint file to 200. > > > > > > The question about atomicity between those lines, for example: > > > > > > * If we crash between line 4 and line 5, the local checkpoint file > would > > > stay as 100, and upon recovery we would replay the changelog from 100 > to > > > 200. This is not ideal but does not violate EOS, since the changelogs > are > > > all overwrites anyways. > > > * If we crash between line 3 and 4, then at that time the local > > persistent > > > store image is representing as of offset 200, but upon recovery all > > > changelog records from 100 to log-end-offset would be considered as > > aborted > > > and not be replayed and we would restart processing from position 100. > > > Restart processing will violate EOS.I'm not sure how e.g. RocksDB's > > > WriteBatchWithIndex would make sure that the step 4 and step 5 could be > > > done atomically here. > > > > > > Originally what I was thinking when creating the JIRA ticket is that we > > > need to let the state store to provide a transactional API like "token > > > commit()" used in step 4) above which returns a token, that e.g. in our > > > example above indicates offset 200, and that token would be written as > > part > > > of the records in Kafka transaction in step 5). And upon recovery the > > state > > > store would have another API like "rollback(token)" where the token is > > read > > > from the latest committed txn, and be used to rollback the store to > that > > > committed image. I think your proposal is different, and it seems like > > > you're proposing we swap step 3) and 4) above, but the atomicity issue > > > still remains since now you may have the store image at 100 but the > > > changelog is committed at 200. I'd like to learn more about the details > > > on how it resolves such issues. > > > > > > Anyways, that's just an example to make the point that there are lots > of > > > implementational details which would drive the public API design, and > we > > > should probably first do a POC, and come back to discuss the KIP. Let > me > > > know what you think? > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, May 24, 2022 at 10:35 AM Sagar <sagarmeansoc...@gmail.com> > > wrote: > > > > > >> Hi Alexander, > > >> > > >> Thanks for the KIP! This seems like a great proposal. I have the same > > >> opinion as John on the Configuration part though. I think the 2 level > > >> config and its behaviour based on the setting/unsetting of the flag > > seems > > >> confusing to me as well. Since the KIP seems specifically centred > around > > >> RocksDB it might be better to add it at the Supplier level as John > > >> suggested. > > >> > > >> On similar lines, this config name => > > *statestore.transactional.mechanism > > >> *may > > >> also need rethinking as the value assigned to it(rocksdb_indexbatch) > > >> implicitly seems to assume that rocksdb is the only statestore that > > Kafka > > >> Stream supports while that's not the case. > > >> > > >> Also, regarding the potential memory pressure that can be introduced > by > > >> WriteBatchIndex, do you think it might make more sense to include some > > >> numbers/benchmarks on how much the memory consumption might increase? > > >> > > >> Lastly, the read_uncommitted flag's behaviour on IQ may need more > > >> elaboration. > > >> > > >> These points aside, as I said, this is a great proposal! > > >> > > >> Thanks! > > >> Sagar. > > >> > > >> On Tue, May 24, 2022 at 10:35 PM John Roesler <vvcep...@apache.org> > > wrote: > > >> > > >>> Thanks for the KIP, Alex! > > >>> > > >>> I'm really happy to see your proposal. This improvement fills a > > >>> long-standing gap. > > >>> > > >>> I have a few questions: > > >>> > > >>> 1. Configuration > > >>> The KIP only mentions RocksDB, but of course, Streams also ships with > > an > > >>> InMemory store, and users also plug in their own custom state stores. > > It > > >> is > > >>> also common to use multiple types of state stores in the same > > application > > >>> for different purposes. > > >>> > > >>> Against this backdrop, the choice to configure transactionality as a > > >>> top-level config, as well as to configure the store transaction > > mechanism > > >>> as a top-level config, seems a bit off. > > >>> > > >>> Did you consider instead just adding the option to the > > >>> RocksDB*StoreSupplier classes and the factories in Stores ? It seems > > like > > >>> the desire to enable the feature by default, but with a feature-flag > to > > >>> disable it was a factor here. However, as you pointed out, there are > > some > > >>> major considerations that users should be aware of, so opt-in doesn't > > >> seem > > >>> like a bad choice, either. You could add an Enum argument to those > > >>> factories like `RocksDBTransactionalMechanism.{NONE, > > >>> > > >>> Some points in favor of this approach: > > >>> * Avoid "stores that don't support transactions ignore the config" > > >>> complexity > > >>> * Users can choose how to spend their memory budget, making some > stores > > >>> transactional and others not > > >>> * When we add transactional support to in-memory stores, we don't > have > > to > > >>> figure out what to do with the mechanism config (i.e., what do you > set > > >> the > > >>> mechanism to when there are multiple kinds of transactional stores in > > the > > >>> topology?) > > >>> > > >>> 2. caching/flushing/transactions > > >>> The coupling between memory usage and flushing that you mentioned is > a > > >> bit > > >>> troubling. It also occurs to me that there seems to be some > > relationship > > >>> with the existing record cache, which is also an in-memory holding > area > > >> for > > >>> records that are not yet written to the cache and/or store (albeit > with > > >> no > > >>> particular semantics). Have you considered how all these components > > >> should > > >>> relate? For example, should a "full" WriteBatch actually trigger a > > flush > > >> so > > >>> that we don't get OOMEs? If the proposed transactional mechanism > forces > > >> all > > >>> uncommitted writes to be buffered in memory, until a commit, then > what > > is > > >>> the advantage over just doing the same thing with the RecordCache and > > not > > >>> introducing the WriteBatch at all? > > >>> > > >>> 3. ALOS > > >>> You mentioned that a transactional store can help reduce duplication > in > > >>> the case of ALOS. We might want to be careful about claims like that. > > >>> Duplication isn't the way that repeated processing manifests in state > > >>> stores. Rather, it is in the form of dirty reads during reprocessing. > > >> This > > >>> feature may reduce the incidence of dirty reads during reprocessing, > > but > > >>> not in a predictable way. During regular processing today, we will > send > > >>> some records through to the changelog in between commit intervals. > > Under > > >>> ALOS, if any of those dirty writes gets committed to the changelog > > topic, > > >>> then upon failure, we have to roll the store forward to them anyway, > > >>> regardless of this new transactional mechanism. That's a fixable > > problem, > > >>> by the way, but this KIP doesn't seem to fix it. I wonder if we > should > > >> make > > >>> any claims about the relationship of this feature to ALOS if the > > >> real-world > > >>> behavior is so complex. > > >>> > > >>> 4. IQ > > >>> As a reminder, we have a new IQv2 mechanism now. Should we propose > any > > >>> changes to IQv1 to support this transactional mechanism, versus just > > >>> proposing it for IQv2? Certainly, it seems strange only to propose a > > >> change > > >>> for IQv1 and not v2. > > >>> > > >>> Regarding your proposal for IQv1, I'm unsure what the behavior should > > be > > >>> for readCommitted, since the current behavior also reads out of the > > >>> RecordCache. I guess if readCommitted==false, then we will continue > to > > >> read > > >>> from the cache first, then the Batch, then the store; and if > > >>> readCommitted==true, we would skip the cache and the Batch and only > > read > > >>> from the persistent RocksDB store? > > >>> > > >>> What should IQ do if I request to readCommitted on a > non-transactional > > >>> store? > > >>> > > >>> Thanks again for proposing the KIP, and my apologies for the long > > reply; > > >>> I'm hoping to air all my concerns in one "batch" to save time for > you. > > >>> > > >>> Thanks, > > >>> -John > > >>> > > >>> On Tue, May 24, 2022, at 03:45, Alexander Sorokoumov wrote: > > >>>> Hi all, > > >>>> > > >>>> I've written a KIP for making Kafka Streams state stores > transactional > > >>> and > > >>>> would like to start a discussion: > > >>>> > > >>>> > > >>> > > >> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-844%3A+Transactional+State+Stores > > >>>> > > >>>> Best, > > >>>> Alex > > >>> > > >> > > > > > > > > > > > -- > > [image: Confluent] <https://www.confluent.io> > Suhas Satish > Engineering Manager > Follow us: [image: Blog] > < > https://www.confluent.io/blog?utm_source=footer&utm_medium=email&utm_campaign=ch.email-signature_type.community_content.blog > >[image: > Twitter] <https://twitter.com/ConfluentInc>[image: LinkedIn] > <https://www.linkedin.com/company/confluent/> > > [image: Try Confluent Cloud for Free] > < > https://www.confluent.io/get-started?utm_campaign=tm.fm-apac_cd.inbound&utm_source=gmail&utm_medium=organic > > >