[ 
https://issues.apache.org/jira/browse/FLINK-5820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15871885#comment-15871885
 ] 

ASF GitHub Bot commented on FLINK-5820:
---------------------------------------

GitHub user uce opened a pull request:

    https://github.com/apache/flink/pull/3346

    Selfcontained

    This is based on #3345 and only the last two commits are relevant here. 
I've separated the test changes (last commit) and the main changes (2nd last 
commit) for better reviewability. I would squash them before merging this. The 
change is looks more involved than it actually is. It is mostly routing new 
information with the checkpointing barriers, which touches a lot of places.
    
    The main change is to add `CheckpointOptions` to the triggered checkpoint 
messages (coordinator to barrier injecting tasks) and barriers (flowing inline 
with the data):
    
    ```java
    public class CheckpointOptions {
        
        // Type of checkpoint
        // => FULL_CHECKPOINT
        // => SAVEPOINT
        @NonNull
        CheckpointType getCheckpointType();
    
        // Custom target location. This is a String, because for future
        // backends it can be a logical location like a DB table.
        @Nullable
        String getTargetLocation();
    
    }
    ```
    
    This class would be the place to define more options for performing the 
checkpoints (for example for incremental checkpoints). @StephanEwen was 
involved with the design of incremental checkpoints and could probably comment 
best whether this is inline with the design for that.
    
    These options are forwarded via the `StreamTask` to the `StreamOperator`s 
and `Snapshotable` backends. The `AbstractStreamOperator` checks the options 
and either
    i) forwards the shared per operator `CheckpointStreamFactory` (as of 
#3312), or
    ii) creates a custom savepoint stream factory (one per savepoint).
    
    For this, the state backends provide the following new method:
    
    ```java
    CheckpointStreamFactory createSavepointStreamFactory(JobID, String, String);
    ```
    
    The `MemoryStateBackend` returns the regular stream factory and the 
`FsStateBackend` returns a `FsSavepointStreamFactory`, which writes all 
checkpoint streams to a single directory (instead of the regular sub folders 
per checkpoint).
    
    We end up with the following directory layout for savepoints:
    
    ```
    +---------------------------+
    | :root_savepoint_directory | (custom per savepoint or configured default 
via `state.savepoints.dir`)
    +---------------------------+
      | +---------------------------------------+
      +-| savepoint-:jobId(0, 6)-:random_suffix | (one directory per savepoint)
        +---------------------------------------+
          |
          +- _metadata (one per savepoint)
          +- :uuid (one data file per StreamTask)
          +- ...
          +- :uuid
    ```
    
    I decided to include a prefix of the job ID to the savepoint directory, 
because I think that this could be helpful to map savepoints to jobs, which is 
a manual task. 
    
    It's important to make sure that this is inline with upcoming changes for 
incremental checkpoints (discussion on mailing list) and FLINK-5820.
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/uce/flink selfcontained

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3346.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3346
    
----
commit 64e1f8892597dd26489774fe494fd7b211d789a9
Author: Ufuk Celebi <u...@apache.org>
Date:   2017-02-15T16:52:40Z

    [FLINK-5763] [checkpoints] Acknowledge with explicit ID and 
CheckpointMetrics
    
    Instead of acknowledging checkpoints with the CheckpointMetaData make
    the acknowledgement explicit by ID and CheckpointMetrics. The rest is
    not needed.

commit 0b8fa02a150ae6d5891e04d1fce6de748a283aaf
Author: Ufuk Celebi <u...@apache.org>
Date:   2017-02-15T17:16:44Z

    [FLINK-5763] [checkpoints] Move CheckpointMetrics out of CheckpointMetaData

commit d023159d0fef1da24373d7880e11d0a36afbd7ef
Author: Ufuk Celebi <u...@apache.org>
Date:   2017-02-16T15:52:32Z

    [FLINK-5763] [checkpoints] Add isSavepoint() to CheckpointProperties

