[ 
https://issues.apache.org/jira/browse/KAFKA-12549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-12549:
----------------------------------
    Description: 
Right now Kafka Stream's EOS implementation does not make any assumptions about 
the state store's transactional support. Allowing the state stores to 
optionally provide transactional support can have multiple benefits. E.g., if 
we add some APIs into the {{StateStore}} interface, like {{beginTxn}}, 
{{commitTxn}} and {{abortTxn}}. Streams library can determine if these are 
supported via an additional {{boolean transactional()}} API, and if yes the 
these APIs can be used under both ALOS and EOS like the following (otherwise 
then just fallback to the normal processing logic):

Within thread processing loops:
1. store.beginTxn
2. store.put // during processing
3. streams commit // either through eos protocol or not
4. store.commitTxn
5. start the next txn by store.beginTxn

If the state stores allow Streams to do something like above, we can have the 
following benefits:
* Reduce the duplicated records upon crashes for ALOS (note this is not EOS 
still, but some middle-ground where uncommitted data within a state store would 
not be retained if store.commitTxn failed).
* No need to wipe the state store and re-bootstrap from scratch upon crashes 
for EOS. E.g., if a crash-failure happened between streams commit completes and 
store.commitTxn. We can instead just roll-forward the transaction by replaying 
the changelog from the second recent streams committed offset towards the most 
recent committed offset.
* Remote stores that support txn then do not need to support wiping 
(https://issues.apache.org/jira/browse/KAFKA-12475).
* We can fix the known issues of emit-on-change 
(https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams).
* We can support "query committed data only" for interactive queries (see below 
for reasons).

As for the implementation of these APIs, there are several options:
* The state store itself have natural transaction features (e.g. RocksDB).
* Use an in-memory buffer for all puts within a transaction, and upon 
`commitTxn` write the whole buffer as a batch to the underlying state store, or 
just drop the whole buffer upon aborting. Then for interactive queries, one can 
optionally only query the underlying store for committed data only.
* Use a separate store as the transient persistent buffer. Upon `beginTxn` 
create a new empty transient store, and upon `commitTxn` merge the store into 
the underlying store. Same applies for interactive querying committed-only 
data. This has a benefit compared with the one above that there's no memory 
pressure even with long transactions, but incurs more complexity / performance 
overhead with the separate persistent store.

  was:
Right now Kafka Stream's EOS implementation does not make any assumptions about 
the state store's transactional support. Allowing the state stores to 
optionally provide transactional support can have multiple benefits. E.g., if 
we add some APIs into the {{StateStore}} interface, like {{beginTxn}}, 
{{commitTxn}} and {{abortTxn}}. Streams library can determine if these are 
supported via an additional {{boolean transactional()}} API, and if yes the 
these APIs can be used under both ALOS and EOS like the following (otherwise 
then just fallback to the normal processing logic):

Within thread processing loops:
1. store.beginTxn
2. store.put // during processing
3. streams commit // either through eos protocol or not
4. store.commitTxn
5. start the next txn by store.beginTxn

If the state stores allow Streams to do something like above, we can have the 
following benefits:
* Reduce the duplicated records upon crashes for ALOS (note this is not EOS 
still, but some middle-ground where uncommitted data within a state store would 
not be retained if store.commitTxn failed).
* No need to wipe the state store and re-bootstrap from scratch upon crashes 
for EOS. E.g., if a crash-failure happened between streams commit completes and 
store.commitTxn. We can instead just roll-forward the transaction by replaying 
the changelog from the second recent  streams committed offset towards the most 
recent committed offset.
* Remote stores that support txn then does not need to support wiping 
(https://issues.apache.org/jira/browse/KAFKA-12475).
* We can fix the known issues of emit-on-change 
(https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams).
* We can support "query committed data only" for interactive queries (see below 
for reasons).

As for the implementation of these APIs, there are several options:
* The state store itself have natural transaction features (e.g. RocksDB).
* Use an in-memory buffer for all puts within a transaction, and upon 
`commitTxn` write the whole buffer as a batch to the underlying state store, or 
just drop the whole buffer upon aborting. Then for interactive queries, one can 
optionally only query the underlying store for committed data only.
* Use a separate store as the transient persistent buffer. Upon `beginTxn` 
create a new empty transient store, and upon `commitTxn` merge the store into 
the underlying store. Same applies for interactive querying committed-only 
data. This has a benefit compared with the one above that there's no memory 
pressure even with long transactions, but incurs more complexity / performance 
overhead with the separate persistent store.


> Allow state stores to opt-in transactional support
> --------------------------------------------------
>
>                 Key: KAFKA-12549
>                 URL: https://issues.apache.org/jira/browse/KAFKA-12549
>             Project: Kafka
>          Issue Type: New Feature
>          Components: streams
>            Reporter: Guozhang Wang
>            Priority: Major
>
> Right now Kafka Stream's EOS implementation does not make any assumptions 
> about the state store's transactional support. Allowing the state stores to 
> optionally provide transactional support can have multiple benefits. E.g., if 
> we add some APIs into the {{StateStore}} interface, like {{beginTxn}}, 
> {{commitTxn}} and {{abortTxn}}. Streams library can determine if these are 
> supported via an additional {{boolean transactional()}} API, and if yes the 
> these APIs can be used under both ALOS and EOS like the following (otherwise 
> then just fallback to the normal processing logic):
> Within thread processing loops:
> 1. store.beginTxn
> 2. store.put // during processing
> 3. streams commit // either through eos protocol or not
> 4. store.commitTxn
> 5. start the next txn by store.beginTxn
> If the state stores allow Streams to do something like above, we can have the 
> following benefits:
> * Reduce the duplicated records upon crashes for ALOS (note this is not EOS 
> still, but some middle-ground where uncommitted data within a state store 
> would not be retained if store.commitTxn failed).
> * No need to wipe the state store and re-bootstrap from scratch upon crashes 
> for EOS. E.g., if a crash-failure happened between streams commit completes 
> and store.commitTxn. We can instead just roll-forward the transaction by 
> replaying the changelog from the second recent streams committed offset 
> towards the most recent committed offset.
> * Remote stores that support txn then do not need to support wiping 
> (https://issues.apache.org/jira/browse/KAFKA-12475).
> * We can fix the known issues of emit-on-change 
> (https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams).
> * We can support "query committed data only" for interactive queries (see 
> below for reasons).
> As for the implementation of these APIs, there are several options:
> * The state store itself have natural transaction features (e.g. RocksDB).
> * Use an in-memory buffer for all puts within a transaction, and upon 
> `commitTxn` write the whole buffer as a batch to the underlying state store, 
> or just drop the whole buffer upon aborting. Then for interactive queries, 
> one can optionally only query the underlying store for committed data only.
> * Use a separate store as the transient persistent buffer. Upon `beginTxn` 
> create a new empty transient store, and upon `commitTxn` merge the store into 
> the underlying store. Same applies for interactive querying committed-only 
> data. This has a benefit compared with the one above that there's no memory 
> pressure even with long transactions, but incurs more complexity / 
> performance overhead with the separate persistent store.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to