This is an automated email from the ASF dual-hosted git repository.

hangxiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 30d75f1511b2b2beac4d7f18ee20941b3bd4cbe8
Author: wangfeifan <zoltar9...@163.com>
AuthorDate: Mon May 27 20:46:03 2024 +0800

    [hotfix][statebackend] Normalize ForSt working dir
    
    Co-authored-by: yhx <master...@gmail.com>
---
 .../flink/state/forst/ForStKeyedStateBackend.java  |  4 +-
 .../state/forst/ForStKeyedStateBackendBuilder.java | 18 +++--
 .../flink/state/forst/ForStResourceContainer.java  | 78 ++++++++++++++--------
 .../flink/state/forst/ForStStateBackend.java       | 30 ++++-----
 .../state/forst/ForStResourceContainerTest.java    |  8 +--
 5 files changed, 83 insertions(+), 55 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java
index 5192536a8dc..c5ec58bccc0 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java
@@ -20,6 +20,7 @@ package org.apache.flink.state.forst;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.state.v2.State;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.memory.DataInputDeserializer;
 import org.apache.flink.core.memory.DataOutputSerializer;
 import org.apache.flink.runtime.asyncprocessing.StateExecutor;
@@ -43,7 +44,6 @@ import javax.annotation.concurrent.GuardedBy;
 
 import java.io.File;
 import java.io.IOException;
-import java.net.URI;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.function.Function;
@@ -231,7 +231,7 @@ public class ForStKeyedStateBackend<K> implements 
AsyncKeyedStateBackend {
     }
 
     @VisibleForTesting
