[
https://issues.apache.org/jira/browse/FLINK-23949?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Feifan Wang updated FLINK-23949:
--------------------------------
Description:
In RocksIncrementalSnapshotStrategy we will record the uploaded rocksdb files
corresponding to the checkpoint id,and clean it in
_CheckpointListener#notifyCheckpointComplete ._
{code:java}
@Override
public void notifyCheckpointComplete(long completedCheckpointId) {
synchronized (materializedSstFiles) {
if (completedCheckpointId > lastCompletedCheckpointId) {
materializedSstFiles
.keySet()
.removeIf(checkpointId -> checkpointId <
completedCheckpointId);
lastCompletedCheckpointId = completedCheckpointId;
}
}
}{code}
This works well without savepoint, but when a savepoint is completed, it will
clean up the _materializedSstFiles_ of the previous checkpoint. It leads to the
first checkpoint after the savepoint must upload all files in rocksdb.
!image-2021-08-25-00-59-05-779.png|width=1188,height=163!
Solving the problem is also very simple, I propose to clean
_materializedSstFiles_ and update _lastCompletedCheckpointId_ only when
{color:#ff0000}_materializedSstFiles.keySet().contains(completedCheckpointId)_{color}
.
If a _completedCheckpointId_ is not in __ _materializedSstFiles.keySet()_ ,
there are only two cases:
1. __ is a checkpoint but other checkoint triggered after it had completed
2. is a savepoint (savepoint not produce by RocksIncrementalSnapshotStrategy)
In either case we don’t need clean _materializedSstFiles_ and update
_lastCompletedCheckpointId_ anymore.
was:
In RocksIncrementalSnapshotStrategy we will record the uploaded rocksdb files
corresponding to the checkpoint id,and clean it in
_CheckpointListener#notifyCheckpointComplete ._
{code:java}
@Override
public void notifyCheckpointComplete(long completedCheckpointId) {
synchronized (materializedSstFiles) {
if (completedCheckpointId > lastCompletedCheckpointId) {
materializedSstFiles
.keySet()
.removeIf(checkpointId -> checkpointId <
completedCheckpointId);
lastCompletedCheckpointId = completedCheckpointId;
}
}
}{code}
This works well without savepoint, but when a savepoint is completed, it will
clean up the _materializedSstFiles_ of the previous checkpoint. It leads to the
first checkpoint after the savepoint must upload all files in rocksdb.
!image-2021-08-25-00-59-05-779.png|width=1188,height=163!
Solving the problem is also very simple, I propose to clean
_materializedSstFiles_ and update _lastCompletedCheckpointId_ only when
{color:#ff0000}_materializedSstFiles.keySet().contains(completedCheckpointId)_{color}
.
> first incremental checkpoint after a savepoint will degenerate into a full
> checkpoint
> -------------------------------------------------------------------------------------
>
> Key: FLINK-23949
> URL: https://issues.apache.org/jira/browse/FLINK-23949
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / State Backends
> Affects Versions: 1.11.4, 1.12.5, 1.13.2
> Reporter: Feifan Wang
> Priority: Major
> Labels: pull-request-available
> Attachments: image-2021-08-25-00-59-05-779.png
>
>
> In RocksIncrementalSnapshotStrategy we will record the uploaded rocksdb files
> corresponding to the checkpoint id,and clean it in
> _CheckpointListener#notifyCheckpointComplete ._
> {code:java}
> @Override
> public void notifyCheckpointComplete(long completedCheckpointId) {
> synchronized (materializedSstFiles) {
> if (completedCheckpointId > lastCompletedCheckpointId) {
> materializedSstFiles
> .keySet()
> .removeIf(checkpointId -> checkpointId <
> completedCheckpointId);
> lastCompletedCheckpointId = completedCheckpointId;
> }
> }
> }{code}
>
> This works well without savepoint, but when a savepoint is completed, it will
> clean up the _materializedSstFiles_ of the previous checkpoint. It leads to
> the first checkpoint after the savepoint must upload all files in rocksdb.
> !image-2021-08-25-00-59-05-779.png|width=1188,height=163!
> Solving the problem is also very simple, I propose to clean
> _materializedSstFiles_ and update _lastCompletedCheckpointId_ only when
> {color:#ff0000}_materializedSstFiles.keySet().contains(completedCheckpointId)_{color}
> .
> If a _completedCheckpointId_ is not in __ _materializedSstFiles.keySet()_ ,
> there are only two cases:
> 1. __ is a checkpoint but other checkoint triggered after it had completed
> 2. is a savepoint (savepoint not produce by RocksIncrementalSnapshotStrategy)
> In either case we don’t need clean _materializedSstFiles_ and update
> _lastCompletedCheckpointId_ anymore.
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)