This is an automated email from the ASF dual-hosted git repository.

tangyun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new bdb0233eb83 [FLINK-31743][statebackend/rocksdb] disable rocksdb log 
relocating when instance path too long
bdb0233eb83 is described below

commit bdb0233eb83629c3bb1b1487057b22891c41c437
Author: wangfeifan <zoltar9...@163.com>
AuthorDate: Fri Apr 21 23:43:31 2023 +0800

    [FLINK-31743][statebackend/rocksdb] disable rocksdb log relocating when 
instance path too long
    
    This closes #22458.
---
 .../state/EmbeddedRocksDBStateBackend.java         | 10 +++++---
 .../state/RocksDBKeyedStateBackendBuilder.java     |  6 ++++-
 .../streaming/state/RocksDBResourceContainer.java  | 30 +++++++++++++++++++---
 .../streaming/state/RocksDBStateBackend.java       |  2 +-
 .../state/EmbeddedRocksDBStateBackendTest.java     |  5 ++--
 .../state/RocksDBStateBackendConfigTest.java       | 19 ++++++++++++--
 6 files changed, 59 insertions(+), 13 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
index e65d925f453..94ea783f670 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
@@ -473,7 +473,9 @@ public class EmbeddedRocksDBStateBackend extends 
AbstractManagedMemoryStateBacke
         }
         final RocksDBResourceContainer resourceContainer =
                 createOptionsAndResourceContainer(
-                        sharedResources, 
nativeMetricOptions.isStatisticsEnabled());
+                        sharedResources,
+                        instanceBasePath,
+                        nativeMetricOptions.isStatisticsEnabled());
 
         ExecutionConfig executionConfig = env.getExecutionConfig();
         StreamCompressionDecorator keyGroupCompressionDecorator =
@@ -875,13 +877,14 @@ public class EmbeddedRocksDBStateBackend extends 
AbstractManagedMemoryStateBacke
     }
 
     @VisibleForTesting
-    RocksDBResourceContainer createOptionsAndResourceContainer() {
-        return createOptionsAndResourceContainer(null, false);
+    RocksDBResourceContainer createOptionsAndResourceContainer(@Nullable File 
instanceBasePath) {
+        return createOptionsAndResourceContainer(null, instanceBasePath, 
false);
     }
 
     @VisibleForTesting
     private RocksDBResourceContainer createOptionsAndResourceContainer(
             @Nullable OpaqueMemoryResource<RocksDBSharedResources> 
sharedResources,
+            @Nullable File instanceBasePath,
             boolean enableStatistics) {
 
         return new RocksDBResourceContainer(
@@ -889,6 +892,7 @@ public class EmbeddedRocksDBStateBackend extends 
AbstractManagedMemoryStateBacke
                 predefinedOptions != null ? predefinedOptions : 
PredefinedOptions.DEFAULT,
                 rocksDbOptionsFactory,
                 sharedResources,
+                instanceBasePath,
                 enableStatistics);
     }
 
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
index 7007f1b5463..3df43d7224e 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
@@ -166,7 +166,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
         this.columnFamilyOptionsFactory = 
Preconditions.checkNotNull(columnFamilyOptionsFactory);
         this.optionsContainer = optionsContainer;
         this.instanceBasePath = instanceBasePath;
-        this.instanceRocksDBPath = new File(instanceBasePath, 
DB_INSTANCE_DIR_STRING);
+        this.instanceRocksDBPath = getInstanceRocksDBPath(instanceBasePath);
         this.metricGroup = metricGroup;
         this.enableIncrementalCheckpointing = false;
         this.nativeMetricOptions = new RocksDBNativeMetricOptions();
@@ -264,6 +264,10 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
         return this;
     }
 
+    public static File getInstanceRocksDBPath(File instanceBasePath) {
+        return new File(instanceBasePath, DB_INSTANCE_DIR_STRING);
+    }
+
     private static void checkAndCreateDirectory(File directory) throws 
