[ https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16311347#comment-16311347 ]
ASF GitHub Bot commented on FLINK-8360: --------------------------------------- GitHub user StefanRRichter opened a pull request: https://github.com/apache/flink/pull/5239 [FLINK-8360] Implement task-local state recovery ## What is the purpose of the change *This changes introduces the task-local recovery feature. The main idea is to have a secondary, local copy of the checkpointed state, while there is still a primary copy in DFS that we report to the checkpoint coordinator. Recovery can attempt to restore from the secondary local copy, if available, to save network bandwidth. This requires that the assignment from tasks to slots is as sticky is possible. For starters, we will implement this feature for all managed keyed states and can easily enhance it to all other state types (e.g. operator state) later, because the basic infrastructure is already in place. This PR is on top of #4745.* ## Brief change log - *Introduced `TaskExecutorLocalStateStoresManager` per task manager. This class manages one `TaskLocalStateStore` for each task running on the task manager.* - *`TaskLocalStateStore` stores and provides the local state for one task. Reporting of checkpointed states goes through this class. The primary state handles are forwarded to the checkpoint coordinator, the optional secondary (local) state is stored in the local store.* - *`LocalRecoveryDirectoryProvider` is used by `TaskLocalStateStore` to manage the local state directory/ies.* - *`StreamTaskStateManager` uses the `TaskLocalStateStore` to restore state for its operators.* - *File-based local state is created through `DuplicatingCheckpointOutputStream`, a stream that duplicates writes into two internal streams - typically one primary against a DFS and one secondary against local FS.* -*RocksDB's incremental checkpoints are not just based on one file, but on a directory. As we do not require reference counting for local files (we can use hardlinks), we can deal with checkpoint directories as a whole. We introduced `IncrementalLocalKeyedStateHandle extends DirectoryKeyedStateHandle` for this purpose.* ## Verifying this change This change added tests and can be verified as follows: - *If we activate the local recovery feature on the state backend (via `#setLocalRecoveryMode(...)`, and introduce task failure by user-code exception, we should observe that managed keyed state is recovered from the local FS through the logs.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes) - The serializers: (yes, slightly and in a way that should not matter.) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (Documentation pending) You can merge this pull request into a Git repository by running: $ git pull https://github.com/StefanRRichter/flink task-local-recovery Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5239.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5239 ---- commit 769aa8619f7da4512e51bfc13110459b50d67c0c Author: Stefan Richter <s.richter@...> Date: 2017-08-21T12:31:38Z [FLINK-7719] [checkpoints] Send checkpoint id to task as part of deployment descriptor when resuming commit 194d289eb5d67494fb7242dda3589171e0bcd4f6 Author: Stefan Richter <s.richter@...> Date: 2017-09-27T13:10:52Z [FLINK-7720] [checkpoints] Centralize creation of backends and state related resources commit 4beb98837fbb1c99d608b6b5ceea1fba1989fbd8 Author: Stefan Richter <s.richter@...> Date: 2017-11-28T17:05:41Z smaller refactorings of method and class names commit 2dc9f30bcad35ef2a63a4014e2fa7f9b6411d811 Author: Stefan Richter <s.richter@...> Date: 2017-11-28T17:06:05Z introduce state object collection for better OO. commit 3c9d39d8ca4c1370467d5596f2c3ef51b9aff043 Author: Stefan Richter <s.richter@...> Date: 2017-11-29T09:59:05Z Split generic type in Snapshotable interface commit 6cf67e5fc04997873d2d9f4abfaa6be962bb464f Author: Stefan Richter <s.richter@...> Date: 2017-11-29T14:33:32Z Introduce explicit snapshot result type with 2 locations commit 5d829f235fa433af4311ca0c943843b879ed4627 Author: Stefan Richter <s.richter@...> Date: 2017-11-30T11:00:48Z Deep compare handle broken. TODO could use matcher instead commit 238ea9881900f26c6e78af093e96fad348cab002 Author: Stefan Richter <s.richter@...> Date: 2017-12-01T11:13:43Z javadoc on new classes. commit 5d9c414cec2bc9410ed0972d37390200e915e325 Author: Stefan Richter <s.richter@...> Date: 2017-12-04T14:12:00Z Preparing, test related refactorings commit adbe507c24eda44088745159444d1ae5011e2bda Author: Stefan Richter <s.richter@...> Date: 2017-12-04T14:12:12Z Additional unit test for forwarding to/from TaskStateManager and StateObjectCollection. commit 5a8b43e33918b84aec98904a77061d69ec1ff51c Author: Stefan Richter <s.richter@...> Date: 2017-12-05T17:46:07Z Introduce configuration for local recovery. commit 0c77cf022fd10ee60c2a5ffe649f4a3cec2fb5fa Author: Stefan Richter <s.richter@...> Date: 2017-12-06T11:00:12Z Introduce snapshot strategy and some connected refactorings. commit 48ac41b78cfede2b3bc34f0f9d61f9e2374aff3b Author: Stefan Richter <s.richter@...> Date: 2017-12-06T16:52:59Z file based local recovery for heap keyed backend commit f4946175060b0b05b463b13b2e94f8cbd5457392 Author: Stefan Richter <s.richter@...> Date: 2017-12-11T11:01:54Z [BUGFIX] Lifecycle of operator close/dispose is of w.r.t finished flag. commit e4bcc7f4aa67a9f91874d5ae765517ab8aa9166f Author: Stefan Richter <s.richter@...> Date: 2017-12-11T14:08:29Z Integration fixup. commit 95f60dff6407f9f5e59ed975b61ece3071c3daae Author: Stefan Richter <s.richter@...> Date: 2017-12-12T16:07:08Z introduce disposable interface commit 37eb9b7ce99171a20c07ece1dc11b2fc3122292d Author: Stefan Richter <s.richter@...> Date: 2017-12-14T10:07:51Z Implement restore path for file-based local snapshots commit a12c1c207e1a93db43bb4077646fbd8eb6c6b13b Author: Stefan Richter <s.richter@...> Date: 2017-12-14T18:04:21Z Cleanup organization improvements commit 9724491fe4c8ec0b1e318e24a76db2e96541a94e Author: Stefan Richter <s.richter@...> Date: 2017-12-14T18:05:00Z RocksDB local incremental snapshots commit e9811c0735809021af7c8175bacab3ce6c046a22 Author: Stefan Richter <s.richter@...> Date: 2017-12-15T15:01:55Z Recovery path for RocksDB local incremental snapshots. commit 588f32fd5d91db22cbf7b14c4726a64408defd2a Author: Stefan Richter <s.richter@...> Date: 2017-12-18T16:15:55Z Additional unit tests (WIP). commit c1b5a8337944bcae3ac1fc012e8c16bc0e6265a7 Author: Stefan Richter <s.richter@...> Date: 2017-12-30T19:23:48Z Abandon old OperatorStateHandles (replace by newer OperatorSubtaskState). commit e7e602768497b3cad0567ad1edf58c4bf67d1018 Author: Stefan Richter <s.richter@...> Date: 2018-01-02T13:21:04Z Additional unit tests (WIP) - 4. commit 3fff0889dafefdb7cb0920b692ed9fdb3645d3b4 Author: Stefan Richter <s.richter@...> Date: 2018-01-02T16:38:33Z Change directory structure so that we can delete local state directories based on allocation id in the (flip-6) future. commit 958baaa5c118b431c18df298f3f138133ac42466 Author: Stefan Richter <s.richter@...> Date: 2018-01-02T17:30:48Z Make checkstyle happy :-) ---- > Implement task-local state recovery > ----------------------------------- > > Key: FLINK-8360 > URL: https://issues.apache.org/jira/browse/FLINK-8360 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing > Reporter: Stefan Richter > Assignee: Stefan Richter > Fix For: 1.5.0 > > > This issue tracks the development of recovery from task-local state. The main > idea is to have a secondary, local copy of the checkpointed state, while > there is still a primary copy in DFS that we report to the checkpoint > coordinator. > Recovery can attempt to restore from the secondary local copy, if available, > to save network bandwidth. This requires that the assignment from tasks to > slots is as sticky is possible. > For starters, we will implement this feature for all managed keyed states and > can easily enhance it to all other state types (e.g. operator state) later. -- This message was sent by Atlassian JIRA (v6.4.14#64029)