[jira] [Commented] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
[ https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751329#comment-17751329 ] Roman Khachatryan commented on FLINK-29913: --- javadoc updated in c7d3e36053091b7d7b10e89c5f9dba3df46ec0bd (master) f2d3ca0bf4d49621450c7e71c1bd7c9de2deb89b (1.16) e213267ebf691f1e959be6f5154a790c2ec6a2e9 (1.17) > Shared state would be discarded by mistake when maxConcurrentCheckpoint>1 > - > > Key: FLINK-29913 > URL: https://issues.apache.org/jira/browse/FLINK-29913 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.15.0, 1.16.0, 1.17.0 >Reporter: Yanfei Lei >Assignee: Feifan Wang >Priority: Major > Labels: pull-request-available > Fix For: 1.18.0, 1.16.3, 1.17.2 > > > When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state > backend would be discarded by registering the same name handle. See > [https://github.com/apache/flink/pull/21050#discussion_r1011061072] > cc [~roman] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
[ https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751308#comment-17751308 ] Roman Khachatryan commented on FLINK-29913: --- Thanks a lot [~Feifan Wang], happy to hear that :) I've merged backports to 1.16 as aad658910fd6967199bce69f33872d245315bca1..c6c54412bf70af4432071dfdf131b62ff8236fe7 and to 1.17 as 65e7004dd32633f9dfc87b0808ffcb587daf525c..f3d212956660a77d58ab99f4de6611760e5f0b9a. I've taken a look at javadoc update, thanks for noticing this (y) > Shared state would be discarded by mistake when maxConcurrentCheckpoint>1 > - > > Key: FLINK-29913 > URL: https://issues.apache.org/jira/browse/FLINK-29913 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.15.0, 1.16.0, 1.17.0 >Reporter: Yanfei Lei >Assignee: Feifan Wang >Priority: Major > Labels: pull-request-available > Fix For: 1.18.0, 1.16.3, 1.17.2 > > > When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state > backend would be discarded by registering the same name handle. See > [https://github.com/apache/flink/pull/21050#discussion_r1011061072] > cc [~roman] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
[ https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751304#comment-17751304 ] Feifan Wang commented on FLINK-29913: - Hi [~roman] , In addition backport pr of [1.16|https://github.com/apache/flink/pull/23137] and [1.17|https://github.com/apache/flink/pull/23139], I also submitted [a PR to fix outdated java doc in SharedStateRegistry|https://github.com/apache/flink/pull/23147], PTAL. > Shared state would be discarded by mistake when maxConcurrentCheckpoint>1 > - > > Key: FLINK-29913 > URL: https://issues.apache.org/jira/browse/FLINK-29913 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.15.0, 1.16.0, 1.17.0 >Reporter: Yanfei Lei >Assignee: Feifan Wang >Priority: Major > Labels: pull-request-available > Fix For: 1.18.0, 1.16.3, 1.17.2 > > > When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state > backend would be discarded by registering the same name handle. See > [https://github.com/apache/flink/pull/21050#discussion_r1011061072] > cc [~roman] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
[ https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1775#comment-1775 ] Feifan Wang commented on FLINK-29913: - [~roman] , I submitted two PRs for [1.16|https://github.com/apache/flink/pull/23137] and [1.17|https://github.com/apache/flink/pull/23139], please take a look. > Shared state would be discarded by mistake when maxConcurrentCheckpoint>1 > - > > Key: FLINK-29913 > URL: https://issues.apache.org/jira/browse/FLINK-29913 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.15.0, 1.16.0, 1.17.0 >Reporter: Yanfei Lei >Assignee: Feifan Wang >Priority: Major > Labels: pull-request-available > Fix For: 1.18.0, 1.16.3, 1.17.2 > > > When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state > backend would be discarded by registering the same name handle. See > [https://github.com/apache/flink/pull/21050#discussion_r1011061072] > cc [~roman] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
[ https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17750647#comment-17750647 ] Feifan Wang commented on FLINK-29913: - Thanks to [~roman] for helping to review the PR, I learned a lot from it. {quote}do you mind creating backport PRs for branches 1.17 and 1.16? {quote} I am glad to. > Shared state would be discarded by mistake when maxConcurrentCheckpoint>1 > - > > Key: FLINK-29913 > URL: https://issues.apache.org/jira/browse/FLINK-29913 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.15.0, 1.16.0, 1.17.0 >Reporter: Yanfei Lei >Assignee: Feifan Wang >Priority: Major > Labels: pull-request-available > Fix For: 1.18.0, 1.16.3, 1.17.2 > > > When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state > backend would be discarded by registering the same name handle. See > [https://github.com/apache/flink/pull/21050#discussion_r1011061072] > cc [~roman] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
[ https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17750636#comment-17750636 ] Roman Khachatryan commented on FLINK-29913: --- Merged into master as 6dcb10abf319b9e5494a82bee71f1ae1a4e4b211 ... 85f32d6bcb31708c9c2c845ea03ef117726b7c1a. [~Feifan Wang] do you mind creating backport PRs for branches 1.17 and 1.16? > Shared state would be discarded by mistake when maxConcurrentCheckpoint>1 > - > > Key: FLINK-29913 > URL: https://issues.apache.org/jira/browse/FLINK-29913 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.15.0, 1.16.0, 1.17.0 >Reporter: Yanfei Lei >Assignee: Feifan Wang >Priority: Major > Labels: pull-request-available > Fix For: 1.18.0, 1.16.3, 1.17.2 > > > When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state > backend would be discarded by registering the same name handle. See > [https://github.com/apache/flink/pull/21050#discussion_r1011061072] > cc [~roman] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
[ https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17726933#comment-17726933 ] Feifan Wang commented on FLINK-29913: - Thanks for your information [~klion26] , I would like to add a case that I recently thought of a possible problem: Since 1.17, user can trigger checkpoint with assigned checkpoint type (CONFIGURED,FULL,INCREMENTAL). If user trigger a FULL checkpoint by rest api, this forces re-upload of snapshot files that have already been uploaded. Using remote file path based register key is still valid in this case. I have submitted a PR, please help to review it [~roman] ,[~klion26] . > Shared state would be discarded by mistake when maxConcurrentCheckpoint>1 > - > > Key: FLINK-29913 > URL: https://issues.apache.org/jira/browse/FLINK-29913 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.15.0, 1.16.0, 1.17.0 >Reporter: Yanfei Lei >Assignee: Feifan Wang >Priority: Major > Labels: pull-request-available > Fix For: 1.16.3, 1.17.2 > > > When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state > backend would be discarded by registering the same name handle. See > [https://github.com/apache/flink/pull/21050#discussion_r1011061072] > cc [~roman] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
[ https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17726084#comment-17726084 ] Congxian Qiu commented on FLINK-29913: -- thanks for the discuss above and contribution! Using the UUID/filename as the key solves the problem here, and it also makes sense because the key and the remote file are one-to-one. In addition, it can also solve some other potential problems, for example, if the Flink job management platform uses the SharedRegistry here to maintain the checkpoints lifecycle, if a task has two ssts with the same name, it will now cause the file to be deleted by mistake (this situation occurs as follows: job A generates a checkpoint chk1, then stops, job B job B resumes from chk1, completes chk2, then stops, then job C resumes from chk1, completes chk3, after we register chk2 and chk3 in one SharedRegistry, we'll delete some remote files by mistake, because there will be some sst files in chk2 and chk3 with the same name) > Shared state would be discarded by mistake when maxConcurrentCheckpoint>1 > - > > Key: FLINK-29913 > URL: https://issues.apache.org/jira/browse/FLINK-29913 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.15.0, 1.16.0, 1.17.0 >Reporter: Yanfei Lei >Assignee: Feifan Wang >Priority: Major > Fix For: 1.16.3, 1.17.2 > > > When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state > backend would be discarded by registering the same name handle. See > [https://github.com/apache/flink/pull/21050#discussion_r1011061072] > cc [~roman] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
[ https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17725693#comment-17725693 ] Roman Khachatryan commented on FLINK-29913: --- Great! Thanks [~Feifan Wang] > Shared state would be discarded by mistake when maxConcurrentCheckpoint>1 > - > > Key: FLINK-29913 > URL: https://issues.apache.org/jira/browse/FLINK-29913 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.15.0, 1.16.0, 1.17.0 >Reporter: Yanfei Lei >Assignee: Feifan Wang >Priority: Major > Fix For: 1.16.2, 1.17.2 > > > When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state > backend would be discarded by registering the same name handle. See > [https://github.com/apache/flink/pull/21050#discussion_r1011061072] > cc [~roman] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
[ https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17725624#comment-17725624 ] Feifan Wang commented on FLINK-29913: - Thanks [~roman] , I will prepare a pr. > Shared state would be discarded by mistake when maxConcurrentCheckpoint>1 > - > > Key: FLINK-29913 > URL: https://issues.apache.org/jira/browse/FLINK-29913 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.15.0, 1.16.0, 1.17.0 >Reporter: Yanfei Lei >Assignee: Feifan Wang >Priority: Major > Fix For: 1.16.2, 1.17.2 > > > When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state > backend would be discarded by registering the same name handle. See > [https://github.com/apache/flink/pull/21050#discussion_r1011061072] > cc [~roman] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
[ https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17725569#comment-17725569 ] Roman Khachatryan commented on FLINK-29913: --- Got it, thanks for clarifying. However, I think that using "Integer.toString(System.identityHashCode(stateHandle))" can easily lead to collisions, thereby causing checkpoint corruption. But this seems to me an (important) implementation detail, that can be discussed in the PR. WDYT? > Shared state would be discarded by mistake when maxConcurrentCheckpoint>1 > - > > Key: FLINK-29913 > URL: https://issues.apache.org/jira/browse/FLINK-29913 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.15.0, 1.16.0, 1.17.0 >Reporter: Yanfei Lei >Assignee: Feifan Wang >Priority: Major > Fix For: 1.16.2, 1.17.2 > > > When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state > backend would be discarded by registering the same name handle. See > [https://github.com/apache/flink/pull/21050#discussion_r1011061072] > cc [~roman] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
[ https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17725470#comment-17725470 ] Feifan Wang commented on FLINK-29913: - Thanks for the clarification [~roman] ! {quote}Further, regarding the approach of using unique registry key, I agree with Congxian Qiu , we can just choose a stable register key generation method based on remote file name (such as use md5 digest of remote file name) , which can replace of IncrementalRemoteKeyedStateHandle#createSharedStateRegistryKeyFromFileName() . The mapping of local sst file name to StreamStateHandle never changed , so the part of RocksDB recovery does not need to be changed. {quote} I mean we still use local file name as key of sharedState map in _*IncrementalRemoteKeyedStateHandle*_ and use remote file path when generating SharedStateRegisterKey. Changes in _*IncrementalRemoteKeyedStateHandle*_ like this : {code:java} ... private final Map sharedState; // still use local file name as key of this map, corresponding to the “never change” I mentioned above ... public void registerSharedStates(SharedStateRegistry stateRegistry, long checkpointID) { ... for (Map.Entry sharedStateHandle : sharedState.entrySet()) { SharedStateRegistryKey registryKey = generateRegisterKey(sharedStateHandle.getValue); // changed line StreamStateHandle reference = stateRegistry.registerReference( registryKey, sharedStateHandle.getValue(), checkpointID); sharedStateHandle.setValue(reference); } } private static SharedStateRegistryKey generateRegisterKey(StreamStateHandle stateHandle) { String keyString = null; if (stateHandle instanceof FileStateHandle) { keyString = ((FileStateHandle) stateHandle).getFilePath().toString(); } else if (stateHandle instanceof ByteStreamStateHandle) { keyString = ((ByteStreamStateHandle) stateHandle).getHandleName(); } else { keyString = Integer.toString(System.identityHashCode(stateHandle)); } return new SharedStateRegistryKey(md5sum(keyString)); // may be other digest algorithm } {code} > Shared state would be discarded by mistake when maxConcurrentCheckpoint>1 > - > > Key: FLINK-29913 > URL: https://issues.apache.org/jira/browse/FLINK-29913 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.15.0, 1.16.0, 1.17.0 >Reporter: Yanfei Lei >Assignee: Feifan Wang >Priority: Major > Fix For: 1.16.2, 1.17.2 > > > When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state > backend would be discarded by registering the same name handle. See > [https://github.com/apache/flink/pull/21050#discussion_r1011061072] > cc [~roman] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
[ https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17725418#comment-17725418 ] Roman Khachatryan commented on FLINK-29913: --- {quote}On a 64-bit system, about 7.63MB more memory will be used for every one million entries Is there any other runtime overhead I missed ? {quote} I was more concerned about the additional time required to traverse the (mostly single-element) lists. When a checkpoint is subsumed, *all* entries need to be scanned. Adding pointer dereference(s) might break any optimizations that JVM and CPU would otherwise employ. {quote}As for the complexity, this approach will indeed increase the operation of the linked list in the registerReference() method and unregisterUnusedState() method. But given that this is easy to implement, and the implementation is cohesive, I think the complexity is acceptable. {quote} In my view, simplicity in this is part is worth the efforts. The problem that SharedStateRegistry solves is already tricky, and we shouldn't complicate it further (higher complexity potentially leads to more bugs and more maintanance efforts). {quote}Further, regarding the approach of using unique registry key, I agree with Congxian Qiu , we can just choose a stable register key generation method based on remote file name (such as use md5 digest of remote file name) , which can replace of IncrementalRemoteKeyedStateHandle#createSharedStateRegistryKeyFromFileName() . The mapping of local sst file name to StreamStateHandle never changed , so the part of RocksDB recovery does not need to be changed. {quote} I don't fully understand what does "never changed" means here. [Here|https://github.com/apache/flink/blob/fbf7b91424ec626ae56dd2477347a7759db6d5fe/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java#L105], the ID is used to create local path. If we change ID to remote path, local will change too. Per my understanding, we can not change the local path without updating metadata files. Or am I missing something? {quote}Whichever approach will be chosen, I am happy to implement it. Can you assign this ticket to me Roman Khachatryan ? looking forward to hearing from you. {quote} Sure, thanks for volounteering! > Shared state would be discarded by mistake when maxConcurrentCheckpoint>1 > - > > Key: FLINK-29913 > URL: https://issues.apache.org/jira/browse/FLINK-29913 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.15.0, 1.16.0 >Reporter: Yanfei Lei >Priority: Minor > > When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state > backend would be discarded by registering the same name handle. See > [https://github.com/apache/flink/pull/21050#discussion_r1011061072] > cc [~roman] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
[ https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17725387#comment-17725387 ] Feifan Wang commented on FLINK-29913: - One overhead I can see is that it will use more memory for storing the next pointer. On a 64-bit system, about 7.63MB more memory will be used for every one million entries, I think it is acceptable. Is there any other runtime overhead I missed ? As for the complexity, this approach will indeed increase the operation of the linked list in the _registerReference()_ method and _unregisterUnusedState()_ method. But given that this is easy to implement, and the implementation is cohesive, I think the complexity is acceptable. Just to clarify, I think using a unique ID is also a valid approach, but I want learn how you do the selection. Further, regarding the approach of using unique registry key, I agree with [~klion26] , we can just choose a stable register key generation method based on remote file name (such as use md5 digest of remote file name) , which can replace of IncrementalRemoteKeyedStateHandle#createSharedStateRegistryKeyFromFileName() . The mapping of local sst file name to StreamStateHandle never changed , so the part of RocksDB recovery does not need to be changed. Whichever approach will be chosen, I am happy to implement it. Can you assign this ticket to me [~roman] ? looking forward to hearing from you. > Shared state would be discarded by mistake when maxConcurrentCheckpoint>1 > - > > Key: FLINK-29913 > URL: https://issues.apache.org/jira/browse/FLINK-29913 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.15.0, 1.16.0 >Reporter: Yanfei Lei >Priority: Minor > > When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state > backend would be discarded by registering the same name handle. See > [https://github.com/apache/flink/pull/21050#discussion_r1011061072] > cc [~roman] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
[ https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17725336#comment-17725336 ] Roman Khachatryan commented on FLINK-29913: --- Thanks for the proposal [~Feifan Wang]. I think it's easier to implement, compared to always unique state IDs. OTH, it complicates already complex part of the system; and has some runtime overhead. So I'd rather go with unique state IDs. WDYT? As for the priority, I'd change it to Major if there are no objections. > Shared state would be discarded by mistake when maxConcurrentCheckpoint>1 > - > > Key: FLINK-29913 > URL: https://issues.apache.org/jira/browse/FLINK-29913 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.15.0, 1.16.0 >Reporter: Yanfei Lei >Priority: Minor > > When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state > backend would be discarded by registering the same name handle. See > [https://github.com/apache/flink/pull/21050#discussion_r1011061072] > cc [~roman] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
[ https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17725286#comment-17725286 ] Feifan Wang commented on FLINK-29913: - Sorry I didn't notice this ticket earlier so I submitted a duplicate one. [~Yanfei Lei] for the priority, I agree with [~klion26] , since it may break the checkpoint in valid use case, the priority at least be *Major* . [~roman] I think your proposal is valid, but I still want to provide an alternative : # Make SharedStateRegistry allow register multi state object to same key. The generation strategy for SharedStateRegistryKey is still determined by CompositeStateHandle , as it is now. Different state objects may be registered under the same key in force-full-checkpoint (active re-upload). # SharedStateRegistry maintains an linked entry list sorted by registration time for each key. PlaceHolderStreamStateHandle will be replace by the last one in the entry list. In this approach, SharedStateRegistry don't discard any state object in registration process. SharedStateRegistry only deleted state object when its lastUsedCheckpoint is subsumed. > Shared state would be discarded by mistake when maxConcurrentCheckpoint>1 > - > > Key: FLINK-29913 > URL: https://issues.apache.org/jira/browse/FLINK-29913 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.15.0, 1.16.0 >Reporter: Yanfei Lei >Priority: Minor > > When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state > backend would be discarded by registering the same name handle. See > [https://github.com/apache/flink/pull/21050#discussion_r1011061072] > cc [~roman] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
[ https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17633672#comment-17633672 ] Roman Khachatryan commented on FLINK-29913: --- [~klion26] , we could use remote filename as well, but we still have to store the mapping from remote to local to initialize RocksDB on recovery. UUID are slightly better IMO because they are usually shorter and therefore consume less space in the final _metadata file and as keys in SharedStateRegistry. [~Yanfei Lei] , good question. In case of re-upload, the old state should be discarded eventually when the checkpoint it was created for is subsumed. > Shared state would be discarded by mistake when maxConcurrentCheckpoint>1 > - > > Key: FLINK-29913 > URL: https://issues.apache.org/jira/browse/FLINK-29913 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.15.0, 1.16.0 >Reporter: Yanfei Lei >Priority: Minor > > When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state > backend would be discarded by registering the same name handle. See > [https://github.com/apache/flink/pull/21050#discussion_r1011061072] > cc [~roman] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
[ https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17633657#comment-17633657 ] Yanfei Lei commented on FLINK-29913: [~roman] Great proposal! I think that maintaining the mapping of is necessary, because RocksDB needs the file name(xxx.sst) to rebuild its instance. And I have a question about "always generate unique state handle IDs": if the confirmation notification was missing, TM re-upload some state handles, how do we delete those duplicate state handles? > Shared state would be discarded by mistake when maxConcurrentCheckpoint>1 > - > > Key: FLINK-29913 > URL: https://issues.apache.org/jira/browse/FLINK-29913 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.15.0, 1.16.0 >Reporter: Yanfei Lei >Priority: Minor > > When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state > backend would be discarded by registering the same name handle. See > [https://github.com/apache/flink/pull/21050#discussion_r1011061072] > cc [~roman] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
[ https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17633506#comment-17633506 ] Congxian Qiu commented on FLINK-29913: -- sorry for the late reply. [~Yanfei Lei] for the priority, IMHO, if the user set \{{ maxConcurrenctCheckpoint > 1 && MAX_RETAINED_CHECKPOINTS > 1 }} , then the checkpoints may be broken, and can't restore from the checkpoint because of the {{{}FileNotFoundException{}}}, so I think it deserves to escalate the priority. [~roman] your proposal seems valid from my perspective, maybe changing the logic for {{generating the registry key(perhaps using the filename in the remote filesystem)is enough to solve the problem here?}} please let me what do you think about this, thanks. > Shared state would be discarded by mistake when maxConcurrentCheckpoint>1 > - > > Key: FLINK-29913 > URL: https://issues.apache.org/jira/browse/FLINK-29913 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.15.0, 1.16.0 >Reporter: Yanfei Lei >Priority: Minor > > When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state > backend would be discarded by registering the same name handle. See > [https://github.com/apache/flink/pull/21050#discussion_r1011061072] > cc [~roman] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
[ https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17631234#comment-17631234 ] Roman Khachatryan commented on FLINK-29913: --- Thanks a lot for noticing and reporting this issue [~Yanfei Lei] and [~klion26]! I think there is a (conceptually) simple solution: always generate unique state handle IDs. (It was already discussed offline before as it could solve some similar problems like detecting duplicates in SharedStateRegistry). The ID doesn't have to have any semantic meaning. RocksDB happens to [store the local path in ID|https://github.com/apache/flink/blob/421f057a7488fd64854a82424755f76b89561a0b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java#L397] and then uses it [on recovery|https://github.com/apache/flink/blob/421f057a7488fd64854a82424755f76b89561a0b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java#L105]; but it can be a separate field in the handle as well. I don't see any issues with this approach, including recovery, rescaling, and JM failover cases. What do you think? Maybe there are some alternatives? > Shared state would be discarded by mistake when maxConcurrentCheckpoint>1 > - > > Key: FLINK-29913 > URL: https://issues.apache.org/jira/browse/FLINK-29913 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.15.0, 1.16.0 >Reporter: Yanfei Lei >Priority: Minor > > When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state > backend would be discarded by registering the same name handle. See > [https://github.com/apache/flink/pull/21050#discussion_r1011061072] > cc [~roman] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
[ https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17629771#comment-17629771 ] Yanfei Lei commented on FLINK-29913: [~klion26] Since maxConcurrentCheckpoint>1 && MAX_CONCURRENT_CHECKPOINTS>1 are less frequently used, I set the priority to Minor. Under the current checkpoint registration and deletion mechanism, I think this bug will take a while to fix, I'd like to fix this problem after discussion. > Shared state would be discarded by mistake when maxConcurrentCheckpoint>1 > - > > Key: FLINK-29913 > URL: https://issues.apache.org/jira/browse/FLINK-29913 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.15.0, 1.16.0 >Reporter: Yanfei Lei >Priority: Minor > > When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state > backend would be discarded by registering the same name handle. See > [https://github.com/apache/flink/pull/21050#discussion_r1011061072] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
[ https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17629759#comment-17629759 ] Congxian Qiu commented on FLINK-29913: -- [~Yanfei Lei] thanks for creating this ticket and the IT Case, would you like to contribute a fix for this problem? For the priority, As this may lead to {{FileNotFoundExecption if set {{maxConcurrencthCheckpoint > 1, I think it at least needs to be Critical, what do you think about this? > Shared state would be discarded by mistake when maxConcurrentCheckpoint>1 > - > > Key: FLINK-29913 > URL: https://issues.apache.org/jira/browse/FLINK-29913 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.15.0, 1.16.0 >Reporter: Yanfei Lei >Priority: Minor > > When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state > backend would be discarded by registering the same name handle. See > [https://github.com/apache/flink/pull/21050#discussion_r1011061072] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29913) Shared state would be discarded by mistake when maxConcurrentCheckpoint>1
[ https://issues.apache.org/jira/browse/FLINK-29913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17629630#comment-17629630 ] Yanfei Lei commented on FLINK-29913: I added [ConcurrentCheckpointITCase|https://github.com/apache/flink/commit/db27669d8bbfa995d961fae32f46bc4df676afb7] to reproduce this problem at https://github.com/fredia/flink/tree/test_concurrent_chk. > Shared state would be discarded by mistake when maxConcurrentCheckpoint>1 > - > > Key: FLINK-29913 > URL: https://issues.apache.org/jira/browse/FLINK-29913 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.15.0, 1.16.0 >Reporter: Yanfei Lei >Priority: Minor > > When maxConcurrentCheckpoint>1, the shared state of Incremental rocksdb state > backend would be discarded by registering the same name handle. See > [https://github.com/apache/flink/pull/21050#discussion_r1011061072] -- This message was sent by Atlassian Jira (v8.20.10#820010)