[ https://issues.apache.org/jira/browse/FLINK-10954?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17119353#comment-17119353 ]
Yun Tang commented on FLINK-10954: ---------------------------------- This ticket has been inactive for more than one year as I just walk around and the local-recovery is not enabled by default in Flink. However, as we plan to make local recovery by default at FLINK-15507, this problem should be better fixed. When I looked back at this problem, the root cause is that current {{LocalRecoveryDirectoryProviderImpl}} lack of any check on the provided {{allocationBaseDirs}}. If we compare the behavior of how rocksDB state-backend treat the {{localRocksDbDirectories}} from configuration of {{state.backend.rocksdb.localdir}} or default spilling directories from IOManager , and we can see the difference that RocksDB would check whether those {{localRocksDbDirectories}} are valid. However, RocksDB state-backend would not execute any check on the {{allocationBaseDirs}} and not provide feedback to {{LocalRecoveryDirectoryProviderImpl}} so that some invalid {{allocationBaseDirs}} could be ignored then. As state-backend is the consumer of {{LocalRecoveryDirectoryProvider}}, and {{subtaskSpecificCheckpointDirectory}} is the actual interface used by state-backends. I suggest to add interface likeĀ '{{void confirmAllocationBaseDirsAvailable(Collection<Integer> indexes)'}} to tell {{LocalRecoveryDirectoryProvider}} which {{allocationBaseDirs}} are valid. And the {{subtaskSpecificCheckpointDirectory}} would only choose from valid directories to report. If we want to allow the {{confirmAllocationBaseDirsAvailable}} could give different responses each time the subtask is submitted, we need to make {{LocalRecoveryDirectoryProviderImpl}} not stateless anymore to remember previous checkpointId sub directory stays. Current {{LocalRecoveryDirectoryProviderImpl}} is actually stateless, and it would only return one of the {{allocationBaseDirs}} based on the {{checkpointId % allocationBaseDirsCount}}. I think this could be possible by introducing a map of {{<checkpointId -> base dir index>}} and the same checkpoint id should only be called twice: once to create, once to discard so that we could remove the record on the second time. On the other hand, we could also still make {{LocalRecoveryDirectoryProviderImpl}} as stateless and only allow {{confirmAllocationBaseDirsAvailable}} to initialize just once, and tasks submitted later need to obey the first initialized result. What do you think? [~sewen], [~liyu] > Hardlink from files of previous local stored state might cross devices > ---------------------------------------------------------------------- > > Key: FLINK-10954 > URL: https://issues.apache.org/jira/browse/FLINK-10954 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends > Affects Versions: 1.6.2 > Reporter: Yun Tang > Assignee: Yun Tang > Priority: Major > Fix For: 1.11.0 > > > Currently, local recovery's base directories is initialized from > '{{io.tmp.dirs}}' if parameter '{{taskmanager.state.local.root-dirs}}' is not > set. For Yarn environment, the tmp dirs is replaced by its '{{LOCAL_DIRS}}', > which might consist of directories from different devices, such as > /dump/1/nm-local-dir, /dump/2/nm-local-dir. The local directory for RocksDB > is initialized from IOManager's spillingDirectories, which might located in > different device from local recovery's folder. However, hard-link between > different devices is not allowed, it will throw exception below: > {code:java} > java.nio.file.FileSystemException: target -> souce: Invalid cross-device link > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)