[ 
https://issues.apache.org/jira/browse/KAFKA-3522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15355948#comment-15355948
 ] 

Guozhang Wang commented on KAFKA-3522:
--------------------------------------

@migno @imandhan [~jkreps] Here is my proposal:

1. currently we keep an offset file under the task-id directory, i.e. 
{state.dir}/{application.id}/{task.id}, which is used both as the last 
committed offset of the state store, and as an indicator that the task was 
cleanly shutdown. More details:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-28+-+Add+a+processor+client#KIP-28-Addaprocessorclient-LocalStateManagement

we can piggy-back the state store version information (for now we only have 
RocksDB, but may add more in the future) in this offset file, since we only 
upgrade to new RockDB storage format when we upgrade Kafka Streams library 
version.

2. Upon restarting, when the stored version is referring to an old storage 
version, delete the state store files and follow the restoration process as if 
the task was unclean shutdown before.

In this case, the standby replica's store version may be different from the 
task's active replica, but I think this is OK, and when the standby replica 
upgrades, it also follow step 2) above to restore from beginning as well.

Ishita, as I think about it I realize it is actually related to the 
exactly-once semantics that we are currently working on. So could you hold on 
this JIRA until the complete design of exactly-once is out?

> Consider adding version information into rocksDB storage format
> ---------------------------------------------------------------
>
>                 Key: KAFKA-3522
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3522
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Guozhang Wang
>            Assignee: Ishita Mandhan
>              Labels: architecture
>             Fix For: 0.10.1.0
>
>
> Kafka Streams does not introduce any modifications to the data format in the 
> underlying Kafka protocol, but it does use RocksDB for persistent state 
> storage, and currently its data format is fixed and hard-coded. We want to 
> consider the evolution path in the future we we change the data format, and 
> hence having some version info stored along with the storage file / directory 
> would be useful.
> And this information could be even out of the storage file; for example, we 
> can just use a small "version indicator" file in the rocksdb directory for 
> this purposes. Thoughts? [~enothereska] [~jkreps]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to