[ 
https://issues.apache.org/jira/browse/FLINK-6364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15993340#comment-15993340
 ] 

ASF GitHub Bot commented on FLINK-6364:
---------------------------------------

Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3801#discussion_r114361888
  
    --- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
    @@ -621,6 +692,237 @@ private static void checkInterrupted() throws 
InterruptedException {
                }
        }
     
    +   private static class RocksDBIncrementalSnapshotOperation {
    +
    +           private final RocksDBKeyedStateBackend<?> stateBackend;
    +
    +           private final CheckpointStreamFactory checkpointStreamFactory;
    +
    +           private final long checkpointId;
    +
    +           private final long checkpointTimestamp;
    +
    +           private Map<String, StreamStateHandle> baseSstFiles;
    +
    +           private List<KeyedBackendSerializationProxy.StateMetaInfo<?, 
?>> stateMetaInfos = new ArrayList<>();
    +
    +           private FileSystem backupFileSystem;
    +           private Path backupPath;
    +
    +           private FSDataInputStream inputStream = null;
    +           private CheckpointStreamFactory.CheckpointStateOutputStream 
outputStream = null;
    +
    +           // new sst files since the last completed checkpoint
    +           private Set<String> newSstFileNames = new HashSet<>();
    +
    +           // handles to the sst files in the current snapshot
    +           private Map<String, StreamStateHandle> sstFiles = new 
HashMap<>();
    +
    +           // handles to the misc files in the current snapshot
    +           private Map<String, StreamStateHandle> miscFiles = new 
HashMap<>();
    +
    +           private StreamStateHandle metaStateHandle = null;
    +
    +           private RocksDBIncrementalSnapshotOperation(
    +                           RocksDBKeyedStateBackend<?> stateBackend,
    +                           CheckpointStreamFactory checkpointStreamFactory,
    +                           long checkpointId,
    +                           long checkpointTimestamp) {
    +
    +                   this.stateBackend = stateBackend;
    +                   this.checkpointStreamFactory = checkpointStreamFactory;
    +                   this.checkpointId = checkpointId;
    +                   this.checkpointTimestamp = checkpointTimestamp;
    +           }
    +
    +           private StreamStateHandle materializeStateData(Path filePath) 
throws Exception {
    +                   try {
    +                           final byte[] buffer = new byte[1024];
    +
    +                           FileSystem backupFileSystem = 
backupPath.getFileSystem();
    +                           inputStream = backupFileSystem.open(filePath);
    +                           
stateBackend.cancelStreamRegistry.registerClosable(inputStream);
    +
    +                           outputStream = checkpointStreamFactory
    +                                   
.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
    +                           
stateBackend.cancelStreamRegistry.registerClosable(outputStream);
    +
    +                           while (true) {
    +                                   int numBytes = inputStream.read(buffer);
    +
    +                                   if (numBytes == -1) {
    +                                           break;
    +                                   }
    +
    +                                   outputStream.write(buffer, 0, numBytes);
    +                           }
    +
    +                           return outputStream.closeAndGetHandle();
    --- End diff --
    
    We could set `outputStream` to `null` before returning, to avoid a second 
call to `close()` in the `finally` clause


> Implement incremental checkpointing in RocksDBStateBackend
> ----------------------------------------------------------
>
>                 Key: FLINK-6364
>                 URL: https://issues.apache.org/jira/browse/FLINK-6364
>             Project: Flink
>          Issue Type: Sub-task
>          Components: State Backends, Checkpointing
>            Reporter: Xiaogang Shi
>            Assignee: Xiaogang Shi
>
> {{RocksDBStateBackend}} is well suited for incremental checkpointing because 
> RocksDB is base on LSM trees,  which record updates in new sst files and all 
> sst files are immutable. By only materializing those new sst files, we can 
> significantly improve the performance of checkpointing.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to