Thanks for the KIP proposal Alex. 1. Configuration default You mention applications using streams DSL with built-in rocksDB state store will get transactional state stores by default when EOS is enabled, but the default implementation for apps using PAPI will fallback to non-transactional behavior. Shouldn't we have the same default behavior for both types of apps - DSL and PAPI?
On Mon, May 30, 2022 at 2:11 AM Bruno Cadonna <cado...@apache.org> wrote: > Thanks for the PR, Alex! > > I am also glad to see this coming. > > > 1. Configuration > > I would also prefer to restrict the configuration of transactional on > the state sore. Ideally, calling method transactional() on the state > store would be enough. An option on the store builder would make it > possible to turn transactionality on and off (as John proposed). > > > 2. Memory usage in RocksDB > > This seems to be a major issue. We do not have any guarantee that > uncommitted writes fit into memory and I guess we will never have. What > happens when the uncommitted writes do not fit into memory? Does RocksDB > throw an exception? Can we handle such an exception without crashing? > > Does the RocksDB behavior even need to be included in this KIP? In the > end it is an implementation detail. > > What we should consider - though - is a memory limit in some form. And > what we do when the memory limit is exceeded. > > > 3. PoC > > I agree with Guozhang that a PoC is a good idea to better understand the > devils in the details. > > > Best, > Bruno > > On 25.05.22 01:52, Guozhang Wang wrote: > > Hello Alex, > > > > Thanks for writing the proposal! Glad to see it coming. I think this is > the > > kind of a KIP that since too many devils would be buried in the details > and > > it's better to start working on a POC, either in parallel, or before we > > resume our discussion, rather than blocking any implementation until we > are > > satisfied with the proposal. > > > > Just as a concrete example, I personally am still not 100% clear how the > > proposal would work to achieve EOS with the state stores. For example, > the > > commit procedure today looks like this: > > > > 0: there's an existing checkpoint file indicating the changelog offset of > > the local state store image is 100. Now a commit is triggered: > > 1. flush cache (since it contains partially processed records), make sure > > all records are written to the producer. > > 2. flush producer, making sure all changelog records have now acked. // > > here we would get the new changelog position, say 200 > > 3. flush store, make sure all writes are persisted. > > 4. producer.sendOffsetsToTransaction(); producer.commitTransaction(); // > we > > would make the writes in changelog up to offset 200 committed > > 5. Update the checkpoint file to 200. > > > > The question about atomicity between those lines, for example: > > > > * If we crash between line 4 and line 5, the local checkpoint file would > > stay as 100, and upon recovery we would replay the changelog from 100 to > > 200. This is not ideal but does not violate EOS, since the changelogs are > > all overwrites anyways. > > * If we crash between line 3 and 4, then at that time the local > persistent > > store image is representing as of offset 200, but upon recovery all > > changelog records from 100 to log-end-offset would be considered as > aborted > > and not be replayed and we would restart processing from position 100. > > Restart processing will violate EOS.I'm not sure how e.g. RocksDB's > > WriteBatchWithIndex would make sure that the step 4 and step 5 could be > > done atomically here. > > > > Originally what I was thinking when creating the JIRA ticket is that we > > need to let the state store to provide a transactional API like "token > > commit()" used in step 4) above which returns a token, that e.g. in our > > example above indicates offset 200, and that token would be written as > part > > of the records in Kafka transaction in step 5). And upon recovery the > state > > store would have another API like "rollback(token)" where the token is > read > > from the latest committed txn, and be used to rollback the store to that > > committed image. I think your proposal is different, and it seems like > > you're proposing we swap step 3) and 4) above, but the atomicity issue > > still remains since now you may have the store image at 100 but the > > changelog is committed at 200. I'd like to learn more about the details > > on how it resolves such issues. > > > > Anyways, that's just an example to make the point that there are lots of > > implementational details which would drive the public API design, and we > > should probably first do a POC, and come back to discuss the KIP. Let me > > know what you think? > > > > > > Guozhang > > > > > > > > > > > > > > > > > > > > On Tue, May 24, 2022 at 10:35 AM Sagar <sagarmeansoc...@gmail.com> > wrote: > > > >> Hi Alexander, > >> > >> Thanks for the KIP! This seems like a great proposal. I have the same > >> opinion as John on the Configuration part though. I think the 2 level > >> config and its behaviour based on the setting/unsetting of the flag > seems > >> confusing to me as well. Since the KIP seems specifically centred around > >> RocksDB it might be better to add it at the Supplier level as John > >> suggested. > >> > >> On similar lines, this config name => > *statestore.transactional.mechanism > >> *may > >> also need rethinking as the value assigned to it(rocksdb_indexbatch) > >> implicitly seems to assume that rocksdb is the only statestore that > Kafka > >> Stream supports while that's not the case. > >> > >> Also, regarding the potential memory pressure that can be introduced by > >> WriteBatchIndex, do you think it might make more sense to include some > >> numbers/benchmarks on how much the memory consumption might increase? > >> > >> Lastly, the read_uncommitted flag's behaviour on IQ may need more > >> elaboration. > >> > >> These points aside, as I said, this is a great proposal! > >> > >> Thanks! > >> Sagar. > >> > >> On Tue, May 24, 2022 at 10:35 PM John Roesler <vvcep...@apache.org> > wrote: > >> > >>> Thanks for the KIP, Alex! > >>> > >>> I'm really happy to see your proposal. This improvement fills a > >>> long-standing gap. > >>> > >>> I have a few questions: > >>> > >>> 1. Configuration > >>> The KIP only mentions RocksDB, but of course, Streams also ships with > an > >>> InMemory store, and users also plug in their own custom state stores. > It > >> is > >>> also common to use multiple types of state stores in the same > application > >>> for different purposes. > >>> > >>> Against this backdrop, the choice to configure transactionality as a > >>> top-level config, as well as to configure the store transaction > mechanism > >>> as a top-level config, seems a bit off. > >>> > >>> Did you consider instead just adding the option to the > >>> RocksDB*StoreSupplier classes and the factories in Stores ? It seems > like > >>> the desire to enable the feature by default, but with a feature-flag to > >>> disable it was a factor here. However, as you pointed out, there are > some > >>> major considerations that users should be aware of, so opt-in doesn't > >> seem > >>> like a bad choice, either. You could add an Enum argument to those > >>> factories like `RocksDBTransactionalMechanism.{NONE, > >>> > >>> Some points in favor of this approach: > >>> * Avoid "stores that don't support transactions ignore the config" > >>> complexity > >>> * Users can choose how to spend their memory budget, making some stores > >>> transactional and others not > >>> * When we add transactional support to in-memory stores, we don't have > to > >>> figure out what to do with the mechanism config (i.e., what do you set > >> the > >>> mechanism to when there are multiple kinds of transactional stores in > the > >>> topology?) > >>> > >>> 2. caching/flushing/transactions > >>> The coupling between memory usage and flushing that you mentioned is a > >> bit > >>> troubling. It also occurs to me that there seems to be some > relationship > >>> with the existing record cache, which is also an in-memory holding area > >> for > >>> records that are not yet written to the cache and/or store (albeit with > >> no > >>> particular semantics). Have you considered how all these components > >> should > >>> relate? For example, should a "full" WriteBatch actually trigger a > flush > >> so > >>> that we don't get OOMEs? If the proposed transactional mechanism forces > >> all > >>> uncommitted writes to be buffered in memory, until a commit, then what > is > >>> the advantage over just doing the same thing with the RecordCache and > not > >>> introducing the WriteBatch at all? > >>> > >>> 3. ALOS > >>> You mentioned that a transactional store can help reduce duplication in > >>> the case of ALOS. We might want to be careful about claims like that. > >>> Duplication isn't the way that repeated processing manifests in state > >>> stores. Rather, it is in the form of dirty reads during reprocessing. > >> This > >>> feature may reduce the incidence of dirty reads during reprocessing, > but > >>> not in a predictable way. During regular processing today, we will send > >>> some records through to the changelog in between commit intervals. > Under > >>> ALOS, if any of those dirty writes gets committed to the changelog > topic, > >>> then upon failure, we have to roll the store forward to them anyway, > >>> regardless of this new transactional mechanism. That's a fixable > problem, > >>> by the way, but this KIP doesn't seem to fix it. I wonder if we should > >> make > >>> any claims about the relationship of this feature to ALOS if the > >> real-world > >>> behavior is so complex. > >>> > >>> 4. IQ > >>> As a reminder, we have a new IQv2 mechanism now. Should we propose any > >>> changes to IQv1 to support this transactional mechanism, versus just > >>> proposing it for IQv2? Certainly, it seems strange only to propose a > >> change > >>> for IQv1 and not v2. > >>> > >>> Regarding your proposal for IQv1, I'm unsure what the behavior should > be > >>> for readCommitted, since the current behavior also reads out of the > >>> RecordCache. I guess if readCommitted==false, then we will continue to > >> read > >>> from the cache first, then the Batch, then the store; and if > >>> readCommitted==true, we would skip the cache and the Batch and only > read > >>> from the persistent RocksDB store? > >>> > >>> What should IQ do if I request to readCommitted on a non-transactional > >>> store? > >>> > >>> Thanks again for proposing the KIP, and my apologies for the long > reply; > >>> I'm hoping to air all my concerns in one "batch" to save time for you. > >>> > >>> Thanks, > >>> -John > >>> > >>> On Tue, May 24, 2022, at 03:45, Alexander Sorokoumov wrote: > >>>> Hi all, > >>>> > >>>> I've written a KIP for making Kafka Streams state stores transactional > >>> and > >>>> would like to start a discussion: > >>>> > >>>> > >>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-844%3A+Transactional+State+Stores > >>>> > >>>> Best, > >>>> Alex > >>> > >> > > > > > -- [image: Confluent] <https://www.confluent.io> Suhas Satish Engineering Manager Follow us: [image: Blog] <https://www.confluent.io/blog?utm_source=footer&utm_medium=email&utm_campaign=ch.email-signature_type.community_content.blog>[image: Twitter] <https://twitter.com/ConfluentInc>[image: LinkedIn] <https://www.linkedin.com/company/confluent/> [image: Try Confluent Cloud for Free] <https://www.confluent.io/get-started?utm_campaign=tm.fm-apac_cd.inbound&utm_source=gmail&utm_medium=organic>