[ 
https://issues.apache.org/jira/browse/FLINK-23949?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-23949:
-----------------------------------
    Labels: pull-request-available  (was: )

> first incremental checkpoint after a savepoint will degenerate into a full 
> checkpoint
> -------------------------------------------------------------------------------------
>
>                 Key: FLINK-23949
>                 URL: https://issues.apache.org/jira/browse/FLINK-23949
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / State Backends
>    Affects Versions: 1.11.4, 1.12.5, 1.13.2
>            Reporter: Feifan Wang
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: image-2021-08-25-00-59-05-779.png
>
>
> In RocksIncrementalSnapshotStrategy we will record the uploaded rocksdb files 
> corresponding to the checkpoint id,and clean it in 
> _CheckpointListener#notifyCheckpointComplete ._
> {code:java}
> @Override
> public void notifyCheckpointComplete(long completedCheckpointId) {
>     synchronized (materializedSstFiles) {
>         if (completedCheckpointId > lastCompletedCheckpointId) {
>             materializedSstFiles
>                     .keySet()
>                     .removeIf(checkpointId -> checkpointId < 
> completedCheckpointId);
>             lastCompletedCheckpointId = completedCheckpointId;
>         }
>     }
> }{code}
>  
> This works well without savepoint, but when a savepoint is completed, it will 
> clean up the _materializedSstFiles_ of the previous checkpoint. It leads to 
> the first checkpoint after the savepoint must upload all files in rocksdb.
> !image-2021-08-25-00-59-05-779.png|width=1188,height=163!
> Solving the problem is also very simple, I propose to clean 
> _materializedSstFiles_ and update  _lastCompletedCheckpointId_ only when 
> {color:#FF0000}_materializedSstFiles.keySet().contains(completedCheckpointId)_{color}
>  .
> {code:java}
> @Override
> public void notifyCheckpointComplete(long completedCheckpointId) {
>     synchronized (materializedSstFiles) {
>         if (completedCheckpointId > lastCompletedCheckpointId
>                 && 
> materializedSstFiles.keySet().contains(completedCheckpointId)) {
>             materializedSstFiles
>                     .keySet()
>                     .removeIf(checkpointId -> checkpointId < 
> completedCheckpointId);
>             lastCompletedCheckpointId = completedCheckpointId;
>         }
>     }
> }
> {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to