commit 2496068371bf1aac9e2ce6223002c1d9a043930f
Author: Ufuk Celebi <u...@apache.org>
Date:   2017-02-16T16:56:23Z

    [FLINK-5763] [checkpoints] Add CheckpointOptions
    
    Adds `CheckpointOptions` to the triggered checkpoint messages (coordinator
    to barrier injecting tasks) and barriers (flowing inline with the data:
    
    ```java
    public class CheckpointOptions {
    
      // Type of checkpoint
      // => FULL_CHECKPOINT
      // => SAVEPOINT
      @NonNull
      CheckpointType getCheckpointType();
    
      // Custom target location. This is a String, because for future
      // backends it can be a logical location like a DB table.
      @Nullable
      String getTargetLocation();
    
    }
    ```
    
    This class would be the place to define more options for performing the
    checkpoints (for example for incremental checkpoints).
    
    These options are forwarded via the `StreamTask` to the `StreamOperator`s 
and
    `Snapshotable` backends. The `AbstractStreamOperator` checks the options and
    either i) forwards the shared per operator `CheckpointStreamFactory` (as of
    
    For this, the state backends provide the following new method:
    
    ```
    CheckpointStreamFactory createSavepointStreamFactory(JobID, String, String);
    ```
    
    The `MemoryStateBackend` returns the regular stream factory and the
    `FsStateBackend` returns a `FsSavepointStreamFactory`, which writes all
    checkpoint streams to a single directory (instead of the regular sub folders
    per checkpoint).
    
    We end up with the following directory layout for savepoints:
    
    ```
    +---------------------------+
    | :root_savepoint_directory | (custom per savepoint or configured default 
via `state.savepoints.dir`)
    +---------------------------+
      | +---------------------------------------+
      +-| savepoint-:jobId(0, 6)-:random_suffix | (one directory per savepoint)
        +---------------------------------------+
           |
           +- _metadata (one per savepoint)
           +- :uuid (one data file per StreamTask)
           +- ...
           +- :uuid
    ```

commit ca80d68ab7faba663f4c384156a90d58de0ebf82
Author: Ufuk Celebi <u...@apache.org>
Date:   2017-02-16T16:56:37Z

    [FLINK-5763] [checkpoints] Adjust tests

----


> Extend State Backend Abstraction to support Global Cleanup Hooks
> ----------------------------------------------------------------
>
>                 Key: FLINK-5820
>                 URL: https://issues.apache.org/jira/browse/FLINK-5820
>             Project: Flink
>          Issue Type: Improvement
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.2.0
>            Reporter: Stephan Ewen
>            Assignee: Stephan Ewen
>             Fix For: 1.3.0
>
>
> The current state backend abstraction has the limitation that each piece of 
> state is only meaningful in the context of its state handle. There is no 
> possibility of a view onto "all state associated with checkpoint X".
> That causes several issues
>   - State might not be cleaned up in the process of failures. When a 
> TaskManager hands over a state handle to the JobManager and either of them 
> has a failure, the state handle may be lost and state lingers.
>   - State might also linger if a cleanup operation failed temporarily, and 
> the checkpoint metadata was already disposed
>   - State cleanup is more expensive than necessary in many cases. Each state 
> handle is individually released. For large jobs, this means 1000s of release 
> operations (typically file deletes) per checkpoint, which can be expensive on 
> some file systems.
>   - It is hard to guarantee cleanup of parent directories with the current 
> architecture.
> The core changes proposed here are:
>   1. Each job has one core {{StateBackend}}. In the future, operators may 
> have different {{KeyedStateBackends}} and {{OperatorStateBackends}} to mix 
> and match for example RocksDB storabe and in-memory storage.
>   2. The JobManager needs to be aware of the {{StateBackend}}.
>   3. Storing checkpoint metadata becomes responsibility of the state backend, 
> not the "completed checkpoint store". The later only stores the pointers to 
> the available latest checkpoints (either in process or in ZooKeeper).
>   4. The StateBackend may optionally have a hook to drop all checkpointed 
> state that belongs to only one specific checkpoint (shared state comes as 
> part of incremental checkpointing).
>   5.  The StateBackend needs to have a hook to drop all checkpointed state up 
> to a specific checkpoint (for all previously discarded checkpoints).
>   6. In the future, this must support periodic cleanup hooks that track 
> orphaned shared state from incremental checkpoints.
> For the {{FsStateBackend}}, which stores most of the checkpointes state 
> currently (transitively for RocksDB as well), this means a re-structuring of 
> the storage directories as follows:
> {code}
> ../<flink-checkpoints>/job1-id/
>                               /shared/    <-- shared checkpoint data
>                               /chk-1/...  <-- data exclusive to checkpoint 1
>                               /chk-2/...  <-- data exclusive to checkpoint 2
>                               /chk-3/...  <-- data exclusive to checkpoint 3
> ../<flink-checkpoints>/job2-id/
>                               /shared/...
>                               /chk-1/...
>                               /chk-2/...
>                               /chk-3/...
> ../<flink-savepoints>/savepoint-1/savepoint-root
>                                  /file-1-uid
>                                  /file-2-uid
>                                  /file-3-uid
>                      /savepoint-2/savepoint-root
>                                  /file-1-uid
>                                  /file-2-uid
>                                  /file-3-uid
> {code}
> This is the umbrella issue for the individual steps needed to address this.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to