[ https://issues.apache.org/jira/browse/KAFKA-12549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Matthias J. Sax reassigned KAFKA-12549: --------------------------------------- Assignee: (was: Alex Sorokoumov) > Allow state stores to opt-in transactional support > -------------------------------------------------- > > Key: KAFKA-12549 > URL: https://issues.apache.org/jira/browse/KAFKA-12549 > Project: Kafka > Issue Type: New Feature > Components: streams > Reporter: Guozhang Wang > Priority: Major > > Right now Kafka Stream's EOS implementation does not make any assumptions > about the state store's transactional support. Allowing the state stores to > optionally provide transactional support can have multiple benefits. E.g., if > we add some APIs into the {{StateStore}} interface, like {{beginTxn}}, > {{commitTxn}} and {{abortTxn}}. Streams library can determine if these are > supported via an additional {{boolean transactional()}} API, and if yes the > these APIs can be used under both ALOS and EOS like the following (otherwise > then just fallback to the normal processing logic): > Within thread processing loops: > 1. store.beginTxn > 2. store.put // during processing > 3. streams commit // either through eos protocol or not > 4. store.commitTxn > 5. start the next txn by store.beginTxn > If the state stores allow Streams to do something like above, we can have the > following benefits: > * Reduce the duplicated records upon crashes for ALOS (note this is not EOS > still, but some middle-ground where uncommitted data within a state store > would not be retained if store.commitTxn failed). > * No need to wipe the state store and re-bootstrap from scratch upon crashes > for EOS. E.g., if a crash-failure happened between streams commit completes > and store.commitTxn. We can instead just roll-forward the transaction by > replaying the changelog from the second recent streams committed offset > towards the most recent committed offset. > * Remote stores that support txn then do not need to support wiping > (https://issues.apache.org/jira/browse/KAFKA-12475). > * We can fix the known issues of emit-on-change > (https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams). > * We can support "query committed data only" for interactive queries (see > below for reasons). > As for the implementation of these APIs, there are several options: > * The state store itself have natural transaction features (e.g. RocksDB). > * Use an in-memory buffer for all puts within a transaction, and upon > `commitTxn` write the whole buffer as a batch to the underlying state store, > or just drop the whole buffer upon aborting. Then for interactive queries, > one can optionally only query the underlying store for committed data only. > * Use a separate store as the transient persistent buffer. Upon `beginTxn` > create a new empty transient store, and upon `commitTxn` merge the store into > the underlying store. Same applies for interactive querying committed-only > data. This has a benefit compared with the one above that there's no memory > pressure even with long transactions, but incurs more complexity / > performance overhead with the separate persistent store. -- This message was sent by Atlassian Jira (v8.20.10#820010)