This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 05cdb299e26db931a328bdc4e536dfb7effdb905
Author: Stefan Richter <srich...@confluent.io>
AuthorDate: Wed Oct 25 11:50:55 2023 +0200

    [FLINK-33341][state] Add support for rescaling from local keyed state to 
RockDBIncrementalRestoreOperation.
---
 .../state/RocksDBIncrementalCheckpointUtils.java   |   8 +-
 .../streaming/state/StateHandleDownloadSpec.java   |  12 ++
 .../RocksDBIncrementalRestoreOperation.java        | 233 ++++++++++-----------
 3 files changed, 129 insertions(+), 124 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
index 54121709876..a835d10c481 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java
@@ -175,14 +175,14 @@ public class RocksDBIncrementalCheckpointUtils {
      * @return The best candidate or null if no candidate was a good fit.
      */
     @Nullable
-    public static KeyedStateHandle chooseTheBestStateHandleForInitial(
-            @Nonnull Collection<KeyedStateHandle> restoreStateHandles,
+    public static <T extends KeyedStateHandle> T 
chooseTheBestStateHandleForInitial(
+            @Nonnull Collection<T> restoreStateHandles,
             @Nonnull KeyGroupRange targetKeyGroupRange,
             double overlapFractionThreshold) {
 
-        KeyedStateHandle bestStateHandle = null;
+        T bestStateHandle = null;
         Score bestScore = Score.MIN;
-        for (KeyedStateHandle rawStateHandle : restoreStateHandles) {
+        for (T rawStateHandle : restoreStateHandles) {
             Score handleScore =
                     stateHandleEvaluator(
                             rawStateHandle, targetKeyGroupRange, 
overlapFractionThreshold);
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/StateHandleDownloadSpec.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/StateHandleDownloadSpec.java
index 93a33fdc6fa..5f37f84921f 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/StateHandleDownloadSpec.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/StateHandleDownloadSpec.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.contrib.streaming.state;
 
+import org.apache.flink.runtime.state.DirectoryStateHandle;
+import org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle;
 import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
 
 import java.nio.file.Path;
@@ -46,4 +48,14 @@ public class StateHandleDownloadSpec {
     public Path getDownloadDestination() {
         return downloadDestination;
     }
+
+    public IncrementalLocalKeyedStateHandle 
createLocalStateHandleForDownloadedState() {
+        return new IncrementalLocalKeyedStateHandle(
+                stateHandle.getBackendIdentifier(),
+                stateHandle.getCheckpointId(),
+                new DirectoryStateHandle(downloadDestination),
+                stateHandle.getKeyGroupRange(),
+                stateHandle.getMetaDataStateHandle(),
+                stateHandle.getSharedState());
+    }
 }
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
index a62bbb4a70b..11d4756ae13 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
@@ -35,7 +35,6 @@ import 
org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.state.BackendBuildingException;
 import org.apache.flink.runtime.state.CompositeKeySerializationUtils;
-import org.apache.flink.runtime.state.DirectoryStateHandle;
 import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
 import 
org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath;
 import org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle;
@@ -44,11 +43,9 @@ import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
-import org.apache.flink.runtime.state.StateHandleID;
 import org.apache.flink.runtime.state.StateSerializerProvider;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
-import org.apache.flink.util.CollectionUtil;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.Preconditions;
@@ -227,35 +224,12 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
                 new StateHandleDownloadSpec(stateHandle, 
tmpRestoreInstancePath);
         try {
             
transferRemoteStateToLocalDirectory(Collections.singletonList(downloadRequest));
-            restoreBaseDBFromDownloadedState(downloadRequest);
+            
restoreBaseDBFromLocalState(downloadRequest.createLocalStateHandleForDownloadedState());
         } finally {
             cleanUpPathQuietly(downloadRequest.getDownloadDestination());
         }
     }
 
-    /**
-     * This helper method creates a {@link IncrementalLocalKeyedStateHandle} 
for state that was
-     * previously downloaded for a {@link IncrementalRemoteKeyedStateHandle} 
and then invokes the
-     * restore procedure for local state on the downloaded state.
-     *
-     * @param downloadedState the specification of a completed state download.
-     * @throws Exception for restore problems.
-     */
-    private void restoreBaseDBFromDownloadedState(StateHandleDownloadSpec 
downloadedState)
-            throws Exception {
-        // since we transferred all remote state to a local directory, we can 
use the same code
-        // as for local recovery.
-        IncrementalRemoteKeyedStateHandle stateHandle = 
downloadedState.getStateHandle();
-        restoreBaseDBFromLocalState(
-                new IncrementalLocalKeyedStateHandle(
-                        stateHandle.getBackendIdentifier(),
-                        stateHandle.getCheckpointId(),
-                        new 
DirectoryStateHandle(downloadedState.getDownloadDestination()),
-                        stateHandle.getKeyGroupRange(),
-                        stateHandle.getMetaDataStateHandle(),
-                        stateHandle.getSharedState()));
-    }
-
     /** Restores RocksDB instance from local state. */
     private void restoreBaseDBFromLocalState(IncrementalLocalKeyedStateHandle 
localKeyedStateHandle)
             throws Exception {
@@ -304,113 +278,133 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
 
         Preconditions.checkArgument(restoreStateHandles != null && 
!restoreStateHandles.isEmpty());
 
-        Map<StateHandleID, StateHandleDownloadSpec> allDownloadSpecs =
-                
CollectionUtil.newHashMapWithExpectedSize(restoreStateHandles.size());
-
-        // Choose the best state handle for the initial DB
-        final KeyedStateHandle selectedInitialHandle =
-                
RocksDBIncrementalCheckpointUtils.chooseTheBestStateHandleForInitial(
-                        restoreStateHandles, keyGroupRange, 
overlapFractionThreshold);
+        final List<StateHandleDownloadSpec> allDownloadSpecs = new 
ArrayList<>();
 
-        Preconditions.checkNotNull(selectedInitialHandle);
+        final List<IncrementalLocalKeyedStateHandle> localKeyedStateHandles =
+                new ArrayList<>(restoreStateHandles.size());
 
         final Path absolutInstanceBasePath = 
instanceBasePath.getAbsoluteFile().toPath();
 
         // Prepare and collect all the download request to pull remote state 
to a local directory
         for (KeyedStateHandle stateHandle : restoreStateHandles) {
-            if (!(stateHandle instanceof IncrementalRemoteKeyedStateHandle)) {
+            if (stateHandle instanceof IncrementalRemoteKeyedStateHandle) {
+                StateHandleDownloadSpec downloadRequest =
+                        new StateHandleDownloadSpec(
+                                (IncrementalRemoteKeyedStateHandle) 
stateHandle,
+                                
absolutInstanceBasePath.resolve(UUID.randomUUID().toString()));
+                allDownloadSpecs.add(downloadRequest);
+            } else if (stateHandle instanceof 
IncrementalLocalKeyedStateHandle) {
+                localKeyedStateHandles.add((IncrementalLocalKeyedStateHandle) 
stateHandle);
+            } else {
                 throw unexpectedStateHandleException(
                         IncrementalRemoteKeyedStateHandle.class, 
stateHandle.getClass());
             }
-            StateHandleDownloadSpec downloadRequest =
-                    new StateHandleDownloadSpec(
-                            (IncrementalRemoteKeyedStateHandle) stateHandle,
-                            
absolutInstanceBasePath.resolve(UUID.randomUUID().toString()));
-            allDownloadSpecs.put(stateHandle.getStateHandleId(), 
downloadRequest);
         }
 
-        // Process all state downloads
-        transferRemoteStateToLocalDirectory(allDownloadSpecs.values());
-
-        // Init the base DB instance with the initial state
-        
initBaseDBForRescaling(allDownloadSpecs.remove(selectedInitialHandle.getStateHandleId()));
-
-        // Transfer remaining key-groups from temporary instance into base DB
-        byte[] startKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes];
-        CompositeKeySerializationUtils.serializeKeyGroup(
-                keyGroupRange.getStartKeyGroup(), startKeyGroupPrefixBytes);
-
-        byte[] stopKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes];
-        CompositeKeySerializationUtils.serializeKeyGroup(
-                keyGroupRange.getEndKeyGroup() + 1, stopKeyGroupPrefixBytes);
-
-        // Insert all remaining state through creating temporary RocksDB 
instances
-        for (StateHandleDownloadSpec downloadRequest : 
allDownloadSpecs.values()) {
-            logger.info(
-                    "Starting to restore from state handle: {} with 
rescaling.",
-                    downloadRequest.getStateHandle());
-
-            try (RestoredDBInstance tmpRestoreDBInfo =
-                            
restoreTempDBInstanceFromDownloadedState(downloadRequest);
-                    RocksDBWriteBatchWrapper writeBatchWrapper =
-                            new RocksDBWriteBatchWrapper(
-                                    this.rocksHandle.getDb(), writeBatchSize)) 
{
-
-                List<ColumnFamilyDescriptor> tmpColumnFamilyDescriptors =
-                        tmpRestoreDBInfo.columnFamilyDescriptors;
-                List<ColumnFamilyHandle> tmpColumnFamilyHandles =
-                        tmpRestoreDBInfo.columnFamilyHandles;
-
-                // iterating only the requested descriptors automatically 
skips the default column
-                // family handle
-                for (int descIdx = 0; descIdx < 
tmpColumnFamilyDescriptors.size(); ++descIdx) {
-                    ColumnFamilyHandle tmpColumnFamilyHandle = 
tmpColumnFamilyHandles.get(descIdx);
-
-                    ColumnFamilyHandle targetColumnFamilyHandle =
-                            
this.rocksHandle.getOrRegisterStateColumnFamilyHandle(
-                                            null,
-                                            
tmpRestoreDBInfo.stateMetaInfoSnapshots.get(descIdx))
-                                    .columnFamilyHandle;
-
-                    try (RocksIteratorWrapper iterator =
-                            RocksDBOperationUtils.getRocksIterator(
-                                    tmpRestoreDBInfo.db,
-                                    tmpColumnFamilyHandle,
-                                    tmpRestoreDBInfo.readOptions)) {
-
-                        iterator.seek(startKeyGroupPrefixBytes);
-
-                        while (iterator.isValid()) {
-
-                            if 
(RocksDBIncrementalCheckpointUtils.beforeThePrefixBytes(
-                                    iterator.key(), stopKeyGroupPrefixBytes)) {
-                                writeBatchWrapper.put(
-                                        targetColumnFamilyHandle, 
iterator.key(), iterator.value());
-                            } else {
-                                // Since the iterator will visit the record 
according to the sorted
-                                // order,
-                                // we can just break here.
-                                break;
-                            }
+        allDownloadSpecs.stream()
+                
.map(StateHandleDownloadSpec::createLocalStateHandleForDownloadedState)
+                .forEach(localKeyedStateHandles::add);
 
-                            iterator.next();
-                        }
-                    } // releases native iterator resources
-                }
+        // Choose the best state handle for the initial DB
+        final IncrementalLocalKeyedStateHandle selectedInitialHandle =
+                
RocksDBIncrementalCheckpointUtils.chooseTheBestStateHandleForInitial(
+                        localKeyedStateHandles, keyGroupRange, 
overlapFractionThreshold);
+        Preconditions.checkNotNull(selectedInitialHandle);
+        // Remove the selected handle from the list so that we don't restore 
it twice.
+        localKeyedStateHandles.remove(selectedInitialHandle);
+
+        try {
+            // Process all state downloads
+            transferRemoteStateToLocalDirectory(allDownloadSpecs);
+
+            // Init the base DB instance with the initial state
+            initBaseDBForRescaling(selectedInitialHandle);
+
+            // Transfer remaining key-groups from temporary instance into base 
DB
+            byte[] startKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes];
+            CompositeKeySerializationUtils.serializeKeyGroup(
+                    keyGroupRange.getStartKeyGroup(), 
startKeyGroupPrefixBytes);
+
+            byte[] stopKeyGroupPrefixBytes = new byte[keyGroupPrefixBytes];
+            CompositeKeySerializationUtils.serializeKeyGroup(
+                    keyGroupRange.getEndKeyGroup() + 1, 
stopKeyGroupPrefixBytes);
+
+            // Insert all remaining state through creating temporary RocksDB 
instances
+            for (IncrementalLocalKeyedStateHandle stateHandle : 
localKeyedStateHandles) {
                 logger.info(
-                        "Finished restoring from state handle: {} with 
rescaling.",
-                        downloadRequest.getStateHandle());
-            } finally {
-                cleanUpPathQuietly(downloadRequest.getDownloadDestination());
+                        "Starting to restore from state handle: {} with 
rescaling.", stateHandle);
+
+                try (RestoredDBInstance tmpRestoreDBInfo =
+                                
restoreTempDBInstanceFromLocalState(stateHandle);
+                        RocksDBWriteBatchWrapper writeBatchWrapper =
+                                new RocksDBWriteBatchWrapper(
+                                        this.rocksHandle.getDb(), 
writeBatchSize)) {
+
+                    List<ColumnFamilyDescriptor> tmpColumnFamilyDescriptors =
+                            tmpRestoreDBInfo.columnFamilyDescriptors;
+                    List<ColumnFamilyHandle> tmpColumnFamilyHandles =
+                            tmpRestoreDBInfo.columnFamilyHandles;
+
+                    // iterating only the requested descriptors automatically 
skips the default
+                    // column
+                    // family handle
+                    for (int descIdx = 0; descIdx < 
tmpColumnFamilyDescriptors.size(); ++descIdx) {
+                        ColumnFamilyHandle tmpColumnFamilyHandle =
+                                tmpColumnFamilyHandles.get(descIdx);
+
+                        ColumnFamilyHandle targetColumnFamilyHandle =
+                                
this.rocksHandle.getOrRegisterStateColumnFamilyHandle(
+                                                null,
+                                                
tmpRestoreDBInfo.stateMetaInfoSnapshots.get(
+                                                        descIdx))
+                                        .columnFamilyHandle;
+
+                        try (RocksIteratorWrapper iterator =
+                                RocksDBOperationUtils.getRocksIterator(
+                                        tmpRestoreDBInfo.db,
+                                        tmpColumnFamilyHandle,
+                                        tmpRestoreDBInfo.readOptions)) {
+
+                            iterator.seek(startKeyGroupPrefixBytes);
+
+                            while (iterator.isValid()) {
+
+                                if 
(RocksDBIncrementalCheckpointUtils.beforeThePrefixBytes(
+                                        iterator.key(), 
stopKeyGroupPrefixBytes)) {
+                                    writeBatchWrapper.put(
+                                            targetColumnFamilyHandle,
+                                            iterator.key(),
+                                            iterator.value());
+                                } else {
+                                    // Since the iterator will visit the 
record according to the
+                                    // sorted
+                                    // order,
+                                    // we can just break here.
+                                    break;
+                                }
+
+                                iterator.next();
+                            }
+                        } // releases native iterator resources
+                    }
+                    logger.info(
+                            "Finished restoring from state handle: {} with 
rescaling.",
+                            stateHandle);
+                }
             }
+        } finally {
+            // Cleanup all download directories
+            allDownloadSpecs.stream()
+                    .map(StateHandleDownloadSpec::getDownloadDestination)
+                    .forEach(this::cleanUpPathQuietly);
         }
     }
 
-    private void initBaseDBForRescaling(StateHandleDownloadSpec 
downloadedInitialState)
+    private void initBaseDBForRescaling(IncrementalLocalKeyedStateHandle 
stateHandle)
             throws Exception {
 
         // 1. Restore base DB from selected initial handle
-        restoreBaseDBFromDownloadedState(downloadedInitialState);
+        restoreBaseDBFromLocalState(stateHandle);
 
         // 2. Clip the base DB instance
         try {
@@ -418,7 +412,7 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
                     this.rocksHandle.getDb(),
                     this.rocksHandle.getColumnFamilyHandles(),
                     keyGroupRange,
-                    downloadedInitialState.getStateHandle().getKeyGroupRange(),
+                    stateHandle.getKeyGroupRange(),
                     keyGroupPrefixBytes);
         } catch (RocksDBException e) {
             String errMsg = "Failed to clip DB after initialization.";
@@ -470,11 +464,10 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
         }
     }
 
-    private RestoredDBInstance restoreTempDBInstanceFromDownloadedState(
-            StateHandleDownloadSpec downloadRequest) throws Exception {
-
+    private RestoredDBInstance restoreTempDBInstanceFromLocalState(
+            IncrementalLocalKeyedStateHandle stateHandle) throws Exception {
         KeyedBackendSerializationProxy<K> serializationProxy =
-                
readMetaData(downloadRequest.getStateHandle().getMetaDataStateHandle());
+                readMetaData(stateHandle.getMetaDataStateHandle());
         // read meta data
         List<StateMetaInfoSnapshot> stateMetaInfoSnapshots =
                 serializationProxy.getStateMetaInfoSnapshots();
@@ -487,7 +480,7 @@ public class RocksDBIncrementalRestoreOperation<K> 
implements RocksDBRestoreOper
 
         RocksDB restoreDb =
                 RocksDBOperationUtils.openDB(
-                        downloadRequest.getDownloadDestination().toString(),
+                        
stateHandle.getDirectoryStateHandle().getDirectory().toString(),
                         columnFamilyDescriptors,
                         columnFamilyHandles,
                         RocksDBOperationUtils.createColumnFamilyOptions(

Reply via email to