This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5495a96a38663dc04753cb6db0c100aa2bc7d297
Author: Stefan Richter <srich...@confluent.io>
AuthorDate: Fri Jan 5 15:32:32 2024 +0100

    [FLINK-31238] Use IngestDB to speed up Rocksdb rescaling recovery (part 2)
---
 .../state/AbstractKeyedStateBackendBuilder.java    |   2 +-
 .../state/EmbeddedRocksDBStateBackend.java         |  43 +-
 .../state/RocksDBConfigurableOptions.java          |  16 +-
 .../state/RocksDBIncrementalCheckpointUtils.java   | 307 ++++++-
 .../streaming/state/RocksDBKeyedStateBackend.java  |  13 +-
 .../state/RocksDBKeyedStateBackendBuilder.java     |  20 +-
 .../streaming/state/RocksDBOperationUtils.java     |  62 +-
 .../streaming/state/RocksDBStateDownloader.java    |   4 +-
 .../state/restore/RocksDBFullRestoreOperation.java |   1 +
 .../streaming/state/restore/RocksDBHandle.java     |  24 +-
 .../RocksDBHeapTimersFullRestoreOperation.java     |   1 +
 .../RocksDBIncrementalRestoreOperation.java        | 878 ++++++++++++++-------
 .../state/restore/RocksDBNoneRestoreOperation.java |   1 +
 .../state/restore/RocksDBRestoreResult.java        |  14 +-
 .../state/EmbeddedRocksDBStateBackendTest.java     |  49 +-
 .../streaming/state/RocksDBRecoveryTest.java       | 379 +++++++++
 .../state/RocksDBStateBackendConfigTest.java       |  43 +
 .../contrib/streaming/state/RocksDBTestUtils.java  |  35 +-
 .../RocksIncrementalCheckpointRescalingTest.java   |   3 +-
 .../test/checkpointing/AutoRescalingITCase.java    |  14 +-
 20 files changed, 1482 insertions(+), 427 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackendBuilder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackendBuilder.java
index 00da919b381..c8fe9ef4652 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackendBuilder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackendBuilder.java
@@ -34,7 +34,7 @@ import java.util.Collection;
 
 /** An abstract base implementation of the {@link StateBackendBuilder} 
interface. */
 public abstract class AbstractKeyedStateBackendBuilder<K>
