This is an automated email from the ASF dual-hosted git repository.

zakelly pushed a commit to branch release-2.1
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-2.1 by this push:
     new 3d3182a35dc [FLINK-38336][state/forst] Avoid data copy during failover 
for ForSt statebackend (#27109)
3d3182a35dc is described below

commit 3d3182a35dcb156b515abe5838d8fdb96baad316
Author: AlexYinHan <[email protected]>
AuthorDate: Tue Oct 14 18:07:34 2025 +0800

    [FLINK-38336][state/forst] Avoid data copy during failover for ForSt 
statebackend (#27109)
    
    (cherry picked from commit 78f6e779137bfb732862f237ae6e70b94d0e00ed)
---
 .../flink/state/forst/ForStKeyedStateBackend.java  |  14 +-
 .../state/forst/ForStKeyedStateBackendBuilder.java |  15 +-
 .../flink/state/forst/ForStPathContainer.java      | 193 +++++++++++++++++++++
 .../flink/state/forst/ForStResourceContainer.java  | 114 +++++-------
 .../flink/state/forst/ForStStateBackend.java       |  52 +++---
 .../datatransfer/DataTransferStrategyBuilder.java  |  39 ++++-
 .../forst/datatransfer/ForStStateDataTransfer.java |  10 +-
 .../restore/ForStIncrementalRestoreOperation.java  |  15 +-
 .../forst/sync/ForStSyncKeyedStateBackend.java     |  11 +-
 .../sync/ForStSyncKeyedStateBackendBuilder.java    |   4 +-
 .../apache/flink/state/forst/ForStExtension.java   |  10 +-
 .../state/forst/ForStResourceContainerTest.java    |  12 +-
 .../state/forst/ForStStateBackendConfigTest.java   |  27 ++-
 .../flink/state/forst/ForStStateBackendTest.java   |   6 +-
 .../datatransfer/DataTransferStrategyTest.java     |  78 ++++++++-
 .../datatransfer/ForStStateDataTransferTest.java   |  22 ++-
 16 files changed, 460 insertions(+), 162 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java
index 635d29b8c41..1efde10dfdf 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java
@@ -596,18 +596,14 @@ public class ForStKeyedStateBackend<K> implements 
AsyncKeyedStateBackend<K> {
                 IOUtils.closeQuietly(db);
 
                 LOG.info(
-                        "Closed ForSt State Backend. Cleaning up ForSt local 
working directory {}, remote working directory {}.",
-                        optionsContainer.getLocalBasePath(),
-                        optionsContainer.getRemoteBasePath());
+                        "Closed ForSt State Backend. Cleaning up ForSt: {}.",
+                        optionsContainer.getPathContainer());
 
                 try {
                     optionsContainer.clearDirectories();
                 } catch (Exception ex) {
                     LOG.warn(
-                            "Could not delete ForSt local working directory 
{}, remote working directory {}.",
-                            optionsContainer.getLocalBasePath(),
-                            optionsContainer.getRemoteBasePath(),
-                            ex);
+                            "Could not delete ForSt: {}.", 
optionsContainer.getPathContainer(), ex);
                 }
 
                 IOUtils.closeQuietly(optionsContainer);
@@ -624,12 +620,12 @@ public class ForStKeyedStateBackend<K> implements 
AsyncKeyedStateBackend<K> {
 
     @VisibleForTesting
     Path getLocalBasePath() {
-        return optionsContainer.getLocalBasePath();
+        return optionsContainer.getPathContainer().getLocalBasePath();
     }
 
     @VisibleForTesting
     Path getRemoteBasePath() {
-        return optionsContainer.getRemoteBasePath();
+        return optionsContainer.getPathContainer().getRemoteBasePath();
     }
 
     @Override
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java
index 31473eda4c4..d9b2bc3dd58 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java
@@ -302,11 +302,7 @@ public class ForStKeyedStateBackendBuilder<K>
                 //       deletion in file mapping manager.
                 optionsContainer.forceClearRemoteDirectories();
             } catch (Exception ex) {
-                logger.warn(
-                        "Failed to delete ForSt local base path {}, remote 
base path {}.",
-                        optionsContainer.getLocalBasePath(),
-                        optionsContainer.getRemoteBasePath(),
-                        ex);
+                logger.warn("Failed to delete ForSt: {}.", 
optionsContainer.getPathContainer(), ex);
             }
             IOUtils.closeQuietly(optionsContainer);
             IOUtils.closeQuietly(snapshotStrategy);
@@ -322,9 +318,8 @@ public class ForStKeyedStateBackendBuilder<K>
         InternalKeyContext<K> keyContext =
                 new InternalKeyContextImpl<>(keyGroupRange, numberOfKeyGroups);
         logger.info(
-                "Finished building ForSt keyed state-backend at local base 
path: {}, remote base path: {}.",
-                optionsContainer.getLocalBasePath(),
-                optionsContainer.getRemoteBasePath());
+                "Finished building ForSt keyed state-backend at {}",
+                optionsContainer.getPathContainer());
         return new ForStKeyedStateBackend<>(
                 backendUID,
                 executionConfig,
@@ -360,8 +355,8 @@ public class ForStKeyedStateBackendBuilder<K>
         // working dir. We will implement this in ForStDB later, but before 
that, we achieved this
         // by setting the dbPath to "/" when the dfs directory existed.
         Path instanceForStPath =
-                optionsContainer.getRemoteForStPath() == null
-                        ? optionsContainer.getLocalForStPath()
+                optionsContainer.getPathContainer().getRemoteForStPath() == 
null
+                        ? 
optionsContainer.getPathContainer().getLocalForStPath()
                         : new Path("/db");
 
         if (CollectionUtil.isEmptyOrAllElementsNull(restoreStateHandles)) {
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStPathContainer.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStPathContainer.java
new file mode 100644
index 00000000000..4cad55c26c5
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStPathContainer.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import org.apache.flink.core.fs.Path;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+
+/** Container for ForSt paths. */
+public class ForStPathContainer {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ForStResourceContainer.class);
+    public static final String DB_DIR_STRING = "db";
+
+    /**
+     * Local job path. This indicates the parent directory of ForSt, which 
ends with the Flink
+     * JobID.
+     */
+    @Nullable private final Path localJobPath;
+
+    /**
+     * Local base path. This includes the information of the subtask that 
holds ForSt, such as the
+     * Operator Identifier and subtask index.
+     */
+    @Nullable private final Path localBasePath;
+
+    /** Local ForSt path. This is the directory of ForSt DB, which ends with 
'db'. */
+    @Nullable private final Path localForStPath;
+
+    /**
+     * Remote paths of ForSt. Similar to the respective Path mentioned above, 
but located under the
+     * remote parent path.
+     */
+    @Nullable private final Path remoteJobPath;
+
+    @Nullable private final Path remoteBasePath;
+    @Nullable private final Path remoteForStPath;
+
+    public static ForStPathContainer empty() {
+        return of(null, null, null, null);
+    }
+
+    public static ForStPathContainer ofLocal(
+            @Nullable Path localJobPath, @Nullable Path localBasePath) {
+        return new ForStPathContainer(localJobPath, localBasePath, null, null);
+    }
+
+    public static ForStPathContainer of(
+            @Nullable Path localJobPath,
+            @Nullable Path localBasePath,
+            @Nullable Path remoteJobPath,
+            @Nullable Path remoteBasePath) {
+        return new ForStPathContainer(localJobPath, localBasePath, 
remoteJobPath, remoteBasePath);
+    }
+
+    public ForStPathContainer(
+            @Nullable Path localJobPath,
+            @Nullable Path localBasePath,
+            @Nullable Path remoteJobPath,
+            @Nullable Path remoteBasePath) {
+        this.localJobPath = localJobPath;
+        this.localBasePath = localBasePath;
+        this.localForStPath = localBasePath != null ? new Path(localBasePath, 
DB_DIR_STRING) : null;
+
+        this.remoteJobPath = remoteJobPath;
+        this.remoteBasePath = remoteBasePath;
+        this.remoteForStPath =
+                remoteBasePath != null ? new Path(remoteBasePath, 
DB_DIR_STRING) : null;
+
+        LOG.info(
+                "ForStPathContainer: localJobPath: {}, localBasePath: {}, 
localForStPath:{},  remoteJobPath: {}, remoteBasePath: {}, remoteForStPath: {}",
+                localJobPath,
+                localBasePath,
+                localForStPath,
+                remoteJobPath,
+                remoteBasePath,
+                remoteForStPath);
+    }
+
+    public @Nullable Path getLocalJobPath() {
+        return localJobPath;
+    }
+
+    public @Nullable Path getLocalBasePath() {
+        return localBasePath;
+    }
+
+    public @Nullable Path getLocalForStPath() {
+        return localForStPath;
+    }
+
+    public @Nullable Path getRemoteJobPath() {
+        return remoteJobPath;
+    }
+
+    public @Nullable Path getRemoteBasePath() {
+        return remoteBasePath;
+    }
+
+    public @Nullable Path getRemoteForStPath() {
+        return remoteForStPath;
+    }
+
+    public Path getJobPath() {
+        if (remoteJobPath != null) {
+            return remoteJobPath;
+        } else {
+            return localJobPath;
+        }
+    }
+
+    public Path getBasePath() {
+        if (remoteBasePath != null) {
+            return remoteBasePath;
+        } else {
+            return localBasePath;
+        }
+    }
+
+    public Path getDbPath() {
+        if (remoteForStPath != null) {
+            return remoteForStPath;
+        } else {
+            return localForStPath;
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "ForStPathContainer(localJobPath = ["
+                + localJobPath
+                + "] localBasePath = ["
+                + localBasePath
+                + "] localForStPath = ["
+                + localForStPath
+                + "] remoteJobPath = ["
+                + remoteJobPath
+                + "] remoteBasePath = ["
+                + remoteBasePath
+                + "] remoteForStPath = ["
+                + remoteForStPath
+                + "])";
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        ForStPathContainer that = (ForStPathContainer) o;
+        return Objects.equals(localJobPath, that.localJobPath)
+                && Objects.equals(localBasePath, that.localBasePath)
+                && Objects.equals(localForStPath, that.localForStPath)
+                && Objects.equals(remoteJobPath, that.remoteJobPath)
+                && Objects.equals(remoteBasePath, that.remoteBasePath)
+                && Objects.equals(remoteForStPath, that.remoteForStPath);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+                localJobPath,
+                localBasePath,
+                localForStPath,
+                remoteJobPath,
+                remoteBasePath,
+                remoteForStPath);
+    }
+}
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java
index d4c5a7cc831..cb29cf1857c 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java
@@ -73,8 +73,6 @@ import static 
org.apache.flink.state.forst.ForStOptions.CACHE_SIZE_BASE_LIMIT;
 public final class ForStResourceContainer implements AutoCloseable {
     private static final Logger LOG = 
LoggerFactory.getLogger(ForStResourceContainer.class);
 
-    public static final String DB_DIR_STRING = "db";
-
     private static final String FORST_RELOCATE_LOG_SUFFIX = "_LOG";
 
     // the filename length limit is 255 on most operating systems
@@ -82,16 +80,10 @@ public final class ForStResourceContainer implements 
AutoCloseable {
     // and the db data dir's absolute path will be used as the log file name's 
prefix.
     private static final int INSTANCE_PATH_LENGTH_LIMIT = 255 - 
FORST_RELOCATE_LOG_SUFFIX.length();
 
-    @Nullable private final Path remoteBasePath;
-
-    @Nullable private final Path remoteForStPath;
+    private final ForStPathContainer pathContainer;
 
     private boolean remotePathNewlyCreated;
 
-    @Nullable private final Path localBasePath;
-
-    @Nullable private final Path localForStPath;
-
     @Nullable private Path cacheBasePath;
 
     private final long cacheCapacity;
@@ -130,8 +122,7 @@ public final class ForStResourceContainer implements 
AutoCloseable {
                 new Configuration(),
                 null,
                 null,
-                null,
-                null,
+                ForStPathContainer.empty(),
                 RecoveryClaimMode.DEFAULT,
                 null,
                 null,
@@ -144,8 +135,7 @@ public final class ForStResourceContainer implements 
AutoCloseable {
                 new Configuration(),
                 optionsFactory,
                 null,
-                null,
-                null,
+                ForStPathContainer.empty(),
                 RecoveryClaimMode.DEFAULT,
                 null,
                 null,
@@ -160,8 +150,7 @@ public final class ForStResourceContainer implements 
AutoCloseable {
                 new Configuration(),
                 optionsFactory,
                 sharedResources,
-                null,
-                null,
+                ForStPathContainer.empty(),
                 RecoveryClaimMode.DEFAULT,
                 null,
                 null,
@@ -172,8 +161,7 @@ public final class ForStResourceContainer implements 
AutoCloseable {
             ReadableConfig configuration,
             @Nullable ForStOptionsFactory optionsFactory,
             @Nullable OpaqueMemoryResource<ForStSharedResources> 
sharedResources,
-            @Nullable Path localBasePath,
-            @Nullable Path remoteBasePath,
+            ForStPathContainer pathContainer,
             RecoveryClaimMode claimMode,
             @Nullable CheckpointStorageAccess checkpointStorageAccess,
             MetricGroup metricGroup,
@@ -183,11 +171,7 @@ public final class ForStResourceContainer implements 
AutoCloseable {
         this.optionsFactory = optionsFactory;
         this.sharedResources = sharedResources;
 
-        this.localBasePath = localBasePath;
-        this.localForStPath = localBasePath != null ? new Path(localBasePath, 
DB_DIR_STRING) : null;
-        this.remoteBasePath = remoteBasePath;
-        this.remoteForStPath =
-                remoteBasePath != null ? new Path(remoteBasePath, 
DB_DIR_STRING) : null;
+        this.pathContainer = pathContainer;
 
         this.enableStatistics = enableStatistics;
         this.handlesToClose = new ArrayList<>();
@@ -228,10 +212,10 @@ public final class ForStResourceContainer implements 
AutoCloseable {
         // TODO: Fallback to checkpoint directory when checkpoint feature is 
ready if not
         // configured,
         //  fallback to local directory currently temporarily.
-        if (remoteForStPath != null) {
+        if (pathContainer.getRemoteForStPath() != null) {
             FlinkEnv flinkEnv =
                     new FlinkEnv(
-                            remoteBasePath.toString(),
+                            pathContainer.getRemoteBasePath().toString(),
                             new StringifiedForStFileSystem(forStFileSystem));
             opt.setEnv(flinkEnv);
             handlesToClose.add(flinkEnv);
@@ -312,40 +296,16 @@ public final class ForStResourceContainer implements 
AutoCloseable {
         return opt;
     }
 
-    @Nullable
-    public Path getLocalBasePath() {
-        return localBasePath;
-    }
-
-    @Nullable
-    public Path getLocalForStPath() {
-        return localForStPath;
-    }
-
-    @Nullable
-    public Path getRemoteBasePath() {
-        return remoteBasePath;
-    }
-
-    @Nullable
-    public Path getRemoteForStPath() {
-        return remoteForStPath;
+    public ForStPathContainer getPathContainer() {
+        return pathContainer;
     }
 
     public Path getBasePath() {
-        if (remoteBasePath != null) {
-            return remoteBasePath;
-        } else {
-            return localBasePath;
-        }
+        return pathContainer.getBasePath();
     }
 
     public Path getDbPath() {
-        if (remoteForStPath != null) {
-            return remoteForStPath;
-        } else {
-            return localForStPath;
-        }
+        return pathContainer.getDbPath();
     }
 
     public boolean isCoordinatorInline() {
@@ -370,28 +330,31 @@ public final class ForStResourceContainer implements 
AutoCloseable {
      * @throws Exception if any unexpected behaviors.
      */
     public void prepareDirectories() throws Exception {
-        if (remoteBasePath != null && remoteForStPath != null) {
-            remotePathNewlyCreated = prepareDirectories(remoteBasePath, 
remoteForStPath);
+        if (pathContainer.getRemoteBasePath() != null
+                && pathContainer.getRemoteForStPath() != null) {
+            remotePathNewlyCreated =
+                    prepareDirectories(
+                            pathContainer.getRemoteBasePath(), 
pathContainer.getRemoteForStPath());
         }
-        if (localBasePath != null && localForStPath != null) {
-            prepareDirectories(
-                    new Path(localBasePath.getPath()), new 
Path(localForStPath.getPath()));
+        if (pathContainer.getLocalBasePath() != null && 
pathContainer.getLocalForStPath() != null) {
+            prepareDirectories(pathContainer.getLocalBasePath(), 
pathContainer.getLocalForStPath());
         }
-        if (remoteForStPath != null && localForStPath != null) {
-            if (cacheBasePath == null && localBasePath != null) {
-                cacheBasePath = new Path(localBasePath.getPath(), "cache");
+        if (pathContainer.getRemoteForStPath() != null
+                && pathContainer.getLocalForStPath() != null) {
+            if (cacheBasePath == null && pathContainer.getLocalBasePath() != 
null) {
+                cacheBasePath = new 
Path(pathContainer.getLocalBasePath().getPath(), "cache");
                 LOG.info(
                         "Cache base path is not configured, set to local base 
path: {}",
                         cacheBasePath);
             }
             forStFileSystem =
                     ForStFlinkFileSystem.get(
-                            remoteForStPath.toUri(),
-                            localForStPath,
+                            pathContainer.getRemoteForStPath().toUri(),
+                            pathContainer.getLocalForStPath(),
                             ForStFlinkFileSystem.getFileBasedCache(
                                     configuration,
                                     cacheBasePath,
-                                    remoteForStPath,
+                                    pathContainer.getRemoteForStPath(),
                                     cacheCapacity,
                                     cacheReservedSize,
                                     metricGroup));
@@ -432,17 +395,19 @@ public final class ForStResourceContainer implements 
AutoCloseable {
      * @throws Exception if any unexpected behaviors.
      */
     public void clearDirectories() throws Exception {
+        Path remoteBasePath = pathContainer.getRemoteBasePath();
         if (remoteBasePath != null) {
             forStFileSystem.delete(remoteBasePath, true);
         }
+        Path localBasePath = pathContainer.getLocalBasePath();
         if (localBasePath != null) {
             clearDirectories(localBasePath);
         }
     }
 
     public void forceClearRemoteDirectories() throws Exception {
-        if (remoteBasePath != null && remotePathNewlyCreated) {
-            clearDirectories(remoteBasePath);
+        if (pathContainer.getRemoteBasePath() != null && 
remotePathNewlyCreated) {
+            clearDirectories(pathContainer.getRemoteBasePath());
         }
     }
 
@@ -529,9 +494,10 @@ public final class ForStResourceContainer implements 
AutoCloseable {
         String logDir = internalGetOption(ForStConfigurableOptions.LOG_DIR);
         if (logDir == null || logDir.isEmpty()) {
             // only relocate db log dir in local mode
-            if (remoteForStPath == null
-                    && localForStPath != null
-                    && localForStPath.getPath().length() <= 
INSTANCE_PATH_LENGTH_LIMIT) {
+            if (pathContainer.getRemoteForStPath() == null
+                    && pathContainer.getLocalForStPath() != null
+                    && pathContainer.getLocalForStPath().getPath().length()
+                            <= INSTANCE_PATH_LENGTH_LIMIT) {
                 relocateDefaultDbLogDir(currentOptions);
             }
         } else {
@@ -647,9 +613,10 @@ public final class ForStResourceContainer implements 
AutoCloseable {
         // log dir, which results in "/" being used as the log directory. This 
often has permission
         // issues, so the db log dir is temporarily set explicitly here.
         // TODO: remove this method after ForSt deal log dir well
-        if (localForStPath != null) {
-            this.relocatedDbLogBaseDir = 
java.nio.file.Path.of(localForStPath.toUri().toString());
-            dbOptions.setDbLogDir(localForStPath.getPath());
+        if (pathContainer.getLocalForStPath() != null) {
+            this.relocatedDbLogBaseDir =
+                    
java.nio.file.Path.of(pathContainer.getLocalForStPath().toUri().toString());
+            dbOptions.setDbLogDir(pathContainer.getLocalForStPath().getPath());
         }
     }
 
@@ -666,10 +633,11 @@ public final class ForStResourceContainer implements 
AutoCloseable {
 
     /** Clean all relocated ForSt logs. */
     private void cleanRelocatedDbLogs() {
-        if (localForStPath != null && relocatedDbLogBaseDir != null) {
+        if (pathContainer.getLocalForStPath() != null && relocatedDbLogBaseDir 
!= null) {
             LOG.info("Cleaning up relocated ForSt logs: {}.", 
relocatedDbLogBaseDir);
 
-            String relocatedDbLogPrefix = 
resolveRelocatedDbLogPrefix(localForStPath.getPath());
+            String relocatedDbLogPrefix =
+                    
resolveRelocatedDbLogPrefix(pathContainer.getLocalForStPath().getPath());
             try {
                 Arrays.stream(FileUtils.listDirectory(relocatedDbLogBaseDir))
                         .filter(
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java
index 328ff9eb709..e8f51c0c631 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java
@@ -22,7 +22,6 @@ import org.apache.flink.annotation.Experimental;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
@@ -418,7 +417,8 @@ public class ForStStateBackend extends 
AbstractManagedMemoryStateBackend
 
         lazyInitializeForJob(env, fileCompatibleIdentifier);
 
-        Tuple2<Path, Path> localAndRemoteBasePath = 
getForStBasePath(fileCompatibleIdentifier, env);
+        ForStPathContainer pathContainer =
+                createForStPathContainer(fileCompatibleIdentifier, env, false);
 
         final OpaqueMemoryResource<ForStSharedResources> sharedResources =
                 ForStOperationUtils.allocateSharedCachesIfConfigured(
@@ -433,8 +433,7 @@ public class ForStStateBackend extends 
AbstractManagedMemoryStateBackend
         final ForStResourceContainer resourceContainer =
                 createOptionsAndResourceContainer(
                         sharedResources,
-                        localAndRemoteBasePath.f0,
-                        localAndRemoteBasePath.f1,
+                        pathContainer,
                         env.getCheckpointStorageAccess(),
                         parameters.getMetricGroup(),
                         nativeMetricOptions.isStatisticsEnabled());
@@ -490,7 +489,8 @@ public class ForStStateBackend extends 
AbstractManagedMemoryStateBackend
 
         lazyInitializeForJob(env, fileCompatibleIdentifier);
 
-        Tuple2<Path, Path> localAndRemoteBasePath = 
getForStBasePath(fileCompatibleIdentifier, env);
+        ForStPathContainer pathContainer =
+                createForStPathContainer(fileCompatibleIdentifier, env, 
forceSyncLocal);
 
         LocalRecoveryConfig localRecoveryConfig =
                 env.getTaskStateManager().createLocalRecoveryConfig();
@@ -508,8 +508,7 @@ public class ForStStateBackend extends 
AbstractManagedMemoryStateBackend
         final ForStResourceContainer resourceContainer =
                 createOptionsAndResourceContainer(
                         sharedResources,
-                        localAndRemoteBasePath.f0,
-                        forceSyncLocal ? null : localAndRemoteBasePath.f1,
+                        pathContainer,
                         env.getCheckpointStorageAccess(),
                         parameters.getMetricGroup(),
                         nativeMetricOptions.isStatisticsEnabled());
@@ -795,25 +794,32 @@ public class ForStStateBackend extends 
AbstractManagedMemoryStateBackend
         return configuration;
     }
 
-    Tuple2<Path, Path> getForStBasePath(String operatorIdentifier, Environment 
env) {
+    ForStPathContainer createForStPathContainer(
+            String operatorIdentifier, Environment env, boolean forceLocal) {
         String opChildPath =
                 String.format(
                         "op_%s_attempt_%s",
                         operatorIdentifier, 
env.getTaskInfo().getAttemptNumber());
 
-        Path localBasePath =
-                new Path(
-                        new File(new File(getNextStoragePath(), 
jobId.toHexString()), opChildPath)
-                                .getAbsolutePath());
+        File localJobFile = new File(getNextStoragePath(), 
jobId.toHexString());
+        Path localJobPath = new Path(localJobFile.getPath());
+        Path localBasePath = new Path(new File(localJobFile, 
opChildPath).getAbsolutePath());
+
+        if (forceLocal) {
+            return ForStPathContainer.ofLocal(localJobPath, localBasePath);
+        }
+
+        Path remoteJobPath = null;
         Path remoteBasePath = null;
         if (remoteForStDirectory != null) {
-            remoteBasePath =
-                    new Path(new Path(remoteForStDirectory, 
jobId.toHexString()), opChildPath);
+            remoteJobPath = new Path(remoteForStDirectory, 
jobId.toHexString());
+            remoteBasePath = new Path(remoteJobPath, opChildPath);
         } else if (remoteShareWithCheckpoint) {
             if (env.getCheckpointStorageAccess() instanceof 
FsCheckpointStorageAccess) {
-                Path sharedStateDirectory =
-                        ((FsCheckpointStorageAccess) 
env.getCheckpointStorageAccess())
-                                .getSharedStateDirectory();
+                FsCheckpointStorageAccess fsCheckpointStorageAccess =
+                        (FsCheckpointStorageAccess) 
env.getCheckpointStorageAccess();
+                remoteJobPath = 
fsCheckpointStorageAccess.getCheckpointsDirectory();
+                Path sharedStateDirectory = 
fsCheckpointStorageAccess.getSharedStateDirectory();
                 remoteBasePath = new Path(sharedStateDirectory, opChildPath);
                 LOG.info("Set remote ForSt directory to checkpoint directory 
{}", remoteBasePath);
             } else {
@@ -821,19 +827,20 @@ public class ForStStateBackend extends 
AbstractManagedMemoryStateBackend
                         "Remote ForSt directory can't be set, because 
checkpoint directory isn't on file system.");
             }
         }
-        return Tuple2.of(localBasePath, remoteBasePath);
+
+        return ForStPathContainer.of(localJobPath, localBasePath, 
remoteJobPath, remoteBasePath);
     }
 
     @VisibleForTesting
     ForStResourceContainer createOptionsAndResourceContainer(@Nullable Path 
localBasePath) {
-        return createOptionsAndResourceContainer(null, localBasePath, null, 
null, null, false);
+        return createOptionsAndResourceContainer(
+                null, ForStPathContainer.ofLocal(null, localBasePath), null, 
null, false);
     }
 
     @VisibleForTesting
     private ForStResourceContainer createOptionsAndResourceContainer(
             @Nullable OpaqueMemoryResource<ForStSharedResources> 
sharedResources,
-            @Nullable Path localBasePath,
-            @Nullable Path remoteBasePath,
+            ForStPathContainer pathContainer,
             @Nullable CheckpointStorageAccess checkpointStorageAccess,
             @Nullable MetricGroup metricGroup,
             boolean enableStatistics) {
@@ -842,8 +849,7 @@ public class ForStStateBackend extends 
AbstractManagedMemoryStateBackend
                 configurableOptions != null ? configurableOptions : new 
Configuration(),
                 forStOptionsFactory,
                 sharedResources,
-                localBasePath,
-                remoteBasePath,
+                pathContainer,
                 recoveryClaimMode,
                 checkpointStorageAccess,
                 metricGroup,
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/datatransfer/DataTransferStrategyBuilder.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/datatransfer/DataTransferStrategyBuilder.java
index 68d54b17799..29cb9be25ae 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/datatransfer/DataTransferStrategyBuilder.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/datatransfer/DataTransferStrategyBuilder.java
@@ -29,6 +29,7 @@ import 
org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.filesystem.FileStateHandle;
 import org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation;
+import org.apache.flink.state.forst.ForStPathContainer;
 import org.apache.flink.state.forst.StateHandleTransferSpec;
 import org.apache.flink.state.forst.fs.ForStFlinkFileSystem;
 
@@ -72,12 +73,12 @@ public class DataTransferStrategyBuilder {
                     forStFlinkFileSystem == null
                             ? new CopyDataTransferStrategy()
                             : new 
CopyDataTransferStrategy(forStFlinkFileSystem);
-            LOG.info("Build DataTransferStrategy for Snapshot: {}", strategy);
+            LOG.debug("Build DataTransferStrategy for Snapshot: {}", strategy);
             return strategy;
         }
 
         strategy = new ReusableDataTransferStrategy(forStFlinkFileSystem);
-        LOG.info("Build DataTransferStrategy for Snapshot: {}", strategy);
+        LOG.debug("Build DataTransferStrategy for Snapshot: {}", strategy);
         return strategy;
     }
 
@@ -122,29 +123,34 @@ public class DataTransferStrategyBuilder {
 
     public static DataTransferStrategy buildForRestore(
             @Nullable ForStFlinkFileSystem forStFlinkFileSystem,
+            ForStPathContainer forStPathContainer,
             Collection<StateHandleTransferSpec> specs,
             RecoveryClaimMode recoveryClaimMode) {
         DataTransferStrategy strategy;
         FileSystem cpSharedFs = getSharedStateFileSystem(specs);
+        boolean isDbUnderSameJobPathFromRestore =
+                isDbUnderSameJobPathFromRestore(forStPathContainer, specs);
         if (forStFlinkFileSystem == null
                 || cpSharedFs == null
                 || !forStFlinkFileSystem.getUri().equals(cpSharedFs.getUri())
-                || recoveryClaimMode == RecoveryClaimMode.NO_CLAIM) {
+                || (!isDbUnderSameJobPathFromRestore
+                        && recoveryClaimMode == RecoveryClaimMode.NO_CLAIM)) {
             strategy =
                     forStFlinkFileSystem == null
                             ? new CopyDataTransferStrategy()
                             : new 
CopyDataTransferStrategy(forStFlinkFileSystem);
-            LOG.info(
-                    "Build DataTransferStrategy for Restore: {}, 
forStFlinkFileSystem: {}, cpSharedFs:{}, recoveryClaimMode:{}",
+            LOG.debug(
+                    "Build DataTransferStrategy for Restore: {}, 
forStFlinkFileSystem: {}, cpSharedFs:{}, isDbUnderSameJobPathFromRestore:{}, 
recoveryClaimMode:{}",
                     strategy,
                     forStFlinkFileSystem,
                     cpSharedFs,
+                    isDbUnderSameJobPathFromRestore,
                     recoveryClaimMode);
             return strategy;
         }
 
         strategy = new ReusableDataTransferStrategy(forStFlinkFileSystem);
-        LOG.info("Build DataTransferStrategy for Restore: {}", strategy);
+        LOG.debug("Build DataTransferStrategy for Restore: {}", strategy);
         return strategy;
     }
 
@@ -168,4 +174,25 @@ public class DataTransferStrategyBuilder {
         }
         return null;
     }
+
+    // Verify if the job path matches the restored path. A match indicates 
that the job is being
+    // restored from a failover.
+    private static boolean isDbUnderSameJobPathFromRestore(
+            ForStPathContainer forStPathContainer, 
Collection<StateHandleTransferSpec> specs) {
+        String jobPathStr = forStPathContainer.getJobPath().getPath();
+        for (StateHandleTransferSpec spec : specs) {
+            IncrementalRemoteKeyedStateHandle stateHandle = 
spec.getStateHandle();
+            for (IncrementalKeyedStateHandle.HandleAndLocalPath 
handleAndLocalPath :
+                    stateHandle.getSharedState()) {
+                StreamStateHandle handle = handleAndLocalPath.getHandle();
+                if (handle instanceof FileStateHandle) {
+                    Path dbRemotePath = ((FileStateHandle) 
handle).getFilePath();
+                    if (!dbRemotePath.getPath().startsWith(jobPathStr)) {
+                        return false;
+                    }
+                }
+            }
+        }
+        return true;
+    }
 }
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/datatransfer/ForStStateDataTransfer.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/datatransfer/ForStStateDataTransfer.java
index 4dfc9c0a91f..0305f8cd02e 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/datatransfer/ForStStateDataTransfer.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/datatransfer/ForStStateDataTransfer.java
@@ -29,6 +29,7 @@ import 
org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocal
 import org.apache.flink.runtime.state.StateUtil;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.state.forst.ForStKeyedStateBackend;
+import org.apache.flink.state.forst.ForStPathContainer;
 import org.apache.flink.state.forst.StateHandleTransferSpec;
 import org.apache.flink.state.forst.fs.ForStFlinkFileSystem;
 import org.apache.flink.util.ExceptionUtils;
@@ -231,6 +232,7 @@ public class ForStStateDataTransfer implements Closeable {
      * @throws Exception If anything about the transfer goes wrong.
      */
     public void transferAllStateDataToDirectory(
+            ForStPathContainer forStPathContainer,
             Collection<StateHandleTransferSpec> transferSpecs,
             CloseableRegistry closeableRegistry,
             RecoveryClaimMode recoveryClaimMode)
@@ -244,7 +246,10 @@ public class ForStStateDataTransfer implements Closeable {
         try {
             List<CompletableFuture<Void>> futures =
                     transferAllStateDataToDirectoryAsync(
-                                    transferSpecs, internalCloser, 
recoveryClaimMode)
+                                    forStPathContainer,
+                                    transferSpecs,
+                                    internalCloser,
+                                    recoveryClaimMode)
                             .collect(Collectors.toList());
 
             // Wait until either all futures completed successfully or one 
failed exceptionally.
@@ -276,12 +281,13 @@ public class ForStStateDataTransfer implements Closeable {
 
     /** Asynchronously runs the specified transfer requests on 
executorService. */
     private Stream<CompletableFuture<Void>> 
transferAllStateDataToDirectoryAsync(
+            ForStPathContainer forStPathContainer,
             Collection<StateHandleTransferSpec> transferSpecs,
             CloseableRegistry closeableRegistry,
             RecoveryClaimMode recoveryClaimMode) {
         DataTransferStrategy strategy =
                 DataTransferStrategyBuilder.buildForRestore(
-                        forStFs, transferSpecs, recoveryClaimMode);
+                        forStFs, forStPathContainer, transferSpecs, 
recoveryClaimMode);
 
         return transferSpecs.stream()
                 .flatMap(
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/restore/ForStIncrementalRestoreOperation.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/restore/ForStIncrementalRestoreOperation.java
index 699da08a53d..f205fe21b9f 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/restore/ForStIncrementalRestoreOperation.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/restore/ForStIncrementalRestoreOperation.java
@@ -45,6 +45,7 @@ import org.apache.flink.state.forst.ForStDBWriteBatchWrapper;
 import org.apache.flink.state.forst.ForStIncrementalCheckpointUtils;
 import org.apache.flink.state.forst.ForStNativeMetricOptions;
 import org.apache.flink.state.forst.ForStOperationUtils;
+import org.apache.flink.state.forst.ForStPathContainer;
 import org.apache.flink.state.forst.ForStResourceContainer;
 import org.apache.flink.state.forst.StateHandleTransferSpec;
 import org.apache.flink.state.forst.datatransfer.ForStStateDataTransfer;
@@ -93,7 +94,6 @@ import static 
org.apache.flink.state.forst.ForStIncrementalCheckpointUtils.check
 import static 
org.apache.flink.state.forst.ForStIncrementalCheckpointUtils.clipDBWithKeyGroupRange;
 import static 
org.apache.flink.state.forst.ForStIncrementalCheckpointUtils.findTheBestStateHandleForInitial;
 import static 
org.apache.flink.state.forst.ForStOperationUtils.createColumnFamilyOptions;
-import static 
org.apache.flink.state.forst.ForStResourceContainer.DB_DIR_STRING;
 
 /** Encapsulates the process of restoring a ForSt instance from an incremental 
snapshot. */
 public class ForStIncrementalRestoreOperation<K> implements 
ForStRestoreOperation {
@@ -217,7 +217,7 @@ public class ForStIncrementalRestoreOperation<K> implements 
ForStRestoreOperatio
             toTransferSpecs.add(
                     new StateHandleTransferSpec(
                             restoreStateHandles.get(bestStateHandleForInit),
-                            new Path(forstBasePath, DB_DIR_STRING)));
+                            new Path(forstBasePath, 
ForStPathContainer.DB_DIR_STRING)));
         }
         for (int i = 0; i < restoreStateHandles.size(); i++) {
             if (i != bestStateHandleForInit) {
@@ -268,7 +268,10 @@ public class ForStIncrementalRestoreOperation<K> 
implements ForStRestoreOperatio
                         ForStStateDataTransfer.DEFAULT_THREAD_NUM,
                         optionsContainer.getFileSystem())) {
             transfer.transferAllStateDataToDirectory(
-                    specs, cancelStreamRegistry, recoveryClaimMode);
+                    optionsContainer.getPathContainer(),
+                    specs,
+                    cancelStreamRegistry,
+                    recoveryClaimMode);
         }
     }
 
@@ -697,7 +700,7 @@ public class ForStIncrementalRestoreOperation<K> implements 
ForStRestoreOperatio
                 StateHandleTransferSpec baseSpec =
                         new StateHandleTransferSpec(
                                 
restoreStateHandles.get(bestStateHandleForInit),
-                                new Path(forstBasePath, DB_DIR_STRING));
+                                new Path(forstBasePath, 
ForStPathContainer.DB_DIR_STRING));
                 transferAllStateHandles(Collections.singletonList(baseSpec));
                 mergeStateHandlesWithCopyFromTemporaryInstance(
                         baseSpec,
@@ -751,7 +754,7 @@ public class ForStIncrementalRestoreOperation<K> implements 
ForStRestoreOperatio
                 String uuid = UUID.randomUUID().toString();
 
                 String subPathStr =
-                        optionsContainer.getRemoteBasePath() != null
+                        
optionsContainer.getPathContainer().getRemoteBasePath() != null
                                 ? exportBasePath.getName() + "/" + uuid
                                 : exportBasePath.toString() + "/" + uuid;
                 ExportImportFilesMetaData exportedColumnFamilyMetaData =
@@ -953,7 +956,7 @@ public class ForStIncrementalRestoreOperation<K> implements 
ForStRestoreOperatio
                 new ArrayList<>(stateMetaInfoSnapshots.size() + 1);
 
         String dbName =
-                optionsContainer.getRemoteBasePath() != null
+                optionsContainer.getPathContainer().getRemoteBasePath() != null
                         ? "/" + 
stateHandleSpec.getTransferDestination().getName()
                         : stateHandleSpec.getTransferDestination().toString();
 
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java
index 0cd1312cc44..2d35bd13d41 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java
@@ -489,18 +489,13 @@ public class ForStSyncKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K>
             columnFamilyOptions.forEach(IOUtils::closeQuietly);
 
             LOG.info(
-                    "Closed ForSt State Backend. Cleaning up ForSt local 
working directory {}, remote working directory {}.",
-                    optionsContainer.getLocalBasePath(),
-                    optionsContainer.getRemoteBasePath());
+                    "Closed ForSt State Backend. Cleaning up ForSt: {}.",
+                    optionsContainer.getPathContainer());
 
             try {
                 optionsContainer.clearDirectories();
             } catch (Exception ex) {
-                LOG.warn(
-                        "Could not delete ForSt local working directory {}, 
remote working directory {}.",
-                        optionsContainer.getLocalBasePath(),
-                        optionsContainer.getRemoteBasePath(),
-                        ex);
+                LOG.warn("Could not delete ForSt: {}.", 
optionsContainer.getPathContainer(), ex);
             }
 
             IOUtils.closeQuietly(optionsContainer);
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackendBuilder.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackendBuilder.java
index 07de516c71d..0e83b22a0ce 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackendBuilder.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackendBuilder.java
@@ -506,8 +506,8 @@ public class ForStSyncKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBack
         // working dir. We will implement this in ForStDB later, but before 
that, we achieved this
         // by setting the dbPath to "/" when the dfs directory existed.
         Path instanceForStPath =
-                optionsContainer.getRemoteForStPath() == null
-                        ? optionsContainer.getLocalForStPath()
+                optionsContainer.getPathContainer().getRemoteForStPath() == 
null
+                        ? 
optionsContainer.getPathContainer().getLocalForStPath()
                         : new Path("/db");
 
         if (CollectionUtil.isEmptyOrAllElementsNull(restoreStateHandles)) {
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStExtension.java
 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStExtension.java
index 6e6c01280d4..99b4c5c68b5 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStExtension.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStExtension.java
@@ -164,13 +164,14 @@ public class ForStExtension implements 
BeforeEachCallback, AfterEachCallback {
 
         resourceGuard = new ResourceGuard();
         File localWorkingDir = TempDirUtils.newFolder(rocksFolder.toPath(), 
"local-working-dir");
+        Path localJobPath = new Path(localWorkingDir.getAbsolutePath());
+        Path localBasePath = new Path(localJobPath, "base");
         this.resourceContainer =
                 new ForStResourceContainer(
                         new Configuration(),
                         optionsFactory,
                         null,
-                        new Path(localWorkingDir.getAbsolutePath()),
-                        null,
+                        ForStPathContainer.ofLocal(localJobPath, 
localBasePath),
                         null,
                         null,
                         null,
@@ -190,9 +191,10 @@ public class ForStExtension implements BeforeEachCallback, 
AfterEachCallback {
         // working dir. We will implement this in ForStDB later, but before 
that, we achieved this
         // by setting the dbPath to "/" when the dfs directory existed.
         // TODO: use localForStPath as dbPath after ForSt Support mixing 
local-dir and remote-dir
+        ForStPathContainer pathContainer = 
resourceContainer.getPathContainer();
         Path instanceForStPath =
-                resourceContainer.getRemoteForStPath() == null
-                        ? resourceContainer.getLocalForStPath()
+                pathContainer.getRemoteForStPath() == null
+                        ? pathContainer.getLocalForStPath()
                         : new Path("/");
 
         this.columnFamilyHandles = new ArrayList<>(1);
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStResourceContainerTest.java
 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStResourceContainerTest.java
index fd9ae8ea111..9085f6a1fd2 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStResourceContainerTest.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStResourceContainerTest.java
@@ -308,15 +308,19 @@ public class ForStResourceContainerTest {
 
     @Test
     public void testDirectoryResources() throws Exception {
-        Path localBasePath = new Path(TMP_FOLDER.newFolder().getPath());
-        Path remoteBasePath = new Path(TMP_FOLDER.newFolder().getPath());
+        Path localJobPath = new Path(TMP_FOLDER.newFolder().getPath());
+        Path localBasePath = new Path(localJobPath, "base");
+        localBasePath.getFileSystem().mkdirs(localBasePath);
+        Path remoteJobPath = new Path(TMP_FOLDER.newFolder().getPath());
+        Path remoteBasePath = new Path(remoteJobPath, "base");
+        remoteBasePath.getFileSystem().mkdirs(remoteBasePath);
         try (final ForStResourceContainer optionsContainer =
                 new ForStResourceContainer(
                         new Configuration(),
                         null,
                         null,
-                        localBasePath,
-                        remoteBasePath,
+                        ForStPathContainer.of(
+                                localJobPath, localBasePath, remoteJobPath, 
remoteBasePath),
                         null,
                         new FsCheckpointStorageAccess(
                                 new Path(TMP_FOLDER.newFolder().getPath()),
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendConfigTest.java
 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendConfigTest.java
index 78b2a0bd79d..50f3ac0954b 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendConfigTest.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendConfigTest.java
@@ -530,7 +530,14 @@ public class ForStStateBackendConfigTest {
 
             try (ForStResourceContainer optionsContainer =
                     new ForStResourceContainer(
-                            configuration, null, null, null, null, null, null, 
null, false)) {
+                            configuration,
+                            null,
+                            null,
+                            ForStPathContainer.empty(),
+                            null,
+                            null,
+                            null,
+                            false)) {
 
                 DBOptions dbOptions = optionsContainer.getDbOptions();
                 assertEquals(-1, dbOptions.maxOpenFiles());
@@ -615,7 +622,14 @@ public class ForStStateBackendConfigTest {
         configuration.set(ForStConfigurableOptions.COMPACTION_STYLE, 
CompactionStyle.UNIVERSAL);
         try (final ForStResourceContainer optionsContainer =
                 new ForStResourceContainer(
-                        configuration, null, null, null, null, null, null, 
null, false)) {
+                        configuration,
+                        null,
+                        null,
+                        ForStPathContainer.empty(),
+                        null,
+                        null,
+                        null,
+                        false)) {
 
             final ColumnFamilyOptions columnFamilyOptions = 
optionsContainer.getColumnOptions();
             assertNotNull(columnFamilyOptions);
@@ -624,7 +638,14 @@ public class ForStStateBackendConfigTest {
 
         try (final ForStResourceContainer optionsContainer =
                 new ForStResourceContainer(
-                        new Configuration(), null, null, null, null, null, 
null, null, false)) {
+                        new Configuration(),
+                        null,
+                        null,
+                        ForStPathContainer.empty(),
+                        null,
+                        null,
+                        null,
+                        false)) {
 
             final ColumnFamilyOptions columnFamilyOptions = 
optionsContainer.getColumnOptions();
             assertNotNull(columnFamilyOptions);
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendTest.java
 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendTest.java
index 7c4e58e1ce6..b3d8bc8e364 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendTest.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendTest.java
@@ -130,7 +130,8 @@ class ForStStateBackendTest extends 
StateBackendTestBase<ForStStateBackend> {
         ForStStateBackend backend = new ForStStateBackend();
         ForStSyncKeyedStateBackend keyedStateBackend1 =
                 (ForStSyncKeyedStateBackend) 
createKeyedBackend(IntSerializer.INSTANCE);
-        
assertThat(keyedStateBackend1.getOptionsContainer().getRemoteBasePath()).isNull();
+        
assertThat(keyedStateBackend1.getOptionsContainer().getPathContainer().getRemoteBasePath())
+                .isNull();
         Configuration config = new Configuration();
         config.set(ForStOptions.SYNC_ENFORCE_LOCAL, false);
         backend = backend.configure(config, 
Thread.currentThread().getContextClassLoader());
@@ -151,6 +152,7 @@ class ForStStateBackendTest extends 
StateBackendTestBase<ForStStateBackend> {
                                         Collections.emptyList(),
                                         new CloseableRegistry(),
                                         1.0d));
-        
assertThat(keyedStateBackend2.getOptionsContainer().getRemoteBasePath()).isNotNull();
+        
assertThat(keyedStateBackend2.getOptionsContainer().getPathContainer().getRemoteBasePath())
+                .isNotNull();
     }
 }
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/datatransfer/DataTransferStrategyTest.java
 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/datatransfer/DataTransferStrategyTest.java
index 73f3f46df4f..deddefe8835 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/datatransfer/DataTransferStrategyTest.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/datatransfer/DataTransferStrategyTest.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.filesystem.FileStateHandle;
 import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory;
+import org.apache.flink.state.forst.ForStPathContainer;
 import org.apache.flink.state.forst.StateHandleTransferSpec;
 import org.apache.flink.state.forst.fs.ForStFlinkFileSystem;
 import org.apache.flink.state.forst.fs.filemapping.FileOwnershipDecider;
@@ -471,11 +472,14 @@ public class DataTransferStrategyTest {
 
     void testRestoreStrategyAsExpected(
             @Nullable ForStFlinkFileSystem forStFlinkFileSystem,
+            String sourceDirectoryStr,
+            String desDirStr,
             RecoveryClaimMode recoveryClaimMode,
             Class<?> expected) {
         List<HandleAndLocalPath> sharedStateHandleList = new ArrayList<>();
         sharedStateHandleList.add(
-                HandleAndLocalPath.of(new FileStateHandle(new Path("1.sst"), 
0), "1.sst"));
+                HandleAndLocalPath.of(
+                        new FileStateHandle(new Path(sourceDirectoryStr + 
"/1.sst"), 0), "1.sst"));
         IncrementalRemoteKeyedStateHandle stateHandle =
                 new IncrementalRemoteKeyedStateHandle(
                         UUID.randomUUID(),
@@ -484,12 +488,15 @@ public class DataTransferStrategyTest {
                         sharedStateHandleList,
                         Collections.emptyList(),
                         new FileStateHandle(new Path("meta"), 0));
+        Path destJobDir = new Path(desDirStr);
+        Path destBaseDir = new Path(destJobDir, "base");
         assertThat(
                         DataTransferStrategyBuilder.buildForRestore(
                                         forStFlinkFileSystem,
+                                        ForStPathContainer.of(null, null, 
destJobDir, destBaseDir),
                                         Collections.singletonList(
                                                 new StateHandleTransferSpec(
-                                                        stateHandle, new 
Path("dst"))),
+                                                        stateHandle, 
destBaseDir)),
                                         recoveryClaimMode)
                                 .getClass())
                 .isEqualTo(expected);
@@ -524,19 +531,76 @@ public class DataTransferStrategyTest {
                 CopyDataTransferStrategy.class);
 
         testRestoreStrategyAsExpected(
-                forStFlinkFileSystem, RecoveryClaimMode.CLAIM, 
ReusableDataTransferStrategy.class);
+                forStFlinkFileSystem,
+                "/src-dir",
+                "/dst-dir",
+                RecoveryClaimMode.CLAIM,
+                ReusableDataTransferStrategy.class);
+
+        testRestoreStrategyAsExpected(
+                forStFlinkFileSystem,
+                "/src-dir",
+                "/dst-dir",
+                RecoveryClaimMode.NO_CLAIM,
+                CopyDataTransferStrategy.class);
+
+        testRestoreStrategyAsExpected(
+                forStFlinkFileSystem,
+                "/src-dir",
+                "/dst-dir",
+                RecoveryClaimMode.LEGACY,
+                ReusableDataTransferStrategy.class);
+
+        testRestoreStrategyAsExpected(
+                null,
+                "/src-dir",
+                "/dst-dir",
+                RecoveryClaimMode.CLAIM,
+                CopyDataTransferStrategy.class);
+
+        testRestoreStrategyAsExpected(
+                null,
+                "/src-dir",
+                "/dst-dir",
+                RecoveryClaimMode.NO_CLAIM,
+                CopyDataTransferStrategy.class);
 
+        // Restoring from the same directory indicates a failover scenario, 
allowing us to reuse the
+        // files if we are in a disaggregated setup.
         testRestoreStrategyAsExpected(
-                forStFlinkFileSystem, RecoveryClaimMode.NO_CLAIM, 
CopyDataTransferStrategy.class);
+                forStFlinkFileSystem,
+                "/same-dir",
+                "/same-dir",
+                RecoveryClaimMode.CLAIM,
+                ReusableDataTransferStrategy.class);
 
         testRestoreStrategyAsExpected(
-                forStFlinkFileSystem, RecoveryClaimMode.LEGACY, 
ReusableDataTransferStrategy.class);
+                forStFlinkFileSystem,
+                "/same-dir",
+                "/same-dir",
+                RecoveryClaimMode.NO_CLAIM,
+                ReusableDataTransferStrategy.class);
 
         testRestoreStrategyAsExpected(
-                null, RecoveryClaimMode.CLAIM, CopyDataTransferStrategy.class);
+                forStFlinkFileSystem,
+                "/same-dir",
+                "/same-dir",
+                RecoveryClaimMode.LEGACY,
+                ReusableDataTransferStrategy.class);
 
         testRestoreStrategyAsExpected(
-                null, RecoveryClaimMode.NO_CLAIM, 
CopyDataTransferStrategy.class);
+                null,
+                "/same-dir",
+                "/same-dir",
+                RecoveryClaimMode.CLAIM,
+                CopyDataTransferStrategy.class);
+
+        testRestoreStrategyAsExpected(
+                null,
+                "/same-dir",
+                "/same-dir",
+                RecoveryClaimMode.NO_CLAIM,
+                CopyDataTransferStrategy.class);
     }
 
     @TestTemplate
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/datatransfer/ForStStateDataTransferTest.java
 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/datatransfer/ForStStateDataTransferTest.java
index bfa90404adf..3b161ad1384 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/datatransfer/ForStStateDataTransferTest.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/datatransfer/ForStStateDataTransferTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.TestStreamStateHandle;
 import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+import org.apache.flink.state.forst.ForStPathContainer;
 import org.apache.flink.state.forst.StateHandleTransferSpec;
 import org.apache.flink.testutils.junit.utils.TempDirUtils;
 import org.apache.flink.util.IOUtils;
@@ -71,6 +72,14 @@ class ForStStateDataTransferTest extends TestLogger {
 
     @TempDir private java.nio.file.Path temporaryFolder;
 
+    private ForStPathContainer createPathContainer() throws IOException {
+        Path localJobPath = 
Path.fromLocalFile(TempDirUtils.newFolder(temporaryFolder));
+        Path localBasePath = new Path(localJobPath, "base");
+        Path remoteJobPath = 
Path.fromLocalFile(TempDirUtils.newFolder(temporaryFolder));
+        Path remoteBasePath = new Path(remoteJobPath, "base");
+        return ForStPathContainer.of(localJobPath, localBasePath, 
remoteJobPath, remoteBasePath);
+    }
+
     /** Test that the exception arose in the thread pool will rethrow to the 
main thread. */
     @Test
     void testMultiThreadTransferThreadPoolExceptionRethrow() throws 
IOException {
@@ -465,11 +474,13 @@ class ForStStateDataTransferTest extends TestLogger {
                         stateHandle);
 
         try (ForStStateDataTransfer stateTransfer = new 
ForStStateDataTransfer(5)) {
+            ForStPathContainer pathContainer = createPathContainer();
             stateTransfer.transferAllStateDataToDirectory(
+                    pathContainer,
                     Collections.singletonList(
                             new StateHandleTransferSpec(
                                     incrementalKeyedStateHandle,
-                                    
Path.fromLocalFile(TempDirUtils.newFolder(temporaryFolder)))),
+                                    pathContainer.getRemoteForStPath())),
                     new CloseableRegistry(),
                     RecoveryClaimMode.DEFAULT);
             fail();
@@ -494,8 +505,12 @@ class ForStStateDataTransferTest extends TestLogger {
         }
 
         try (ForStStateDataTransfer stateTransfer = new 
ForStStateDataTransfer(4)) {
+            ForStPathContainer pathContainer = createPathContainer();
             stateTransfer.transferAllStateDataToDirectory(
-                    transferRequests, new CloseableRegistry(), 
RecoveryClaimMode.DEFAULT);
+                    pathContainer,
+                    transferRequests,
+                    new CloseableRegistry(),
+                    RecoveryClaimMode.DEFAULT);
         }
 
         for (int i = 0; i < numRemoteHandles; ++i) {
@@ -540,8 +555,9 @@ class ForStStateDataTransferTest extends TestLogger {
 
         CloseableRegistry closeableRegistry = new CloseableRegistry();
         try (ForStStateDataTransfer stateTransfer = new 
ForStStateDataTransfer(5)) {
+            ForStPathContainer pathContainer = createPathContainer();
             stateTransfer.transferAllStateDataToDirectory(
-                    transferRequests, closeableRegistry, 
RecoveryClaimMode.DEFAULT);
+                    pathContainer, transferRequests, closeableRegistry, 
RecoveryClaimMode.DEFAULT);
             fail("Exception is expected");
         } catch (IOException ignore) {
         }

Reply via email to