[ https://issues.apache.org/jira/browse/FLINK-5820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15871885#comment-15871885 ]
ASF GitHub Bot commented on FLINK-5820: --------------------------------------- GitHub user uce opened a pull request: https://github.com/apache/flink/pull/3346 Selfcontained This is based on #3345 and only the last two commits are relevant here. I've separated the test changes (last commit) and the main changes (2nd last commit) for better reviewability. I would squash them before merging this. The change is looks more involved than it actually is. It is mostly routing new information with the checkpointing barriers, which touches a lot of places. The main change is to add `CheckpointOptions` to the triggered checkpoint messages (coordinator to barrier injecting tasks) and barriers (flowing inline with the data): ```java public class CheckpointOptions { // Type of checkpoint // => FULL_CHECKPOINT // => SAVEPOINT @NonNull CheckpointType getCheckpointType(); // Custom target location. This is a String, because for future // backends it can be a logical location like a DB table. @Nullable String getTargetLocation(); } ``` This class would be the place to define more options for performing the checkpoints (for example for incremental checkpoints). @StephanEwen was involved with the design of incremental checkpoints and could probably comment best whether this is inline with the design for that. These options are forwarded via the `StreamTask` to the `StreamOperator`s and `Snapshotable` backends. The `AbstractStreamOperator` checks the options and either i) forwards the shared per operator `CheckpointStreamFactory` (as of #3312), or ii) creates a custom savepoint stream factory (one per savepoint). For this, the state backends provide the following new method: ```java CheckpointStreamFactory createSavepointStreamFactory(JobID, String, String); ``` The `MemoryStateBackend` returns the regular stream factory and the `FsStateBackend` returns a `FsSavepointStreamFactory`, which writes all checkpoint streams to a single directory (instead of the regular sub folders per checkpoint). We end up with the following directory layout for savepoints: ``` +---------------------------+ | :root_savepoint_directory | (custom per savepoint or configured default via `state.savepoints.dir`) +---------------------------+ | +---------------------------------------+ +-| savepoint-:jobId(0, 6)-:random_suffix | (one directory per savepoint) +---------------------------------------+ | +- _metadata (one per savepoint) +- :uuid (one data file per StreamTask) +- ... +- :uuid ``` I decided to include a prefix of the job ID to the savepoint directory, because I think that this could be helpful to map savepoints to jobs, which is a manual task. It's important to make sure that this is inline with upcoming changes for incremental checkpoints (discussion on mailing list) and FLINK-5820. You can merge this pull request into a Git repository by running: $ git pull https://github.com/uce/flink selfcontained Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3346.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3346 ---- commit 64e1f8892597dd26489774fe494fd7b211d789a9 Author: Ufuk Celebi <u...@apache.org> Date: 2017-02-15T16:52:40Z [FLINK-5763] [checkpoints] Acknowledge with explicit ID and CheckpointMetrics Instead of acknowledging checkpoints with the CheckpointMetaData make the acknowledgement explicit by ID and CheckpointMetrics. The rest is not needed. commit 0b8fa02a150ae6d5891e04d1fce6de748a283aaf Author: Ufuk Celebi <u...@apache.org> Date: 2017-02-15T17:16:44Z [FLINK-5763] [checkpoints] Move CheckpointMetrics out of CheckpointMetaData commit d023159d0fef1da24373d7880e11d0a36afbd7ef Author: Ufuk Celebi <u...@apache.org> Date: 2017-02-16T15:52:32Z [FLINK-5763] [checkpoints] Add isSavepoint() to CheckpointProperties commit 2496068371bf1aac9e2ce6223002c1d9a043930f Author: Ufuk Celebi <u...@apache.org> Date: 2017-02-16T16:56:23Z [FLINK-5763] [checkpoints] Add CheckpointOptions Adds `CheckpointOptions` to the triggered checkpoint messages (coordinator to barrier injecting tasks) and barriers (flowing inline with the data: ```java public class CheckpointOptions { // Type of checkpoint // => FULL_CHECKPOINT // => SAVEPOINT @NonNull CheckpointType getCheckpointType(); // Custom target location. This is a String, because for future // backends it can be a logical location like a DB table. @Nullable String getTargetLocation(); } ``` This class would be the place to define more options for performing the checkpoints (for example for incremental checkpoints). These options are forwarded via the `StreamTask` to the `StreamOperator`s and `Snapshotable` backends. The `AbstractStreamOperator` checks the options and either i) forwards the shared per operator `CheckpointStreamFactory` (as of For this, the state backends provide the following new method: ``` CheckpointStreamFactory createSavepointStreamFactory(JobID, String, String); ``` The `MemoryStateBackend` returns the regular stream factory and the `FsStateBackend` returns a `FsSavepointStreamFactory`, which writes all checkpoint streams to a single directory (instead of the regular sub folders per checkpoint). We end up with the following directory layout for savepoints: ``` +---------------------------+ | :root_savepoint_directory | (custom per savepoint or configured default via `state.savepoints.dir`) +---------------------------+ | +---------------------------------------+ +-| savepoint-:jobId(0, 6)-:random_suffix | (one directory per savepoint) +---------------------------------------+ | +- _metadata (one per savepoint) +- :uuid (one data file per StreamTask) +- ... +- :uuid ``` commit ca80d68ab7faba663f4c384156a90d58de0ebf82 Author: Ufuk Celebi <u...@apache.org> Date: 2017-02-16T16:56:37Z [FLINK-5763] [checkpoints] Adjust tests ---- > Extend State Backend Abstraction to support Global Cleanup Hooks > ---------------------------------------------------------------- > > Key: FLINK-5820 > URL: https://issues.apache.org/jira/browse/FLINK-5820 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing > Affects Versions: 1.2.0 > Reporter: Stephan Ewen > Assignee: Stephan Ewen > Fix For: 1.3.0 > > > The current state backend abstraction has the limitation that each piece of > state is only meaningful in the context of its state handle. There is no > possibility of a view onto "all state associated with checkpoint X". > That causes several issues > - State might not be cleaned up in the process of failures. When a > TaskManager hands over a state handle to the JobManager and either of them > has a failure, the state handle may be lost and state lingers. > - State might also linger if a cleanup operation failed temporarily, and > the checkpoint metadata was already disposed > - State cleanup is more expensive than necessary in many cases. Each state > handle is individually released. For large jobs, this means 1000s of release > operations (typically file deletes) per checkpoint, which can be expensive on > some file systems. > - It is hard to guarantee cleanup of parent directories with the current > architecture. > The core changes proposed here are: > 1. Each job has one core {{StateBackend}}. In the future, operators may > have different {{KeyedStateBackends}} and {{OperatorStateBackends}} to mix > and match for example RocksDB storabe and in-memory storage. > 2. The JobManager needs to be aware of the {{StateBackend}}. > 3. Storing checkpoint metadata becomes responsibility of the state backend, > not the "completed checkpoint store". The later only stores the pointers to > the available latest checkpoints (either in process or in ZooKeeper). > 4. The StateBackend may optionally have a hook to drop all checkpointed > state that belongs to only one specific checkpoint (shared state comes as > part of incremental checkpointing). > 5. The StateBackend needs to have a hook to drop all checkpointed state up > to a specific checkpoint (for all previously discarded checkpoints). > 6. In the future, this must support periodic cleanup hooks that track > orphaned shared state from incremental checkpoints. > For the {{FsStateBackend}}, which stores most of the checkpointes state > currently (transitively for RocksDB as well), this means a re-structuring of > the storage directories as follows: > {code} > ../<flink-checkpoints>/job1-id/ > /shared/ <-- shared checkpoint data > /chk-1/... <-- data exclusive to checkpoint 1 > /chk-2/... <-- data exclusive to checkpoint 2 > /chk-3/... <-- data exclusive to checkpoint 3 > ../<flink-checkpoints>/job2-id/ > /shared/... > /chk-1/... > /chk-2/... > /chk-3/... > ../<flink-savepoints>/savepoint-1/savepoint-root > /file-1-uid > /file-2-uid > /file-3-uid > /savepoint-2/savepoint-root > /file-1-uid > /file-2-uid > /file-3-uid > {code} > This is the umbrella issue for the individual steps needed to address this. -- This message was sent by Atlassian JIRA (v6.3.15#6346)