This is an automated email from the ASF dual-hosted git repository. hangxiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 30d75f1511b2b2beac4d7f18ee20941b3bd4cbe8 Author: wangfeifan <zoltar9...@163.com> AuthorDate: Mon May 27 20:46:03 2024 +0800 [hotfix][statebackend] Normalize ForSt working dir Co-authored-by: yhx <master...@gmail.com> --- .../flink/state/forst/ForStKeyedStateBackend.java | 4 +- .../state/forst/ForStKeyedStateBackendBuilder.java | 18 +++-- .../flink/state/forst/ForStResourceContainer.java | 78 ++++++++++++++-------- .../flink/state/forst/ForStStateBackend.java | 30 ++++----- .../state/forst/ForStResourceContainerTest.java | 8 +-- 5 files changed, 83 insertions(+), 55 deletions(-) diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java index 5192536a8dc..c5ec58bccc0 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java @@ -20,6 +20,7 @@ package org.apache.flink.state.forst; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.state.v2.State; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.fs.Path; import org.apache.flink.core.memory.DataInputDeserializer; import org.apache.flink.core.memory.DataOutputSerializer; import org.apache.flink.runtime.asyncprocessing.StateExecutor; @@ -43,7 +44,6 @@ import javax.annotation.concurrent.GuardedBy; import java.io.File; import java.io.IOException; -import java.net.URI; import java.util.HashSet; import java.util.Set; import java.util.function.Function; @@ -231,7 +231,7 @@ public class ForStKeyedStateBackend<K> implements AsyncKeyedStateBackend { } @VisibleForTesting - URI getRemoteBasePath() { + Path getRemoteBasePath() { return optionsContainer.getRemoteBasePath(); } diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java index d3fad83c13f..5a52d9dad6a 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java @@ -37,13 +37,13 @@ import org.apache.flink.util.Preconditions; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ColumnFamilyOptions; -import org.rocksdb.DBOptions; import org.rocksdb.RocksDB; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nonnull; +import java.io.File; import java.util.Collection; import java.util.function.Function; import java.util.function.Supplier; @@ -180,11 +180,21 @@ public class ForStKeyedStateBackendBuilder<K> } private ForStRestoreOperation getForStRestoreOperation() { - DBOptions dbOptions = optionsContainer.getDbOptions(); + // Currently, ForStDB does not support mixing local-dir and remote-dir, and ForStDB will + // concatenates the dfs directory with the local directory as working dir when using flink + // env. We expect to directly use the dfs directory in flink env or local directory as + // working dir. We will implement this in ForStDB later, but before that, we achieved this + // by setting the dbPath to "/" when the dfs directory existed. + // TODO: use localForStPath as dbPath after ForSt Support mixing local-dir and remote-dir + File instanceForStPath = + optionsContainer.getRemoteForStPath() == null + ? optionsContainer.getLocalForStPath() + : new File("/"); + if (CollectionUtil.isEmptyOrAllElementsNull(restoreStateHandles)) { return new ForStNoneRestoreOperation( - optionsContainer.getLocalForStPath(), - dbOptions, + instanceForStPath, + optionsContainer.getDbOptions(), columnFamilyOptionsFactory, nativeMetricOptions, metricGroup); diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java index 2b912ca9732..a5644c3d3b7 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java @@ -23,6 +23,7 @@ import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.memory.OpaqueMemoryResource; import org.apache.flink.util.FileUtils; import org.apache.flink.util.IOUtils; @@ -48,9 +49,7 @@ import javax.annotation.Nullable; import java.io.File; import java.io.IOException; -import java.net.URI; import java.nio.file.Files; -import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -71,9 +70,9 @@ public final class ForStResourceContainer implements AutoCloseable { private static final String DB_DIR_STRING = "db"; - @Nullable private final URI remoteBasePath; + @Nullable private final Path remoteBasePath; - @Nullable private final URI remoteForStPath; + @Nullable private final Path remoteForStPath; @Nullable private final File localBasePath; @@ -97,7 +96,7 @@ public final class ForStResourceContainer implements AutoCloseable { /** The handles to be closed when the container is closed. */ private final ArrayList<AutoCloseable> handlesToClose; - @Nullable private Path relocatedDbLogBaseDir; + @Nullable private java.nio.file.Path relocatedDbLogBaseDir; @VisibleForTesting public ForStResourceContainer() { @@ -121,7 +120,7 @@ public final class ForStResourceContainer implements AutoCloseable { @Nullable ForStOptionsFactory optionsFactory, @Nullable OpaqueMemoryResource<ForStSharedResources> sharedResources, @Nullable File localBasePath, - @Nullable URI remoteBasePath, + @Nullable Path remoteBasePath, boolean enableStatistics) { this.configuration = configuration; @@ -132,7 +131,7 @@ public final class ForStResourceContainer implements AutoCloseable { this.localForStPath = localBasePath != null ? new File(localBasePath, DB_DIR_STRING) : null; this.remoteBasePath = remoteBasePath; this.remoteForStPath = - remoteBasePath != null ? remoteBasePath.resolve(DB_DIR_STRING) : null; + remoteBasePath != null ? new Path(remoteBasePath, DB_DIR_STRING) : null; this.enableStatistics = enableStatistics; this.handlesToClose = new ArrayList<>(); @@ -259,10 +258,15 @@ public final class ForStResourceContainer implements AutoCloseable { } @Nullable - public URI getRemoteBasePath() { + public Path getRemoteBasePath() { return remoteBasePath; } + @Nullable + public Path getRemoteForStPath() { + return remoteForStPath; + } + /** * Prepare local and remote directories. * @@ -273,28 +277,27 @@ public final class ForStResourceContainer implements AutoCloseable { prepareDirectories(remoteBasePath, remoteForStPath); } if (localBasePath != null && localForStPath != null) { - prepareDirectories(new URI(localBasePath.getPath()), new URI(localForStPath.getPath())); + prepareDirectories( + new Path(localBasePath.getPath()), new Path(localForStPath.getPath())); } } - private static void prepareDirectories(URI basePath, URI dbPath) throws IOException { - FileSystem fileSystem = FileSystem.get(basePath); - org.apache.flink.core.fs.Path tempBasePath = new org.apache.flink.core.fs.Path(basePath), - tempDBPath = new org.apache.flink.core.fs.Path(dbPath); - if (fileSystem.exists(tempBasePath)) { - if (!fileSystem.getFileStatus(tempBasePath).isDir()) { - throw new IOException("Not a directory: " + tempBasePath); + private static void prepareDirectories(Path basePath, Path dbPath) throws IOException { + FileSystem fileSystem = basePath.getFileSystem(); + if (fileSystem.exists(basePath)) { + if (!fileSystem.getFileStatus(basePath).isDir()) { + throw new IOException("Not a directory: " + basePath); } - } else if (!fileSystem.mkdirs(tempBasePath)) { + } else if (!fileSystem.mkdirs(basePath)) { throw new IOException( - String.format("Could not create ForSt directory at %s.", tempBasePath)); + String.format("Could not create ForSt directory at %s.", basePath)); } - if (fileSystem.exists(tempDBPath)) { - fileSystem.delete(tempDBPath, true); + if (fileSystem.exists(dbPath)) { + fileSystem.delete(dbPath, true); } - if (!fileSystem.mkdirs(tempDBPath)) { + if (!fileSystem.mkdirs(dbPath)) { throw new IOException( - String.format("Could not create ForSt db directory at %s.", tempDBPath)); + String.format("Could not create ForSt db directory at %s.", dbPath)); } } @@ -308,15 +311,14 @@ public final class ForStResourceContainer implements AutoCloseable { clearDirectories(remoteBasePath); } if (localBasePath != null) { - clearDirectories(new URI(localBasePath.getPath())); + clearDirectories(new Path(localBasePath.getPath())); } } - private static void clearDirectories(URI basePath) throws IOException { - FileSystem fileSystem = FileSystem.get(basePath); - org.apache.flink.core.fs.Path tempBasePath = new org.apache.flink.core.fs.Path(basePath); - if (fileSystem.exists(tempBasePath)) { - fileSystem.delete(tempBasePath, true); + private static void clearDirectories(Path basePath) throws IOException { + FileSystem fileSystem = basePath.getFileSystem(); + if (fileSystem.exists(basePath)) { + fileSystem.delete(basePath, true); } } @@ -493,7 +495,27 @@ public final class ForStResourceContainer implements AutoCloseable { String relocatedDbLogDir = logFile.getParent(); this.relocatedDbLogBaseDir = new File(relocatedDbLogDir).toPath(); dbOptions.setDbLogDir(relocatedDbLogDir); + } else { + setLocalForStPathAsLogDir(dbOptions); } + } else { + setLocalForStPathAsLogDir(dbOptions); + } + } + + private void setLocalForStPathAsLogDir(DBOptions dbOptions) { + // Currently, ForStDB does not support mixing local-dir and remote-dir, and ForStDB will + // concatenates the dfs directory with the local directory as working dir when using flink + // env. We expect to directly use the dfs directory in flink env or local directory as + // working dir. We will implement this in ForStDB later, but before that, we achieved this + // by setting the dbPath to "/" when the dfs directory existed. Another problem is that when + // the system property "log.file" is not set, ForSt directly uses the instance path as the + // log dir, which results in "/" being used as the log directory. This often has permission + // issues, so the db log dir is temporarily set explicitly here. + // TODO: remove this method after ForSt deal log dir well + if (localForStPath != null) { + this.relocatedDbLogBaseDir = localForStPath.toPath(); + dbOptions.setDbLogDir(localForStPath.getPath()); } } diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java index c2521772b44..7528b4d5265 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java @@ -50,7 +50,6 @@ import java.io.File; import java.io.IOException; import java.lang.reflect.Field; import java.net.URI; -import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -90,7 +89,7 @@ public class ForStStateBackend extends AbstractManagedMemoryStateBackend * configuration values will be used. The configuration will fallback to local directory by * default. TODO: fallback to checkpoint directory if not configured. */ - @Nullable private URI remoteForStDirectory; + @Nullable private Path remoteForStDirectory; /** * Base paths for ForSt directory, as configured. Null if not yet set, in which case the @@ -154,15 +153,7 @@ public class ForStStateBackend extends AbstractManagedMemoryStateBackend this.remoteForStDirectory = original.remoteForStDirectory; } else { String remoteDirStr = config.get(ForStOptions.REMOTE_DIRECTORY); - try { - this.remoteForStDirectory = remoteDirStr == null ? null : new URI(remoteDirStr); - } catch (URISyntaxException e) { - throw new RuntimeException( - String.format( - "Exception when transform %s to URI, the value is: %s", - ForStOptions.REMOTE_DIRECTORY.key(), remoteDirStr), - e); - } + this.remoteForStDirectory = remoteDirStr == null ? null : new Path(remoteDirStr); } // configure local directories @@ -308,12 +299,17 @@ public class ForStStateBackend extends AbstractManagedMemoryStateBackend lazyInitializeForJob(env, fileCompatibleIdentifier); - String childPath = - "job_" + jobId + "_op_" + fileCompatibleIdentifier + "_uuid_" + UUID.randomUUID(); + String opChildPath = + String.format( + "op_%s_attempt_%s", + fileCompatibleIdentifier, env.getTaskInfo().getAttemptNumber()); - File localBasePath = new File(getNextStoragePath(), childPath); - URI remoteBasePath = - remoteForStDirectory != null ? remoteForStDirectory.resolve(childPath) : null; + File localBasePath = + new File(new File(getNextStoragePath(), jobId.toHexString()), opChildPath); + Path remoteBasePath = + remoteForStDirectory != null + ? new Path(new Path(remoteForStDirectory, jobId.toHexString()), opChildPath) + : null; final OpaqueMemoryResource<ForStSharedResources> sharedResources = ForStOperationUtils.allocateSharedCachesIfConfigured( @@ -570,7 +566,7 @@ public class ForStStateBackend extends AbstractManagedMemoryStateBackend private ForStResourceContainer createOptionsAndResourceContainer( @Nullable OpaqueMemoryResource<ForStSharedResources> sharedResources, @Nullable File localBasePath, - @Nullable URI remoteBasePath, + @Nullable Path remoteBasePath, boolean enableStatistics) { return new ForStResourceContainer( diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStResourceContainerTest.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStResourceContainerTest.java index bec699931c6..a3d4e7fa979 100644 --- a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStResourceContainerTest.java +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStResourceContainerTest.java @@ -19,6 +19,7 @@ package org.apache.flink.state.forst; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.memory.OpaqueMemoryResource; import org.apache.flink.util.function.ThrowingRunnable; @@ -43,7 +44,6 @@ import org.rocksdb.WriteOptions; import java.io.File; import java.io.IOException; import java.lang.reflect.Field; -import java.net.URI; import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; @@ -303,18 +303,18 @@ public class ForStResourceContainerTest { @Test public void testDirectoryResources() throws Exception { File localBasePath = TMP_FOLDER.newFolder(); - URI remoteBasePath = TMP_FOLDER.newFolder().toURI(); + Path remoteBasePath = new Path(TMP_FOLDER.newFolder().getPath()); try (final ForStResourceContainer optionsContainer = new ForStResourceContainer( new Configuration(), null, null, localBasePath, remoteBasePath, false)) { optionsContainer.prepareDirectories(); assertTrue(localBasePath.exists()); - assertTrue(new File(remoteBasePath).exists()); + assertTrue(new File(remoteBasePath.getPath()).exists()); assertTrue(optionsContainer.getDbOptions().getEnv() instanceof FlinkEnv); optionsContainer.clearDirectories(); assertFalse(localBasePath.exists()); - assertFalse(new File(remoteBasePath).exists()); + assertFalse(new File(remoteBasePath.getPath()).exists()); } } }