[ 
https://issues.apache.org/jira/browse/FLINK-6364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15992542#comment-15992542
 ] 

ASF GitHub Bot commented on FLINK-6364:
---------------------------------------

Github user gyfora commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3801#discussion_r114270508
  
    --- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
    @@ -621,6 +692,237 @@ private static void checkInterrupted() throws 
InterruptedException {
                }
        }
     
    +   private static class RocksDBIncrementalSnapshotOperation {
    +
    +           private final RocksDBKeyedStateBackend<?> stateBackend;
    +
    +           private final CheckpointStreamFactory checkpointStreamFactory;
    +
    +           private final long checkpointId;
    +
    +           private final long checkpointTimestamp;
    +
    +           private Map<String, StreamStateHandle> baseSstFiles;
    +
    +           private List<KeyedBackendSerializationProxy.StateMetaInfo<?, 
?>> stateMetaInfos = new ArrayList<>();
    +
    +           private FileSystem backupFileSystem;
    +           private Path backupPath;
    +
    +           private FSDataInputStream inputStream = null;
    +           private CheckpointStreamFactory.CheckpointStateOutputStream 
outputStream = null;
    +
    +           // new sst files since the last completed checkpoint
    +           private Set<String> newSstFileNames = new HashSet<>();
    +
    +           // handles to the sst files in the current snapshot
    +           private Map<String, StreamStateHandle> sstFiles = new 
HashMap<>();
    +
    +           // handles to the misc files in the current snapshot
    +           private Map<String, StreamStateHandle> miscFiles = new 
HashMap<>();
    +
    +           private StreamStateHandle metaStateHandle = null;
    +
    +           private RocksDBIncrementalSnapshotOperation(
    +                           RocksDBKeyedStateBackend<?> stateBackend,
    +                           CheckpointStreamFactory checkpointStreamFactory,
    +                           long checkpointId,
    +                           long checkpointTimestamp) {
    +
    +                   this.stateBackend = stateBackend;
    +                   this.checkpointStreamFactory = checkpointStreamFactory;
    +                   this.checkpointId = checkpointId;
    +                   this.checkpointTimestamp = checkpointTimestamp;
    +           }
    +
    +           private StreamStateHandle materializeStateData(Path filePath) 
throws Exception {
    +                   try {
    +                           final byte[] buffer = new byte[1024];
    +
    +                           FileSystem backupFileSystem = 
backupPath.getFileSystem();
    +                           inputStream = backupFileSystem.open(filePath);
    +                           
stateBackend.cancelStreamRegistry.registerClosable(inputStream);
    +
    +                           outputStream = checkpointStreamFactory
    +                                   
.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
    +                           
stateBackend.cancelStreamRegistry.registerClosable(outputStream);
    +
    +                           while (true) {
    +                                   int numBytes = inputStream.read(buffer);
    +
    +                                   if (numBytes == -1) {
    +                                           break;
    +                                   }
    +
    +                                   outputStream.write(buffer, 0, numBytes);
    +                           }
    +
    +                           return outputStream.closeAndGetHandle();
    +                   } finally {
    +                           if (inputStream != null) {
    +                                   
stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
    +                                   inputStream.close();
    +                                   inputStream = null;
    +                           }
    +
    +                           if (outputStream != null) {
    +                                   
stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
    +                                   outputStream.close();
    +                                   outputStream = null;
    +                           }
    +                   }
    +           }
    +
    +           private StreamStateHandle materializeMetaData() throws 
Exception {
    +                   try {
    +                           outputStream = checkpointStreamFactory
    +                                   
.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
    +                           
stateBackend.cancelStreamRegistry.registerClosable(outputStream);
    +
    +                           KeyedBackendSerializationProxy 
serializationProxy =
    +                                   new 
KeyedBackendSerializationProxy(stateBackend.keySerializer, stateMetaInfos);
    +                           DataOutputView out = new 
DataOutputViewStreamWrapper(outputStream);
    +
    +                           serializationProxy.write(out);
    +
    +                           return outputStream.closeAndGetHandle();
    +                   } finally {
    +                           if (outputStream != null) {
    +                                   
stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
    +                                   outputStream.close();
    +                                   outputStream = null;
    +                           }
    +                   }
    +           }
    +
    +           void takeSnapshot() throws Exception {
    +                   // use the last completed checkpoint as the comparison 
base.
    +                   baseSstFiles = 
stateBackend.materializedSstFiles.get(stateBackend.lastCompletedCheckpointId);
    +
    +                   // save meta data
    +                   for (Map.Entry<String, Tuple2<ColumnFamilyHandle, 
RegisteredBackendStateMetaInfo<?, ?>>> stateMetaInfoEntry : 
stateBackend.kvStateInformation.entrySet()) {
    +
    +                           RegisteredBackendStateMetaInfo<?, ?> metaInfo = 
stateMetaInfoEntry.getValue().f1;
    +
    +                           KeyedBackendSerializationProxy.StateMetaInfo<?, 
?> metaInfoProxy =
    +                                   new 
KeyedBackendSerializationProxy.StateMetaInfo<>(
    +                                           metaInfo.getStateType(),
    +                                           metaInfo.getName(),
    +                                           
metaInfo.getNamespaceSerializer(),
    +                                           metaInfo.getStateSerializer());
    +
    +                           stateMetaInfos.add(metaInfoProxy);
    +                   }
    +
    +                   // save state data
    +                   backupPath = new 
Path(stateBackend.instanceBasePath.getAbsolutePath(), "chk-" + checkpointId);
    +                   backupFileSystem = backupPath.getFileSystem();
    +                   if (backupFileSystem.exists(backupPath)) {
    +                           LOG.warn("Deleting an existing local checkpoint 
directory " +
    +                                   backupPath + ".");
    +
    +                           backupFileSystem.delete(backupPath, true);
    +                   }
    +
    +                   // create hard links of living files in the checkpoint 
path
    +                   Checkpoint checkpoint = 
Checkpoint.create(stateBackend.db);
    +                   checkpoint.createCheckpoint(backupPath.getPath());
    --- End diff --
    
    I guess this will also create the sst file(s) for the current changes (that 
are in the log). I wonder if this will lead to too many sst files for small 
snapshot intervals after a while.


> Implement incremental checkpointing in RocksDBStateBackend
> ----------------------------------------------------------
>
>                 Key: FLINK-6364
>                 URL: https://issues.apache.org/jira/browse/FLINK-6364
>             Project: Flink
>          Issue Type: Sub-task
>          Components: State Backends, Checkpointing
>            Reporter: Xiaogang Shi
>            Assignee: Xiaogang Shi
>
> {{RocksDBStateBackend}} is well suited for incremental checkpointing because 
> RocksDB is base on LSM trees,  which record updates in new sst files and all 
> sst files are immutable. By only materializing those new sst files, we can 
> significantly improve the performance of checkpointing.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to