[ 
https://issues.apache.org/jira/browse/FLINK-10954?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17119353#comment-17119353
 ] 

Yun Tang commented on FLINK-10954:
----------------------------------

This ticket has been inactive for more than one year as I just walk around and 
the local-recovery is not enabled by default in Flink.

However, as we plan to make local recovery by default at FLINK-15507, this 
problem should be better fixed.

When I looked back at this problem, the root cause is that current 
{{LocalRecoveryDirectoryProviderImpl}} lack of any check on the provided 
{{allocationBaseDirs}}.

If we compare the behavior of how rocksDB state-backend treat the 
{{localRocksDbDirectories}} from configuration of 
{{state.backend.rocksdb.localdir}} or default spilling directories from 
IOManager , and we can see the difference that RocksDB would check whether 
those {{localRocksDbDirectories}} are valid. However, RocksDB state-backend 
would not execute any check on the {{allocationBaseDirs}} and not provide 
feedback to {{LocalRecoveryDirectoryProviderImpl}} so that some invalid 
{{allocationBaseDirs}} could be ignored then.

As state-backend is the consumer of {{LocalRecoveryDirectoryProvider}}, and 
{{subtaskSpecificCheckpointDirectory}} is the actual interface used by 
state-backends. I suggest to add interface likeĀ  '{{void 
confirmAllocationBaseDirsAvailable(Collection<Integer> indexes)'}} to tell 
{{LocalRecoveryDirectoryProvider}} which {{allocationBaseDirs}} are valid. And 
the {{subtaskSpecificCheckpointDirectory}} would only choose from valid 
directories to report.

If we want to allow the {{confirmAllocationBaseDirsAvailable}} could give 
different responses each time the subtask is submitted, we need to make 
{{LocalRecoveryDirectoryProviderImpl}} not stateless anymore to remember 
previous checkpointId sub directory stays. Current 
{{LocalRecoveryDirectoryProviderImpl}} is actually stateless, and it would only 
return one of the {{allocationBaseDirs}} based on the {{checkpointId % 
allocationBaseDirsCount}}. I think this could be possible by introducing a map 
of {{<checkpointId -> base dir index>}} and the same checkpoint id should only 
be called twice: once to create, once to discard so that we could remove the 
record on the second time.

On the other hand, we could also still make 
{{LocalRecoveryDirectoryProviderImpl}} as stateless and only allow 
{{confirmAllocationBaseDirsAvailable}} to initialize just once, and tasks 
submitted later need to obey the first initialized result.

What do you think? [~sewen], [~liyu]

> Hardlink from files of previous local stored state might cross devices
> ----------------------------------------------------------------------
>
>                 Key: FLINK-10954
>                 URL: https://issues.apache.org/jira/browse/FLINK-10954
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / State Backends
>    Affects Versions: 1.6.2
>            Reporter: Yun Tang
>            Assignee: Yun Tang
>            Priority: Major
>             Fix For: 1.11.0
>
>
> Currently, local recovery's base directories is initialized from 
> '{{io.tmp.dirs}}' if parameter '{{taskmanager.state.local.root-dirs}}' is not 
> set. For Yarn environment, the tmp dirs is replaced by its '{{LOCAL_DIRS}}', 
> which might consist of directories from different devices, such as 
> /dump/1/nm-local-dir, /dump/2/nm-local-dir. The local directory for RocksDB 
> is initialized from IOManager's spillingDirectories, which might located in 
> different device from local recovery's folder. However, hard-link between 
> different devices is not allowed, it will throw exception below:
> {code:java}
> java.nio.file.FileSystemException: target -> souce: Invalid cross-device link
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to