[ 
https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17725470#comment-17725470
 ] 

Feifan Wang commented on FLINK-29913:
-------------------------------------

Thanks for the clarification [~roman] ! 
{quote}Further, regarding the approach of using unique registry key, I agree 
with Congxian Qiu , we can just choose a stable register key generation method 
based on remote file name (such as use md5 digest of remote file name) , which 
can replace of 
IncrementalRemoteKeyedStateHandle#createSharedStateRegistryKeyFromFileName() .
The mapping of local sst file name to StreamStateHandle never changed , so the 
part of RocksDB recovery does not need to be changed.
{quote}
I mean we still use local file name as key of sharedState map in  
_*IncrementalRemoteKeyedStateHandle*_ and use remote file path when generating 
SharedStateRegisterKey. Changes in _*IncrementalRemoteKeyedStateHandle*_ like 
this :
{code:java}
...

private final Map<StateHandleID, StreamStateHandle> sharedState;  // still use 
local file name as key of this map, corresponding to the “never change” I 
mentioned above

...


public void registerSharedStates(SharedStateRegistry stateRegistry, long 
checkpointID) {
...
    for (Map.Entry<StateHandleID, StreamStateHandle> sharedStateHandle :
            sharedState.entrySet()) {

        SharedStateRegistryKey registryKey = 
generateRegisterKey(sharedStateHandle.getValue);  // changed line


        StreamStateHandle reference =
                stateRegistry.registerReference(
                        registryKey, sharedStateHandle.getValue(), 
checkpointID);

        sharedStateHandle.setValue(reference);
    }
}

private static SharedStateRegistryKey generateRegisterKey(StreamStateHandle 
stateHandle) {
    String keyString = null;
    if (stateHandle instanceof FileStateHandle) {
        keyString = ((FileStateHandle) stateHandle).getFilePath().toString();
    } else if (stateHandle instanceof ByteStreamStateHandle) {
        keyString = ((ByteStreamStateHandle) stateHandle).getHandleName();
    } else {
        keyString = Integer.toString(System.identityHashCode(stateHandle));
    }
    return new SharedStateRegistryKey(md5sum(keyString)); // may be other 
digest algorithm
}

{code}

> Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
> -------------------------------------------------------------------------
>
>                 Key: FLINK-29913
>                 URL: https://issues.apache.org/jira/browse/FLINK-29913
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.15.0, 1.16.0, 1.17.0
>            Reporter: Yanfei Lei
>            Assignee: Feifan Wang
>            Priority: Major
>             Fix For: 1.16.2, 1.17.2
>
>
> When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state 
> backend would be discarded by registering the same name handle. See 
> [https://github.com/apache/flink/pull/21050#discussion_r1011061072]
> cc [~roman] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to