This is an automated email from the ASF dual-hosted git repository. tangyun pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new bdb0233eb83 [FLINK-31743][statebackend/rocksdb] disable rocksdb log relocating when instance path too long bdb0233eb83 is described below commit bdb0233eb83629c3bb1b1487057b22891c41c437 Author: wangfeifan <zoltar9...@163.com> AuthorDate: Fri Apr 21 23:43:31 2023 +0800 [FLINK-31743][statebackend/rocksdb] disable rocksdb log relocating when instance path too long This closes #22458. --- .../state/EmbeddedRocksDBStateBackend.java | 10 +++++--- .../state/RocksDBKeyedStateBackendBuilder.java | 6 ++++- .../streaming/state/RocksDBResourceContainer.java | 30 +++++++++++++++++++--- .../streaming/state/RocksDBStateBackend.java | 2 +- .../state/EmbeddedRocksDBStateBackendTest.java | 5 ++-- .../state/RocksDBStateBackendConfigTest.java | 19 ++++++++++++-- 6 files changed, 59 insertions(+), 13 deletions(-) diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java index e65d925f453..94ea783f670 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java @@ -473,7 +473,9 @@ public class EmbeddedRocksDBStateBackend extends AbstractManagedMemoryStateBacke } final RocksDBResourceContainer resourceContainer = createOptionsAndResourceContainer( - sharedResources, nativeMetricOptions.isStatisticsEnabled()); + sharedResources, + instanceBasePath, + nativeMetricOptions.isStatisticsEnabled()); ExecutionConfig executionConfig = env.getExecutionConfig(); StreamCompressionDecorator keyGroupCompressionDecorator = @@ -875,13 +877,14 @@ public class EmbeddedRocksDBStateBackend extends AbstractManagedMemoryStateBacke } @VisibleForTesting - RocksDBResourceContainer createOptionsAndResourceContainer() { - return createOptionsAndResourceContainer(null, false); + RocksDBResourceContainer createOptionsAndResourceContainer(@Nullable File instanceBasePath) { + return createOptionsAndResourceContainer(null, instanceBasePath, false); } @VisibleForTesting private RocksDBResourceContainer createOptionsAndResourceContainer( @Nullable OpaqueMemoryResource<RocksDBSharedResources> sharedResources, + @Nullable File instanceBasePath, boolean enableStatistics) { return new RocksDBResourceContainer( @@ -889,6 +892,7 @@ public class EmbeddedRocksDBStateBackend extends AbstractManagedMemoryStateBacke predefinedOptions != null ? predefinedOptions : PredefinedOptions.DEFAULT, rocksDbOptionsFactory, sharedResources, + instanceBasePath, enableStatistics); } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java index 7007f1b5463..3df43d7224e 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java @@ -166,7 +166,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken this.columnFamilyOptionsFactory = Preconditions.checkNotNull(columnFamilyOptionsFactory); this.optionsContainer = optionsContainer; this.instanceBasePath = instanceBasePath; - this.instanceRocksDBPath = new File(instanceBasePath, DB_INSTANCE_DIR_STRING); + this.instanceRocksDBPath = getInstanceRocksDBPath(instanceBasePath); this.metricGroup = metricGroup; this.enableIncrementalCheckpointing = false; this.nativeMetricOptions = new RocksDBNativeMetricOptions(); @@ -264,6 +264,10 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken return this; } + public static File getInstanceRocksDBPath(File instanceBasePath) { + return new File(instanceBasePath, DB_INSTANCE_DIR_STRING); + } + private static void checkAndCreateDirectory(File directory) throws IOException { if (directory.exists()) { if (!directory.isDirectory()) { diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java index ed652980d04..74cfdb0d24c 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java @@ -59,6 +59,11 @@ import static org.apache.flink.util.Preconditions.checkNotNull; public final class RocksDBResourceContainer implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(RocksDBResourceContainer.class); + // the filename length limit is 255 on most operating systems + private static final int INSTANCE_PATH_LENGTH_LIMIT = 255 - "_LOG".length(); + + @Nullable private final File instanceRocksDBPath; + /** The configurations from file. */ private final ReadableConfig configuration; @@ -82,13 +87,13 @@ public final class RocksDBResourceContainer implements AutoCloseable { @VisibleForTesting public RocksDBResourceContainer() { - this(new Configuration(), PredefinedOptions.DEFAULT, null, null, false); + this(new Configuration(), PredefinedOptions.DEFAULT, null, null, null, false); } @VisibleForTesting public RocksDBResourceContainer( PredefinedOptions predefinedOptions, @Nullable RocksDBOptionsFactory optionsFactory) { - this(new Configuration(), predefinedOptions, optionsFactory, null, false); + this(new Configuration(), predefinedOptions, optionsFactory, null, null, false); } @VisibleForTesting @@ -96,7 +101,7 @@ public final class RocksDBResourceContainer implements AutoCloseable { PredefinedOptions predefinedOptions, @Nullable RocksDBOptionsFactory optionsFactory, @Nullable OpaqueMemoryResource<RocksDBSharedResources> sharedResources) { - this(new Configuration(), predefinedOptions, optionsFactory, sharedResources, false); + this(new Configuration(), predefinedOptions, optionsFactory, sharedResources, null, false); } public RocksDBResourceContainer( @@ -104,12 +109,19 @@ public final class RocksDBResourceContainer implements AutoCloseable { PredefinedOptions predefinedOptions, @Nullable RocksDBOptionsFactory optionsFactory, @Nullable OpaqueMemoryResource<RocksDBSharedResources> sharedResources, + @Nullable File instanceBasePath, boolean enableStatistics) { this.configuration = configuration; this.predefinedOptions = checkNotNull(predefinedOptions); this.optionsFactory = optionsFactory; this.sharedResources = sharedResources; + + this.instanceRocksDBPath = + instanceBasePath != null + ? RocksDBKeyedStateBackendBuilder.getInstanceRocksDBPath(instanceBasePath) + : null; + this.enableStatistics = enableStatistics; this.handlesToClose = new ArrayList<>(); } @@ -314,7 +326,17 @@ public final class RocksDBResourceContainer implements AutoCloseable { String logDir = internalGetOption(RocksDBConfigurableOptions.LOG_DIR); if (logDir == null || logDir.isEmpty()) { - relocateDefaultDbLogDir(currentOptions); + if (instanceRocksDBPath == null + || instanceRocksDBPath.getAbsolutePath().length() + <= INSTANCE_PATH_LENGTH_LIMIT) { + relocateDefaultDbLogDir(currentOptions); + } else { + // disable log relocate when instance path length exceeds limit to prevent rocksdb + // log file creation failure, details in FLINK-31743 + LOG.warn( + "RocksDB instance path length exceeds limit : {}, disable log relocate.", + instanceRocksDBPath); + } } else { currentOptions.setDbLogDir(logDir); } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java index 20be1e085f0..1275a5aa928 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java @@ -575,7 +575,7 @@ public class RocksDBStateBackend extends AbstractManagedMemoryStateBackend @VisibleForTesting RocksDBResourceContainer createOptionsAndResourceContainer() { - return rocksDBStateBackend.createOptionsAndResourceContainer(); + return rocksDBStateBackend.createOptionsAndResourceContainer(null); } @Override diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java index 46788406c16..4d6b533d662 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java @@ -86,7 +86,6 @@ import java.util.Queue; import java.util.concurrent.RunnableFuture; import static junit.framework.TestCase.assertNotNull; -import static org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.DB_INSTANCE_DIR_STRING; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -148,7 +147,9 @@ public class EmbeddedRocksDBStateBackendTest private final RocksDBResourceContainer optionsContainer = new RocksDBResourceContainer(); public void prepareRocksDB() throws Exception { - String dbPath = new File(TEMP_FOLDER.newFolder(), DB_INSTANCE_DIR_STRING).getAbsolutePath(); + String dbPath = + RocksDBKeyedStateBackendBuilder.getInstanceRocksDBPath(TEMP_FOLDER.newFolder()) + .getAbsolutePath(); ColumnFamilyOptions columnOptions = optionsContainer.getColumnOptions(); ArrayList<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(1); diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java index e16d1504812..36e0d3c2bce 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java @@ -101,7 +101,7 @@ public class RocksDBStateBackendConfigTest { final File logFile = File.createTempFile(getClass().getSimpleName() + "-", ".log"); // set the environment variable 'log.file' with the Flink log file location System.setProperty("log.file", logFile.getPath()); - try (RocksDBResourceContainer container = backend.createOptionsAndResourceContainer()) { + try (RocksDBResourceContainer container = backend.createOptionsAndResourceContainer(null)) { assertEquals( RocksDBConfigurableOptions.LOG_LEVEL.defaultValue(), container.getDbOptions().infoLogLevel()); @@ -109,6 +109,19 @@ public class RocksDBStateBackendConfigTest { } finally { logFile.delete(); } + + StringBuilder longInstanceBasePath = + new StringBuilder(tempFolder.newFolder().getAbsolutePath()); + while (longInstanceBasePath.length() < 255) { + longInstanceBasePath.append("/append-for-long-path"); + } + try (RocksDBResourceContainer container = + backend.createOptionsAndResourceContainer( + new File(longInstanceBasePath.toString()))) { + assertTrue(container.getDbOptions().dbLogDir().isEmpty()); + } finally { + logFile.delete(); + } } // ------------------------------------------------------------------------ @@ -531,7 +544,7 @@ public class RocksDBStateBackendConfigTest { try (RocksDBResourceContainer optionsContainer = new RocksDBResourceContainer( - configuration, PredefinedOptions.DEFAULT, null, null, false)) { + configuration, PredefinedOptions.DEFAULT, null, null, null, false)) { DBOptions dbOptions = optionsContainer.getDbOptions(); assertEquals(-1, dbOptions.maxOpenFiles()); @@ -614,6 +627,7 @@ public class RocksDBStateBackendConfigTest { PredefinedOptions.SPINNING_DISK_OPTIMIZED, null, null, + null, false)) { final ColumnFamilyOptions columnFamilyOptions = optionsContainer.getColumnOptions(); @@ -627,6 +641,7 @@ public class RocksDBStateBackendConfigTest { PredefinedOptions.SPINNING_DISK_OPTIMIZED, null, null, + null, false)) { final ColumnFamilyOptions columnFamilyOptions = optionsContainer.getColumnOptions();