Ufuk Celebi created FLINK-5777:
----------------------------------

             Summary: Pass savepoint information to CheckpointingOperation
                 Key: FLINK-5777
                 URL: https://issues.apache.org/jira/browse/FLINK-5777
             Project: Flink
          Issue Type: Sub-task
          Components: State Backends, Checkpointing
            Reporter: Ufuk Celebi
            Assignee: Ufuk Celebi


In order to make savepoints self contained in a single directory, we need to 
pass some information to {{StreamTask#CheckpointingOperation}}.

I propose to extend the {{CheckpointMetaData}} for this.

We currently have some overlap with CheckpointMetaData, CheckpointMetrics, and 
manually passed checkpoint ID and checkpoint timestamps. We should restrict 
CheckpointMetaData to the integral meta data that needs to be passed to 
StreamTask#CheckpointingOperation.

This means that we move the CheckpointMetrics out of the CheckpointMetaData and 
the BarrierBuffer/BarrierTracker create CheckpointMetrics separately and send 
it back with the acknowledge message.

CheckpointMetaData should be extended with the following properties:
- boolean isSavepoint
- String targetDirectory

There are two code paths that lead to the CheckpointingOperation:

1. From CheckpointCoordinator via RPC to StreamTask#triggerCheckpoint
- Execution#triggerCheckpoint(long, long) 
=> triggerCheckpoint(CheckpointMetaData)
- TaskManagerGateway#triggerCheckpoint(ExecutionAttemptID, JobID, long, long) 
=> TaskManagerGateway#triggerCheckpoint(ExecutionAttemptID, JobID, 
CheckpointMetaData)
- Task#triggerCheckpointBarrier(long, long) =>  
Task#triggerCheckpointBarrier(CheckpointMetaData)

2. From intermediate streams via the CheckpointBarrier to  
StreamTask#triggerCheckpointOnBarrier
- triggerCheckpointOnBarrier(CheckpointMetaData)
=> triggerCheckpointOnBarrier(CheckpointMetaData, CheckpointMetrics)
- CheckpointBarrier(long, long) => CheckpointBarrier(CheckpointMetaData)
- AcknowledgeCheckpoint(CheckpointMetaData)
=> AcknowledgeCheckpoint(long, CheckpointMetrics)

The state backends provide another stream factory that is called in 
CheckpointingOperation when the meta data indicates savepoint. The state 
backends can choose whether they return the regular checkpoint stream factory 
in that case or a special one for savepoints. That way backends that don’t 
checkpoint to a file system can special case savepoints easily.

- FsStateBackend: return special FsCheckpointStreamFactory with different 
directory layout
- MemoryStateBackend: return regular checkpoint stream factory 
(MemCheckpointStreamFactory) => The _metadata file will contain all state as 
the state handles are part of it





--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to