[
https://issues.apache.org/jira/browse/FLINK-23949?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-23949:
-----------------------------------
Labels: pull-request-available (was: )
> first incremental checkpoint after a savepoint will degenerate into a full
> checkpoint
> -------------------------------------------------------------------------------------
>
> Key: FLINK-23949
> URL: https://issues.apache.org/jira/browse/FLINK-23949
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / State Backends
> Affects Versions: 1.11.4, 1.12.5, 1.13.2
> Reporter: Feifan Wang
> Priority: Major
> Labels: pull-request-available
> Attachments: image-2021-08-25-00-59-05-779.png
>
>
> In RocksIncrementalSnapshotStrategy we will record the uploaded rocksdb files
> corresponding to the checkpoint id,and clean it in
> _CheckpointListener#notifyCheckpointComplete ._
> {code:java}
> @Override
> public void notifyCheckpointComplete(long completedCheckpointId) {
> synchronized (materializedSstFiles) {
> if (completedCheckpointId > lastCompletedCheckpointId) {
> materializedSstFiles
> .keySet()
> .removeIf(checkpointId -> checkpointId <
> completedCheckpointId);
> lastCompletedCheckpointId = completedCheckpointId;
> }
> }
> }{code}
>
> This works well without savepoint, but when a savepoint is completed, it will
> clean up the _materializedSstFiles_ of the previous checkpoint. It leads to
> the first checkpoint after the savepoint must upload all files in rocksdb.
> !image-2021-08-25-00-59-05-779.png|width=1188,height=163!
> Solving the problem is also very simple, I propose to clean
> _materializedSstFiles_ and update _lastCompletedCheckpointId_ only when
> {color:#FF0000}_materializedSstFiles.keySet().contains(completedCheckpointId)_{color}
> .
> {code:java}
> @Override
> public void notifyCheckpointComplete(long completedCheckpointId) {
> synchronized (materializedSstFiles) {
> if (completedCheckpointId > lastCompletedCheckpointId
> &&
> materializedSstFiles.keySet().contains(completedCheckpointId)) {
> materializedSstFiles
> .keySet()
> .removeIf(checkpointId -> checkpointId <
> completedCheckpointId);
> lastCompletedCheckpointId = completedCheckpointId;
> }
> }
> }
> {code}
>
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)