Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5582#discussion_r192072966 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -791,20 +784,215 @@ private void restoreInstance( } /** - * Recovery from local incremental state. + * Recovery from multi incremental states. + * In case of rescaling, this method creates a temporary RocksDB instance for a key-groups shard. All contents + * from the temporary instance are copied into the real restore instance and then the temporary instance is + * discarded. */ - private void restoreInstance(IncrementalLocalKeyedStateHandle localKeyedStateHandle) throws Exception { + void restoreFromMultiHandles(Collection<KeyedStateHandle> restoreStateHandles) throws Exception { + + KeyGroupRange targetKeyGroupRange = stateBackend.keyGroupRange; + + chooseTheBestStateHandleToInit(restoreStateHandles, targetKeyGroupRange); + + int targetStartKeyGroup = stateBackend.getKeyGroupRange().getStartKeyGroup(); + byte[] targetStartKeyGroupPrefixBytes = new byte[stateBackend.keyGroupPrefixBytes]; + for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) { + targetStartKeyGroupPrefixBytes[j] = (byte) (targetStartKeyGroup >>> ((stateBackend.keyGroupPrefixBytes - j - 1) * Byte.SIZE)); + } + + for (KeyedStateHandle rawStateHandle : restoreStateHandles) { + + if (!(rawStateHandle instanceof IncrementalKeyedStateHandle)) { + throw new IllegalStateException("Unexpected state handle type, " + + "expected " + IncrementalKeyedStateHandle.class + + ", but found " + rawStateHandle.getClass()); + } + + Path temporaryRestoreInstancePath = new Path(stateBackend.instanceBasePath.getAbsolutePath() + UUID.randomUUID().toString()); + try (RestoredDBInfo tmpRestoreDBInfo = restoreDBFromStateHandle( + (IncrementalKeyedStateHandle) rawStateHandle, + temporaryRestoreInstancePath, + targetKeyGroupRange, + stateBackend.keyGroupPrefixBytes)) { + + List<ColumnFamilyDescriptor> tmpColumnFamilyDescriptors = tmpRestoreDBInfo.columnFamilyDescriptors; + List<ColumnFamilyHandle> tmpColumnFamilyHandles = tmpRestoreDBInfo.columnFamilyHandles; + + // iterating only the requested descriptors automatically skips the default column family handle + for (int i = 0; i < tmpColumnFamilyDescriptors.size(); ++i) { + ColumnFamilyHandle tmpColumnFamilyHandle = tmpColumnFamilyHandles.get(i); + ColumnFamilyDescriptor tmpColumnFamilyDescriptor = tmpColumnFamilyDescriptors.get(i); + + ColumnFamilyHandle targetColumnFamilyHandle = getOrRegisterColumnFamilyHandle( + tmpColumnFamilyDescriptor, null, tmpRestoreDBInfo.stateMetaInfoSnapshots.get(i)); + + try (RocksIterator iterator = tmpRestoreDBInfo.db.newIterator(tmpColumnFamilyHandle)) { + + iterator.seek(targetStartKeyGroupPrefixBytes); + while (iterator.isValid()) { + // DB has been clip by target key group range, so we do not need to do key rang check in this loop + stateBackend.db.put(targetColumnFamilyHandle, iterator.key(), iterator.value()); + iterator.next(); + } + } // releases native iterator resources + } + } finally { + FileSystem restoreFileSystem = temporaryRestoreInstancePath.getFileSystem(); + if (restoreFileSystem.exists(temporaryRestoreInstancePath)) { + restoreFileSystem.delete(temporaryRestoreInstancePath, true); + } + } + } + } + + private class RestoredDBInfo implements AutoCloseable { + private RocksDB db; --- End diff -- All fields could be `final` and `NonNull` annotated.
---