Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5582#discussion_r192335886 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -795,20 +806,241 @@ private void restoreInstance( } /** - * Recovery from local incremental state. + * Recovery from multi incremental states. + * In case of rescaling, this method creates a temporary RocksDB instance for a key-groups shard. All contents + * from the temporary instance are copied into the real restore instance and then the temporary instance is + * discarded. */ - private void restoreInstance(IncrementalLocalKeyedStateHandle localKeyedStateHandle) throws Exception { + void restoreFromMultiHandles(Collection<KeyedStateHandle> restoreStateHandles) throws Exception { + + KeyGroupRange targetKeyGroupRange = stateBackend.keyGroupRange; + + chooseTheBestStateHandleToInit(restoreStateHandles, targetKeyGroupRange); + + int targetStartKeyGroup = stateBackend.getKeyGroupRange().getStartKeyGroup(); + byte[] targetStartKeyGroupPrefixBytes = new byte[stateBackend.keyGroupPrefixBytes]; + for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) { + targetStartKeyGroupPrefixBytes[j] = (byte) (targetStartKeyGroup >>> ((stateBackend.keyGroupPrefixBytes - j - 1) * Byte.SIZE)); + } + + for (KeyedStateHandle rawStateHandle : restoreStateHandles) { + + if (!(rawStateHandle instanceof IncrementalKeyedStateHandle)) { + throw new IllegalStateException("Unexpected state handle type, " + + "expected " + IncrementalKeyedStateHandle.class + + ", but found " + rawStateHandle.getClass()); + } + + Path temporaryRestoreInstancePath = new Path(stateBackend.instanceBasePath.getAbsolutePath() + UUID.randomUUID().toString()); + try (RestoredDBInstance tmpRestoreDBInfo = restoreDBFromStateHandle( + (IncrementalKeyedStateHandle) rawStateHandle, + temporaryRestoreInstancePath, + targetKeyGroupRange, + stateBackend.keyGroupPrefixBytes, + false); + RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(stateBackend.db)) { + + List<ColumnFamilyDescriptor> tmpColumnFamilyDescriptors = tmpRestoreDBInfo.columnFamilyDescriptors; + List<ColumnFamilyHandle> tmpColumnFamilyHandles = tmpRestoreDBInfo.columnFamilyHandles; + + int startKeyGroup = stateBackend.getKeyGroupRange().getStartKeyGroup(); + byte[] startKeyGroupPrefixBytes = new byte[stateBackend.keyGroupPrefixBytes]; + for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) { + startKeyGroupPrefixBytes[j] = (byte) (startKeyGroup >>> ((stateBackend.keyGroupPrefixBytes - j - 1) * Byte.SIZE)); + } + + // iterating only the requested descriptors automatically skips the default column family handle + for (int i = 0; i < tmpColumnFamilyDescriptors.size(); ++i) { + ColumnFamilyHandle tmpColumnFamilyHandle = tmpColumnFamilyHandles.get(i); + ColumnFamilyDescriptor tmpColumnFamilyDescriptor = tmpColumnFamilyDescriptors.get(i); + + ColumnFamilyHandle targetColumnFamilyHandle = getOrRegisterColumnFamilyHandle( + tmpColumnFamilyDescriptor, null, tmpRestoreDBInfo.stateMetaInfoSnapshots.get(i)); + + try (RocksIteratorWrapper iterator = getRocksIterator(tmpRestoreDBInfo.db, tmpColumnFamilyHandle)) { + + iterator.seek(startKeyGroupPrefixBytes); + + while (iterator.isValid()) { + + int keyGroup = 0; + for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) { + keyGroup = (keyGroup << Byte.SIZE) + iterator.key()[j]; + } + + if (stateBackend.keyGroupRange.contains(keyGroup)) { + writeBatchWrapper.put(targetColumnFamilyHandle, iterator.key(), iterator.value()); + } else { + // Since the iterator will visit the record according to the sorted order, + // we can just break here. + break; + } + + iterator.next(); + } + } // releases native iterator resources + } + } finally { + FileSystem restoreFileSystem = temporaryRestoreInstancePath.getFileSystem(); + if (restoreFileSystem.exists(temporaryRestoreInstancePath)) { + restoreFileSystem.delete(temporaryRestoreInstancePath, true); + } + } + } + } + + private class RestoredDBInstance implements AutoCloseable { + + @Nonnull + private final RocksDB db; + + @Nonnull + private final ColumnFamilyHandle defaultColumnFamilyHandle; + + @Nonnull + private final List<ColumnFamilyHandle> columnFamilyHandles; + + @Nonnull + private final List<ColumnFamilyDescriptor> columnFamilyDescriptors; + + @Nonnull + private final List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots; + + public RestoredDBInstance(@Nonnull RocksDB db, + @Nonnull List<ColumnFamilyHandle> columnFamilyHandles, + @Nonnull List<ColumnFamilyDescriptor> columnFamilyDescriptors, + @Nonnull List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots) { + this.db = db; + this.columnFamilyHandles = columnFamilyHandles; + this.defaultColumnFamilyHandle = this.columnFamilyHandles.remove(0); + this.columnFamilyDescriptors = columnFamilyDescriptors; + this.stateMetaInfoSnapshots = stateMetaInfoSnapshots; + } + + @Override + public void close() throws Exception { --- End diff -- ð
---