Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3801#discussion_r114365154
  
    --- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
    @@ -808,6 +1143,240 @@ private void restoreKVStateData() throws 
IOException, RocksDBException {
                }
        }
     
    +   private static class RocksDBIncrementalRestoreOperation {
    +
    +           private final RocksDBKeyedStateBackend<?> stateBackend;
    +
    +           private 
RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend<?> stateBackend) {
    +                   this.stateBackend = stateBackend;
    +           }
    +
    +           private List<KeyedBackendSerializationProxy.StateMetaInfo<?, 
?>> readMetaData(
    +                           StreamStateHandle metaStateHandle) throws 
Exception {
    +
    +                   FSDataInputStream inputStream = null;
    +
    +                   try {
    +                           inputStream = metaStateHandle.openInputStream();
    +                           
stateBackend.cancelStreamRegistry.registerClosable(inputStream);
    +
    +                           KeyedBackendSerializationProxy 
serializationProxy =
    +                                   new 
KeyedBackendSerializationProxy(stateBackend.userCodeClassLoader);
    +                           DataInputView in = new 
DataInputViewStreamWrapper(inputStream);
    +                           serializationProxy.read(in);
    +
    +                           return 
serializationProxy.getNamedStateSerializationProxies();
    +                   } finally {
    +                           if (inputStream != null) {
    +                                   
stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
    +                                   inputStream.close();
    +                           }
    +                   }
    +           }
    +
    +           private void readStateData(
    +                           Path restoreFilePath,
    +                           StreamStateHandle remoteFileHandle) throws 
IOException {
    +
    +                   FileSystem restoreFileSystem = 
restoreFilePath.getFileSystem();
    +
    +                   FSDataInputStream inputStream = null;
    +                   FSDataOutputStream outputStream = null;
    +
    +                   try {
    +                           inputStream = 
remoteFileHandle.openInputStream();
    +                           
stateBackend.cancelStreamRegistry.registerClosable(inputStream);
    +
    +                           outputStream = 
restoreFileSystem.create(restoreFilePath, FileSystem.WriteMode.OVERWRITE);
    +                           
stateBackend.cancelStreamRegistry.registerClosable(outputStream);
    +
    +                           byte[] buffer = new byte[1024];
    +                           while (true) {
    +                                   int numBytes = inputStream.read(buffer);
    +                                   if (numBytes == -1) {
    +                                           break;
    +                                   }
    +
    +                                   outputStream.write(buffer, 0, numBytes);
    +                           }
    +                   } finally {
    +                           if (inputStream != null) {
    +                                   
stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
    +                                   inputStream.close();
    +                           }
    +
    +                           if (outputStream != null) {
    +                                   
stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
    +                                   outputStream.close();
    +                           }
    +                   }
    +           }
    +
    +           private void restoreInstance(
    +                           RocksDBKeyedStateHandle restoreStateHandle,
    +                           boolean hasExtraKeys) throws Exception {
    +
    +                   // read state data
    +                   Path restoreInstancePath = new Path(
    +                           stateBackend.instanceBasePath.getAbsolutePath(),
    +                           UUID.randomUUID().toString());
    +
    +                   try {
    +                           Map<String, StreamStateHandle> sstFiles = 
restoreStateHandle.getSstFiles();
    +                           for (Map.Entry<String, StreamStateHandle> 
sstFileEntry : sstFiles.entrySet()) {
    +                                   String fileName = sstFileEntry.getKey();
    +                                   StreamStateHandle remoteFileHandle = 
sstFileEntry.getValue();
    +
    +                                   readStateData(new 
Path(restoreInstancePath, fileName), remoteFileHandle);
    +                           }
    +
    +                           Map<String, StreamStateHandle> miscFiles = 
restoreStateHandle.getMiscFiles();
    +                           for (Map.Entry<String, StreamStateHandle> 
miscFileEntry : miscFiles.entrySet()) {
    +                                   String fileName = 
miscFileEntry.getKey();
    +                                   StreamStateHandle remoteFileHandle = 
miscFileEntry.getValue();
    +
    +                                   readStateData(new 
Path(restoreInstancePath, fileName), remoteFileHandle);
    +                           }
    +
    +                           // read meta data
    +                           
List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> stateMetaInfoProxies =
    +                                   
readMetaData(restoreStateHandle.getMetaStateHandle());
    +
    +                           List<ColumnFamilyDescriptor> 
columnFamilyDescriptors = new ArrayList<>();
    +                           columnFamilyDescriptors.add(new 
ColumnFamilyDescriptor("default".getBytes(ConfigConstants.DEFAULT_CHARSET)));
    +
    +                           for 
(KeyedBackendSerializationProxy.StateMetaInfo<?, ?> stateMetaInfoProxy : 
stateMetaInfoProxies) {
    +
    +                                   ColumnFamilyDescriptor 
columnFamilyDescriptor = new ColumnFamilyDescriptor(
    +                                           
stateMetaInfoProxy.getStateName().getBytes(ConfigConstants.DEFAULT_CHARSET),
    +                                           stateBackend.columnOptions);
    +
    +                                   
columnFamilyDescriptors.add(columnFamilyDescriptor);
    +                           }
    +
    +                           if (hasExtraKeys) {
    +
    +                                   List<ColumnFamilyHandle> 
columnFamilyHandles = new ArrayList<>();
    +
    +                                   RocksDB restoreDb = RocksDB.open(
    +                                           stateBackend.dbOptions,
    +                                           restoreInstancePath.getPath(),
    +                                           columnFamilyDescriptors,
    +                                           columnFamilyHandles);
    +
    +                                   for (int i = 1; i < 
columnFamilyHandles.size(); ++i) {
    +                                           ColumnFamilyHandle 
columnFamilyHandle = columnFamilyHandles.get(i);
    +                                           ColumnFamilyDescriptor 
columnFamilyDescriptor = columnFamilyDescriptors.get(i);
    +                                           
KeyedBackendSerializationProxy.StateMetaInfo<?, ?> stateMetaInfoProxy = 
stateMetaInfoProxies.get(i - 1);
    +
    +                                           Tuple2<ColumnFamilyHandle, 
RegisteredBackendStateMetaInfo<?, ?>> registeredStateMetaInfoEntry =
    +                                                   
stateBackend.kvStateInformation.get(stateMetaInfoProxy.getStateName());
    +
    +                                           if (null == 
registeredStateMetaInfoEntry) {
    +
    +                                                   
RegisteredBackendStateMetaInfo<?, ?> stateMetaInfo = new 
RegisteredBackendStateMetaInfo<>(stateMetaInfoProxy);
    +
    +                                                   
registeredStateMetaInfoEntry =
    +                                                           new 
Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?, ?>>(
    +                                                                   
stateBackend.db.createColumnFamily(columnFamilyDescriptor),
    +                                                                   
stateMetaInfo);
    +
    +                                                   
stateBackend.kvStateInformation.put(stateMetaInfoProxy.getStateName(), 
registeredStateMetaInfoEntry);
    +                                           }
    +
    +                                           ColumnFamilyHandle 
targetColumnFamilyHandle = registeredStateMetaInfoEntry.f0;
    +
    +                                           try (RocksIterator iterator = 
restoreDb.newIterator(columnFamilyHandle)) {
    +
    +                                                   iterator.seekToFirst();
    --- End diff --
    
    Instead of `seekToFirst`, can we not seek to the first key-group in the 
backend's range (via `seek(keygroupPrefixBytes)` to potentially save some 
entries?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to