[ 
https://issues.apache.org/jira/browse/KAFKA-8870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16922705#comment-16922705
 ] 

Vinoth Chandar edited comment on KAFKA-8870 at 9/4/19 5:40 PM:
---------------------------------------------------------------

Initial thoughts from [~mjsax] [~guozhang] & me below 

Approach 1 : 

We create a “staging in-memory buffer” for each state store – each write to the 
staging buffer will also be a write into the store changelog topic. After a 
transaction is committed, we replay/merge all buffered writes into the state 
store to expose them to IQ. During processing, the Processors would also read 
from the “staging buffer” to ensure read-your-own-writes semantics.

Additionally, we re-introduce the local .checkpoint file. On commit, we first 
commit the Kafka transaction, second, merged the staging buffer into the local 
store, and third update the checkpoint file with the changelog end-offsets. 
This algorithm ensures that the checkpoint file only indicates committed data. 
On failure, we only need to re-read the changelog topic starting at the 
checkpoint file offsets to then log-end-offset in read-committed mode. If we 
crash and lose the staging-buffer updates, we will abort the transaction 
anyway. If we fail after the commit but before we merged the staging buffer 
into the local state store, we know that all staging-buffer updates are written 
into the changelog topic already and hence, we use the changelog topic for 
recovery.

 

Approach 2 : 

Implement the buffer using rocksdb transactions 
[https://github.com/facebook/rocksdb/wiki/Transactions] , where we only commit 
rocksdb transaction when kafka commits. If we fail before comitting to rocksdb, 
we restore from Kafka again anyway since that is the source of truth. Approach 
1 generalizes to different state stores better.  

 

Would like to pick up this work unless someone objects

 


was (Author: vc):
Initial thoughts from [~mjsax] [~guozhang] & me below 

Approach 1 : 

We create a “staging in-memory buffer” for each state store – each write to the 
staging buffer will also be a write into the store changelog topic. After a 
transaction is committed, we replay/merge all buffered writes into the state 
store to expose them to IQ. During processing, the Processors would also read 
from the “staging buffer” to ensure read-your-own-writes semantics.

Additionally, we re-introduce the local .checkpoint file. On commit, we first 
commit the Kafka transaction, second, merged the staging buffer into the local 
store, and third update the checkpoint file with the changelog end-offsets. 
This algorithm ensures that the checkpoint file only indicates committed data. 
On failure, we only need to re-read the changelog topic starting at the 
checkpoint file offsets to then log-end-offset in read-committed mode. If we 
crash and lose the staging-buffer updates, we will abort the transaction 
anyway. If we fail after the commit but before we merged the staging buffer 
into the local state store, we know that all staging-buffer updates are written 
into the changelog topic already and hence, we use the changelog topic for 
recovery.

 

Approach 2 : 

Implement the buffer using rocksdb transactions 
[https://github.com/facebook/rocksdb/wiki/Transactions] , where we only commit 
rocksdb transaction when kafka commits. If we fail before comitting to rocksdb, 
we restore from Kafka again anyway since that is the source of truth. Approach 
1 generalizes to different state stores better.  

 

 

> Prevent dirty reads of Streams state store from Interactive queries
> -------------------------------------------------------------------
>
>                 Key: KAFKA-8870
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8870
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Vinoth Chandar
>            Priority: Major
>
> Today, Interactive Queries (IQ) against Streams state store could see 
> uncommitted data, even with EOS processing guarantees (these are actually 
> orthogonal, but clarifying since EOS may give the impression that everything 
> is dandy). This is causes primarily because state updates in rocksdb are 
> visible even before the kafka transaction is committed. Thus, if the instance 
> fails, then the failed over instance will redo the uncommited old transaction 
> and the following could be possible during recovery,.
> Value for key K can go from *V0 → V1 → V2* on active instance A, IQ reads V1, 
> instance A fails and any failure/rebalancing will leave the standy instance B 
> rewinding offsets and reprocessing, during which time IQ can again see V0 or 
> V1 or any number of previous values for the same key.
> In this issue, we will plan work towards providing consistency for IQ, for a 
> single row in a single state store. i.e once a query sees V1, it can only see 
> either V1 or V2.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

Reply via email to