-    URI getRemoteBasePath() {
+    Path getRemoteBasePath() {
         return optionsContainer.getRemoteBasePath();
     }
 
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java
index d3fad83c13f..5a52d9dad6a 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java
@@ -37,13 +37,13 @@ import org.apache.flink.util.Preconditions;
 
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
-import org.rocksdb.DBOptions;
 import org.rocksdb.RocksDB;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnull;
 
+import java.io.File;
 import java.util.Collection;
 import java.util.function.Function;
 import java.util.function.Supplier;
@@ -180,11 +180,21 @@ public class ForStKeyedStateBackendBuilder<K>
     }
 
     private ForStRestoreOperation getForStRestoreOperation() {
-        DBOptions dbOptions = optionsContainer.getDbOptions();
+        // Currently, ForStDB does not support mixing local-dir and 
remote-dir, and ForStDB will
+        // concatenates the dfs directory with the local directory as working 
dir when using flink
+        // env. We expect to directly use the dfs directory in flink env or 
local directory as
+        // working dir. We will implement this in ForStDB later, but before 
that, we achieved this
+        // by setting the dbPath to "/" when the dfs directory existed.
+        // TODO: use localForStPath as dbPath after ForSt Support mixing 
local-dir and remote-dir
+        File instanceForStPath =
+                optionsContainer.getRemoteForStPath() == null
+                        ? optionsContainer.getLocalForStPath()
+                        : new File("/");
+
         if (CollectionUtil.isEmptyOrAllElementsNull(restoreStateHandles)) {
             return new ForStNoneRestoreOperation(
-                    optionsContainer.getLocalForStPath(),
-                    dbOptions,
+                    instanceForStPath,
+                    optionsContainer.getDbOptions(),
                     columnFamilyOptionsFactory,
                     nativeMetricOptions,
                     metricGroup);
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java
index 2b912ca9732..a5644c3d3b7 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.memory.OpaqueMemoryResource;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.IOUtils;
@@ -48,9 +49,7 @@ import javax.annotation.Nullable;
 
 import java.io.File;
 import java.io.IOException;
-import java.net.URI;
 import java.nio.file.Files;
-import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -71,9 +70,9 @@ public final class ForStResourceContainer implements 
AutoCloseable {
 
     private static final String DB_DIR_STRING = "db";
 
-    @Nullable private final URI remoteBasePath;
+    @Nullable private final Path remoteBasePath;
 
-    @Nullable private final URI remoteForStPath;
+    @Nullable private final Path remoteForStPath;
 
     @Nullable private final File localBasePath;
 
@@ -97,7 +96,7 @@ public final class ForStResourceContainer implements 
AutoCloseable {
     /** The handles to be closed when the container is closed. */
     private final ArrayList<AutoCloseable> handlesToClose;
 
-    @Nullable private Path relocatedDbLogBaseDir;
+    @Nullable private java.nio.file.Path relocatedDbLogBaseDir;
 
     @VisibleForTesting
     public ForStResourceContainer() {
@@ -121,7 +120,7 @@ public final class ForStResourceContainer implements 
AutoCloseable {
             @Nullable ForStOptionsFactory optionsFactory,
             @Nullable OpaqueMemoryResource<ForStSharedResources> 
sharedResources,
             @Nullable File localBasePath,
-            @Nullable URI remoteBasePath,
+            @Nullable Path remoteBasePath,
             boolean enableStatistics) {
 
         this.configuration = configuration;
@@ -132,7 +131,7 @@ public final class ForStResourceContainer implements 
AutoCloseable {
         this.localForStPath = localBasePath != null ? new File(localBasePath, 
DB_DIR_STRING) : null;
         this.remoteBasePath = remoteBasePath;
         this.remoteForStPath =
-                remoteBasePath != null ? remoteBasePath.resolve(DB_DIR_STRING) 
: null;
+                remoteBasePath != null ? new Path(remoteBasePath, 
DB_DIR_STRING) : null;
 
         this.enableStatistics = enableStatistics;
         this.handlesToClose = new ArrayList<>();
@@ -259,10 +258,15 @@ public final class ForStResourceContainer implements 
AutoCloseable {
     }
 
     @Nullable
-    public URI getRemoteBasePath() {
+    public Path getRemoteBasePath() {
         return remoteBasePath;
     }
 
+    @Nullable
+    public Path getRemoteForStPath() {
+        return remoteForStPath;
+    }
+
     /**
      * Prepare local and remote directories.
      *
@@ -273,28 +277,27 @@ public final class ForStResourceContainer implements 
AutoCloseable {
             prepareDirectories(remoteBasePath, remoteForStPath);
         }
         if (localBasePath != null && localForStPath != null) {
-            prepareDirectories(new URI(localBasePath.getPath()), new 
URI(localForStPath.getPath()));
+            prepareDirectories(
+                    new Path(localBasePath.getPath()), new 
Path(localForStPath.getPath()));
         }
     }
 
-    private static void prepareDirectories(URI basePath, URI dbPath) throws 
IOException {
-        FileSystem fileSystem = FileSystem.get(basePath);
-        org.apache.flink.core.fs.Path tempBasePath = new 
org.apache.flink.core.fs.Path(basePath),
-                tempDBPath = new org.apache.flink.core.fs.Path(dbPath);
-        if (fileSystem.exists(tempBasePath)) {
-            if (!fileSystem.getFileStatus(tempBasePath).isDir()) {
-                throw new IOException("Not a directory: " + tempBasePath);
+    private static void prepareDirectories(Path basePath, Path dbPath) throws 
IOException {
+        FileSystem fileSystem = basePath.getFileSystem();
+        if (fileSystem.exists(basePath)) {
+            if (!fileSystem.getFileStatus(basePath).isDir()) {
+                throw new IOException("Not a directory: " + basePath);
             }
-        } else if (!fileSystem.mkdirs(tempBasePath)) {
+        } else if (!fileSystem.mkdirs(basePath)) {
             throw new IOException(
-                    String.format("Could not create ForSt directory at %s.", 
tempBasePath));
+                    String.format("Could not create ForSt directory at %s.", 
basePath));
         }
-        if (fileSystem.exists(tempDBPath)) {
-            fileSystem.delete(tempDBPath, true);
+        if (fileSystem.exists(dbPath)) {
+            fileSystem.delete(dbPath, true);
         }
-        if (!fileSystem.mkdirs(tempDBPath)) {
+        if (!fileSystem.mkdirs(dbPath)) {
             throw new IOException(
-                    String.format("Could not create ForSt db directory at 
%s.", tempDBPath));
+                    String.format("Could not create ForSt db directory at 
%s.", dbPath));
         }
     }
 
@@ -308,15 +311,14 @@ public final class ForStResourceContainer implements 
AutoCloseable {
             clearDirectories(remoteBasePath);
         }
         if (localBasePath != null) {
-            clearDirectories(new URI(localBasePath.getPath()));
+            clearDirectories(new Path(localBasePath.getPath()));
         }
     }
 
-    private static void clearDirectories(URI basePath) throws IOException {
-        FileSystem fileSystem = FileSystem.get(basePath);
-        org.apache.flink.core.fs.Path tempBasePath = new 
org.apache.flink.core.fs.Path(basePath);
-        if (fileSystem.exists(tempBasePath)) {
-            fileSystem.delete(tempBasePath, true);
+    private static void clearDirectories(Path basePath) throws IOException {
+        FileSystem fileSystem = basePath.getFileSystem();
+        if (fileSystem.exists(basePath)) {
+            fileSystem.delete(basePath, true);
         }
     }
 
@@ -493,7 +495,27 @@ public final class ForStResourceContainer implements 
AutoCloseable {
                 String relocatedDbLogDir = logFile.getParent();
                 this.relocatedDbLogBaseDir = new 
File(relocatedDbLogDir).toPath();
                 dbOptions.setDbLogDir(relocatedDbLogDir);
+            } else {
+                setLocalForStPathAsLogDir(dbOptions);
             }
+        } else {
+            setLocalForStPathAsLogDir(dbOptions);
+        }
+    }
+
+    private void setLocalForStPathAsLogDir(DBOptions dbOptions) {
+        // Currently, ForStDB does not support mixing local-dir and 
remote-dir, and ForStDB will
+        // concatenates the dfs directory with the local directory as working 
dir when using flink
+        // env. We expect to directly use the dfs directory in flink env or 
local directory as
+        // working dir. We will implement this in ForStDB later, but before 
that, we achieved this
+        // by setting the dbPath to "/" when the dfs directory existed. 
Another problem is that when
+        // the system property "log.file" is not set, ForSt directly uses the 
instance path as the
+        // log dir, which results in "/" being used as the log directory. This 
often has permission
+        // issues, so the db log dir is temporarily set explicitly here.
+        // TODO: remove this method after ForSt deal log dir well
+        if (localForStPath != null) {
+            this.relocatedDbLogBaseDir = localForStPath.toPath();
+            dbOptions.setDbLogDir(localForStPath.getPath());
         }
     }
 
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java
index c2521772b44..7528b4d5265 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java
@@ -50,7 +50,6 @@ import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.net.URI;
-import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -90,7 +89,7 @@ public class ForStStateBackend extends 
AbstractManagedMemoryStateBackend
      * configuration values will be used. The configuration will fallback to 
local directory by
      * default. TODO: fallback to checkpoint directory if not configured.
      */
-    @Nullable private URI remoteForStDirectory;
+    @Nullable private Path remoteForStDirectory;
 
     /**
      * Base paths for ForSt directory, as configured. Null if not yet set, in 
which case the
@@ -154,15 +153,7 @@ public class ForStStateBackend extends 
AbstractManagedMemoryStateBackend
             this.remoteForStDirectory = original.remoteForStDirectory;
         } else {
             String remoteDirStr = config.get(ForStOptions.REMOTE_DIRECTORY);
-            try {
-                this.remoteForStDirectory = remoteDirStr == null ? null : new 
URI(remoteDirStr);
-            } catch (URISyntaxException e) {
-                throw new RuntimeException(
-                        String.format(
-                                "Exception when transform %s to URI, the value 
is: %s",
-                                ForStOptions.REMOTE_DIRECTORY.key(), 
remoteDirStr),
-                        e);
-            }
+            this.remoteForStDirectory = remoteDirStr == null ? null : new 
Path(remoteDirStr);
         }
 
         // configure local directories
@@ -308,12 +299,17 @@ public class ForStStateBackend extends 
AbstractManagedMemoryStateBackend
 
         lazyInitializeForJob(env, fileCompatibleIdentifier);
 
-        String childPath =
-                "job_" + jobId + "_op_" + fileCompatibleIdentifier + "_uuid_" 
+ UUID.randomUUID();
+        String opChildPath =
+                String.format(
+                        "op_%s_attempt_%s",
+                        fileCompatibleIdentifier, 
env.getTaskInfo().getAttemptNumber());
 
-        File localBasePath = new File(getNextStoragePath(), childPath);
-        URI remoteBasePath =
-                remoteForStDirectory != null ? 
remoteForStDirectory.resolve(childPath) : null;
+        File localBasePath =
+                new File(new File(getNextStoragePath(), jobId.toHexString()), 
opChildPath);
+        Path remoteBasePath =
+                remoteForStDirectory != null
+                        ? new Path(new Path(remoteForStDirectory, 
jobId.toHexString()), opChildPath)
+                        : null;
 
         final OpaqueMemoryResource<ForStSharedResources> sharedResources =
                 ForStOperationUtils.allocateSharedCachesIfConfigured(
@@ -570,7 +566,7 @@ public class ForStStateBackend extends 
AbstractManagedMemoryStateBackend
     private ForStResourceContainer createOptionsAndResourceContainer(
             @Nullable OpaqueMemoryResource<ForStSharedResources> 
sharedResources,
             @Nullable File localBasePath,
-            @Nullable URI remoteBasePath,
+            @Nullable Path remoteBasePath,
             boolean enableStatistics) {
 
         return new ForStResourceContainer(
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStResourceContainerTest.java
 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStResourceContainerTest.java
index bec699931c6..a3d4e7fa979 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStResourceContainerTest.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStResourceContainerTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.state.forst;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.memory.OpaqueMemoryResource;
 import org.apache.flink.util.function.ThrowingRunnable;
 
@@ -43,7 +44,6 @@ import org.rocksdb.WriteOptions;
 import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Field;
-import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
@@ -303,18 +303,18 @@ public class ForStResourceContainerTest {
     @Test
     public void testDirectoryResources() throws Exception {
         File localBasePath = TMP_FOLDER.newFolder();
-        URI remoteBasePath = TMP_FOLDER.newFolder().toURI();
+        Path remoteBasePath = new Path(TMP_FOLDER.newFolder().getPath());
         try (final ForStResourceContainer optionsContainer =
                 new ForStResourceContainer(
                         new Configuration(), null, null, localBasePath, 
remoteBasePath, false)) {
             optionsContainer.prepareDirectories();
             assertTrue(localBasePath.exists());
-            assertTrue(new File(remoteBasePath).exists());
+            assertTrue(new File(remoteBasePath.getPath()).exists());
             assertTrue(optionsContainer.getDbOptions().getEnv() instanceof 
FlinkEnv);
 
             optionsContainer.clearDirectories();
             assertFalse(localBasePath.exists());
-            assertFalse(new File(remoteBasePath).exists());
+            assertFalse(new File(remoteBasePath.getPath()).exists());
         }
     }
 }

Reply via email to