[
https://issues.apache.org/jira/browse/KAFKA-12549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Matthias J. Sax resolved KAFKA-12549.
-------------------------------------
Resolution: Duplicate
Closing this ticket in favor of K14412.
> Allow state stores to opt-in transactional support
> --------------------------------------------------
>
> Key: KAFKA-12549
> URL: https://issues.apache.org/jira/browse/KAFKA-12549
> Project: Kafka
> Issue Type: New Feature
> Components: streams
> Reporter: Guozhang Wang
> Priority: Major
>
> Right now Kafka Stream's EOS implementation does not make any assumptions
> about the state store's transactional support. Allowing the state stores to
> optionally provide transactional support can have multiple benefits. E.g., if
> we add some APIs into the {{StateStore}} interface, like {{beginTxn}},
> {{commitTxn}} and {{abortTxn}}. Streams library can determine if these are
> supported via an additional {{boolean transactional()}} API, and if yes the
> these APIs can be used under both ALOS and EOS like the following (otherwise
> then just fallback to the normal processing logic):
> Within thread processing loops:
> 1. store.beginTxn
> 2. store.put // during processing
> 3. streams commit // either through eos protocol or not
> 4. store.commitTxn
> 5. start the next txn by store.beginTxn
> If the state stores allow Streams to do something like above, we can have the
> following benefits:
> * Reduce the duplicated records upon crashes for ALOS (note this is not EOS
> still, but some middle-ground where uncommitted data within a state store
> would not be retained if store.commitTxn failed).
> * No need to wipe the state store and re-bootstrap from scratch upon crashes
> for EOS. E.g., if a crash-failure happened between streams commit completes
> and store.commitTxn. We can instead just roll-forward the transaction by
> replaying the changelog from the second recent streams committed offset
> towards the most recent committed offset.
> * Remote stores that support txn then do not need to support wiping
> (https://issues.apache.org/jira/browse/KAFKA-12475).
> * We can fix the known issues of emit-on-change
> (https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams).
> * We can support "query committed data only" for interactive queries (see
> below for reasons).
> As for the implementation of these APIs, there are several options:
> * The state store itself have natural transaction features (e.g. RocksDB).
> * Use an in-memory buffer for all puts within a transaction, and upon
> `commitTxn` write the whole buffer as a batch to the underlying state store,
> or just drop the whole buffer upon aborting. Then for interactive queries,
> one can optionally only query the underlying store for committed data only.
> * Use a separate store as the transient persistent buffer. Upon `beginTxn`
> create a new empty transient store, and upon `commitTxn` merge the store into
> the underlying store. Same applies for interactive querying committed-only
> data. This has a benefit compared with the one above that there's no memory
> pressure even with long transactions, but incurs more complexity /
> performance overhead with the separate persistent store.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)