-        implements StateBackendBuilder<AbstractKeyedStateBackend, 
BackendBuildingException> {
+        implements StateBackendBuilder<AbstractKeyedStateBackend<K>, 
BackendBuildingException> {
     protected final Logger logger = LoggerFactory.getLogger(getClass());
 
     protected final TaskKvStateRegistry kvStateRegistry;
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
index 5fa5c68acf1..0c5e748772e 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
@@ -70,6 +70,7 @@ import java.util.UUID;
 import java.util.function.Supplier;
 
 import static org.apache.flink.configuration.description.TextElement.text;
+import static 
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE;
 import static 
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.RESTORE_OVERLAP_FRACTION_THRESHOLD;
 import static 
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.USE_INGEST_DB_RESTORE_MODE;
 import static 
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.WRITE_BATCH_SIZE;
@@ -108,8 +109,6 @@ public class EmbeddedRocksDBStateBackend extends 
AbstractManagedMemoryStateBacke
 
     private static final double UNDEFINED_OVERLAP_FRACTION_THRESHOLD = -1;
 
-    private static final boolean UNDEFINED_USE_INGEST_DB_RESTORE_MODE = false;
-
     // ------------------------------------------------------------------------
 
     // -- configuration values, set in the application / configuration
@@ -171,9 +170,19 @@ public class EmbeddedRocksDBStateBackend extends 
AbstractManagedMemoryStateBacke
      * The threshold of the overlap fraction between the handle's key-group 
range and target
      * key-group range.
      */
-    private double overlapFractionThreshold;
+    private final double overlapFractionThreshold;
+
+    /**
+     * Whether we use the optimized Ingest/Clip DB method for rescaling 
RocksDB incremental
+     * checkpoints.
+     */
+    private final TernaryBoolean useIngestDbRestoreMode;
 
-    private boolean useIngestDbRestoreMode;
+    /**
+     * Whether we trigger an async compaction after restores for which we 
detect state in the
+     * database (including tombstones) that exceed the proclaimed key-groups 
range of the backend.
+     */
+    private final TernaryBoolean incrementalRestoreAsyncCompactAfterRescale;
 
     /** Factory for Write Buffer Manager and Block Cache. */
     private RocksDBMemoryFactory rocksDBMemoryFactory;
@@ -207,7 +216,8 @@ public class EmbeddedRocksDBStateBackend extends 
AbstractManagedMemoryStateBacke
         this.overlapFractionThreshold = UNDEFINED_OVERLAP_FRACTION_THRESHOLD;
         this.rocksDBMemoryFactory = RocksDBMemoryFactory.DEFAULT;
         this.priorityQueueConfig = new RocksDBPriorityQueueConfig();
-        this.useIngestDbRestoreMode = UNDEFINED_USE_INGEST_DB_RESTORE_MODE;
+        this.useIngestDbRestoreMode = TernaryBoolean.UNDEFINED;
+        this.incrementalRestoreAsyncCompactAfterRescale = 
TernaryBoolean.UNDEFINED;
     }
 
     /**
@@ -302,10 +312,16 @@ public class EmbeddedRocksDBStateBackend extends 
AbstractManagedMemoryStateBacke
                 overlapFractionThreshold >= 0 && this.overlapFractionThreshold 
<= 1,
                 "Overlap fraction threshold of restoring should be between 0 
and 1");
 
+        incrementalRestoreAsyncCompactAfterRescale =
+                original.incrementalRestoreAsyncCompactAfterRescale == 
TernaryBoolean.UNDEFINED
+                        ? TernaryBoolean.fromBoxedBoolean(
+                                
config.get(INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE))
+                        : original.incrementalRestoreAsyncCompactAfterRescale;
+
         useIngestDbRestoreMode =
-                original.useIngestDbRestoreMode == 
UNDEFINED_USE_INGEST_DB_RESTORE_MODE
-                        ? config.get(USE_INGEST_DB_RESTORE_MODE)
-                        : original.useIngestDbRestoreMode;
+                original.useIngestDbRestoreMode == TernaryBoolean.UNDEFINED
+                        ? 
TernaryBoolean.fromBoxedBoolean(config.get(USE_INGEST_DB_RESTORE_MODE))
+                        : 
TernaryBoolean.fromBoolean(original.getUseIngestDbRestoreMode());
 
         this.rocksDBMemoryFactory = original.rocksDBMemoryFactory;
     }
@@ -478,6 +494,8 @@ public class EmbeddedRocksDBStateBackend extends 
AbstractManagedMemoryStateBacke
                                 
resourceContainer.getMemoryWatcherOptions(nativeMetricOptions))
                         .setWriteBatchSize(getWriteBatchSize())
                         
.setOverlapFractionThreshold(getOverlapFractionThreshold())
+                        .setIncrementalRestoreAsyncCompactAfterRescale(
+                                
getIncrementalRestoreAsyncCompactAfterRescale())
                         
.setUseIngestDbRestoreMode(getUseIngestDbRestoreMode());
         return builder.build();
     }
@@ -817,10 +835,13 @@ public class EmbeddedRocksDBStateBackend extends 
AbstractManagedMemoryStateBacke
                 : overlapFractionThreshold;
     }
 
+    boolean getIncrementalRestoreAsyncCompactAfterRescale() {
+        return incrementalRestoreAsyncCompactAfterRescale.getOrDefault(
+                
INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE.defaultValue());
+    }
+
     boolean getUseIngestDbRestoreMode() {
-        return useIngestDbRestoreMode == UNDEFINED_USE_INGEST_DB_RESTORE_MODE
-                ? USE_INGEST_DB_RESTORE_MODE.defaultValue()
-                : useIngestDbRestoreMode;
+        return 
useIngestDbRestoreMode.getOrDefault(USE_INGEST_DB_RESTORE_MODE.defaultValue());
     }
 
     // ------------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java
index 255182f3502..8acf8af11fa 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java
@@ -312,9 +312,17 @@ public class RocksDBConfigurableOptions implements 
Serializable {
     public static final ConfigOption<Boolean> USE_INGEST_DB_RESTORE_MODE =
             key("state.backend.rocksdb.use-ingest-db-restore-mode")
                     .booleanType()
-                    .defaultValue(false)
+                    .defaultValue(Boolean.FALSE)
+                    .withDescription(
+                            "A recovery mode that directly clips and ingests 
multiple DBs during state recovery if the keys"
+                                    + " in the SST files does not exceed the 
declared key-group range.");
+
+    public static final ConfigOption<Boolean> 
INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE =
+            
key("state.backend.rocksdb.incremental-restore-async-compact-after-rescale")
+                    .booleanType()
+                    .defaultValue(Boolean.FALSE)
                     .withDescription(
-                            "A recovery mode that directly clips and ingests 
multiple DBs during state recovery. ");
+                            "If true, an async compaction of RocksDB is 
started after every restore after which we detect keys (including tombstones) 
in the database that are outside the key-groups range of the backend.");
 
     static final ConfigOption<?>[] CANDIDATE_CONFIGS =
             new ConfigOption<?>[] {
@@ -341,7 +349,9 @@ public class RocksDBConfigurableOptions implements 
Serializable {
                 USE_BLOOM_FILTER,
                 BLOOM_FILTER_BITS_PER_KEY,
                 BLOOM_FILTER_BLOCK_BASED_MODE,
-                RESTORE_OVERLAP_FRACTION_THRESHOLD
+                RESTORE_OVERLAP_FRACTION_THRESHOLD,
+                USE_INGEST_DB_RESTORE_MODE,
+                INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE
             };
 
     private static final Set<ConfigOption<?>> POSITIVE_INT_CONFIG_SET =
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
index 4fed7e72e4d..2e7342d4e0f 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
@@ -21,11 +21,17 @@ import 
org.apache.flink.runtime.state.CompositeKeySerializationUtils;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
-import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.RunnableWithException;
+
+import 
org.apache.flink.shaded.guava31.com.google.common.primitives.UnsignedBytes;
 
 import org.rocksdb.Checkpoint;
 import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.CompactRangeOptions;
 import org.rocksdb.ExportImportFilesMetaData;
+import org.rocksdb.LiveFileMetaData;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
 
@@ -33,18 +39,22 @@ import javax.annotation.Nonnegative;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
+import java.io.File;
 import java.nio.file.Path;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Comparator;
-import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.UUID;
 
 /** Utils for RocksDB Incremental Checkpoint. */
 public class RocksDBIncrementalCheckpointUtils {
+
     /**
-     * Evaluates state handle's "score" regarding to the target range when 
choosing the best state
+     * Evaluates state handle's "score" regarding the target range when 
choosing the best state
      * handle to init the initial db for recovery, if the overlap fraction is 
less than
      * overlapFractionThreshold, then just return {@code Score.MIN} to mean 
the handle has no chance
      * to be the initial handle.
@@ -93,9 +103,11 @@ public class RocksDBIncrementalCheckpointUtils {
         }
 
         @Override
-        public int compareTo(@Nonnull Score other) {
-            return Comparator.comparing(Score::getIntersectGroupRange)
-                    .thenComparing(Score::getOverlapFraction)
+        public int compareTo(@Nullable Score other) {
+            return Comparator.nullsFirst(
+                            Comparator.comparing(Score::getIntersectGroupRange)
+                                    
.thenComparing(Score::getIntersectGroupRange)
+                                    .thenComparing(Score::getOverlapFraction))
                     .compare(this, other);
         }
     }
@@ -163,51 +175,187 @@ public class RocksDBIncrementalCheckpointUtils {
     }
 
     /**
-     * Clip the entries in the CF according to the range [begin_key, end_key). 
Any entries outside
-     * this range will be completely deleted (including tombstones).
+     * Returns true, if all entries in the sst files of the given DB is 
strictly within the expected
+     * key-group range for the DB.
      *
-     * @param db the target need to be clipped.
-     * @param columnFamilyHandles the column family need to be clipped.
-     * @param beginKeyBytes the begin key bytes
-     * @param endKeyBytes the end key bytes
+     * @param db the DB to check.
+     * @param dbExpectedKeyGroupRange the expected key-groups range of the DB.
+     * @param keyGroupPrefixBytes the number of bytes used to serialize the 
key-group prefix of keys
+     *     in the DB.
+     */
+    public static boolean isSstDataInKeyGroupRange(
+            RocksDB db, int keyGroupPrefixBytes, KeyGroupRange 
dbExpectedKeyGroupRange) {
+        return checkSstDataAgainstKeyGroupRange(db, keyGroupPrefixBytes, 
dbExpectedKeyGroupRange)
+                .allInRange();
+    }
+
+    /**
+     * Returns a range compaction task as runnable if any data in the SST 
files of the given DB
+     * exceeds the proclaimed key-group range.
+     *
+     * @param db the DB to check and compact if needed.
+     * @param columnFamilyHandles list of column families to check.
+     * @param keyGroupPrefixBytes the number of bytes used to serialize the 
key-group prefix of keys
+     *     in the DB.
+     * @param dbExpectedKeyGroupRange the expected key-groups range of the DB.
+     * @return runnable that performs compaction upon execution if the 
key-groups range is exceeded.
+     *     Otherwise, empty optional is returned.
      */
-    public static void clipColumnFamilies(
+    public static Optional<RunnableWithException> 
createRangeCompactionTaskIfNeeded(
             RocksDB db,
-            List<ColumnFamilyHandle> columnFamilyHandles,
-            byte[] beginKeyBytes,
-            byte[] endKeyBytes)
-            throws RocksDBException {
+            Collection<ColumnFamilyHandle> columnFamilyHandles,
+            int keyGroupPrefixBytes,
+            KeyGroupRange dbExpectedKeyGroupRange) {
 
-        for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) {
-            db.clipColumnFamily(columnFamilyHandle, beginKeyBytes, 
endKeyBytes);
+        RangeCheckResult rangeCheckResult =
+                checkSstDataAgainstKeyGroupRange(db, keyGroupPrefixBytes, 
dbExpectedKeyGroupRange);
+
+        if (rangeCheckResult.allInRange()) {
+            // No keys exceed the proclaimed range of the backend, so we don't 
need a compaction
+            // from this point of view.
+            return Optional.empty();
+        }
+
+        return Optional.of(
+                () -> {
+                    try (CompactRangeOptions compactionOptions =
+                            new CompactRangeOptions()
+                                    .setExclusiveManualCompaction(true)
+                                    .setBottommostLevelCompaction(
+                                            
CompactRangeOptions.BottommostLevelCompaction
+                                                    .kForceOptimized)) {
+
+                        if (!rangeCheckResult.leftInRange) {
+                            // Compact all keys before from the expected 
key-groups range
+                            for (ColumnFamilyHandle columnFamilyHandle : 
columnFamilyHandles) {
+                                db.compactRange(
+                                        columnFamilyHandle,
+                                        // TODO: change to null once this API 
is fixed
+                                        new byte[] {},
+                                        rangeCheckResult.minKey,
+                                        compactionOptions);
+                            }
+                        }
+
+                        if (!rangeCheckResult.rightInRange) {
+                            // Compact all keys after the expected key-groups 
range
+                            for (ColumnFamilyHandle columnFamilyHandle : 
columnFamilyHandles) {
+                                db.compactRange(
+                                        columnFamilyHandle,
+                                        rangeCheckResult.maxKey,
+                                        // TODO: change to null once this API 
is fixed
+                                        new byte[] {
+                                            (byte) 0xff, (byte) 0xff, (byte) 
0xff, (byte) 0xff
+                                        },
+                                        compactionOptions);
+                            }
+                        }
+                    }
+                });
+    }
+
+    /**
+     * Checks data in the SST files of the given DB for keys that exceed 
either the lower and upper
+     * bound of the proclaimed key-groups range of the DB.
+     *
+     * @param db the DB to check.
+     * @param keyGroupPrefixBytes the number of bytes used to serialize the 
key-group prefix of keys
+     *     in the DB.
+     * @param dbExpectedKeyGroupRange the expected key-groups range of the DB.
+     * @return the check result with detailed info about lower and upper bound 
violations.
+     */
+    private static RangeCheckResult checkSstDataAgainstKeyGroupRange(
+            RocksDB db, int keyGroupPrefixBytes, KeyGroupRange 
dbExpectedKeyGroupRange) {
+        final byte[] beginKeyGroupBytes = new byte[keyGroupPrefixBytes];
+        final byte[] endKeyGroupBytes = new byte[keyGroupPrefixBytes];
+
+        CompositeKeySerializationUtils.serializeKeyGroup(
+                dbExpectedKeyGroupRange.getStartKeyGroup(), 
beginKeyGroupBytes);
+
+        CompositeKeySerializationUtils.serializeKeyGroup(
+                dbExpectedKeyGroupRange.getEndKeyGroup() + 1, 
endKeyGroupBytes);
+
+        KeyRange dbKeyRange = getDBKeyRange(db);
+        Comparator<byte[]> comparator = 
UnsignedBytes.lexicographicalComparator();
+        return RangeCheckResult.of(
+                comparator.compare(dbKeyRange.minKey, beginKeyGroupBytes) >= 0,
+                comparator.compare(dbKeyRange.maxKey, endKeyGroupBytes) < 0,
+                beginKeyGroupBytes,
+                endKeyGroupBytes);
+    }
+
+    /** Returns a pair of minimum and maximum key in the sst files of the 
given database. */
+    private static KeyRange getDBKeyRange(RocksDB db) {
+        final Comparator<byte[]> comparator = 
UnsignedBytes.lexicographicalComparator();
+        final List<LiveFileMetaData> liveFilesMetaData = 
db.getLiveFilesMetaData();
+
+        if (liveFilesMetaData.isEmpty()) {
+            return KeyRange.EMPTY;
+        }
+
+        Iterator<LiveFileMetaData> liveFileMetaDataIterator = 
liveFilesMetaData.iterator();
+        LiveFileMetaData fileMetaData = liveFileMetaDataIterator.next();
+        byte[] smallestKey = fileMetaData.smallestKey();
+        byte[] largestKey = fileMetaData.largestKey();
+        while (liveFileMetaDataIterator.hasNext()) {
+            fileMetaData = liveFileMetaDataIterator.next();
+            byte[] sstSmallestKey = fileMetaData.smallestKey();
+            byte[] sstLargestKey = fileMetaData.largestKey();
+            if (comparator.compare(sstSmallestKey, smallestKey) < 0) {
+                smallestKey = sstSmallestKey;
+            }
+            if (comparator.compare(sstLargestKey, largestKey) > 0) {
+                largestKey = sstLargestKey;
+            }
         }
+        return KeyRange.of(smallestKey, largestKey);
     }
 
-    public static Map<RegisteredStateMetaInfoBase, ExportImportFilesMetaData> 
exportColumnFamilies(
+    /**
+     * Exports the data of the given column families in the given DB.
+     *
+     * @param db the DB to export from.
+     * @param columnFamilyHandles the column families to export.
+     * @param registeredStateMetaInfoBases meta information about the 
registered states in the DB.
+     * @param exportBasePath the path to which the export files go.
+     * @param resultOutput output parameter for the metadata of the export.
+     * @throws RocksDBException on problems inside RocksDB.
+     */
+    public static void exportColumnFamilies(
             RocksDB db,
             List<ColumnFamilyHandle> columnFamilyHandles,
-            List<StateMetaInfoSnapshot> stateMetaInfoSnapshots,
-            Path exportBasePath)
+            List<RegisteredStateMetaInfoBase> registeredStateMetaInfoBases,
+            Path exportBasePath,
+            Map<RegisteredStateMetaInfoBase, List<ExportImportFilesMetaData>> 
resultOutput)
             throws RocksDBException {
 
-        HashMap<RegisteredStateMetaInfoBase, ExportImportFilesMetaData> 
cfMetaInfoAndData =
-                new HashMap<>();
+        Preconditions.checkArgument(
+                columnFamilyHandles.size() == 
registeredStateMetaInfoBases.size(),
+                "Lists are aligned by index and must be of the same size!");
+
         try (final Checkpoint checkpoint = Checkpoint.create(db)) {
             for (int i = 0; i < columnFamilyHandles.size(); i++) {
-                StateMetaInfoSnapshot metaInfoSnapshot = 
stateMetaInfoSnapshots.get(i);
-
-                RegisteredStateMetaInfoBase stateMetaInfo =
-                        
RegisteredStateMetaInfoBase.fromMetaInfoSnapshot(metaInfoSnapshot);
+                RegisteredStateMetaInfoBase stateMetaInfo = 
registeredStateMetaInfoBases.get(i);
 
-                ExportImportFilesMetaData cfMetaData =
+                Path subPath = 
exportBasePath.resolve(UUID.randomUUID().toString());
+                ExportImportFilesMetaData exportedColumnFamilyMetaData =
                         checkpoint.exportColumnFamily(
-                                columnFamilyHandles.get(i),
-                                
exportBasePath.resolve(UUID.randomUUID().toString()).toString());
-                cfMetaInfoAndData.put(stateMetaInfo, cfMetaData);
+                                columnFamilyHandles.get(i), 
subPath.toString());
+
+                File[] exportedSstFiles =
+                        subPath.toFile()
+                                .listFiles((file, name) -> 
name.toLowerCase().endsWith(".sst"));
+
+                if (exportedSstFiles != null && exportedSstFiles.length > 0) {
+                    resultOutput
+                            .computeIfAbsent(stateMetaInfo, (key) -> new 
ArrayList<>())
+                            .add(exportedColumnFamilyMetaData);
+                } else {
+                    // Close unused empty export result
+                    IOUtils.closeQuietly(exportedColumnFamilyMetaData);
+                }
             }
         }
-
-        return cfMetaInfoAndData;
     }
 
     /** check whether the bytes is before prefixBytes in the character order. 
*/
@@ -228,26 +376,107 @@ public class RocksDBIncrementalCheckpointUtils {
      *
      * @param restoreStateHandles The candidate state handles.
      * @param targetKeyGroupRange The target key group range.
+     * @param overlapFractionThreshold configured threshold for overlap.
      * @return The best candidate or null if no candidate was a good fit.
+     * @param <T> the generic parameter type of the state handles.
      */
     @Nullable
     public static <T extends KeyedStateHandle> T 
chooseTheBestStateHandleForInitial(
-            @Nonnull Collection<T> restoreStateHandles,
+            @Nonnull List<T> restoreStateHandles,
+            @Nonnull KeyGroupRange targetKeyGroupRange,
+            double overlapFractionThreshold) {
+
+        int pos =
+                findTheBestStateHandleForInitial(
+                        restoreStateHandles, targetKeyGroupRange, 
overlapFractionThreshold);
+        return pos >= 0 ? restoreStateHandles.get(pos) : null;
+    }
+
+    /**
+     * Choose the best state handle according to the {@link 
#stateHandleEvaluator(KeyedStateHandle,
+     * KeyGroupRange, double)} to init the initial db from the given lists and 
returns its index.
+     *
+     * @param restoreStateHandles The candidate state handles.
+     * @param targetKeyGroupRange The target key group range.
+     * @param overlapFractionThreshold configured threshold for overlap.
+     * @return the index of the best candidate handle in the list or -1 if the 
list was empty.
+     * @param <T> the generic parameter type of the state handles.
+     */
+    public static <T extends KeyedStateHandle> int 
findTheBestStateHandleForInitial(
+            @Nonnull List<T> restoreStateHandles,
             @Nonnull KeyGroupRange targetKeyGroupRange,
             double overlapFractionThreshold) {
 
-        T bestStateHandle = null;
+        if (restoreStateHandles.isEmpty()) {
+            return -1;
+        }
+
+        // Shortcut for a common case (scale out)
+        if (restoreStateHandles.size() == 1) {
+            return 0;
+        }
+
+        int currentPos = 0;
+        int bestHandlePos = 0;
         Score bestScore = Score.MIN;
         for (T rawStateHandle : restoreStateHandles) {
             Score handleScore =
                     stateHandleEvaluator(
                             rawStateHandle, targetKeyGroupRange, 
overlapFractionThreshold);
-            if (bestStateHandle == null || handleScore.compareTo(bestScore) > 
0) {
-                bestStateHandle = rawStateHandle;
+            if (handleScore.compareTo(bestScore) > 0) {
+                bestHandlePos = currentPos;
                 bestScore = handleScore;
             }
+            ++currentPos;
+        }
+        return bestHandlePos;
+    }
+
+    /** Helper class tha defines a key-range in RocksDB as byte arrays for min 
and max key. */
+    private static final class KeyRange {
+        static final KeyRange EMPTY = KeyRange.of(new byte[0], new byte[0]);
+
+        final byte[] minKey;
+        final byte[] maxKey;
+
+        private KeyRange(byte[] minKey, byte[] maxKey) {
+            this.minKey = minKey;
+            this.maxKey = maxKey;
+        }
+
+        static KeyRange of(byte[] minKey, byte[] maxKey) {
+            return new KeyRange(minKey, maxKey);
         }
+    }
 
-        return bestStateHandle;
+    /**
+     * Helper class that represents the result of a range check of the actual 
keys in a RocksDB
+     * instance against the proclaimed key-group range of the instance. In 
short, this checks if the
+     * instance contains any keys (or tombstones for keys) that don't belong 
in the instance's
+     * key-groups range.
+     */
+    private static final class RangeCheckResult {
+        private final byte[] minKey;
+
+        private final byte[] maxKey;
+        final boolean leftInRange;
+        final boolean rightInRange;
+
+        private RangeCheckResult(
+                boolean leftInRange, boolean rightInRange, byte[] minKey, 
byte[] maxKey) {
+            this.leftInRange = leftInRange;
+            this.rightInRange = rightInRange;
+            this.minKey = minKey;
+            this.maxKey = maxKey;
+        }
+
+        boolean allInRange() {
+            return leftInRange && rightInRange;
+        }
+
+        static RangeCheckResult of(
+                boolean leftInRange, boolean rightInRange, byte[] minKey, 
byte[] maxKey) {
+            return new RangeCheckResult(leftInRange, rightInRange, minKey, 
maxKey);
+        }
     }
 }
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index c91b63963f6..8e531ecebf8 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -80,6 +80,7 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnegative;
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import java.io.File;
 import java.io.IOException;
@@ -88,8 +89,10 @@ import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Spliterator;
 import java.util.Spliterators;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.RunnableFuture;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -259,6 +262,8 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
     private final RocksDbTtlCompactFiltersManager ttlCompactFiltersManager;
 
+    @Nullable private final CompletableFuture<Void> 
asyncCompactAfterRestoreFuture;
+
     public RocksDBKeyedStateBackend(
             ClassLoader userCodeClassLoader,
             File instanceBasePath,
@@ -284,7 +289,8 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
             PriorityQueueSetFactory priorityQueueFactory,
             RocksDbTtlCompactFiltersManager ttlCompactFiltersManager,
             InternalKeyContext<K> keyContext,
-            @Nonnegative long writeBatchSize) {
+            @Nonnegative long writeBatchSize,
+            @Nullable CompletableFuture<Void> asyncCompactFuture) {
 
         super(
                 kvStateRegistry,
@@ -321,6 +327,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
         this.nativeMetricMonitor = nativeMetricMonitor;
         this.sharedRocksKeyBuilder = sharedRocksKeyBuilder;
         this.priorityQueueFactory = priorityQueueFactory;
+        this.asyncCompactAfterRestoreFuture = asyncCompactFuture;
         if (priorityQueueFactory instanceof HeapPriorityQueueSetFactory) {
             this.heapPriorityQueuesManager =
                     new HeapPriorityQueuesManager(
@@ -994,4 +1001,8 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
     long getWriteBatchSize() {
         return writeBatchSize;
     }
+
+    public Optional<CompletableFuture<Void>> 
getAsyncCompactAfterRestoreFuture() {
+        return Optional.ofNullable(asyncCompactAfterRestoreFuture);
+    }
 }
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
index 9bfb8238b85..623f9c497ba 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
@@ -74,8 +74,10 @@ import java.util.Map;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
 
+import static 
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE;
 import static 
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.RESTORE_OVERLAP_FRACTION_THRESHOLD;
 import static 
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.USE_INGEST_DB_RESTORE_MODE;
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -125,6 +127,8 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
             
RocksDBConfigurableOptions.WRITE_BATCH_SIZE.defaultValue().getBytes();
 
     private RocksDB injectedTestDB; // for testing
+    private boolean incrementalRestoreAsyncCompactAfterRescale =
+            INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE.defaultValue();
     private double overlapFractionThreshold = 
RESTORE_OVERLAP_FRACTION_THRESHOLD.defaultValue();
     private boolean useIngestDbRestoreMode = 
USE_INGEST_DB_RESTORE_MODE.defaultValue();
     private ColumnFamilyHandle injectedDefaultColumnFamilyHandle; // for 
testing
@@ -271,6 +275,13 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
         return this;
     }
 
+    RocksDBKeyedStateBackendBuilder<K> 
setIncrementalRestoreAsyncCompactAfterRescale(
+            boolean incrementalRestoreAsyncCompactAfterRescale) {
+        this.incrementalRestoreAsyncCompactAfterRescale =
+                incrementalRestoreAsyncCompactAfterRescale;
+        return this;
+    }
+
     RocksDBKeyedStateBackendBuilder<K> setUseIngestDbRestoreMode(boolean 
useIngestDbRestoreMode) {
         this.useIngestDbRestoreMode = useIngestDbRestoreMode;
         return this;
@@ -303,6 +314,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
                 new LinkedHashMap<>();
         RocksDB db = null;
         RocksDBRestoreOperation restoreOperation = null;
+        CompletableFuture<Void> asyncCompactAfterRestoreFuture = null;
         RocksDbTtlCompactFiltersManager ttlCompactFiltersManager =
                 new RocksDbTtlCompactFiltersManager(ttlTimeProvider);
 
@@ -341,6 +353,8 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
                 db = restoreResult.getDb();
                 defaultColumnFamilyHandle = 
restoreResult.getDefaultColumnFamilyHandle();
                 nativeMetricMonitor = restoreResult.getNativeMetricMonitor();
+                asyncCompactAfterRestoreFuture =
+                        
restoreResult.getAsyncCompactAfterRestoreFuture().orElse(null);
                 if (restoreOperation instanceof 
RocksDBIncrementalRestoreOperation) {
                     backendUID = restoreResult.getBackendUID();
                     materializedSstFiles = restoreResult.getRestoredSstFiles();
@@ -446,7 +460,8 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
                 priorityQueueFactory,
                 ttlCompactFiltersManager,
                 keyContext,
-                writeBatchSize);
+                writeBatchSize,
+                asyncCompactAfterRestoreFuture);
     }
 
     private RocksDBRestoreOperation getRocksDBRestoreOperation(
@@ -490,7 +505,8 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
                     writeBatchSize,
                     optionsContainer.getWriteBufferManagerCapacity(),
                     overlapFractionThreshold,
-                    useIngestDbRestoreMode);
+                    useIngestDbRestoreMode,
+                    incrementalRestoreAsyncCompactAfterRescale);
         } else if (priorityQueueConfig.getPriorityQueueStateType()
                 == EmbeddedRocksDBStateBackend.PriorityQueueStateType.HEAP) {
             return new RocksDBHeapTimersFullRestoreOperation<>(
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
index ccb435dffc7..1f7bcf5ff1a 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
@@ -44,6 +44,7 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Function;
@@ -119,6 +120,9 @@ public class RocksDBOperationUtils {
      *
      * <p>Creates the column family for the state. Sets TTL compaction filter 
if {@code
      * ttlCompactFiltersManager} is not {@code null}.
+     *
+     * @param importFilesMetaData if not empty, we import the files specified 
in the metadata to the
+     *     column family.
      */
     public static RocksDBKeyedStateBackend.RocksDbKvStateInfo createStateInfo(
             RegisteredStateMetaInfoBase metaInfoBase,
@@ -126,7 +130,7 @@ public class RocksDBOperationUtils {
             Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory,
             @Nullable RocksDbTtlCompactFiltersManager ttlCompactFiltersManager,
             @Nullable Long writeBufferManagerCapacity,
-            List<ExportImportFilesMetaData> cfMetaDataList) {
+            List<ExportImportFilesMetaData> importFilesMetaData) {
 
         ColumnFamilyDescriptor columnFamilyDescriptor =
                 createColumnFamilyDescriptor(
@@ -135,10 +139,15 @@ public class RocksDBOperationUtils {
                         ttlCompactFiltersManager,
                         writeBufferManagerCapacity);
 
-        ColumnFamilyHandle columnFamilyHandle =
-                cfMetaDataList == null
-                        ? createColumnFamily(columnFamilyDescriptor, db)
-                        : createColumnFamilyWithImport(columnFamilyDescriptor, 
db, cfMetaDataList);
+        final ColumnFamilyHandle columnFamilyHandle;
+        try {
+            columnFamilyHandle =
+                    createColumnFamily(columnFamilyDescriptor, db, 
importFilesMetaData);
+        } catch (Exception ex) {
+            IOUtils.closeQuietly(columnFamilyDescriptor.getOptions());
+            throw new FlinkRuntimeException("Error creating 
ColumnFamilyHandle.", ex);
+        }
+
         return new 
RocksDBKeyedStateBackend.RocksDbKvStateInfo(columnFamilyHandle, metaInfoBase);
     }
 
@@ -154,7 +163,7 @@ public class RocksDBOperationUtils {
                 columnFamilyOptionsFactory,
                 ttlCompactFiltersManager,
                 writeBufferManagerCapacity,
-                null);
+                Collections.emptyList());
     }
 
     /**
@@ -168,15 +177,17 @@ public class RocksDBOperationUtils {
             @Nullable RocksDbTtlCompactFiltersManager ttlCompactFiltersManager,
             @Nullable Long writeBufferManagerCapacity) {
 
+        byte[] nameBytes = 
metaInfoBase.getName().getBytes(ConfigConstants.DEFAULT_CHARSET);
+        Preconditions.checkState(
+                !Arrays.equals(RocksDB.DEFAULT_COLUMN_FAMILY, nameBytes),
+                "The chosen state name 'default' collides with the name of the 
default column family!");
+
         ColumnFamilyOptions options =
                 createColumnFamilyOptions(columnFamilyOptionsFactory, 
metaInfoBase.getName());
+
         if (ttlCompactFiltersManager != null) {
             
ttlCompactFiltersManager.setAndRegisterCompactFilterIfStateTtl(metaInfoBase, 
options);
         }
-        byte[] nameBytes = 
metaInfoBase.getName().getBytes(ConfigConstants.DEFAULT_CHARSET);
-        Preconditions.checkState(
-                !Arrays.equals(RocksDB.DEFAULT_COLUMN_FAMILY, nameBytes),
-                "The chosen state name 'default' collides with the name of the 
default column family!");
 
         if (writeBufferManagerCapacity != null) {
             // It'd be great to perform the check earlier, e.g. when creating 
write buffer manager.
@@ -203,8 +214,7 @@ public class RocksDBOperationUtils {
      * @return true if sanity check passes, false otherwise
      */
     static boolean sanityCheckArenaBlockSize(
-            long writeBufferSize, long arenaBlockSizeConfigured, long 
writeBufferManagerCapacity)
-            throws IllegalStateException {
+            long writeBufferSize, long arenaBlockSizeConfigured, long 
writeBufferManagerCapacity) {
 
         long defaultArenaBlockSize =
                 
RocksDBMemoryControllerUtils.calculateRocksDBDefaultArenaBlockSize(writeBufferSize);
@@ -243,25 +253,19 @@ public class RocksDBOperationUtils {
     }
 
     private static ColumnFamilyHandle createColumnFamily(
-            ColumnFamilyDescriptor columnDescriptor, RocksDB db) {
-        try {
-            return db.createColumnFamily(columnDescriptor);
-        } catch (RocksDBException e) {
-            IOUtils.closeQuietly(columnDescriptor.getOptions());
-            throw new FlinkRuntimeException("Error creating 
ColumnFamilyHandle.", e);
-        }
-    }
-
-    private static ColumnFamilyHandle createColumnFamilyWithImport(
             ColumnFamilyDescriptor columnDescriptor,
             RocksDB db,
-            List<ExportImportFilesMetaData> metaDataList) {
-        try {
-            return db.createColumnFamilyWithImport(
-                    columnDescriptor, new ImportColumnFamilyOptions(), 
metaDataList);
-        } catch (RocksDBException e) {
-            IOUtils.closeQuietly(columnDescriptor.getOptions());
-            throw new FlinkRuntimeException("Error creating 
ColumnFamilyHandle.", e);
+            List<ExportImportFilesMetaData> importFilesMetaData)
+            throws RocksDBException {
+
+        if (importFilesMetaData.isEmpty()) {
+            return db.createColumnFamily(columnDescriptor);
+        } else {
+            try (ImportColumnFamilyOptions importColumnFamilyOptions =
+                    new ImportColumnFamilyOptions().setMoveFiles(true)) {
+                return db.createColumnFamilyWithImport(
+                        columnDescriptor, importColumnFamilyOptions, 
importFilesMetaData);
+            }
         }
     }
 
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java
index af1694c66e7..f7ea297d761 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java
@@ -29,8 +29,6 @@ import org.apache.flink.util.clock.SystemClock;
 import org.apache.flink.util.concurrent.FutureUtils;
 import org.apache.flink.util.function.ThrowingRunnable;
 
-import org.apache.flink.shaded.guava31.com.google.common.collect.Streams;
-
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.file.Files;
@@ -108,7 +106,7 @@ public class RocksDBStateDownloader extends 
RocksDBStateDataTransfer {
                 .flatMap(
                         downloadRequest ->
                                 // Take all files from shared and private 
state.
-                                Streams.concat(
+                                Stream.concat(
                                                 
downloadRequest.getStateHandle().getSharedState()
                                                         .stream(),
                                                 
downloadRequest.getStateHandle().getPrivateState()
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java
index 4005adde565..228ba065e11 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java
@@ -108,6 +108,7 @@ public class RocksDBFullRestoreOperation<K> implements 
RocksDBRestoreOperation {
                 this.rocksHandle.getNativeMetricMonitor(),
                 -1,
                 null,
+                null,
                 null);
     }
 
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHandle.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHandle.java
index b5d3c7efc74..a2ed8f1e33f 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHandle.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHandle.java
@@ -187,18 +187,19 @@ class RocksDBHandle implements AutoCloseable {
         return registeredStateMetaInfoEntry;
     }
 
-    RocksDbKvStateInfo registerStateColumnFamilyHandleWithImport(
+    /**
+     * Registers a new column family and imports data from the given export.
+     *
+     * @param stateMetaInfo info about the state to create.
+     * @param cfMetaDataList the data to import.
+     */
+    void registerStateColumnFamilyHandleWithImport(
             RegisteredStateMetaInfoBase stateMetaInfo,
             List<ExportImportFilesMetaData> cfMetaDataList) {
 
-        RocksDbKvStateInfo registeredStateMetaInfoEntry =
-                kvStateInformation.get(stateMetaInfo.getName());
-        if (registeredStateMetaInfoEntry != null) {
-            System.out.println("test");
-        }
-        Preconditions.checkState(registeredStateMetaInfoEntry == null);
+        
Preconditions.checkState(!kvStateInformation.containsKey(stateMetaInfo.getName()));
 
-        registeredStateMetaInfoEntry =
+        RocksDbKvStateInfo stateInfo =
                 RocksDBOperationUtils.createStateInfo(
                         stateMetaInfo,
                         db,
@@ -208,12 +209,9 @@ class RocksDBHandle implements AutoCloseable {
                         cfMetaDataList);
 
         RocksDBOperationUtils.registerKvStateInformation(
-                kvStateInformation,
-                nativeMetricMonitor,
-                stateMetaInfo.getName(),
-                registeredStateMetaInfoEntry);
+                kvStateInformation, nativeMetricMonitor, 
stateMetaInfo.getName(), stateInfo);
 
-        return registeredStateMetaInfoEntry;
+        columnFamilyHandles.add(stateInfo.columnFamilyHandle);
     }
 
     /**
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHeapTimersFullRestoreOperation.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHeapTimersFullRestoreOperation.java
index 0c859e88f6b..649aa572a62 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHeapTimersFullRestoreOperation.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBHeapTimersFullRestoreOperation.java
@@ -138,6 +138,7 @@ public class RocksDBHeapTimersFullRestoreOperation<K> 
implements RocksDBRestoreO
                 this.rocksHandle.getNativeMetricMonitor(),
                 -1,
                 null,
+                null,
                 null);
     }
 
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
index 01559ea55d8..0a5e9744c29 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
@@ -51,7 +51,8 @@ import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StateMigrationException;
-import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.concurrent.FutureUtils;
+import org.apache.flink.util.function.RunnableWithException;
 
 import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
@@ -74,7 +75,6 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -82,7 +82,11 @@ import java.util.Objects;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 
 import static 
org.apache.flink.runtime.state.StateUtil.unexpectedStateHandleException;
 
@@ -111,7 +115,9 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
 
     private boolean isKeySerializerCompatibilityChecked;
 
-    private ThrowingConsumer<Collection<KeyedStateHandle>, Exception> 
rescalingRestoreOperation;
+    private final boolean useIngestDbRestoreMode;
+
+    private final boolean asyncCompactAfterRescale;
 
     public RocksDBIncrementalRestoreOperation(
             String operatorIdentifier,
@@ -134,7 +140,8 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
             @Nonnegative long writeBatchSize,
             Long writeBufferManagerCapacity,
             double overlapFractionThreshold,
-            boolean useIngestDbRestoreMode) {
+            boolean useIngestDbRestoreMode,
+            boolean asyncCompactAfterRescale) {
         this.rocksHandle =
                 new RocksDBHandle(
                         kvStateInformation,
@@ -160,8 +167,8 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
         this.keyGroupPrefixBytes = keyGroupPrefixBytes;
         this.keySerializerProvider = keySerializerProvider;
         this.userCodeClassLoader = userCodeClassLoader;
-        this.rescalingRestoreOperation =
-                useIngestDbRestoreMode ? this::restoreWithIngestDbMode : 
this::restoreWithRescaling;
+        this.useIngestDbRestoreMode = useIngestDbRestoreMode;
+        this.asyncCompactAfterRescale = asyncCompactAfterRescale;
     }
 
     /** Root method that branches for different implementations of {@link 
KeyedStateHandle}. */
@@ -172,53 +179,457 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
             return null;
         }
 
-        final KeyedStateHandle theFirstStateHandle = 
restoreStateHandles.iterator().next();
+        final List<StateHandleDownloadSpec> allDownloadSpecs =
+                new ArrayList<>(restoreStateHandles.size());
+
+        final List<IncrementalLocalKeyedStateHandle> localKeyedStateHandles =
+                new ArrayList<>(restoreStateHandles.size());
 
-        boolean isRescaling =
-                (restoreStateHandles.size() > 1
-                        || 
!Objects.equals(theFirstStateHandle.getKeyGroupRange(), keyGroupRange));
+        final Path absolutInstanceBasePath = 
instanceBasePath.getAbsoluteFile().toPath();
 
-        if (isRescaling) {
-            rescalingRestoreOperation.accept(restoreStateHandles);
-        } else {
-            restoreWithoutRescaling(theFirstStateHandle);
+        try {
+            prepareStateHandleDownloadsToLocal(
+                    absolutInstanceBasePath, localKeyedStateHandles, 
allDownloadSpecs);
+
+            if (localKeyedStateHandles.size() == 1) {
+                // This happens if we don't rescale and for some scale out 
scenarios.
+                initBaseDBFromSingleStateHandle(localKeyedStateHandles.get(0));
+            } else {
+                // This happens for all scale ins and some scale outs.
+                restoreFromMultipleStateHandles(localKeyedStateHandles);
+            }
+
+            CompletableFuture<Void> asyncCompactFuture = null;
+            if (asyncCompactAfterRescale) {
+                asyncCompactFuture =
+                        
RocksDBIncrementalCheckpointUtils.createRangeCompactionTaskIfNeeded(
+                                        rocksHandle.getDb(),
+                                        rocksHandle.getColumnFamilyHandles(),
+                                        keyGroupPrefixBytes,
+                                        keyGroupRange)
+                                .map(
+                                        (run) -> {
+                                            RunnableWithException 
runWithLogging =
+                                                    () -> {
+                                                        long t = 
System.currentTimeMillis();
+                                                        logger.info(
+                                                                "Starting 
async compaction after restore for backend {} in operator {}",
+                                                                backendUID,
+                                                                
operatorIdentifier);
+                                                        try {
+                                                            run.run();
+                                                            logger.info(
+                                                                    "Completed 
async compaction after restore for backend {} in operator {} after {} ms.",
+                                                                    backendUID,
+                                                                    
operatorIdentifier,
+                                                                    
System.currentTimeMillis() - t);
+                                                        } catch (Exception ex) 
{
+                                                            logger.info(
+                                                                    "Failed 
async compaction after restore for backend {} in operator {} after {} ms.",
+                                                                    backendUID,
+                                                                    
operatorIdentifier,
+                                                                    
System.currentTimeMillis() - t,
+                                                                    ex);
+                                                            throw ex;
+                                                        }
+                                                    };
+                                            ExecutorService executorService =
+                                                    
Executors.newSingleThreadExecutor();
+                                            CompletableFuture<Void> 
resultFuture =
+                                                    FutureUtils.runAsync(
+                                                            runWithLogging, 
executorService);
+                                            executorService.shutdown();
+                                            return resultFuture;
+                                        })
+                                .orElse(null);
+            }
+
+            return new RocksDBRestoreResult(
+                    this.rocksHandle.getDb(),
+                    this.rocksHandle.getDefaultColumnFamilyHandle(),
+                    this.rocksHandle.getNativeMetricMonitor(),
+                    lastCompletedCheckpointId,
+                    backendUID,
+                    restoredSstFiles,
+                    asyncCompactFuture);
+        } finally {
+            // Cleanup all download directories
+            allDownloadSpecs.stream()
+                    .map(StateHandleDownloadSpec::getDownloadDestination)
+                    .forEach(this::cleanUpPathQuietly);
         }
-        return new RocksDBRestoreResult(
-                this.rocksHandle.getDb(),
-                this.rocksHandle.getDefaultColumnFamilyHandle(),
-                this.rocksHandle.getNativeMetricMonitor(),
-                lastCompletedCheckpointId,
-                backendUID,
-                restoredSstFiles);
     }
 
-    /** Recovery from a single remote incremental state without rescaling. */
+    /**
+     * Prepares the download of all {@link IncrementalRemoteKeyedStateHandle}s 
to {@link
+     * IncrementalLocalKeyedStateHandle}s by creating the download specs and 
already converting the
+     * handle type.
+     *
+     * @param absolutInstanceBasePath the base path of the restoring DB 
instance as absolute path.
+     * @param localKeyedStateHandlesOut the output parameter for the created 
{@link
+     *     IncrementalLocalKeyedStateHandle}s.
+     * @param allDownloadSpecsOut output parameter for the created download 
specs.
+     * @throws Exception if an unexpected state handle type is passed as 
argument.
+     */
     @SuppressWarnings("unchecked")
-    private void restoreWithoutRescaling(KeyedStateHandle keyedStateHandle) 
throws Exception {
+    private void prepareStateHandleDownloadsToLocal(
+            Path absolutInstanceBasePath,
+            List<IncrementalLocalKeyedStateHandle> localKeyedStateHandlesOut,
+            List<StateHandleDownloadSpec> allDownloadSpecsOut)
+            throws Exception {
+        // Prepare and collect all the download request to pull remote state 
to a local directory
+        for (KeyedStateHandle stateHandle : restoreStateHandles) {
+            if (stateHandle instanceof IncrementalRemoteKeyedStateHandle) {
+                StateHandleDownloadSpec downloadRequest =
+                        new StateHandleDownloadSpec(
+                                (IncrementalRemoteKeyedStateHandle) 
stateHandle,
+                                
absolutInstanceBasePath.resolve(UUID.randomUUID().toString()));
+                allDownloadSpecsOut.add(downloadRequest);
+            } else if (stateHandle instanceof 
IncrementalLocalKeyedStateHandle) {
+                
localKeyedStateHandlesOut.add((IncrementalLocalKeyedStateHandle) stateHandle);
+            } else {
+                throw unexpectedStateHandleException(
+                        new Class[] {
+                            IncrementalRemoteKeyedStateHandle.class,
+                            IncrementalLocalKeyedStateHandle.class
+                        },
+                        stateHandle.getClass());
+            }
+        }
+
+        allDownloadSpecsOut.stream()
+                
.map(StateHandleDownloadSpec::createLocalStateHandleForDownloadedState)
+                .forEach(localKeyedStateHandlesOut::add);
+
+        transferRemoteStateToLocalDirectory(allDownloadSpecsOut);
+    }
+
+    /**
+     * Initializes the base DB that we restore from a single local state 
handle.
+     *
+     * @param stateHandle the state handle to restore the base DB from.
+     * @throws Exception on any error during restore.
+     */
+    private void 
initBaseDBFromSingleStateHandle(IncrementalLocalKeyedStateHandle stateHandle)
+            throws Exception {
+
         logger.info(
-                "Starting to restore from state handle: {} without 
rescaling.", keyedStateHandle);
-        if (keyedStateHandle instanceof IncrementalRemoteKeyedStateHandle) {
-            IncrementalRemoteKeyedStateHandle 
incrementalRemoteKeyedStateHandle =
-                    (IncrementalRemoteKeyedStateHandle) keyedStateHandle;
-            
restorePreviousIncrementalFilesStatus(incrementalRemoteKeyedStateHandle);
-            restoreBaseDBFromRemoteState(incrementalRemoteKeyedStateHandle);
-        } else if (keyedStateHandle instanceof 
IncrementalLocalKeyedStateHandle) {
-            IncrementalLocalKeyedStateHandle incrementalLocalKeyedStateHandle =
-                    (IncrementalLocalKeyedStateHandle) keyedStateHandle;
-            
restorePreviousIncrementalFilesStatus(incrementalLocalKeyedStateHandle);
-            restoreBaseDBFromLocalState(incrementalLocalKeyedStateHandle);
+                "Starting to restore Base DB in backend with range {} in 
operator {} from selected state handle {}.",
+                keyGroupRange,
+                operatorIdentifier,
+                stateHandle);
+
+        // Restore base DB from selected initial handle
+        restoreBaseDBFromLocalState(stateHandle);
+
+        KeyGroupRange stateHandleKeyGroupRange = 
stateHandle.getKeyGroupRange();
+
+        // Check if the key-groups range has changed.
+        if (Objects.equals(stateHandleKeyGroupRange, keyGroupRange)) {
+            // This is the case if we didn't rescale, so we can restore all 
the info from the
+            // previous backend instance (backend id and incremental 
checkpoint history).
+            restorePreviousIncrementalFilesStatus(stateHandle);
+        } else {
+            // If the key-groups don't match, this was a scale out, and we 
need to clip the
+            // key-groups range of the db to the target range for this backend.
+            try {
+                RocksDBIncrementalCheckpointUtils.clipDBWithKeyGroupRange(
+                        this.rocksHandle.getDb(),
+                        this.rocksHandle.getColumnFamilyHandles(),
+                        keyGroupRange,
+                        stateHandleKeyGroupRange,
+                        keyGroupPrefixBytes);
+            } catch (RocksDBException e) {
+                String errMsg = "Failed to clip DB after initialization.";
+                logger.error(errMsg, e);
+                throw new BackendBuildingException(errMsg, e);
+            }
+        }
+        logger.info(
+                "Completed restoring backend with range {} in operator {} from 
selected state handle.",
+                keyGroupRange,
+                operatorIdentifier);
+    }
+
+    /**
+     * Initializes the base DB that we restore from a list of multiple local 
state handles.
+     *
+     * @param localKeyedStateHandles the list of state handles to restore the 
base DB from.
+     * @throws Exception on any error during restore.
+     */
+    private void restoreFromMultipleStateHandles(
+            List<IncrementalLocalKeyedStateHandle> localKeyedStateHandles) 
throws Exception {
+
+        logger.info(
+                "Starting to restore backend with range {} in operator {} from 
multiple state handles {} with useIngestDbRestoreMode = {}.",
+                keyGroupRange,
+                operatorIdentifier,
+                localKeyedStateHandles,
+                useIngestDbRestoreMode);
+
+        byte[] startKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes];
+        CompositeKeySerializationUtils.serializeKeyGroup(
+                keyGroupRange.getStartKeyGroup(), startKeyGroupPrefixBytes);
+
+        byte[] stopKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes];
+        CompositeKeySerializationUtils.serializeKeyGroup(
+                keyGroupRange.getEndKeyGroup() + 1, stopKeyGroupPrefixBytes);
+
+        if (useIngestDbRestoreMode) {
+            // Optimized path for merging multiple handles with Ingest/Clip
+            mergeStateHandlesWithClipAndIngest(
+                    localKeyedStateHandles, startKeyGroupPrefixBytes, 
stopKeyGroupPrefixBytes);
         } else {
-            throw unexpectedStateHandleException(
-                    new Class[] {
-                        IncrementalRemoteKeyedStateHandle.class,
-                        IncrementalLocalKeyedStateHandle.class
-                    },
-                    keyedStateHandle.getClass());
+            // Optimized path for single handle and legacy path for merging 
multiple handles.
+            mergeStateHandlesWithCopyFromTemporaryInstance(
+                    localKeyedStateHandles, startKeyGroupPrefixBytes, 
stopKeyGroupPrefixBytes);
+        }
+
+        logger.info(
+                "Completed restoring backend with range {} in operator {} from 
multiple state handles with useIngestDbRestoreMode = {}.",
+                keyGroupRange,
+                operatorIdentifier,
+                useIngestDbRestoreMode);
+    }
+
+    /**
+     * Restores the base DB by merging multiple state handles into one. This 
method first checks if
+     * all data to import is in the expected key-groups range and then uses 
import/export.
+     * Otherwise, this method falls back to copying the data using a temporary 
DB.
+     *
+     * @param localKeyedStateHandles the list of state handles to restore the 
base DB from.
+     * @param startKeyGroupPrefixBytes the min/start key of the key groups 
range as bytes.
+     * @param stopKeyGroupPrefixBytes the max+1/end key of the key groups 
range as bytes.
+     * @throws Exception on any restore error.
+     */
+    private void mergeStateHandlesWithClipAndIngest(
+            List<IncrementalLocalKeyedStateHandle> localKeyedStateHandles,
+            byte[] startKeyGroupPrefixBytes,
+            byte[] stopKeyGroupPrefixBytes)
+            throws Exception {
+
+        final Path absolutInstanceBasePath = 
instanceBasePath.getAbsoluteFile().toPath();
+        final Path exportCfBasePath = 
absolutInstanceBasePath.resolve("export-cfs");
+        Files.createDirectories(exportCfBasePath);
+
+        final Map<RegisteredStateMetaInfoBase, List<ExportImportFilesMetaData>>
+                exportedColumnFamilyMetaData = new 
HashMap<>(localKeyedStateHandles.size());
+
+        final List<IncrementalLocalKeyedStateHandle> notImportableHandles =
+                new ArrayList<>(localKeyedStateHandles.size());
+
+        try {
+
+            KeyGroupRange exportedSstKeyGroupsRange =
+                    exportColumnFamiliesWithSstDataInKeyGroupsRange(
+                            exportCfBasePath,
+                            localKeyedStateHandles,
+                            exportedColumnFamilyMetaData,
+                            notImportableHandles);
+
+            if (exportedColumnFamilyMetaData.isEmpty()) {
+                // Nothing coule be exported, so we fall back to
+                // #mergeStateHandlesWithCopyFromTemporaryInstance
+                mergeStateHandlesWithCopyFromTemporaryInstance(
+                        notImportableHandles, startKeyGroupPrefixBytes, 
stopKeyGroupPrefixBytes);
+            } else {
+                // We initialize the base DB by importing all the exported 
data.
+                initBaseDBFromColumnFamilyImports(
+                        exportedColumnFamilyMetaData, 
exportedSstKeyGroupsRange);
+                // Copy data from handles that we couldn't directly import 
using temporary
+                // instances.
+                copyToBaseDBUsingTempDBs(
+                        notImportableHandles, startKeyGroupPrefixBytes, 
stopKeyGroupPrefixBytes);
+            }
+        } finally {
+            // Close native RocksDB objects
+            
exportedColumnFamilyMetaData.values().forEach(IOUtils::closeAllQuietly);
+            // Cleanup export base directory
+            cleanUpPathQuietly(exportCfBasePath);
+        }
+    }
+
+    /**
+     * Prepares the data for importing by exporting from temporary RocksDB 
instances. We can only
+     * import data that does not exceed the target key-groups range and skip 
state handles that
+     * exceed their range.
+     *
+     * @param exportCfBasePath the base path for the export files.
+     * @param localKeyedStateHandles the state handles to prepare for import.
+     * @param exportedColumnFamiliesOut output parameter for the metadata of 
completed exports.
+     * @param skipped output parameter for state handles that could not be 
exported because the data
+     *     exceeded the proclaimed range.
+     * @return the total key-groups range of the exported data.
+     * @throws Exception on any export error.
+     */
+    private KeyGroupRange exportColumnFamiliesWithSstDataInKeyGroupsRange(
+            Path exportCfBasePath,
+            List<IncrementalLocalKeyedStateHandle> localKeyedStateHandles,
+            Map<RegisteredStateMetaInfoBase, List<ExportImportFilesMetaData>>
+                    exportedColumnFamiliesOut,
+            List<IncrementalLocalKeyedStateHandle> skipped)
+            throws Exception {
+
+        logger.info(
+                "Starting restore export for backend with range {} in operator 
{}.",
+                keyGroupRange,
+                operatorIdentifier);
+
+        int minExportKeyGroup = Integer.MAX_VALUE;
+        int maxExportKeyGroup = Integer.MIN_VALUE;
+        for (IncrementalLocalKeyedStateHandle stateHandle : 
localKeyedStateHandles) {
+            try (RestoredDBInstance tmpRestoreDBInfo =
+                    restoreTempDBInstanceFromLocalState(stateHandle)) {
+
+                List<ColumnFamilyHandle> tmpColumnFamilyHandles =
+                        tmpRestoreDBInfo.columnFamilyHandles;
+
+                // Check if the data in all SST files referenced in the handle 
is within the
+                // proclaimed key-groups range of the handle.
+                if (RocksDBIncrementalCheckpointUtils.isSstDataInKeyGroupRange(
+                        tmpRestoreDBInfo.db, keyGroupPrefixBytes, 
stateHandle.getKeyGroupRange())) {
+
+                    logger.debug(
+                            "Exporting state handle {} for backend with range 
{} in operator {}.",
+                            stateHandle,
+                            keyGroupRange,
+                            operatorIdentifier);
+
+                    List<RegisteredStateMetaInfoBase> 
registeredStateMetaInfoBases =
+                            tmpRestoreDBInfo.stateMetaInfoSnapshots.stream()
+                                    
.map(RegisteredStateMetaInfoBase::fromMetaInfoSnapshot)
+                                    .collect(Collectors.toList());
+
+                    // Export all the Column Families and store the result in
+                    // exportedColumnFamiliesOut
+                    RocksDBIncrementalCheckpointUtils.exportColumnFamilies(
+                            tmpRestoreDBInfo.db,
+                            tmpColumnFamilyHandles,
+                            registeredStateMetaInfoBases,
+                            exportCfBasePath,
+                            exportedColumnFamiliesOut);
+
+                    minExportKeyGroup =
+                            Math.min(
+                                    minExportKeyGroup,
+                                    
stateHandle.getKeyGroupRange().getStartKeyGroup());
+                    maxExportKeyGroup =
+                            Math.max(
+                                    maxExportKeyGroup,
+                                    
stateHandle.getKeyGroupRange().getEndKeyGroup());
+
+                    logger.debug(
+                            "Done exporting state handle {} for backend with 
range {} in operator {}.",
+                            stateHandle,
+                            keyGroupRange,
+                            operatorIdentifier);
+                } else {
+                    // Actual key range in files exceeds proclaimed range, 
cannot import. We
+                    // will copy this handle using a temporary DB later.
+                    skipped.add(stateHandle);
+                }
+            }
         }
+
+        logger.info(
+                "Completed restore export for backend with range {} in 
operator {}. Number of Exported handles: {}. Skipped handles: {}.",
+                keyGroupRange,
+                operatorIdentifier,
+                localKeyedStateHandles.size() - skipped.size(),
+                skipped);
+
+        return minExportKeyGroup <= maxExportKeyGroup
+                ? new KeyGroupRange(minExportKeyGroup, maxExportKeyGroup)
+                : KeyGroupRange.EMPTY_KEY_GROUP_RANGE;
+    }
+
+    /**
+     * Helper method that merges the data from multiple state handles into the 
restoring base DB by
+     * the help of copying through temporary RocksDB instances.
+     *
+     * @param localKeyedStateHandles the state handles to merge into the base 
DB.
+     * @param startKeyGroupPrefixBytes the min/start key of the key groups 
range as bytes.
+     * @param stopKeyGroupPrefixBytes the max+1/end key of the key groups 
range as bytes.
+     * @throws Exception on any merge error.
+     */
+    private void mergeStateHandlesWithCopyFromTemporaryInstance(
+            List<IncrementalLocalKeyedStateHandle> localKeyedStateHandles,
+            byte[] startKeyGroupPrefixBytes,
+            byte[] stopKeyGroupPrefixBytes)
+            throws Exception {
+
+        logger.info(
+                "Starting to merge state for backend with range {} in operator 
{} from multiple state handles using temporary instances.",
+                keyGroupRange,
+                operatorIdentifier);
+
+        // Choose the best state handle for the initial DB
+        final IncrementalLocalKeyedStateHandle selectedInitialHandle =
+                localKeyedStateHandles.remove(
+                        
RocksDBIncrementalCheckpointUtils.findTheBestStateHandleForInitial(
+                                localKeyedStateHandles, keyGroupRange, 
overlapFractionThreshold));
+
+        Preconditions.checkNotNull(selectedInitialHandle);
+
+        // Remove the selected handle from the list so that we don't restore 
it twice.
+        localKeyedStateHandles.remove(selectedInitialHandle);
+
+        // Init the base DB instance with the initial state
+        initBaseDBFromSingleStateHandle(selectedInitialHandle);
+
+        // Copy remaining handles using temporary RocksDB instances
+        copyToBaseDBUsingTempDBs(
+                localKeyedStateHandles, startKeyGroupPrefixBytes, 
stopKeyGroupPrefixBytes);
+
+        logger.info(
+                "Completed merging state for backend with range {} in operator 
{} from multiple state handles using temporary instances.",
+                keyGroupRange,
+                operatorIdentifier);
+    }
+
+    /**
+     * Initializes the base DB by importing from previously exported data.
+     *
+     * @param exportedColumnFamilyMetaData the export (meta) data.
+     * @param exportKeyGroupRange the total key-groups range of the exported 
data.
+     * @throws Exception on import error.
+     */
+    private void initBaseDBFromColumnFamilyImports(
+            Map<RegisteredStateMetaInfoBase, List<ExportImportFilesMetaData>>
+                    exportedColumnFamilyMetaData,
+            KeyGroupRange exportKeyGroupRange)
+            throws Exception {
+
+        // We initialize the base DB by importing all the exported data.
+        logger.info(
+                "Starting to import exported state handles for backend with 
range {} in operator {} using Clip/Ingest DB.",
+                keyGroupRange,
+                operatorIdentifier);
+        rocksHandle.openDB();
+        exportedColumnFamilyMetaData.forEach(
+                rocksHandle::registerStateColumnFamilyHandleWithImport);
+
+        // Use Range delete to clip the temp db to the target range of the 
backend
+        RocksDBIncrementalCheckpointUtils.clipDBWithKeyGroupRange(
+                rocksHandle.getDb(),
+                rocksHandle.getColumnFamilyHandles(),
+                keyGroupRange,
+                exportKeyGroupRange,
+                keyGroupPrefixBytes);
+
         logger.info(
-                "Finished restoring from state handle: {} without rescaling.", 
keyedStateHandle);
+                "Completed importing exported state handles for backend with 
range {} in operator {} using Clip/Ingest DB.",
+                keyGroupRange,
+                operatorIdentifier);
     }
 
+    /**
+     * Restores the checkpointing status and state for this backend. This can 
only be done if the
+     * backend was not rescaled and is therefore identical to the source 
backend in the previous
+     * run.
+     *
+     * @param localKeyedStateHandle the single state handle from which the 
backend is restored.
+     */
     private void restorePreviousIncrementalFilesStatus(
             IncrementalKeyedStateHandle localKeyedStateHandle) {
         backendUID = localKeyedStateHandle.getBackendIdentifier();
@@ -226,24 +637,20 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
                 localKeyedStateHandle.getCheckpointId(),
                 localKeyedStateHandle.getSharedStateHandles());
         lastCompletedCheckpointId = localKeyedStateHandle.getCheckpointId();
+        logger.info(
+                "Restored previous incremental files status in backend with 
range {} in operator {}: backend uuid {}, last checkpoint id {}.",
+                keyGroupRange,
+                operatorIdentifier,
+                backendUID,
+                lastCompletedCheckpointId);
     }
 
-    private void 
restoreBaseDBFromRemoteState(IncrementalRemoteKeyedStateHandle stateHandle)
-            throws Exception {
-        // used as restore source for IncrementalRemoteKeyedStateHandle
-        final Path tmpRestoreInstancePath =
-                
instanceBasePath.getAbsoluteFile().toPath().resolve(UUID.randomUUID().toString());
-        final StateHandleDownloadSpec downloadRequest =
-                new StateHandleDownloadSpec(stateHandle, 
tmpRestoreInstancePath);
-        try {
-            
transferRemoteStateToLocalDirectory(Collections.singletonList(downloadRequest));
-            
restoreBaseDBFromLocalState(downloadRequest.createLocalStateHandleForDownloadedState());
-        } finally {
-            cleanUpPathQuietly(downloadRequest.getDownloadDestination());
-        }
-    }
-
-    /** Restores RocksDB instance from local state. */
+    /**
+     * Restores the base DB from local state of a single state handle.
+     *
+     * @param localKeyedStateHandle the state handle tor estore from.
+     * @throws Exception on any restore error.
+     */
     private void restoreBaseDBFromLocalState(IncrementalLocalKeyedStateHandle 
localKeyedStateHandle)
             throws Exception {
         KeyedBackendSerializationProxy<K> serializationProxy =
@@ -264,268 +671,142 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
                 restoreSourcePath);
     }
 
+    /**
+     * Helper method to download files, as specified in the given download 
specs, to the local
+     * directory.
+     *
+     * @param downloadSpecs specifications of files to download.
+     * @throws Exception On any download error.
+     */
     private void transferRemoteStateToLocalDirectory(
-            Collection<StateHandleDownloadSpec> downloadRequests) throws 
Exception {
+            Collection<StateHandleDownloadSpec> downloadSpecs) throws 
Exception {
         try (RocksDBStateDownloader rocksDBStateDownloader =
                 new RocksDBStateDownloader(
                         numberOfTransferringThreads, 
customInitializationMetrics)) {
             rocksDBStateDownloader.transferAllStateDataToDirectory(
-                    downloadRequests, cancelStreamRegistry);
-        }
-    }
-
-    private void cleanUpPathQuietly(@Nonnull Path path) {
-        try {
-            FileUtils.deleteDirectory(path.toFile());
-        } catch (IOException ex) {
-            logger.warn("Failed to clean up path " + path, ex);
+                    downloadSpecs, cancelStreamRegistry);
         }
     }
 
     /**
-     * Recovery from multi incremental states with rescaling. For rescaling, 
this method creates a
-     * temporary RocksDB instance for a key-groups shard. All contents from 
the temporary instance
-     * are copied into the real restore instance and then the temporary 
instance is discarded.
+     * Helper method to copy all data from the given local state handles to 
the base DB by using
+     * temporary DB instances.
+     *
+     * @param toImport the state handles to import.
+     * @param startKeyGroupPrefixBytes the min/start key of the key groups 
range as bytes.
+     * @param stopKeyGroupPrefixBytes the max+1/end key of the key groups 
range as bytes.
+     * @throws Exception on any copy error.
      */
-    private void restoreWithRescaling(Collection<KeyedStateHandle> 
restoreStateHandles)
+    private void copyToBaseDBUsingTempDBs(
+            List<IncrementalLocalKeyedStateHandle> toImport,
+            byte[] startKeyGroupPrefixBytes,
+            byte[] stopKeyGroupPrefixBytes)
             throws Exception {
 
-        Preconditions.checkArgument(restoreStateHandles != null && 
!restoreStateHandles.isEmpty());
-
-        final List<StateHandleDownloadSpec> allDownloadSpecs = new 
ArrayList<>();
-
-        final List<IncrementalLocalKeyedStateHandle> localKeyedStateHandles =
-                new ArrayList<>(restoreStateHandles.size());
-
-        final Path absolutInstanceBasePath = 
instanceBasePath.getAbsoluteFile().toPath();
-
-        // Prepare and collect all the download request to pull remote state 
to a local directory
-        for (KeyedStateHandle stateHandle : restoreStateHandles) {
-            if (stateHandle instanceof IncrementalRemoteKeyedStateHandle) {
-                StateHandleDownloadSpec downloadRequest =
-                        new StateHandleDownloadSpec(
-                                (IncrementalRemoteKeyedStateHandle) 
stateHandle,
-                                
absolutInstanceBasePath.resolve(UUID.randomUUID().toString()));
-                allDownloadSpecs.add(downloadRequest);
-            } else if (stateHandle instanceof 
IncrementalLocalKeyedStateHandle) {
-                localKeyedStateHandles.add((IncrementalLocalKeyedStateHandle) 
stateHandle);
-            } else {
-                throw unexpectedStateHandleException(
-                        IncrementalRemoteKeyedStateHandle.class, 
stateHandle.getClass());
-            }
+        if (toImport.isEmpty()) {
+            return;
         }
 
-        allDownloadSpecs.stream()
-                
.map(StateHandleDownloadSpec::createLocalStateHandleForDownloadedState)
-                .forEach(localKeyedStateHandles::add);
-
-        // Choose the best state handle for the initial DB
-        final IncrementalLocalKeyedStateHandle selectedInitialHandle =
-                
RocksDBIncrementalCheckpointUtils.chooseTheBestStateHandleForInitial(
-                        localKeyedStateHandles, keyGroupRange, 
overlapFractionThreshold);
-        Preconditions.checkNotNull(selectedInitialHandle);
-        // Remove the selected handle from the list so that we don't restore 
it twice.
-        localKeyedStateHandles.remove(selectedInitialHandle);
-
-        try {
-            // Process all state downloads
-            transferRemoteStateToLocalDirectory(allDownloadSpecs);
-
-            // Init the base DB instance with the initial state
-            initBaseDBForRescaling(selectedInitialHandle);
-
-            // Transfer remaining key-groups from temporary instance into base 
DB
-            byte[] startKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes];
-            CompositeKeySerializationUtils.serializeKeyGroup(
-                    keyGroupRange.getStartKeyGroup(), 
startKeyGroupPrefixBytes);
-
-            byte[] stopKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes];
-            CompositeKeySerializationUtils.serializeKeyGroup(
-                    keyGroupRange.getEndKeyGroup() + 1, 
stopKeyGroupPrefixBytes);
-
-            // Insert all remaining state through creating temporary RocksDB 
instances
-            for (IncrementalLocalKeyedStateHandle stateHandle : 
localKeyedStateHandles) {
-                logger.info(
-                        "Starting to restore from state handle: {} with 
rescaling.", stateHandle);
-
-                try (RestoredDBInstance tmpRestoreDBInfo =
-                                
restoreTempDBInstanceFromLocalState(stateHandle);
-                        RocksDBWriteBatchWrapper writeBatchWrapper =
-                                new RocksDBWriteBatchWrapper(
-                                        this.rocksHandle.getDb(), 
writeBatchSize)) {
-
-                    List<ColumnFamilyDescriptor> tmpColumnFamilyDescriptors =
-                            tmpRestoreDBInfo.columnFamilyDescriptors;
-                    List<ColumnFamilyHandle> tmpColumnFamilyHandles =
-                            tmpRestoreDBInfo.columnFamilyHandles;
-
-                    // iterating only the requested descriptors automatically 
skips the default
-                    // column
-                    // family handle
-                    for (int descIdx = 0; descIdx < 
tmpColumnFamilyDescriptors.size(); ++descIdx) {
-                        ColumnFamilyHandle tmpColumnFamilyHandle =
-                                tmpColumnFamilyHandles.get(descIdx);
-
-                        ColumnFamilyHandle targetColumnFamilyHandle =
-                                
this.rocksHandle.getOrRegisterStateColumnFamilyHandle(
-                                                null,
-                                                
tmpRestoreDBInfo.stateMetaInfoSnapshots.get(
-                                                        descIdx))
-                                        .columnFamilyHandle;
-
-                        try (RocksIteratorWrapper iterator =
-                                RocksDBOperationUtils.getRocksIterator(
-                                        tmpRestoreDBInfo.db,
-                                        tmpColumnFamilyHandle,
-                                        tmpRestoreDBInfo.readOptions)) {
-
-                            iterator.seek(startKeyGroupPrefixBytes);
-
-                            while (iterator.isValid()) {
-
-                                if 
(RocksDBIncrementalCheckpointUtils.beforeThePrefixBytes(
-                                        iterator.key(), 
stopKeyGroupPrefixBytes)) {
-                                    writeBatchWrapper.put(
-                                            targetColumnFamilyHandle,
-                                            iterator.key(),
-                                            iterator.value());
-                                } else {
-                                    // Since the iterator will visit the 
record according to the
-                                    // sorted
-                                    // order,
-                                    // we can just break here.
-                                    break;
-                                }
-
-                                iterator.next();
-                            }
-                        } // releases native iterator resources
-                    }
-                    logger.info(
-                            "Finished restoring from state handle: {} with 
rescaling.",
-                            stateHandle);
+        logger.info(
+                "Starting to copy state handles for backend with range {} in 
operator {} using temporary instances.",
+                keyGroupRange,
+                operatorIdentifier);
+
+        try (RocksDBWriteBatchWrapper writeBatchWrapper =
+                new RocksDBWriteBatchWrapper(this.rocksHandle.getDb(), 
writeBatchSize)) {
+            for (IncrementalLocalKeyedStateHandle handleToCopy : toImport) {
+                try (RestoredDBInstance restoredDBInstance =
+                        restoreTempDBInstanceFromLocalState(handleToCopy)) {
+                    copyTempDbIntoBaseDb(
+                            restoredDBInstance,
+                            writeBatchWrapper,
+                            startKeyGroupPrefixBytes,
+                            stopKeyGroupPrefixBytes);
                 }
             }
-        } finally {
-            // Cleanup all download directories
-            allDownloadSpecs.stream()
-                    .map(StateHandleDownloadSpec::getDownloadDestination)
-                    .forEach(this::cleanUpPathQuietly);
         }
+
+        logger.info(
+                "Competed copying state handles for backend with range {} in 
operator {} using temporary instances.",
+                keyGroupRange,
+                operatorIdentifier);
     }
 
     /**
-     * Recovery from multi incremental states with rescaling. For rescaling, 
this method creates a
-     * temporary RocksDB instance for a key-groups shard. All contents from 
the temporary instance
-     * are copied into the real restore instance and then the temporary 
instance is discarded.
+     * Helper method tp copy all data from an open temporary DB to the base DB.
+     *
+     * @param tmpRestoreDBInfo the temporary instance.
+     * @param writeBatchWrapper write batch wrapper for writes against the 
base DB.
+     * @param startKeyGroupPrefixBytes the min/start key of the key groups 
range as bytes.
+     * @param stopKeyGroupPrefixBytes the max+1/end key of the key groups 
range as bytes.
+     * @throws Exception on any copy error.
      */
-    private void restoreWithIngestDbMode(Collection<KeyedStateHandle> 
restoreStateHandles)
+    private void copyTempDbIntoBaseDb(
+            RestoredDBInstance tmpRestoreDBInfo,
+            RocksDBWriteBatchWrapper writeBatchWrapper,
+            byte[] startKeyGroupPrefixBytes,
+            byte[] stopKeyGroupPrefixBytes)
             throws Exception {
 
-        Preconditions.checkArgument(restoreStateHandles != null && 
!restoreStateHandles.isEmpty());
-
-        Map<StateHandleID, StateHandleDownloadSpec> allDownloadSpecs =
-                
CollectionUtil.newHashMapWithExpectedSize(restoreStateHandles.size());
-
-        final Path absolutInstanceBasePath = 
instanceBasePath.getAbsoluteFile().toPath();
-        final Path exportCfBasePath = 
absolutInstanceBasePath.resolve("export-cfs");
-        Files.createDirectories(exportCfBasePath);
-
-        // Open base db as Empty DB
-        this.rocksHandle.openDB();
-
-        // Prepare and collect all the download request to pull remote state 
to a local directory
-        for (KeyedStateHandle stateHandle : restoreStateHandles) {
-            if (!(stateHandle instanceof IncrementalRemoteKeyedStateHandle)) {
-                throw unexpectedStateHandleException(
-                        IncrementalRemoteKeyedStateHandle.class, 
stateHandle.getClass());
-            }
-            StateHandleDownloadSpec downloadRequest =
-                    new StateHandleDownloadSpec(
-                            (IncrementalRemoteKeyedStateHandle) stateHandle,
-                            
absolutInstanceBasePath.resolve(UUID.randomUUID().toString()));
-            allDownloadSpecs.put(stateHandle.getStateHandleId(), 
downloadRequest);
-        }
-
-        // Process all state downloads
-        transferRemoteStateToLocalDirectory(allDownloadSpecs.values());
-
-        // Transfer remaining key-groups from temporary instance into base DB
-        byte[] startKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes];
-        CompositeKeySerializationUtils.serializeKeyGroup(
-                keyGroupRange.getStartKeyGroup(), startKeyGroupPrefixBytes);
-
-        byte[] stopKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes];
-        CompositeKeySerializationUtils.serializeKeyGroup(
-                keyGroupRange.getEndKeyGroup() + 1, stopKeyGroupPrefixBytes);
-
-        HashMap<RegisteredStateMetaInfoBase, List<ExportImportFilesMetaData>> 
cfMetaDataToImport =
-                new HashMap();
-
-        // Insert all remaining state through creating temporary RocksDB 
instances
-        for (StateHandleDownloadSpec downloadRequest : 
allDownloadSpecs.values()) {
-            logger.info(
-                    "Starting to restore from state handle: {} with 
rescaling.",
-                    downloadRequest.getStateHandle());
-
-            try (RestoredDBInstance tmpRestoreDBInfo =
-                    restoreTempDBInstanceFromDownloadedState(downloadRequest)) 
{
-
-                List<ColumnFamilyHandle> tmpColumnFamilyHandles =
-                        tmpRestoreDBInfo.columnFamilyHandles;
-
-                // Clip all tmp db to Range [startKeyGroupPrefixBytes, 
stopKeyGroupPrefixBytes)
-                RocksDBIncrementalCheckpointUtils.clipColumnFamilies(
-                        tmpRestoreDBInfo.db,
-                        tmpColumnFamilyHandles,
-                        startKeyGroupPrefixBytes,
-                        stopKeyGroupPrefixBytes);
-
-                // Export all the Column Families
-                Map<RegisteredStateMetaInfoBase, ExportImportFilesMetaData> 
exportedCFAndMetaData =
-                        RocksDBIncrementalCheckpointUtils.exportColumnFamilies(
-                                tmpRestoreDBInfo.db,
-                                tmpColumnFamilyHandles,
-                                tmpRestoreDBInfo.stateMetaInfoSnapshots,
-                                exportCfBasePath);
-
-                exportedCFAndMetaData.forEach(
-                        (stateMeta, cfMetaData) -> {
-                            if (!cfMetaData.files().isEmpty()) {
-                                cfMetaDataToImport.putIfAbsent(stateMeta, new 
ArrayList<>());
-                                
cfMetaDataToImport.get(stateMeta).add(cfMetaData);
-                            }
-                        });
-            } finally {
-                cleanUpPathQuietly(downloadRequest.getDownloadDestination());
-            }
-        }
+        logger.debug(
+                "Starting copy of state handle {} for backend with range {} in 
operator {} to base DB using temporary instance.",
+                tmpRestoreDBInfo.srcStateHandle,
+                keyGroupRange,
+                operatorIdentifier);
+
+        List<ColumnFamilyDescriptor> tmpColumnFamilyDescriptors =
+                tmpRestoreDBInfo.columnFamilyDescriptors;
+        List<ColumnFamilyHandle> tmpColumnFamilyHandles = 
tmpRestoreDBInfo.columnFamilyHandles;
+
+        // iterating only the requested descriptors automatically skips the 
default
+        // column
+        // family handle
+        for (int descIdx = 0; descIdx < tmpColumnFamilyDescriptors.size(); 
++descIdx) {
+            ColumnFamilyHandle tmpColumnFamilyHandle = 
tmpColumnFamilyHandles.get(descIdx);
+
+            ColumnFamilyHandle targetColumnFamilyHandle =
+                    this.rocksHandle.getOrRegisterStateColumnFamilyHandle(
+                                    null, 
tmpRestoreDBInfo.stateMetaInfoSnapshots.get(descIdx))
+                            .columnFamilyHandle;
+
+            try (RocksIteratorWrapper iterator =
+                    RocksDBOperationUtils.getRocksIterator(
+                            tmpRestoreDBInfo.db,
+                            tmpColumnFamilyHandle,
+                            tmpRestoreDBInfo.readOptions)) {
+
+                iterator.seek(startKeyGroupPrefixBytes);
+
+                while (iterator.isValid()) {
+
+                    if (RocksDBIncrementalCheckpointUtils.beforeThePrefixBytes(
+                            iterator.key(), stopKeyGroupPrefixBytes)) {
+                        writeBatchWrapper.put(
+                                targetColumnFamilyHandle, iterator.key(), 
iterator.value());
+                    } else {
+                        // Since the iterator will visit the record according 
to the
+                        // sorted
+                        // order,
+                        // we can just break here.
+                        break;
+                    }
 
-        try {
-            
cfMetaDataToImport.forEach(this.rocksHandle::registerStateColumnFamilyHandleWithImport);
-        } finally {
-            cleanUpPathQuietly(exportCfBasePath);
+                    iterator.next();
+                }
+            } // releases native iterator resources
         }
+        logger.debug(
+                "Finished copy of state handle {} for backend with range {} in 
operator {} using temporary instance.",
+                tmpRestoreDBInfo.srcStateHandle,
+                keyGroupRange,
+                operatorIdentifier);
     }
 
-    private void initBaseDBForRescaling(IncrementalLocalKeyedStateHandle 
stateHandle)
-            throws Exception {
-
-        // 1. Restore base DB from selected initial handle
-        restoreBaseDBFromLocalState(stateHandle);
-
-        // 2. Clip the base DB instance
+    private void cleanUpPathQuietly(@Nonnull Path path) {
         try {
-            RocksDBIncrementalCheckpointUtils.clipDBWithKeyGroupRange(
-                    this.rocksHandle.getDb(),
-                    this.rocksHandle.getColumnFamilyHandles(),
-                    keyGroupRange,
-                    stateHandle.getKeyGroupRange(),
-                    keyGroupPrefixBytes);
-        } catch (RocksDBException e) {
-            String errMsg = "Failed to clip DB after initialization.";
-            logger.error(errMsg, e);
-            throw new BackendBuildingException(errMsg, e);
+            FileUtils.deleteDirectory(path.toFile());
+        } catch (IOException ex) {
+            logger.warn("Failed to clean up path " + path, ex);
         }
     }
 
@@ -544,17 +825,21 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
 
         private final ReadOptions readOptions;
 
+        private final IncrementalLocalKeyedStateHandle srcStateHandle;
+
         private RestoredDBInstance(
                 @Nonnull RocksDB db,
                 @Nonnull List<ColumnFamilyHandle> columnFamilyHandles,
                 @Nonnull List<ColumnFamilyDescriptor> columnFamilyDescriptors,
-                @Nonnull List<StateMetaInfoSnapshot> stateMetaInfoSnapshots) {
+                @Nonnull List<StateMetaInfoSnapshot> stateMetaInfoSnapshots,
+                @Nonnull IncrementalLocalKeyedStateHandle srcStateHandle) {
             this.db = db;
             this.defaultColumnFamilyHandle = columnFamilyHandles.remove(0);
             this.columnFamilyHandles = columnFamilyHandles;
             this.columnFamilyDescriptors = columnFamilyDescriptors;
             this.stateMetaInfoSnapshots = stateMetaInfoSnapshots;
             this.readOptions = new ReadOptions();
+            this.srcStateHandle = srcStateHandle;
         }
 
         @Override
@@ -596,12 +881,16 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
                         this.rocksHandle.getDbOptions());
 
         return new RestoredDBInstance(
-                restoreDb, columnFamilyHandles, columnFamilyDescriptors, 
stateMetaInfoSnapshots);
+                restoreDb,
+                columnFamilyHandles,
+                columnFamilyDescriptors,
+                stateMetaInfoSnapshots,
+                stateHandle);
     }
 
     /**
      * This method recreates and registers all {@link ColumnFamilyDescriptor} 
from Flink's state
-     * meta data snapshot.
+     * metadata snapshot.
      */
     private List<ColumnFamilyDescriptor> createColumnFamilyDescriptors(
             List<StateMetaInfoSnapshot> stateMetaInfoSnapshots, boolean 
registerTtlCompactFilter) {
@@ -612,6 +901,7 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
         for (StateMetaInfoSnapshot stateMetaInfoSnapshot : 
stateMetaInfoSnapshots) {
             RegisteredStateMetaInfoBase metaInfoBase =
                     
RegisteredStateMetaInfoBase.fromMetaInfoSnapshot(stateMetaInfoSnapshot);
+
             ColumnFamilyDescriptor columnFamilyDescriptor =
                     RocksDBOperationUtils.createColumnFamilyDescriptor(
                             metaInfoBase,
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBNoneRestoreOperation.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBNoneRestoreOperation.java
index 4202c899c59..b12d6ff1d15 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBNoneRestoreOperation.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBNoneRestoreOperation.java
@@ -66,6 +66,7 @@ public class RocksDBNoneRestoreOperation<K> implements 
RocksDBRestoreOperation {
                 this.rocksHandle.getNativeMetricMonitor(),
                 -1,
                 null,
+                null,
                 null);
     }
 
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBRestoreResult.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBRestoreResult.java
index ad17b6f2769..1e458eb45f4 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBRestoreResult.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBRestoreResult.java
@@ -24,9 +24,13 @@ import 
org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocal
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.RocksDB;
 
+import javax.annotation.Nullable;
+
 import java.util.Collection;
+import java.util.Optional;
 import java.util.SortedMap;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 
 /** Entity holding result of RocksDB instance restore. */
 public class RocksDBRestoreResult {
@@ -39,19 +43,23 @@ public class RocksDBRestoreResult {
     private final UUID backendUID;
     private final SortedMap<Long, Collection<HandleAndLocalPath>> 
restoredSstFiles;
 
+    private final CompletableFuture<Void> asyncCompactAfterRestoreFuture;
+
     public RocksDBRestoreResult(
             RocksDB db,
             ColumnFamilyHandle defaultColumnFamilyHandle,
             RocksDBNativeMetricMonitor nativeMetricMonitor,
             long lastCompletedCheckpointId,
             UUID backendUID,
-            SortedMap<Long, Collection<HandleAndLocalPath>> restoredSstFiles) {
+            SortedMap<Long, Collection<HandleAndLocalPath>> restoredSstFiles,
+            @Nullable CompletableFuture<Void> asyncCompactAfterRestoreFuture) {
         this.db = db;
         this.defaultColumnFamilyHandle = defaultColumnFamilyHandle;
         this.nativeMetricMonitor = nativeMetricMonitor;
         this.lastCompletedCheckpointId = lastCompletedCheckpointId;
         this.backendUID = backendUID;
         this.restoredSstFiles = restoredSstFiles;
+        this.asyncCompactAfterRestoreFuture = asyncCompactAfterRestoreFuture;
     }
 
     public RocksDB getDb() {
@@ -77,4 +85,8 @@ public class RocksDBRestoreResult {
     public RocksDBNativeMetricMonitor getNativeMetricMonitor() {
         return nativeMetricMonitor;
     }
+
+    public Optional<CompletableFuture<Void>> 
getAsyncCompactAfterRestoreFuture() {
+        return Optional.ofNullable(asyncCompactAfterRestoreFuture);
+    }
 }
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java
index 03b1a087e28..d41b166e3c9 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java
@@ -86,6 +86,7 @@ import java.util.Queue;
 import java.util.concurrent.RunnableFuture;
 import java.util.stream.Collectors;
 
+import static 
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.USE_INGEST_DB_RESTORE_MODE;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.mockito.Matchers.any;
@@ -116,7 +117,14 @@ public class EmbeddedRocksDBStateBackendTest
                     {
                         true,
                         (SupplierWithException<CheckpointStorage, IOException>)
-                                JobManagerCheckpointStorage::new
+                                JobManagerCheckpointStorage::new,
+                        false
+                    },
+                    {
+                        true,
+                        (SupplierWithException<CheckpointStorage, IOException>)
+                                JobManagerCheckpointStorage::new,
+                        true
                     },
                     {
                         false,
@@ -126,7 +134,8 @@ public class EmbeddedRocksDBStateBackendTest
                                             
TempDirUtils.newFolder(tempFolder).toURI().toString();
                                     return new FileSystemCheckpointStorage(
                                             new Path(checkpointPath), 0, -1);
-                                }
+                                },
+                        false
                     }
                 });
     }
@@ -137,6 +146,9 @@ public class EmbeddedRocksDBStateBackendTest
     @Parameter(value = 1)
     public SupplierWithException<CheckpointStorage, IOException> 
storageSupplier;
 
+    @Parameter(value = 2)
+    public boolean useIngestDB;
+
     // Store it because we need it for the cleanup test.
     private String dbPath;
     private RocksDB db = null;
@@ -168,6 +180,7 @@ public class EmbeddedRocksDBStateBackendTest
         EmbeddedRocksDBStateBackend backend =
                 new 
EmbeddedRocksDBStateBackend(enableIncrementalCheckpointing);
         Configuration configuration = new Configuration();
+        configuration.setBoolean(USE_INGEST_DB_RESTORE_MODE, useIngestDB);
         configuration.set(
                 RocksDBOptions.TIMER_SERVICE_FACTORY,
                 EmbeddedRocksDBStateBackend.PriorityQueueStateType.ROCKSDB);
@@ -234,7 +247,8 @@ public class EmbeddedRocksDBStateBackendTest
                                 spy(db),
                                 defaultCFHandle,
                                 optionsContainer.getColumnOptions())
-                        
.setEnableIncrementalCheckpointing(enableIncrementalCheckpointing);
+                        
.setEnableIncrementalCheckpointing(enableIncrementalCheckpointing)
+                        .setUseIngestDbRestoreMode(useIngestDB);
 
         if (enableIncrementalCheckpointing) {
             rocksDBStateUploader =
@@ -315,19 +329,18 @@ public class EmbeddedRocksDBStateBackendTest
     @TestTemplate
     public void testCorrectMergeOperatorSet() throws Exception {
         prepareRocksDB();
-        final ColumnFamilyOptions columnFamilyOptions = spy(new 
ColumnFamilyOptions());
-        RocksDBKeyedStateBackend<Integer> test = null;
 
-        try {
-            test =
-                    RocksDBTestUtils.builderForTestDB(
-                                    TempDirUtils.newFolder(tempFolder),
-                                    IntSerializer.INSTANCE,
-                                    db,
-                                    defaultCFHandle,
-                                    columnFamilyOptions)
-                            
.setEnableIncrementalCheckpointing(enableIncrementalCheckpointing)
-                            .build();
+        try (ColumnFamilyOptions columnFamilyOptions = spy(new 
ColumnFamilyOptions());
+                RocksDBKeyedStateBackend<Integer> test =
+                        RocksDBTestUtils.builderForTestDB(
+                                        TempDirUtils.newFolder(tempFolder),
+                                        IntSerializer.INSTANCE,
+                                        db,
+                                        defaultCFHandle,
+                                        columnFamilyOptions)
+                                
.setEnableIncrementalCheckpointing(enableIncrementalCheckpointing)
+                                .setUseIngestDbRestoreMode(useIngestDB)
+                                .build()) {
 
             ValueStateDescriptor<String> stubState1 =
                     new ValueStateDescriptor<>("StubState-1", 
StringSerializer.INSTANCE);
@@ -339,12 +352,6 @@ public class EmbeddedRocksDBStateBackendTest
             // The default CF is pre-created so sum up to 2 times (once for 
each stub state)
             verify(columnFamilyOptions, Mockito.times(2))
                     
.setMergeOperatorName(RocksDBKeyedStateBackend.MERGE_OPERATOR_NAME);
-        } finally {
-            if (test != null) {
-                IOUtils.closeQuietly(test);
-                test.dispose();
-            }
-            columnFamilyOptions.close();
         }
     }
 
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRecoveryTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRecoveryTest.java
new file mode 100644
index 00000000000..4a233ec06ba
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRecoveryTest.java
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory;
+import org.apache.flink.testutils.junit.utils.TempDirUtils;
+import org.apache.flink.util.IOUtils;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.RunnableFuture;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.core.fs.Path.fromLocalFile;
+import static org.apache.flink.core.fs.local.LocalFileSystem.getSharedInstance;
+
+/** Rescaling test and microbenchmark for RocksDB. */
+public class RocksDBRecoveryTest {
+
+    // Assign System.out for console output.
+    private static final PrintStream OUTPUT =
+            new PrintStream(
+                    new OutputStream() {
+                        @Override
+                        public void write(int b) {}
+                    });
+
+    @TempDir private static java.nio.file.Path tempFolder;
+
+    @Test
+    public void testScaleOut_1_2() throws Exception {
+        testRescale(1, 2, 100_000, 10);
+    }
+
+    @Test
+    public void testScaleOut_2_8() throws Exception {
+        testRescale(2, 8, 100_000, 10);
+    }
+
+    @Test
+    public void testScaleOut_2_7() throws Exception {
+        testRescale(2, 7, 100_000, 10);
+    }
+
+    @Test
+    public void testScaleIn_2_1() throws Exception {
+        testRescale(2, 1, 100_000, 10);
+    }
+
+    @Test
+    public void testScaleIn_8_2() throws Exception {
+        testRescale(8, 2, 100_000, 10);
+    }
+
+    @Test
+    public void testScaleIn_7_2() throws Exception {
+        testRescale(7, 2, 100_000, 10);
+    }
+
+    @Test
+    public void testScaleIn_2_3() throws Exception {
+        testRescale(2, 3, 100_000, 10);
+    }
+
+    @Test
+    public void testScaleIn_3_2() throws Exception {
+        testRescale(3, 2, 100_000, 10);
+    }
+
+    public void testRescale(
+            int startParallelism, int targetParallelism, int numKeys, int 
updateDistance)
+            throws Exception {
+
+        OUTPUT.println("Rescaling from " + startParallelism + " to " + 
targetParallelism + "...");
+        final String stateName = "TestValueState";
+        final int maxParallelism = startParallelism * targetParallelism;
+        final List<RocksDBKeyedStateBackend<Integer>> backends = new 
ArrayList<>(maxParallelism);
+        final List<SnapshotResult<KeyedStateHandle>> startSnapshotResult = new 
ArrayList<>();
+        final List<SnapshotResult<KeyedStateHandle>> rescaleSnapshotResult = 
new ArrayList<>();
+        final List<SnapshotResult<KeyedStateHandle>> cleanupSnapshotResult = 
new ArrayList<>();
+        try {
+            final List<ValueState<Integer>> valueStates = new 
ArrayList<>(maxParallelism);
+            try {
+                ValueStateDescriptor<Integer> stateDescriptor =
+                        new ValueStateDescriptor<>(stateName, 
IntSerializer.INSTANCE);
+
+                for (int i = 0; i < startParallelism; ++i) {
+                    RocksDBKeyedStateBackend<Integer> backend =
+                            RocksDBTestUtils.builderForTestDefaults(
+                                            TempDirUtils.newFolder(tempFolder),
+                                            IntSerializer.INSTANCE,
+                                            maxParallelism,
+                                            KeyGroupRangeAssignment
+                                                    
.computeKeyGroupRangeForOperatorIndex(
+                                                            maxParallelism, 
startParallelism, i),
+                                            Collections.emptyList())
+                                    .setEnableIncrementalCheckpointing(true)
+                                    .setUseIngestDbRestoreMode(true)
+                                    .build();
+
+                    valueStates.add(
+                            backend.getOrCreateKeyedState(
+                                    VoidNamespaceSerializer.INSTANCE, 
stateDescriptor));
+
+                    backends.add(backend);
+                }
+
+                OUTPUT.println("Inserting " + numKeys + " keys...");
+
+                for (int i = 1; i <= numKeys; ++i) {
+                    int key = i;
+                    int index =
+                            
KeyGroupRangeAssignment.assignKeyToParallelOperator(
+                                    key, maxParallelism, startParallelism);
+                    backends.get(index).setCurrentKey(key);
+                    valueStates.get(index).update(i);
+
+                    if (updateDistance > 0 && i % updateDistance == 0) {
+                        key = i - updateDistance + 1;
+                        index =
+                                
KeyGroupRangeAssignment.assignKeyToParallelOperator(
+                                        key, maxParallelism, startParallelism);
+                        backends.get(index).setCurrentKey(key);
+                        valueStates.get(index).update(i);
+                    }
+                }
+
+                OUTPUT.println("Creating snapshots...");
+                snapshotAllBackends(backends, startSnapshotResult);
+            } finally {
+                for (RocksDBKeyedStateBackend<Integer> backend : backends) {
+                    IOUtils.closeQuietly(backend);
+                    backend.dispose();
+                }
+                valueStates.clear();
+                backends.clear();
+            }
+
+            for (boolean useIngest : Arrays.asList(Boolean.TRUE, 
Boolean.FALSE)) {
+                for (boolean asyncCompact : Arrays.asList(Boolean.TRUE, 
Boolean.FALSE)) {
+
+                    // Rescale start -> target
+                    rescaleAndRestoreBackends(
+                            useIngest,
+                            asyncCompact,
+                            targetParallelism,
+                            maxParallelism,
+                            startSnapshotResult,
+                            backends);
+
+                    backends.forEach(
+                            backend ->
+                                    backend.getAsyncCompactAfterRestoreFuture()
+                                            .ifPresent(
+                                                    future -> {
+                                                        try {
+                                                            future.get();
+                                                        } catch (Exception e) {
+                                                            throw new 
RuntimeException(e);
+                                                        }
+                                                    }));
+
+                    snapshotAllBackends(backends, rescaleSnapshotResult);
+
+                    int count = 0;
+                    for (RocksDBKeyedStateBackend<Integer> backend : backends) 
{
+                        count += backend.getKeys(stateName, 
VoidNamespace.INSTANCE).count();
+                        IOUtils.closeQuietly(backend);
+                        backend.dispose();
+                    }
+                    Assertions.assertEquals(numKeys, count);
+                    backends.clear();
+                    cleanupSnapshotResult.addAll(rescaleSnapshotResult);
+
+                    // Rescale reverse: target -> start
+                    rescaleAndRestoreBackends(
+                            useIngest,
+                            false,
+                            startParallelism,
+                            maxParallelism,
+                            rescaleSnapshotResult,
+                            backends);
+
+                    count = 0;
+                    for (RocksDBKeyedStateBackend<Integer> backend : backends) 
{
+                        count += backend.getKeys(stateName, 
VoidNamespace.INSTANCE).count();
+                        IOUtils.closeQuietly(backend);
+                        backend.dispose();
+                    }
+                    Assertions.assertEquals(numKeys, count);
+                    rescaleSnapshotResult.clear();
+                    backends.clear();
+                }
+            }
+        } finally {
+            for (RocksDBKeyedStateBackend<Integer> backend : backends) {
+                IOUtils.closeQuietly(backend);
+                backend.dispose();
+            }
+            for (SnapshotResult<KeyedStateHandle> snapshotResult : 
startSnapshotResult) {
+                snapshotResult.discardState();
+            }
+            for (SnapshotResult<KeyedStateHandle> snapshotResult : 
rescaleSnapshotResult) {
+                snapshotResult.discardState();
+            }
+            for (SnapshotResult<KeyedStateHandle> snapshotResult : 
cleanupSnapshotResult) {
+                snapshotResult.discardState();
+            }
+        }
+    }
+
+    private void rescaleAndRestoreBackends(
+            boolean useIngest,
+            boolean asyncCompactAfterRescale,
+            int targetParallelism,
+            int maxParallelism,
+            List<SnapshotResult<KeyedStateHandle>> snapshotResult,
+            List<RocksDBKeyedStateBackend<Integer>> backendsOut)
+            throws IOException {
+
+        List<KeyedStateHandle> stateHandles =
+                extractKeyedStateHandlesFromSnapshotResult(snapshotResult);
+        List<KeyGroupRange> ranges = computeKeyGroupRanges(targetParallelism, 
maxParallelism);
+        List<List<KeyedStateHandle>> handlesByInstance =
+                computeHandlesByInstance(stateHandles, ranges, 
targetParallelism);
+
+        OUTPUT.println(
+                "Restoring using ingestDb="
+                        + useIngest
+                        + ", asyncCompact="
+                        + asyncCompactAfterRescale
+                        + "... ");
+
+        OUTPUT.println(
+                "Sum of snapshot sizes: "
+                        + 
stateHandles.stream().mapToLong(StateObject::getStateSize).sum()
+                                / (1024 * 1024)
+                        + " MB");
+
+        long maxInstanceTime = Long.MIN_VALUE;
+        long t = System.currentTimeMillis();
+        for (int i = 0; i < targetParallelism; ++i) {
+            List<KeyedStateHandle> instanceHandles = handlesByInstance.get(i);
+            long tInstance = System.currentTimeMillis();
+            RocksDBKeyedStateBackend<Integer> backend =
+                    RocksDBTestUtils.builderForTestDefaults(
+                                    TempDirUtils.newFolder(tempFolder),
+                                    IntSerializer.INSTANCE,
+                                    maxParallelism,
+                                    ranges.get(i),
+                                    instanceHandles)
+                            .setEnableIncrementalCheckpointing(true)
+                            .setUseIngestDbRestoreMode(useIngest)
+                            
.setIncrementalRestoreAsyncCompactAfterRescale(asyncCompactAfterRescale)
+                            .build();
+
+            long instanceTime = System.currentTimeMillis() - tInstance;
+            if (instanceTime > maxInstanceTime) {
+                maxInstanceTime = instanceTime;
+            }
+
+            OUTPUT.println(
+                    "    Restored instance "
+                            + i
+                            + " from "
+                            + instanceHandles.size()
+                            + " state handles"
+                            + " time (ms): "
+                            + instanceTime);
+
+            backendsOut.add(backend);
+        }
+        OUTPUT.println("Total restore time (ms): " + 
(System.currentTimeMillis() - t));
+        OUTPUT.println("Max restore time (ms): " + maxInstanceTime);
+    }
+
+    private void snapshotAllBackends(
+            List<RocksDBKeyedStateBackend<Integer>> backends,
+            List<SnapshotResult<KeyedStateHandle>> snapshotResultsOut)
+            throws Exception {
+        for (int i = 0; i < backends.size(); ++i) {
+            RocksDBKeyedStateBackend<Integer> backend = backends.get(i);
+            FsCheckpointStreamFactory fsCheckpointStreamFactory =
+                    new FsCheckpointStreamFactory(
+                            getSharedInstance(),
+                            fromLocalFile(
+                                    TempDirUtils.newFolder(
+                                            tempFolder, "checkpointsDir_" + 
UUID.randomUUID() + i)),
+                            fromLocalFile(
+                                    TempDirUtils.newFolder(
+                                            tempFolder, "sharedStateDir_" + 
UUID.randomUUID() + i)),
+                            1,
+                            4096);
+
+            RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot =
+                    backend.snapshot(
+                            0L,
+                            0L,
+                            fsCheckpointStreamFactory,
+                            
CheckpointOptions.forCheckpointWithDefaultLocation());
+
+            snapshot.run();
+            snapshotResultsOut.add(snapshot.get());
+        }
+    }
+
+    private List<KeyedStateHandle> extractKeyedStateHandlesFromSnapshotResult(
+            List<SnapshotResult<KeyedStateHandle>> snapshotResults) {
+        return snapshotResults.stream()
+                .map(SnapshotResult::getJobManagerOwnedSnapshot)
+                .collect(Collectors.toList());
+    }
+
+    private List<KeyGroupRange> computeKeyGroupRanges(int restoreParallelism, 
int maxParallelism) {
+        List<KeyGroupRange> ranges = new ArrayList<>(restoreParallelism);
+        for (int i = 0; i < restoreParallelism; ++i) {
+            ranges.add(
+                    
KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(
+                            maxParallelism, restoreParallelism, i));
+        }
+        return ranges;
+    }
+
+    private List<List<KeyedStateHandle>> computeHandlesByInstance(
+            List<KeyedStateHandle> stateHandles,
+            List<KeyGroupRange> computedRanges,
+            int restoreParallelism) {
+        List<List<KeyedStateHandle>> handlesByInstance = new 
ArrayList<>(restoreParallelism);
+        for (KeyGroupRange targetRange : computedRanges) {
+            List<KeyedStateHandle> handlesForTargetRange = new ArrayList<>(1);
+            handlesByInstance.add(handlesForTargetRange);
+
+            for (KeyedStateHandle stateHandle : stateHandles) {
+                if (stateHandle.getKeyGroupRange().getIntersection(targetRange)
+                        != KeyGroupRange.EMPTY_KEY_GROUP_RANGE) {
+                    handlesForTargetRange.add(stateHandle);
+                }
+            }
+        }
+        return handlesByInstance;
+    }
+}
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index 34d27e1bb59..d54210e727e 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -872,6 +872,49 @@ public class RocksDBStateBackendConfigTest {
         assertTrue(0.3 == rocksDBStateBackend.getOverlapFractionThreshold());
     }
 
+    @Test
+    public void testDefaultUseIngestDB() {
+        EmbeddedRocksDBStateBackend rocksDBStateBackend = new 
EmbeddedRocksDBStateBackend(true);
+        assertEquals(
+                
RocksDBConfigurableOptions.USE_INGEST_DB_RESTORE_MODE.defaultValue(),
+                rocksDBStateBackend.getUseIngestDbRestoreMode());
+    }
+
+    @Test
+    public void testConfigureUseIngestDB() {
+        EmbeddedRocksDBStateBackend rocksDBStateBackend = new 
EmbeddedRocksDBStateBackend(true);
+        Configuration configuration = new Configuration();
+        
configuration.set(RocksDBConfigurableOptions.USE_INGEST_DB_RESTORE_MODE, true);
+        rocksDBStateBackend =
+                rocksDBStateBackend.configure(configuration, 
getClass().getClassLoader());
+        assertTrue(rocksDBStateBackend.getUseIngestDbRestoreMode());
+    }
+
+    @Test
+    public void testDefaultIncrementalRestoreInstanceBufferSize() {
+        EmbeddedRocksDBStateBackend rocksDBStateBackend = new 
EmbeddedRocksDBStateBackend(true);
+        assertEquals(
+                
RocksDBConfigurableOptions.INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE
+                        .defaultValue(),
+                
rocksDBStateBackend.getIncrementalRestoreAsyncCompactAfterRescale());
+    }
+
+    @Test
+    public void testConfigureIncrementalRestoreInstanceBufferSize() {
+        EmbeddedRocksDBStateBackend rocksDBStateBackend = new 
EmbeddedRocksDBStateBackend(true);
+        Configuration configuration = new Configuration();
+        boolean notDefault =
+                
!RocksDBConfigurableOptions.INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE
+                        .defaultValue();
+        configuration.set(
+                
RocksDBConfigurableOptions.INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE,
+                notDefault);
+        rocksDBStateBackend =
+                rocksDBStateBackend.configure(configuration, 
getClass().getClassLoader());
+        assertEquals(
+                notDefault, 
rocksDBStateBackend.getIncrementalRestoreAsyncCompactAfterRescale());
+    }
+
     private void verifySetParameter(Runnable setter) {
         try {
             setter.run();
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java
index 2fc862664ce..43111df7582 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTestUtils.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateBackendParametersImpl;
+import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
 import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
 import org.apache.flink.runtime.state.metrics.LatencyTrackingStateConfig;
@@ -37,8 +38,11 @@ import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.RocksDB;
 
+import javax.annotation.Nonnull;
+
 import java.io.File;
 import java.io.IOException;
+import java.util.Collection;
 import java.util.Collections;
 
 /** Test utils for the RocksDB state backend. */
@@ -50,13 +54,34 @@ public final class RocksDBTestUtils {
         return builderForTestDefaults(
                 instanceBasePath,
                 keySerializer,
-                EmbeddedRocksDBStateBackend.PriorityQueueStateType.HEAP);
+                2,
+                new KeyGroupRange(0, 1),
+                Collections.emptyList());
     }
 
     public static <K> RocksDBKeyedStateBackendBuilder<K> 
builderForTestDefaults(
             File instanceBasePath,
             TypeSerializer<K> keySerializer,
-            EmbeddedRocksDBStateBackend.PriorityQueueStateType queueStateType) 
{
+            int numKeyGroups,
+            KeyGroupRange keyGroupRange,
+            @Nonnull Collection<KeyedStateHandle> stateHandles) {
+
+        return builderForTestDefaults(
+                instanceBasePath,
+                keySerializer,
+                EmbeddedRocksDBStateBackend.PriorityQueueStateType.HEAP,
+                numKeyGroups,
+                keyGroupRange,
+                stateHandles);
+    }
+
+    public static <K> RocksDBKeyedStateBackendBuilder<K> 
builderForTestDefaults(
+            File instanceBasePath,
+            TypeSerializer<K> keySerializer,
+            EmbeddedRocksDBStateBackend.PriorityQueueStateType queueStateType,
+            int numKeyGroups,
+            KeyGroupRange keyGroupRange,
+            @Nonnull Collection<KeyedStateHandle> stateHandles) {
 
         final RocksDBResourceContainer optionsContainer = new 
RocksDBResourceContainer();
 
@@ -68,8 +93,8 @@ public final class RocksDBTestUtils {
                 stateName -> optionsContainer.getColumnOptions(),
                 new KvStateRegistry().createTaskRegistry(new JobID(), new 
JobVertexID()),
                 keySerializer,
-                2,
-                new KeyGroupRange(0, 1),
+                numKeyGroups,
+                keyGroupRange,
                 new ExecutionConfig(),
                 TestLocalRecoveryConfig.disabled(),
                 
RocksDBPriorityQueueConfig.buildWithPriorityQueueType(queueStateType),
@@ -77,7 +102,7 @@ public final class RocksDBTestUtils {
                 LatencyTrackingStateConfig.disabled(),
                 new UnregisteredMetricsGroup(),
                 (key, value) -> {},
-                Collections.emptyList(),
+                stateHandles,
                 UncompressedStreamCompressionDecorator.INSTANCE,
                 new CloseableRegistry());
     }
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksIncrementalCheckpointRescalingTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksIncrementalCheckpointRescalingTest.java
index 7e253cd2cc6..38b8d3e8c92 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksIncrementalCheckpointRescalingTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksIncrementalCheckpointRescalingTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
 import org.apache.flink.runtime.state.KeyGroupRange;
@@ -63,7 +64,7 @@ public class RocksIncrementalCheckpointRescalingTest extends 
TestLogger {
 
     private final int maxParallelism = 10;
 
-    private KeySelector<String, String> keySelector = new TestKeySelector();
+    private final KeySelector<String, String> keySelector = new 
TestKeySelector();
 
     private String[] records;
 
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java
index 9b95c8aa41c..ba13faca294 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java
@@ -38,6 +38,7 @@ import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
 import org.apache.flink.configuration.StateBackendOptions;
 import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -109,23 +110,29 @@ public class AutoRescalingITCase extends TestLogger {
     private static final int slotsPerTaskManager = 2;
     private static final int totalSlots = numTaskManagers * 
slotsPerTaskManager;
 
-    @Parameterized.Parameters(name = "backend = {0}, buffersPerChannel = {1}")
+    @Parameterized.Parameters(name = "backend = {0}, buffersPerChannel = {1}, 
useIngestDB = {2}")
     public static Collection<Object[]> data() {
         return Arrays.asList(
                 new Object[][] {
-                    {"rocksdb", 0}, {"rocksdb", 2}, {"filesystem", 0}, 
{"filesystem", 2}
+                    {"rocksdb", 0, false},
+                    {"rocksdb", 2, true},
+                    {"filesystem", 0, false},
+                    {"filesystem", 2, false}
                 });
     }
 
-    public AutoRescalingITCase(String backend, int buffersPerChannel) {
+    public AutoRescalingITCase(String backend, int buffersPerChannel, boolean 
useIngestDB) {
         this.backend = backend;
         this.buffersPerChannel = buffersPerChannel;
+        this.useIngestDB = useIngestDB;
     }
 
     private final String backend;
 
     private final int buffersPerChannel;
 
+    private final boolean useIngestDB;
+
     private String currentBackend = null;
 
     enum OperatorCheckpointMethod {
@@ -154,6 +161,7 @@ public class AutoRescalingITCase extends TestLogger {
             final File savepointDir = temporaryFolder.newFolder();
 
             config.set(StateBackendOptions.STATE_BACKEND, currentBackend);
+            config.set(RocksDBConfigurableOptions.USE_INGEST_DB_RESTORE_MODE, 
useIngestDB);
             config.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true);
             config.set(CheckpointingOptions.LOCAL_RECOVERY, true);
             config.set(


Reply via email to