IOException {
         if (directory.exists()) {
             if (!directory.isDirectory()) {
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
index ed652980d04..74cfdb0d24c 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
@@ -59,6 +59,11 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 public final class RocksDBResourceContainer implements AutoCloseable {
     private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBResourceContainer.class);
 
+    // the filename length limit is 255 on most operating systems
+    private static final int INSTANCE_PATH_LENGTH_LIMIT = 255 - 
"_LOG".length();
+
+    @Nullable private final File instanceRocksDBPath;
+
     /** The configurations from file. */
     private final ReadableConfig configuration;
 
@@ -82,13 +87,13 @@ public final class RocksDBResourceContainer implements 
AutoCloseable {
 
     @VisibleForTesting
     public RocksDBResourceContainer() {
-        this(new Configuration(), PredefinedOptions.DEFAULT, null, null, 
false);
+        this(new Configuration(), PredefinedOptions.DEFAULT, null, null, null, 
false);
     }
 
     @VisibleForTesting
     public RocksDBResourceContainer(
             PredefinedOptions predefinedOptions, @Nullable 
RocksDBOptionsFactory optionsFactory) {
-        this(new Configuration(), predefinedOptions, optionsFactory, null, 
false);
+        this(new Configuration(), predefinedOptions, optionsFactory, null, 
null, false);
     }
 
     @VisibleForTesting
@@ -96,7 +101,7 @@ public final class RocksDBResourceContainer implements 
AutoCloseable {
             PredefinedOptions predefinedOptions,
             @Nullable RocksDBOptionsFactory optionsFactory,
             @Nullable OpaqueMemoryResource<RocksDBSharedResources> 
sharedResources) {
-        this(new Configuration(), predefinedOptions, optionsFactory, 
sharedResources, false);
+        this(new Configuration(), predefinedOptions, optionsFactory, 
sharedResources, null, false);
     }
 
     public RocksDBResourceContainer(
@@ -104,12 +109,19 @@ public final class RocksDBResourceContainer implements 
AutoCloseable {
             PredefinedOptions predefinedOptions,
             @Nullable RocksDBOptionsFactory optionsFactory,
             @Nullable OpaqueMemoryResource<RocksDBSharedResources> 
sharedResources,
+            @Nullable File instanceBasePath,
             boolean enableStatistics) {
 
         this.configuration = configuration;
         this.predefinedOptions = checkNotNull(predefinedOptions);
         this.optionsFactory = optionsFactory;
         this.sharedResources = sharedResources;
+
+        this.instanceRocksDBPath =
+                instanceBasePath != null
+                        ? 
RocksDBKeyedStateBackendBuilder.getInstanceRocksDBPath(instanceBasePath)
+                        : null;
+
         this.enableStatistics = enableStatistics;
         this.handlesToClose = new ArrayList<>();
     }
@@ -314,7 +326,17 @@ public final class RocksDBResourceContainer implements 
AutoCloseable {
 
         String logDir = internalGetOption(RocksDBConfigurableOptions.LOG_DIR);
         if (logDir == null || logDir.isEmpty()) {
-            relocateDefaultDbLogDir(currentOptions);
+            if (instanceRocksDBPath == null
+                    || instanceRocksDBPath.getAbsolutePath().length()
+                            <= INSTANCE_PATH_LENGTH_LIMIT) {
+                relocateDefaultDbLogDir(currentOptions);
+            } else {
+                // disable log relocate when instance path length exceeds 
limit to prevent rocksdb
+                // log file creation failure, details in FLINK-31743
+                LOG.warn(
+                        "RocksDB instance path length exceeds limit : {}, 
disable log relocate.",
+                        instanceRocksDBPath);
+            }
         } else {
             currentOptions.setDbLogDir(logDir);
         }
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 20be1e085f0..1275a5aa928 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -575,7 +575,7 @@ public class RocksDBStateBackend extends 
AbstractManagedMemoryStateBackend
 
     @VisibleForTesting
     RocksDBResourceContainer createOptionsAndResourceContainer() {
-        return rocksDBStateBackend.createOptionsAndResourceContainer();
+        return rocksDBStateBackend.createOptionsAndResourceContainer(null);
     }
 
     @Override
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java
index 46788406c16..4d6b533d662 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java
@@ -86,7 +86,6 @@ import java.util.Queue;
 import java.util.concurrent.RunnableFuture;
 
 import static junit.framework.TestCase.assertNotNull;
-import static 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.DB_INSTANCE_DIR_STRING;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -148,7 +147,9 @@ public class EmbeddedRocksDBStateBackendTest
     private final RocksDBResourceContainer optionsContainer = new 
RocksDBResourceContainer();
 
     public void prepareRocksDB() throws Exception {
-        String dbPath = new File(TEMP_FOLDER.newFolder(), 
DB_INSTANCE_DIR_STRING).getAbsolutePath();
+        String dbPath =
+                
RocksDBKeyedStateBackendBuilder.getInstanceRocksDBPath(TEMP_FOLDER.newFolder())
+                        .getAbsolutePath();
         ColumnFamilyOptions columnOptions = 
optionsContainer.getColumnOptions();
 
         ArrayList<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(1);
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index e16d1504812..36e0d3c2bce 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -101,7 +101,7 @@ public class RocksDBStateBackendConfigTest {
         final File logFile = File.createTempFile(getClass().getSimpleName() + 
"-", ".log");
         // set the environment variable 'log.file' with the Flink log file 
location
         System.setProperty("log.file", logFile.getPath());
-        try (RocksDBResourceContainer container = 
backend.createOptionsAndResourceContainer()) {
+        try (RocksDBResourceContainer container = 
backend.createOptionsAndResourceContainer(null)) {
             assertEquals(
                     RocksDBConfigurableOptions.LOG_LEVEL.defaultValue(),
                     container.getDbOptions().infoLogLevel());
@@ -109,6 +109,19 @@ public class RocksDBStateBackendConfigTest {
         } finally {
             logFile.delete();
         }
+
+        StringBuilder longInstanceBasePath =
+                new StringBuilder(tempFolder.newFolder().getAbsolutePath());
+        while (longInstanceBasePath.length() < 255) {
+            longInstanceBasePath.append("/append-for-long-path");
+        }
+        try (RocksDBResourceContainer container =
+                backend.createOptionsAndResourceContainer(
+                        new File(longInstanceBasePath.toString()))) {
+            assertTrue(container.getDbOptions().dbLogDir().isEmpty());
+        } finally {
+            logFile.delete();
+        }
     }
 
     // ------------------------------------------------------------------------
@@ -531,7 +544,7 @@ public class RocksDBStateBackendConfigTest {
 
             try (RocksDBResourceContainer optionsContainer =
                     new RocksDBResourceContainer(
-                            configuration, PredefinedOptions.DEFAULT, null, 
null, false)) {
+                            configuration, PredefinedOptions.DEFAULT, null, 
null, null, false)) {
 
                 DBOptions dbOptions = optionsContainer.getDbOptions();
                 assertEquals(-1, dbOptions.maxOpenFiles());
@@ -614,6 +627,7 @@ public class RocksDBStateBackendConfigTest {
                         PredefinedOptions.SPINNING_DISK_OPTIMIZED,
                         null,
                         null,
+                        null,
                         false)) {
 
             final ColumnFamilyOptions columnFamilyOptions = 
optionsContainer.getColumnOptions();
@@ -627,6 +641,7 @@ public class RocksDBStateBackendConfigTest {
                         PredefinedOptions.SPINNING_DISK_OPTIMIZED,
                         null,
                         null,
+                        null,
                         false)) {
 
             final ColumnFamilyOptions columnFamilyOptions = 
optionsContainer.getColumnOptions();

Reply via email to