This is an automated email from the ASF dual-hosted git repository.
zakelly pushed a commit to branch release-2.1
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-2.1 by this push:
new 3d3182a35dc [FLINK-38336][state/forst] Avoid data copy during failover
for ForSt statebackend (#27109)
3d3182a35dc is described below
commit 3d3182a35dcb156b515abe5838d8fdb96baad316
Author: AlexYinHan <[email protected]>
AuthorDate: Tue Oct 14 18:07:34 2025 +0800
[FLINK-38336][state/forst] Avoid data copy during failover for ForSt
statebackend (#27109)
(cherry picked from commit 78f6e779137bfb732862f237ae6e70b94d0e00ed)
---
.../flink/state/forst/ForStKeyedStateBackend.java | 14 +-
.../state/forst/ForStKeyedStateBackendBuilder.java | 15 +-
.../flink/state/forst/ForStPathContainer.java | 193 +++++++++++++++++++++
.../flink/state/forst/ForStResourceContainer.java | 114 +++++-------
.../flink/state/forst/ForStStateBackend.java | 52 +++---
.../datatransfer/DataTransferStrategyBuilder.java | 39 ++++-
.../forst/datatransfer/ForStStateDataTransfer.java | 10 +-
.../restore/ForStIncrementalRestoreOperation.java | 15 +-
.../forst/sync/ForStSyncKeyedStateBackend.java | 11 +-
.../sync/ForStSyncKeyedStateBackendBuilder.java | 4 +-
.../apache/flink/state/forst/ForStExtension.java | 10 +-
.../state/forst/ForStResourceContainerTest.java | 12 +-
.../state/forst/ForStStateBackendConfigTest.java | 27 ++-
.../flink/state/forst/ForStStateBackendTest.java | 6 +-
.../datatransfer/DataTransferStrategyTest.java | 78 ++++++++-
.../datatransfer/ForStStateDataTransferTest.java | 22 ++-
16 files changed, 460 insertions(+), 162 deletions(-)
diff --git
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java
index 635d29b8c41..1efde10dfdf 100644
---
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java
+++
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java
@@ -596,18 +596,14 @@ public class ForStKeyedStateBackend<K> implements
AsyncKeyedStateBackend<K> {
IOUtils.closeQuietly(db);
LOG.info(
- "Closed ForSt State Backend. Cleaning up ForSt local
working directory {}, remote working directory {}.",
- optionsContainer.getLocalBasePath(),
- optionsContainer.getRemoteBasePath());
+ "Closed ForSt State Backend. Cleaning up ForSt: {}.",
+ optionsContainer.getPathContainer());
try {
optionsContainer.clearDirectories();
} catch (Exception ex) {
LOG.warn(
- "Could not delete ForSt local working directory
{}, remote working directory {}.",
- optionsContainer.getLocalBasePath(),
- optionsContainer.getRemoteBasePath(),
- ex);
+ "Could not delete ForSt: {}.",
optionsContainer.getPathContainer(), ex);
}
IOUtils.closeQuietly(optionsContainer);
@@ -624,12 +620,12 @@ public class ForStKeyedStateBackend<K> implements
AsyncKeyedStateBackend<K> {
@VisibleForTesting
Path getLocalBasePath() {
- return optionsContainer.getLocalBasePath();
+ return optionsContainer.getPathContainer().getLocalBasePath();
}
@VisibleForTesting
Path getRemoteBasePath() {
- return optionsContainer.getRemoteBasePath();
+ return optionsContainer.getPathContainer().getRemoteBasePath();
}
@Override
diff --git
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java
index 31473eda4c4..d9b2bc3dd58 100644
---
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java
+++
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java
@@ -302,11 +302,7 @@ public class ForStKeyedStateBackendBuilder<K>
// deletion in file mapping manager.
optionsContainer.forceClearRemoteDirectories();
} catch (Exception ex) {
- logger.warn(
- "Failed to delete ForSt local base path {}, remote
base path {}.",
- optionsContainer.getLocalBasePath(),
- optionsContainer.getRemoteBasePath(),
- ex);
+ logger.warn("Failed to delete ForSt: {}.",
optionsContainer.getPathContainer(), ex);
}
IOUtils.closeQuietly(optionsContainer);
IOUtils.closeQuietly(snapshotStrategy);
@@ -322,9 +318,8 @@ public class ForStKeyedStateBackendBuilder<K>
InternalKeyContext<K> keyContext =
new InternalKeyContextImpl<>(keyGroupRange, numberOfKeyGroups);
logger.info(
- "Finished building ForSt keyed state-backend at local base
path: {}, remote base path: {}.",
- optionsContainer.getLocalBasePath(),
- optionsContainer.getRemoteBasePath());
+ "Finished building ForSt keyed state-backend at {}",
+ optionsContainer.getPathContainer());
return new ForStKeyedStateBackend<>(
backendUID,
executionConfig,
@@ -360,8 +355,8 @@ public class ForStKeyedStateBackendBuilder<K>
// working dir. We will implement this in ForStDB later, but before
that, we achieved this
// by setting the dbPath to "/" when the dfs directory existed.
Path instanceForStPath =
- optionsContainer.getRemoteForStPath() == null
- ? optionsContainer.getLocalForStPath()
+ optionsContainer.getPathContainer().getRemoteForStPath() ==
null
+ ?
optionsContainer.getPathContainer().getLocalForStPath()
: new Path("/db");
if (CollectionUtil.isEmptyOrAllElementsNull(restoreStateHandles)) {
diff --git
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStPathContainer.java
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStPathContainer.java
new file mode 100644
index 00000000000..4cad55c26c5
--- /dev/null
+++
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStPathContainer.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import org.apache.flink.core.fs.Path;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+
+/** Container for ForSt paths. */
+public class ForStPathContainer {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ForStResourceContainer.class);
+ public static final String DB_DIR_STRING = "db";
+
+ /**
+ * Local job path. This indicates the parent directory of ForSt, which
ends with the Flink
+ * JobID.
+ */
+ @Nullable private final Path localJobPath;
+
+ /**
+ * Local base path. This includes the information of the subtask that
holds ForSt, such as the
+ * Operator Identifier and subtask index.
+ */
+ @Nullable private final Path localBasePath;
+
+ /** Local ForSt path. This is the directory of ForSt DB, which ends with
'db'. */
+ @Nullable private final Path localForStPath;
+
+ /**
+ * Remote paths of ForSt. Similar to the respective Path mentioned above,
but located under the
+ * remote parent path.
+ */
+ @Nullable private final Path remoteJobPath;
+
+ @Nullable private final Path remoteBasePath;
+ @Nullable private final Path remoteForStPath;
+
+ public static ForStPathContainer empty() {
+ return of(null, null, null, null);
+ }
+
+ public static ForStPathContainer ofLocal(
+ @Nullable Path localJobPath, @Nullable Path localBasePath) {
+ return new ForStPathContainer(localJobPath, localBasePath, null, null);
+ }
+
+ public static ForStPathContainer of(
+ @Nullable Path localJobPath,
+ @Nullable Path localBasePath,
+ @Nullable Path remoteJobPath,
+ @Nullable Path remoteBasePath) {
+ return new ForStPathContainer(localJobPath, localBasePath,
remoteJobPath, remoteBasePath);
+ }
+
+ public ForStPathContainer(
+ @Nullable Path localJobPath,
+ @Nullable Path localBasePath,
+ @Nullable Path remoteJobPath,
+ @Nullable Path remoteBasePath) {
+ this.localJobPath = localJobPath;
+ this.localBasePath = localBasePath;
+ this.localForStPath = localBasePath != null ? new Path(localBasePath,
DB_DIR_STRING) : null;
+
+ this.remoteJobPath = remoteJobPath;
+ this.remoteBasePath = remoteBasePath;
+ this.remoteForStPath =
+ remoteBasePath != null ? new Path(remoteBasePath,
DB_DIR_STRING) : null;
+
+ LOG.info(
+ "ForStPathContainer: localJobPath: {}, localBasePath: {},
localForStPath:{}, remoteJobPath: {}, remoteBasePath: {}, remoteForStPath: {}",
+ localJobPath,
+ localBasePath,
+ localForStPath,
+ remoteJobPath,
+ remoteBasePath,
+ remoteForStPath);
+ }
+
+ public @Nullable Path getLocalJobPath() {
+ return localJobPath;
+ }
+
+ public @Nullable Path getLocalBasePath() {
+ return localBasePath;
+ }
+
+ public @Nullable Path getLocalForStPath() {
+ return localForStPath;
+ }
+
+ public @Nullable Path getRemoteJobPath() {
+ return remoteJobPath;
+ }
+
+ public @Nullable Path getRemoteBasePath() {
+ return remoteBasePath;
+ }
+
+ public @Nullable Path getRemoteForStPath() {
+ return remoteForStPath;
+ }
+
+ public Path getJobPath() {
+ if (remoteJobPath != null) {
+ return remoteJobPath;
+ } else {
+ return localJobPath;
+ }
+ }
+
+ public Path getBasePath() {
+ if (remoteBasePath != null) {
+ return remoteBasePath;
+ } else {
+ return localBasePath;
+ }
+ }
+
+ public Path getDbPath() {
+ if (remoteForStPath != null) {
+ return remoteForStPath;
+ } else {
+ return localForStPath;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "ForStPathContainer(localJobPath = ["
+ + localJobPath
+ + "] localBasePath = ["
+ + localBasePath
+ + "] localForStPath = ["
+ + localForStPath
+ + "] remoteJobPath = ["
+ + remoteJobPath
+ + "] remoteBasePath = ["
+ + remoteBasePath
+ + "] remoteForStPath = ["
+ + remoteForStPath
+ + "])";
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ForStPathContainer that = (ForStPathContainer) o;
+ return Objects.equals(localJobPath, that.localJobPath)
+ && Objects.equals(localBasePath, that.localBasePath)
+ && Objects.equals(localForStPath, that.localForStPath)
+ && Objects.equals(remoteJobPath, that.remoteJobPath)
+ && Objects.equals(remoteBasePath, that.remoteBasePath)
+ && Objects.equals(remoteForStPath, that.remoteForStPath);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ localJobPath,
+ localBasePath,
+ localForStPath,
+ remoteJobPath,
+ remoteBasePath,
+ remoteForStPath);
+ }
+}
diff --git
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java
index d4c5a7cc831..cb29cf1857c 100644
---
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java
+++
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java
@@ -73,8 +73,6 @@ import static
org.apache.flink.state.forst.ForStOptions.CACHE_SIZE_BASE_LIMIT;
public final class ForStResourceContainer implements AutoCloseable {
private static final Logger LOG =
LoggerFactory.getLogger(ForStResourceContainer.class);
- public static final String DB_DIR_STRING = "db";
-
private static final String FORST_RELOCATE_LOG_SUFFIX = "_LOG";
// the filename length limit is 255 on most operating systems
@@ -82,16 +80,10 @@ public final class ForStResourceContainer implements
AutoCloseable {
// and the db data dir's absolute path will be used as the log file name's
prefix.
private static final int INSTANCE_PATH_LENGTH_LIMIT = 255 -
FORST_RELOCATE_LOG_SUFFIX.length();
- @Nullable private final Path remoteBasePath;
-
- @Nullable private final Path remoteForStPath;
+ private final ForStPathContainer pathContainer;
private boolean remotePathNewlyCreated;
- @Nullable private final Path localBasePath;
-
- @Nullable private final Path localForStPath;
-
@Nullable private Path cacheBasePath;
private final long cacheCapacity;
@@ -130,8 +122,7 @@ public final class ForStResourceContainer implements
AutoCloseable {
new Configuration(),
null,
null,
- null,
- null,
+ ForStPathContainer.empty(),
RecoveryClaimMode.DEFAULT,
null,
null,
@@ -144,8 +135,7 @@ public final class ForStResourceContainer implements
AutoCloseable {
new Configuration(),
optionsFactory,
null,
- null,
- null,
+ ForStPathContainer.empty(),
RecoveryClaimMode.DEFAULT,
null,
null,
@@ -160,8 +150,7 @@ public final class ForStResourceContainer implements
AutoCloseable {
new Configuration(),
optionsFactory,
sharedResources,
- null,
- null,
+ ForStPathContainer.empty(),
RecoveryClaimMode.DEFAULT,
null,
null,
@@ -172,8 +161,7 @@ public final class ForStResourceContainer implements
AutoCloseable {
ReadableConfig configuration,
@Nullable ForStOptionsFactory optionsFactory,
@Nullable OpaqueMemoryResource<ForStSharedResources>
sharedResources,
- @Nullable Path localBasePath,
- @Nullable Path remoteBasePath,
+ ForStPathContainer pathContainer,
RecoveryClaimMode claimMode,
@Nullable CheckpointStorageAccess checkpointStorageAccess,
MetricGroup metricGroup,
@@ -183,11 +171,7 @@ public final class ForStResourceContainer implements
AutoCloseable {
this.optionsFactory = optionsFactory;
this.sharedResources = sharedResources;
- this.localBasePath = localBasePath;
- this.localForStPath = localBasePath != null ? new Path(localBasePath,
DB_DIR_STRING) : null;
- this.remoteBasePath = remoteBasePath;
- this.remoteForStPath =
- remoteBasePath != null ? new Path(remoteBasePath,
DB_DIR_STRING) : null;
+ this.pathContainer = pathContainer;
this.enableStatistics = enableStatistics;
this.handlesToClose = new ArrayList<>();
@@ -228,10 +212,10 @@ public final class ForStResourceContainer implements
AutoCloseable {
// TODO: Fallback to checkpoint directory when checkpoint feature is
ready if not
// configured,
// fallback to local directory currently temporarily.
- if (remoteForStPath != null) {
+ if (pathContainer.getRemoteForStPath() != null) {
FlinkEnv flinkEnv =
new FlinkEnv(
- remoteBasePath.toString(),
+ pathContainer.getRemoteBasePath().toString(),
new StringifiedForStFileSystem(forStFileSystem));
opt.setEnv(flinkEnv);
handlesToClose.add(flinkEnv);
@@ -312,40 +296,16 @@ public final class ForStResourceContainer implements
AutoCloseable {
return opt;
}
- @Nullable
- public Path getLocalBasePath() {
- return localBasePath;
- }
-
- @Nullable
- public Path getLocalForStPath() {
- return localForStPath;
- }
-
- @Nullable
- public Path getRemoteBasePath() {
- return remoteBasePath;
- }
-
- @Nullable
- public Path getRemoteForStPath() {
- return remoteForStPath;
+ public ForStPathContainer getPathContainer() {
+ return pathContainer;
}
public Path getBasePath() {
- if (remoteBasePath != null) {
- return remoteBasePath;
- } else {
- return localBasePath;
- }
+ return pathContainer.getBasePath();
}
public Path getDbPath() {
- if (remoteForStPath != null) {
- return remoteForStPath;
- } else {
- return localForStPath;
- }
+ return pathContainer.getDbPath();
}
public boolean isCoordinatorInline() {
@@ -370,28 +330,31 @@ public final class ForStResourceContainer implements
AutoCloseable {
* @throws Exception if any unexpected behaviors.
*/
public void prepareDirectories() throws Exception {
- if (remoteBasePath != null && remoteForStPath != null) {
- remotePathNewlyCreated = prepareDirectories(remoteBasePath,
remoteForStPath);
+ if (pathContainer.getRemoteBasePath() != null
+ && pathContainer.getRemoteForStPath() != null) {
+ remotePathNewlyCreated =
+ prepareDirectories(
+ pathContainer.getRemoteBasePath(),
pathContainer.getRemoteForStPath());
}
- if (localBasePath != null && localForStPath != null) {
- prepareDirectories(
- new Path(localBasePath.getPath()), new
Path(localForStPath.getPath()));
+ if (pathContainer.getLocalBasePath() != null &&
pathContainer.getLocalForStPath() != null) {
+ prepareDirectories(pathContainer.getLocalBasePath(),
pathContainer.getLocalForStPath());
}
- if (remoteForStPath != null && localForStPath != null) {
- if (cacheBasePath == null && localBasePath != null) {
- cacheBasePath = new Path(localBasePath.getPath(), "cache");
+ if (pathContainer.getRemoteForStPath() != null
+ && pathContainer.getLocalForStPath() != null) {
+ if (cacheBasePath == null && pathContainer.getLocalBasePath() !=
null) {
+ cacheBasePath = new
Path(pathContainer.getLocalBasePath().getPath(), "cache");
LOG.info(
"Cache base path is not configured, set to local base
path: {}",
cacheBasePath);
}
forStFileSystem =
ForStFlinkFileSystem.get(
- remoteForStPath.toUri(),
- localForStPath,
+ pathContainer.getRemoteForStPath().toUri(),
+ pathContainer.getLocalForStPath(),
ForStFlinkFileSystem.getFileBasedCache(
configuration,
cacheBasePath,
- remoteForStPath,
+ pathContainer.getRemoteForStPath(),
cacheCapacity,
cacheReservedSize,
metricGroup));
@@ -432,17 +395,19 @@ public final class ForStResourceContainer implements
AutoCloseable {
* @throws Exception if any unexpected behaviors.
*/
public void clearDirectories() throws Exception {
+ Path remoteBasePath = pathContainer.getRemoteBasePath();
if (remoteBasePath != null) {
forStFileSystem.delete(remoteBasePath, true);
}
+ Path localBasePath = pathContainer.getLocalBasePath();
if (localBasePath != null) {
clearDirectories(localBasePath);
}
}
public void forceClearRemoteDirectories() throws Exception {
- if (remoteBasePath != null && remotePathNewlyCreated) {
- clearDirectories(remoteBasePath);
+ if (pathContainer.getRemoteBasePath() != null &&
remotePathNewlyCreated) {
+ clearDirectories(pathContainer.getRemoteBasePath());
}
}
@@ -529,9 +494,10 @@ public final class ForStResourceContainer implements
AutoCloseable {
String logDir = internalGetOption(ForStConfigurableOptions.LOG_DIR);
if (logDir == null || logDir.isEmpty()) {
// only relocate db log dir in local mode
- if (remoteForStPath == null
- && localForStPath != null
- && localForStPath.getPath().length() <=
INSTANCE_PATH_LENGTH_LIMIT) {
+ if (pathContainer.getRemoteForStPath() == null
+ && pathContainer.getLocalForStPath() != null
+ && pathContainer.getLocalForStPath().getPath().length()
+ <= INSTANCE_PATH_LENGTH_LIMIT) {
relocateDefaultDbLogDir(currentOptions);
}
} else {
@@ -647,9 +613,10 @@ public final class ForStResourceContainer implements
AutoCloseable {
// log dir, which results in "/" being used as the log directory. This
often has permission
// issues, so the db log dir is temporarily set explicitly here.
// TODO: remove this method after ForSt deal log dir well
- if (localForStPath != null) {
- this.relocatedDbLogBaseDir =
java.nio.file.Path.of(localForStPath.toUri().toString());
- dbOptions.setDbLogDir(localForStPath.getPath());
+ if (pathContainer.getLocalForStPath() != null) {
+ this.relocatedDbLogBaseDir =
+
java.nio.file.Path.of(pathContainer.getLocalForStPath().toUri().toString());
+ dbOptions.setDbLogDir(pathContainer.getLocalForStPath().getPath());
}
}
@@ -666,10 +633,11 @@ public final class ForStResourceContainer implements
AutoCloseable {
/** Clean all relocated ForSt logs. */
private void cleanRelocatedDbLogs() {
- if (localForStPath != null && relocatedDbLogBaseDir != null) {
+ if (pathContainer.getLocalForStPath() != null && relocatedDbLogBaseDir
!= null) {
LOG.info("Cleaning up relocated ForSt logs: {}.",
relocatedDbLogBaseDir);
- String relocatedDbLogPrefix =
resolveRelocatedDbLogPrefix(localForStPath.getPath());
+ String relocatedDbLogPrefix =
+
resolveRelocatedDbLogPrefix(pathContainer.getLocalForStPath().getPath());
try {
Arrays.stream(FileUtils.listDirectory(relocatedDbLogBaseDir))
.filter(
diff --git
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java
index 328ff9eb709..e8f51c0c631 100644
---
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java
+++
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java
@@ -22,7 +22,6 @@ import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
@@ -418,7 +417,8 @@ public class ForStStateBackend extends
AbstractManagedMemoryStateBackend
lazyInitializeForJob(env, fileCompatibleIdentifier);
- Tuple2<Path, Path> localAndRemoteBasePath =
getForStBasePath(fileCompatibleIdentifier, env);
+ ForStPathContainer pathContainer =
+ createForStPathContainer(fileCompatibleIdentifier, env, false);
final OpaqueMemoryResource<ForStSharedResources> sharedResources =
ForStOperationUtils.allocateSharedCachesIfConfigured(
@@ -433,8 +433,7 @@ public class ForStStateBackend extends
AbstractManagedMemoryStateBackend
final ForStResourceContainer resourceContainer =
createOptionsAndResourceContainer(
sharedResources,
- localAndRemoteBasePath.f0,
- localAndRemoteBasePath.f1,
+ pathContainer,
env.getCheckpointStorageAccess(),
parameters.getMetricGroup(),
nativeMetricOptions.isStatisticsEnabled());
@@ -490,7 +489,8 @@ public class ForStStateBackend extends
AbstractManagedMemoryStateBackend
lazyInitializeForJob(env, fileCompatibleIdentifier);
- Tuple2<Path, Path> localAndRemoteBasePath =
getForStBasePath(fileCompatibleIdentifier, env);
+ ForStPathContainer pathContainer =
+ createForStPathContainer(fileCompatibleIdentifier, env,
forceSyncLocal);
LocalRecoveryConfig localRecoveryConfig =
env.getTaskStateManager().createLocalRecoveryConfig();
@@ -508,8 +508,7 @@ public class ForStStateBackend extends
AbstractManagedMemoryStateBackend
final ForStResourceContainer resourceContainer =
createOptionsAndResourceContainer(
sharedResources,
- localAndRemoteBasePath.f0,
- forceSyncLocal ? null : localAndRemoteBasePath.f1,
+ pathContainer,
env.getCheckpointStorageAccess(),
parameters.getMetricGroup(),
nativeMetricOptions.isStatisticsEnabled());
@@ -795,25 +794,32 @@ public class ForStStateBackend extends
AbstractManagedMemoryStateBackend
return configuration;
}
- Tuple2<Path, Path> getForStBasePath(String operatorIdentifier, Environment
env) {
+ ForStPathContainer createForStPathContainer(
+ String operatorIdentifier, Environment env, boolean forceLocal) {
String opChildPath =
String.format(
"op_%s_attempt_%s",
operatorIdentifier,
env.getTaskInfo().getAttemptNumber());
- Path localBasePath =
- new Path(
- new File(new File(getNextStoragePath(),
jobId.toHexString()), opChildPath)
- .getAbsolutePath());
+ File localJobFile = new File(getNextStoragePath(),
jobId.toHexString());
+ Path localJobPath = new Path(localJobFile.getPath());
+ Path localBasePath = new Path(new File(localJobFile,
opChildPath).getAbsolutePath());
+
+ if (forceLocal) {
+ return ForStPathContainer.ofLocal(localJobPath, localBasePath);
+ }
+
+ Path remoteJobPath = null;
Path remoteBasePath = null;
if (remoteForStDirectory != null) {
- remoteBasePath =
- new Path(new Path(remoteForStDirectory,
jobId.toHexString()), opChildPath);
+ remoteJobPath = new Path(remoteForStDirectory,
jobId.toHexString());
+ remoteBasePath = new Path(remoteJobPath, opChildPath);
} else if (remoteShareWithCheckpoint) {
if (env.getCheckpointStorageAccess() instanceof
FsCheckpointStorageAccess) {
- Path sharedStateDirectory =
- ((FsCheckpointStorageAccess)
env.getCheckpointStorageAccess())
- .getSharedStateDirectory();
+ FsCheckpointStorageAccess fsCheckpointStorageAccess =
+ (FsCheckpointStorageAccess)
env.getCheckpointStorageAccess();
+ remoteJobPath =
fsCheckpointStorageAccess.getCheckpointsDirectory();
+ Path sharedStateDirectory =
fsCheckpointStorageAccess.getSharedStateDirectory();
remoteBasePath = new Path(sharedStateDirectory, opChildPath);
LOG.info("Set remote ForSt directory to checkpoint directory
{}", remoteBasePath);
} else {
@@ -821,19 +827,20 @@ public class ForStStateBackend extends
AbstractManagedMemoryStateBackend
"Remote ForSt directory can't be set, because
checkpoint directory isn't on file system.");
}
}
- return Tuple2.of(localBasePath, remoteBasePath);
+
+ return ForStPathContainer.of(localJobPath, localBasePath,
remoteJobPath, remoteBasePath);
}
@VisibleForTesting
ForStResourceContainer createOptionsAndResourceContainer(@Nullable Path
localBasePath) {
- return createOptionsAndResourceContainer(null, localBasePath, null,
null, null, false);
+ return createOptionsAndResourceContainer(
+ null, ForStPathContainer.ofLocal(null, localBasePath), null,
null, false);
}
@VisibleForTesting
private ForStResourceContainer createOptionsAndResourceContainer(
@Nullable OpaqueMemoryResource<ForStSharedResources>
sharedResources,
- @Nullable Path localBasePath,
- @Nullable Path remoteBasePath,
+ ForStPathContainer pathContainer,
@Nullable CheckpointStorageAccess checkpointStorageAccess,
@Nullable MetricGroup metricGroup,
boolean enableStatistics) {
@@ -842,8 +849,7 @@ public class ForStStateBackend extends
AbstractManagedMemoryStateBackend
configurableOptions != null ? configurableOptions : new
Configuration(),
forStOptionsFactory,
sharedResources,
- localBasePath,
- remoteBasePath,
+ pathContainer,
recoveryClaimMode,
checkpointStorageAccess,
metricGroup,
diff --git
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/datatransfer/DataTransferStrategyBuilder.java
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/datatransfer/DataTransferStrategyBuilder.java
index 68d54b17799..29cb9be25ae 100644
---
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/datatransfer/DataTransferStrategyBuilder.java
+++
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/datatransfer/DataTransferStrategyBuilder.java
@@ -29,6 +29,7 @@ import
org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation;
+import org.apache.flink.state.forst.ForStPathContainer;
import org.apache.flink.state.forst.StateHandleTransferSpec;
import org.apache.flink.state.forst.fs.ForStFlinkFileSystem;
@@ -72,12 +73,12 @@ public class DataTransferStrategyBuilder {
forStFlinkFileSystem == null
? new CopyDataTransferStrategy()
: new
CopyDataTransferStrategy(forStFlinkFileSystem);
- LOG.info("Build DataTransferStrategy for Snapshot: {}", strategy);
+ LOG.debug("Build DataTransferStrategy for Snapshot: {}", strategy);
return strategy;
}
strategy = new ReusableDataTransferStrategy(forStFlinkFileSystem);
- LOG.info("Build DataTransferStrategy for Snapshot: {}", strategy);
+ LOG.debug("Build DataTransferStrategy for Snapshot: {}", strategy);
return strategy;
}
@@ -122,29 +123,34 @@ public class DataTransferStrategyBuilder {
public static DataTransferStrategy buildForRestore(
@Nullable ForStFlinkFileSystem forStFlinkFileSystem,
+ ForStPathContainer forStPathContainer,
Collection<StateHandleTransferSpec> specs,
RecoveryClaimMode recoveryClaimMode) {
DataTransferStrategy strategy;
FileSystem cpSharedFs = getSharedStateFileSystem(specs);
+ boolean isDbUnderSameJobPathFromRestore =
+ isDbUnderSameJobPathFromRestore(forStPathContainer, specs);
if (forStFlinkFileSystem == null
|| cpSharedFs == null
|| !forStFlinkFileSystem.getUri().equals(cpSharedFs.getUri())
- || recoveryClaimMode == RecoveryClaimMode.NO_CLAIM) {
+ || (!isDbUnderSameJobPathFromRestore
+ && recoveryClaimMode == RecoveryClaimMode.NO_CLAIM)) {
strategy =
forStFlinkFileSystem == null
? new CopyDataTransferStrategy()
: new
CopyDataTransferStrategy(forStFlinkFileSystem);
- LOG.info(
- "Build DataTransferStrategy for Restore: {},
forStFlinkFileSystem: {}, cpSharedFs:{}, recoveryClaimMode:{}",
+ LOG.debug(
+ "Build DataTransferStrategy for Restore: {},
forStFlinkFileSystem: {}, cpSharedFs:{}, isDbUnderSameJobPathFromRestore:{},
recoveryClaimMode:{}",
strategy,
forStFlinkFileSystem,
cpSharedFs,
+ isDbUnderSameJobPathFromRestore,
recoveryClaimMode);
return strategy;
}
strategy = new ReusableDataTransferStrategy(forStFlinkFileSystem);
- LOG.info("Build DataTransferStrategy for Restore: {}", strategy);
+ LOG.debug("Build DataTransferStrategy for Restore: {}", strategy);
return strategy;
}
@@ -168,4 +174,25 @@ public class DataTransferStrategyBuilder {
}
return null;
}
+
+ // Verify if the job path matches the restored path. A match indicates
that the job is being
+ // restored from a failover.
+ private static boolean isDbUnderSameJobPathFromRestore(
+ ForStPathContainer forStPathContainer,
Collection<StateHandleTransferSpec> specs) {
+ String jobPathStr = forStPathContainer.getJobPath().getPath();
+ for (StateHandleTransferSpec spec : specs) {
+ IncrementalRemoteKeyedStateHandle stateHandle =
spec.getStateHandle();
+ for (IncrementalKeyedStateHandle.HandleAndLocalPath
handleAndLocalPath :
+ stateHandle.getSharedState()) {
+ StreamStateHandle handle = handleAndLocalPath.getHandle();
+ if (handle instanceof FileStateHandle) {
+ Path dbRemotePath = ((FileStateHandle)
handle).getFilePath();
+ if (!dbRemotePath.getPath().startsWith(jobPathStr)) {
+ return false;
+ }
+ }
+ }
+ }
+ return true;
+ }
}
diff --git
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/datatransfer/ForStStateDataTransfer.java
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/datatransfer/ForStStateDataTransfer.java
index 4dfc9c0a91f..0305f8cd02e 100644
---
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/datatransfer/ForStStateDataTransfer.java
+++
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/datatransfer/ForStStateDataTransfer.java
@@ -29,6 +29,7 @@ import
org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocal
import org.apache.flink.runtime.state.StateUtil;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.state.forst.ForStKeyedStateBackend;
+import org.apache.flink.state.forst.ForStPathContainer;
import org.apache.flink.state.forst.StateHandleTransferSpec;
import org.apache.flink.state.forst.fs.ForStFlinkFileSystem;
import org.apache.flink.util.ExceptionUtils;
@@ -231,6 +232,7 @@ public class ForStStateDataTransfer implements Closeable {
* @throws Exception If anything about the transfer goes wrong.
*/
public void transferAllStateDataToDirectory(
+ ForStPathContainer forStPathContainer,
Collection<StateHandleTransferSpec> transferSpecs,
CloseableRegistry closeableRegistry,
RecoveryClaimMode recoveryClaimMode)
@@ -244,7 +246,10 @@ public class ForStStateDataTransfer implements Closeable {
try {
List<CompletableFuture<Void>> futures =
transferAllStateDataToDirectoryAsync(
- transferSpecs, internalCloser,
recoveryClaimMode)
+ forStPathContainer,
+ transferSpecs,
+ internalCloser,
+ recoveryClaimMode)
.collect(Collectors.toList());
// Wait until either all futures completed successfully or one
failed exceptionally.
@@ -276,12 +281,13 @@ public class ForStStateDataTransfer implements Closeable {
/** Asynchronously runs the specified transfer requests on
executorService. */
private Stream<CompletableFuture<Void>>
transferAllStateDataToDirectoryAsync(
+ ForStPathContainer forStPathContainer,
Collection<StateHandleTransferSpec> transferSpecs,
CloseableRegistry closeableRegistry,
RecoveryClaimMode recoveryClaimMode) {
DataTransferStrategy strategy =
DataTransferStrategyBuilder.buildForRestore(
- forStFs, transferSpecs, recoveryClaimMode);
+ forStFs, forStPathContainer, transferSpecs,
recoveryClaimMode);
return transferSpecs.stream()
.flatMap(
diff --git
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/restore/ForStIncrementalRestoreOperation.java
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/restore/ForStIncrementalRestoreOperation.java
index 699da08a53d..f205fe21b9f 100644
---
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/restore/ForStIncrementalRestoreOperation.java
+++
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/restore/ForStIncrementalRestoreOperation.java
@@ -45,6 +45,7 @@ import org.apache.flink.state.forst.ForStDBWriteBatchWrapper;
import org.apache.flink.state.forst.ForStIncrementalCheckpointUtils;
import org.apache.flink.state.forst.ForStNativeMetricOptions;
import org.apache.flink.state.forst.ForStOperationUtils;
+import org.apache.flink.state.forst.ForStPathContainer;
import org.apache.flink.state.forst.ForStResourceContainer;
import org.apache.flink.state.forst.StateHandleTransferSpec;
import org.apache.flink.state.forst.datatransfer.ForStStateDataTransfer;
@@ -93,7 +94,6 @@ import static
org.apache.flink.state.forst.ForStIncrementalCheckpointUtils.check
import static
org.apache.flink.state.forst.ForStIncrementalCheckpointUtils.clipDBWithKeyGroupRange;
import static
org.apache.flink.state.forst.ForStIncrementalCheckpointUtils.findTheBestStateHandleForInitial;
import static
org.apache.flink.state.forst.ForStOperationUtils.createColumnFamilyOptions;
-import static
org.apache.flink.state.forst.ForStResourceContainer.DB_DIR_STRING;
/** Encapsulates the process of restoring a ForSt instance from an incremental
snapshot. */
public class ForStIncrementalRestoreOperation<K> implements
ForStRestoreOperation {
@@ -217,7 +217,7 @@ public class ForStIncrementalRestoreOperation<K> implements
ForStRestoreOperatio
toTransferSpecs.add(
new StateHandleTransferSpec(
restoreStateHandles.get(bestStateHandleForInit),
- new Path(forstBasePath, DB_DIR_STRING)));
+ new Path(forstBasePath,
ForStPathContainer.DB_DIR_STRING)));
}
for (int i = 0; i < restoreStateHandles.size(); i++) {
if (i != bestStateHandleForInit) {
@@ -268,7 +268,10 @@ public class ForStIncrementalRestoreOperation<K>
implements ForStRestoreOperatio
ForStStateDataTransfer.DEFAULT_THREAD_NUM,
optionsContainer.getFileSystem())) {
transfer.transferAllStateDataToDirectory(
- specs, cancelStreamRegistry, recoveryClaimMode);
+ optionsContainer.getPathContainer(),
+ specs,
+ cancelStreamRegistry,
+ recoveryClaimMode);
}
}
@@ -697,7 +700,7 @@ public class ForStIncrementalRestoreOperation<K> implements
ForStRestoreOperatio
StateHandleTransferSpec baseSpec =
new StateHandleTransferSpec(
restoreStateHandles.get(bestStateHandleForInit),
- new Path(forstBasePath, DB_DIR_STRING));
+ new Path(forstBasePath,
ForStPathContainer.DB_DIR_STRING));
transferAllStateHandles(Collections.singletonList(baseSpec));
mergeStateHandlesWithCopyFromTemporaryInstance(
baseSpec,
@@ -751,7 +754,7 @@ public class ForStIncrementalRestoreOperation<K> implements
ForStRestoreOperatio
String uuid = UUID.randomUUID().toString();
String subPathStr =
- optionsContainer.getRemoteBasePath() != null
+
optionsContainer.getPathContainer().getRemoteBasePath() != null
? exportBasePath.getName() + "/" + uuid
: exportBasePath.toString() + "/" + uuid;
ExportImportFilesMetaData exportedColumnFamilyMetaData =
@@ -953,7 +956,7 @@ public class ForStIncrementalRestoreOperation<K> implements
ForStRestoreOperatio
new ArrayList<>(stateMetaInfoSnapshots.size() + 1);
String dbName =
- optionsContainer.getRemoteBasePath() != null
+ optionsContainer.getPathContainer().getRemoteBasePath() != null
? "/" +
stateHandleSpec.getTransferDestination().getName()
: stateHandleSpec.getTransferDestination().toString();
diff --git
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java
index 0cd1312cc44..2d35bd13d41 100644
---
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java
+++
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java
@@ -489,18 +489,13 @@ public class ForStSyncKeyedStateBackend<K> extends
AbstractKeyedStateBackend<K>
columnFamilyOptions.forEach(IOUtils::closeQuietly);
LOG.info(
- "Closed ForSt State Backend. Cleaning up ForSt local
working directory {}, remote working directory {}.",
- optionsContainer.getLocalBasePath(),
- optionsContainer.getRemoteBasePath());
+ "Closed ForSt State Backend. Cleaning up ForSt: {}.",
+ optionsContainer.getPathContainer());
try {
optionsContainer.clearDirectories();
} catch (Exception ex) {
- LOG.warn(
- "Could not delete ForSt local working directory {},
remote working directory {}.",
- optionsContainer.getLocalBasePath(),
- optionsContainer.getRemoteBasePath(),
- ex);
+ LOG.warn("Could not delete ForSt: {}.",
optionsContainer.getPathContainer(), ex);
}
IOUtils.closeQuietly(optionsContainer);
diff --git
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackendBuilder.java
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackendBuilder.java
index 07de516c71d..0e83b22a0ce 100644
---
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackendBuilder.java
+++
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackendBuilder.java
@@ -506,8 +506,8 @@ public class ForStSyncKeyedStateBackendBuilder<K> extends
AbstractKeyedStateBack
// working dir. We will implement this in ForStDB later, but before
that, we achieved this
// by setting the dbPath to "/" when the dfs directory existed.
Path instanceForStPath =
- optionsContainer.getRemoteForStPath() == null
- ? optionsContainer.getLocalForStPath()
+ optionsContainer.getPathContainer().getRemoteForStPath() ==
null
+ ?
optionsContainer.getPathContainer().getLocalForStPath()
: new Path("/db");
if (CollectionUtil.isEmptyOrAllElementsNull(restoreStateHandles)) {
diff --git
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStExtension.java
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStExtension.java
index 6e6c01280d4..99b4c5c68b5 100644
---
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStExtension.java
+++
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStExtension.java
@@ -164,13 +164,14 @@ public class ForStExtension implements
BeforeEachCallback, AfterEachCallback {
resourceGuard = new ResourceGuard();
File localWorkingDir = TempDirUtils.newFolder(rocksFolder.toPath(),
"local-working-dir");
+ Path localJobPath = new Path(localWorkingDir.getAbsolutePath());
+ Path localBasePath = new Path(localJobPath, "base");
this.resourceContainer =
new ForStResourceContainer(
new Configuration(),
optionsFactory,
null,
- new Path(localWorkingDir.getAbsolutePath()),
- null,
+ ForStPathContainer.ofLocal(localJobPath,
localBasePath),
null,
null,
null,
@@ -190,9 +191,10 @@ public class ForStExtension implements BeforeEachCallback,
AfterEachCallback {
// working dir. We will implement this in ForStDB later, but before
that, we achieved this
// by setting the dbPath to "/" when the dfs directory existed.
// TODO: use localForStPath as dbPath after ForSt Support mixing
local-dir and remote-dir
+ ForStPathContainer pathContainer =
resourceContainer.getPathContainer();
Path instanceForStPath =
- resourceContainer.getRemoteForStPath() == null
- ? resourceContainer.getLocalForStPath()
+ pathContainer.getRemoteForStPath() == null
+ ? pathContainer.getLocalForStPath()
: new Path("/");
this.columnFamilyHandles = new ArrayList<>(1);
diff --git
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStResourceContainerTest.java
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStResourceContainerTest.java
index fd9ae8ea111..9085f6a1fd2 100644
---
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStResourceContainerTest.java
+++
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStResourceContainerTest.java
@@ -308,15 +308,19 @@ public class ForStResourceContainerTest {
@Test
public void testDirectoryResources() throws Exception {
- Path localBasePath = new Path(TMP_FOLDER.newFolder().getPath());
- Path remoteBasePath = new Path(TMP_FOLDER.newFolder().getPath());
+ Path localJobPath = new Path(TMP_FOLDER.newFolder().getPath());
+ Path localBasePath = new Path(localJobPath, "base");
+ localBasePath.getFileSystem().mkdirs(localBasePath);
+ Path remoteJobPath = new Path(TMP_FOLDER.newFolder().getPath());
+ Path remoteBasePath = new Path(remoteJobPath, "base");
+ remoteBasePath.getFileSystem().mkdirs(remoteBasePath);
try (final ForStResourceContainer optionsContainer =
new ForStResourceContainer(
new Configuration(),
null,
null,
- localBasePath,
- remoteBasePath,
+ ForStPathContainer.of(
+ localJobPath, localBasePath, remoteJobPath,
remoteBasePath),
null,
new FsCheckpointStorageAccess(
new Path(TMP_FOLDER.newFolder().getPath()),
diff --git
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendConfigTest.java
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendConfigTest.java
index 78b2a0bd79d..50f3ac0954b 100644
---
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendConfigTest.java
+++
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendConfigTest.java
@@ -530,7 +530,14 @@ public class ForStStateBackendConfigTest {
try (ForStResourceContainer optionsContainer =
new ForStResourceContainer(
- configuration, null, null, null, null, null, null,
null, false)) {
+ configuration,
+ null,
+ null,
+ ForStPathContainer.empty(),
+ null,
+ null,
+ null,
+ false)) {
DBOptions dbOptions = optionsContainer.getDbOptions();
assertEquals(-1, dbOptions.maxOpenFiles());
@@ -615,7 +622,14 @@ public class ForStStateBackendConfigTest {
configuration.set(ForStConfigurableOptions.COMPACTION_STYLE,
CompactionStyle.UNIVERSAL);
try (final ForStResourceContainer optionsContainer =
new ForStResourceContainer(
- configuration, null, null, null, null, null, null,
null, false)) {
+ configuration,
+ null,
+ null,
+ ForStPathContainer.empty(),
+ null,
+ null,
+ null,
+ false)) {
final ColumnFamilyOptions columnFamilyOptions =
optionsContainer.getColumnOptions();
assertNotNull(columnFamilyOptions);
@@ -624,7 +638,14 @@ public class ForStStateBackendConfigTest {
try (final ForStResourceContainer optionsContainer =
new ForStResourceContainer(
- new Configuration(), null, null, null, null, null,
null, null, false)) {
+ new Configuration(),
+ null,
+ null,
+ ForStPathContainer.empty(),
+ null,
+ null,
+ null,
+ false)) {
final ColumnFamilyOptions columnFamilyOptions =
optionsContainer.getColumnOptions();
assertNotNull(columnFamilyOptions);
diff --git
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendTest.java
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendTest.java
index 7c4e58e1ce6..b3d8bc8e364 100644
---
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendTest.java
+++
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendTest.java
@@ -130,7 +130,8 @@ class ForStStateBackendTest extends
StateBackendTestBase<ForStStateBackend> {
ForStStateBackend backend = new ForStStateBackend();
ForStSyncKeyedStateBackend keyedStateBackend1 =
(ForStSyncKeyedStateBackend)
createKeyedBackend(IntSerializer.INSTANCE);
-
assertThat(keyedStateBackend1.getOptionsContainer().getRemoteBasePath()).isNull();
+
assertThat(keyedStateBackend1.getOptionsContainer().getPathContainer().getRemoteBasePath())
+ .isNull();
Configuration config = new Configuration();
config.set(ForStOptions.SYNC_ENFORCE_LOCAL, false);
backend = backend.configure(config,
Thread.currentThread().getContextClassLoader());
@@ -151,6 +152,7 @@ class ForStStateBackendTest extends
StateBackendTestBase<ForStStateBackend> {
Collections.emptyList(),
new CloseableRegistry(),
1.0d));
-
assertThat(keyedStateBackend2.getOptionsContainer().getRemoteBasePath()).isNotNull();
+
assertThat(keyedStateBackend2.getOptionsContainer().getPathContainer().getRemoteBasePath())
+ .isNotNull();
}
}
diff --git
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/datatransfer/DataTransferStrategyTest.java
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/datatransfer/DataTransferStrategyTest.java
index 73f3f46df4f..deddefe8835 100644
---
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/datatransfer/DataTransferStrategyTest.java
+++
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/datatransfer/DataTransferStrategyTest.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory;
+import org.apache.flink.state.forst.ForStPathContainer;
import org.apache.flink.state.forst.StateHandleTransferSpec;
import org.apache.flink.state.forst.fs.ForStFlinkFileSystem;
import org.apache.flink.state.forst.fs.filemapping.FileOwnershipDecider;
@@ -471,11 +472,14 @@ public class DataTransferStrategyTest {
void testRestoreStrategyAsExpected(
@Nullable ForStFlinkFileSystem forStFlinkFileSystem,
+ String sourceDirectoryStr,
+ String desDirStr,
RecoveryClaimMode recoveryClaimMode,
Class<?> expected) {
List<HandleAndLocalPath> sharedStateHandleList = new ArrayList<>();
sharedStateHandleList.add(
- HandleAndLocalPath.of(new FileStateHandle(new Path("1.sst"),
0), "1.sst"));
+ HandleAndLocalPath.of(
+ new FileStateHandle(new Path(sourceDirectoryStr +
"/1.sst"), 0), "1.sst"));
IncrementalRemoteKeyedStateHandle stateHandle =
new IncrementalRemoteKeyedStateHandle(
UUID.randomUUID(),
@@ -484,12 +488,15 @@ public class DataTransferStrategyTest {
sharedStateHandleList,
Collections.emptyList(),
new FileStateHandle(new Path("meta"), 0));
+ Path destJobDir = new Path(desDirStr);
+ Path destBaseDir = new Path(destJobDir, "base");
assertThat(
DataTransferStrategyBuilder.buildForRestore(
forStFlinkFileSystem,
+ ForStPathContainer.of(null, null,
destJobDir, destBaseDir),
Collections.singletonList(
new StateHandleTransferSpec(
- stateHandle, new
Path("dst"))),
+ stateHandle,
destBaseDir)),
recoveryClaimMode)
.getClass())
.isEqualTo(expected);
@@ -524,19 +531,76 @@ public class DataTransferStrategyTest {
CopyDataTransferStrategy.class);
testRestoreStrategyAsExpected(
- forStFlinkFileSystem, RecoveryClaimMode.CLAIM,
ReusableDataTransferStrategy.class);
+ forStFlinkFileSystem,
+ "/src-dir",
+ "/dst-dir",
+ RecoveryClaimMode.CLAIM,
+ ReusableDataTransferStrategy.class);
+
+ testRestoreStrategyAsExpected(
+ forStFlinkFileSystem,
+ "/src-dir",
+ "/dst-dir",
+ RecoveryClaimMode.NO_CLAIM,
+ CopyDataTransferStrategy.class);
+
+ testRestoreStrategyAsExpected(
+ forStFlinkFileSystem,
+ "/src-dir",
+ "/dst-dir",
+ RecoveryClaimMode.LEGACY,
+ ReusableDataTransferStrategy.class);
+
+ testRestoreStrategyAsExpected(
+ null,
+ "/src-dir",
+ "/dst-dir",
+ RecoveryClaimMode.CLAIM,
+ CopyDataTransferStrategy.class);
+
+ testRestoreStrategyAsExpected(
+ null,
+ "/src-dir",
+ "/dst-dir",
+ RecoveryClaimMode.NO_CLAIM,
+ CopyDataTransferStrategy.class);
+ // Restoring from the same directory indicates a failover scenario,
allowing us to reuse the
+ // files if we are in a disaggregated setup.
testRestoreStrategyAsExpected(
- forStFlinkFileSystem, RecoveryClaimMode.NO_CLAIM,
CopyDataTransferStrategy.class);
+ forStFlinkFileSystem,
+ "/same-dir",
+ "/same-dir",
+ RecoveryClaimMode.CLAIM,
+ ReusableDataTransferStrategy.class);
testRestoreStrategyAsExpected(
- forStFlinkFileSystem, RecoveryClaimMode.LEGACY,
ReusableDataTransferStrategy.class);
+ forStFlinkFileSystem,
+ "/same-dir",
+ "/same-dir",
+ RecoveryClaimMode.NO_CLAIM,
+ ReusableDataTransferStrategy.class);
testRestoreStrategyAsExpected(
- null, RecoveryClaimMode.CLAIM, CopyDataTransferStrategy.class);
+ forStFlinkFileSystem,
+ "/same-dir",
+ "/same-dir",
+ RecoveryClaimMode.LEGACY,
+ ReusableDataTransferStrategy.class);
testRestoreStrategyAsExpected(
- null, RecoveryClaimMode.NO_CLAIM,
CopyDataTransferStrategy.class);
+ null,
+ "/same-dir",
+ "/same-dir",
+ RecoveryClaimMode.CLAIM,
+ CopyDataTransferStrategy.class);
+
+ testRestoreStrategyAsExpected(
+ null,
+ "/same-dir",
+ "/same-dir",
+ RecoveryClaimMode.NO_CLAIM,
+ CopyDataTransferStrategy.class);
}
@TestTemplate
diff --git
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/datatransfer/ForStStateDataTransferTest.java
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/datatransfer/ForStStateDataTransferTest.java
index bfa90404adf..3b161ad1384 100644
---
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/datatransfer/ForStStateDataTransferTest.java
+++
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/datatransfer/ForStStateDataTransferTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.TestStreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+import org.apache.flink.state.forst.ForStPathContainer;
import org.apache.flink.state.forst.StateHandleTransferSpec;
import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.apache.flink.util.IOUtils;
@@ -71,6 +72,14 @@ class ForStStateDataTransferTest extends TestLogger {
@TempDir private java.nio.file.Path temporaryFolder;
+ private ForStPathContainer createPathContainer() throws IOException {
+ Path localJobPath =
Path.fromLocalFile(TempDirUtils.newFolder(temporaryFolder));
+ Path localBasePath = new Path(localJobPath, "base");
+ Path remoteJobPath =
Path.fromLocalFile(TempDirUtils.newFolder(temporaryFolder));
+ Path remoteBasePath = new Path(remoteJobPath, "base");
+ return ForStPathContainer.of(localJobPath, localBasePath,
remoteJobPath, remoteBasePath);
+ }
+
/** Test that the exception arose in the thread pool will rethrow to the
main thread. */
@Test
void testMultiThreadTransferThreadPoolExceptionRethrow() throws
IOException {
@@ -465,11 +474,13 @@ class ForStStateDataTransferTest extends TestLogger {
stateHandle);
try (ForStStateDataTransfer stateTransfer = new
ForStStateDataTransfer(5)) {
+ ForStPathContainer pathContainer = createPathContainer();
stateTransfer.transferAllStateDataToDirectory(
+ pathContainer,
Collections.singletonList(
new StateHandleTransferSpec(
incrementalKeyedStateHandle,
-
Path.fromLocalFile(TempDirUtils.newFolder(temporaryFolder)))),
+ pathContainer.getRemoteForStPath())),
new CloseableRegistry(),
RecoveryClaimMode.DEFAULT);
fail();
@@ -494,8 +505,12 @@ class ForStStateDataTransferTest extends TestLogger {
}
try (ForStStateDataTransfer stateTransfer = new
ForStStateDataTransfer(4)) {
+ ForStPathContainer pathContainer = createPathContainer();
stateTransfer.transferAllStateDataToDirectory(
- transferRequests, new CloseableRegistry(),
RecoveryClaimMode.DEFAULT);
+ pathContainer,
+ transferRequests,
+ new CloseableRegistry(),
+ RecoveryClaimMode.DEFAULT);
}
for (int i = 0; i < numRemoteHandles; ++i) {
@@ -540,8 +555,9 @@ class ForStStateDataTransferTest extends TestLogger {
CloseableRegistry closeableRegistry = new CloseableRegistry();
try (ForStStateDataTransfer stateTransfer = new
ForStStateDataTransfer(5)) {
+ ForStPathContainer pathContainer = createPathContainer();
stateTransfer.transferAllStateDataToDirectory(
- transferRequests, closeableRegistry,
RecoveryClaimMode.DEFAULT);
+ pathContainer, transferRequests, closeableRegistry,
RecoveryClaimMode.DEFAULT);
fail("Exception is expected");
} catch (IOException ignore) {
}