Ufuk Celebi created FLINK-5777:
----------------------------------
Summary: Pass savepoint information to CheckpointingOperation
Key: FLINK-5777
URL: https://issues.apache.org/jira/browse/FLINK-5777
Project: Flink
Issue Type: Sub-task
Components: State Backends, Checkpointing
Reporter: Ufuk Celebi
Assignee: Ufuk Celebi
In order to make savepoints self contained in a single directory, we need to
pass some information to {{StreamTask#CheckpointingOperation}}.
I propose to extend the {{CheckpointMetaData}} for this.
We currently have some overlap with CheckpointMetaData, CheckpointMetrics, and
manually passed checkpoint ID and checkpoint timestamps. We should restrict
CheckpointMetaData to the integral meta data that needs to be passed to
StreamTask#CheckpointingOperation.
This means that we move the CheckpointMetrics out of the CheckpointMetaData and
the BarrierBuffer/BarrierTracker create CheckpointMetrics separately and send
it back with the acknowledge message.
CheckpointMetaData should be extended with the following properties:
- boolean isSavepoint
- String targetDirectory
There are two code paths that lead to the CheckpointingOperation:
1. From CheckpointCoordinator via RPC to StreamTask#triggerCheckpoint
- Execution#triggerCheckpoint(long, long)
=> triggerCheckpoint(CheckpointMetaData)
- TaskManagerGateway#triggerCheckpoint(ExecutionAttemptID, JobID, long, long)
=> TaskManagerGateway#triggerCheckpoint(ExecutionAttemptID, JobID,
CheckpointMetaData)
- Task#triggerCheckpointBarrier(long, long) =>
Task#triggerCheckpointBarrier(CheckpointMetaData)
2. From intermediate streams via the CheckpointBarrier to
StreamTask#triggerCheckpointOnBarrier
- triggerCheckpointOnBarrier(CheckpointMetaData)
=> triggerCheckpointOnBarrier(CheckpointMetaData, CheckpointMetrics)
- CheckpointBarrier(long, long) => CheckpointBarrier(CheckpointMetaData)
- AcknowledgeCheckpoint(CheckpointMetaData)
=> AcknowledgeCheckpoint(long, CheckpointMetrics)
The state backends provide another stream factory that is called in
CheckpointingOperation when the meta data indicates savepoint. The state
backends can choose whether they return the regular checkpoint stream factory
in that case or a special one for savepoints. That way backends that don’t
checkpoint to a file system can special case savepoints easily.
- FsStateBackend: return special FsCheckpointStreamFactory with different
directory layout
- MemoryStateBackend: return regular checkpoint stream factory
(MemCheckpointStreamFactory) => The _metadata file will contain all state as
the state handles are part of it
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)