This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 398bb506870  [FLINK-33798][statebackend/rocksdb] automatically clean 
up rocksdb logs when the task exited. (#23922)
398bb506870 is described below

commit 398bb5068703a6461e2e52a90c97078e4263059a
Author: liming.1018 <69444450+limin...@users.noreply.github.com>
AuthorDate: Wed Mar 13 11:55:52 2024 +0800

     [FLINK-33798][statebackend/rocksdb] automatically clean up rocksdb logs 
when the task exited. (#23922)
---
 .../streaming/state/RocksDBResourceContainer.java  | 57 +++++++++++++++++++++-
 .../state/RocksDBStateBackendConfigTest.java       | 57 ++++++++++++++++++++++
 2 files changed, 112 insertions(+), 2 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
index ae4298c8e38..c74a41cb7a1 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.runtime.memory.OpaqueMemoryResource;
+import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.Preconditions;
 
@@ -44,7 +45,11 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 
 import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -59,8 +64,11 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 public final class RocksDBResourceContainer implements AutoCloseable {
     private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBResourceContainer.class);
 
+    private static final String ROCKSDB_RELOCATE_LOG_SUFFIX = "_LOG";
+
     // the filename length limit is 255 on most operating systems
-    private static final int INSTANCE_PATH_LENGTH_LIMIT = 255 - 
"_LOG".length();
+    private static final int INSTANCE_PATH_LENGTH_LIMIT =
+            255 - ROCKSDB_RELOCATE_LOG_SUFFIX.length();
 
     @Nullable private final File instanceRocksDBPath;
 
@@ -85,6 +93,8 @@ public final class RocksDBResourceContainer implements 
AutoCloseable {
     /** The handles to be closed when the container is closed. */
     private final ArrayList<AutoCloseable> handlesToClose;
 
+    @Nullable private Path relocatedDbLogBaseDir;
+
     @VisibleForTesting
     public RocksDBResourceContainer() {
         this(new Configuration(), PredefinedOptions.DEFAULT, null, null, null, 
false);
@@ -267,6 +277,7 @@ public final class RocksDBResourceContainer implements 
AutoCloseable {
         if (sharedResources != null) {
             sharedResources.close();
         }
+        cleanRelocatedDbLogs();
     }
 
     /**
@@ -426,7 +437,9 @@ public final class RocksDBResourceContainer implements 
AutoCloseable {
         if (logFilePath != null) {
             File logFile = resolveFileLocation(logFilePath);
             if (logFile != null && resolveFileLocation(logFile.getParent()) != 
null) {
-                dbOptions.setDbLogDir(logFile.getParent());
+                String relocatedDbLogDir = logFile.getParent();
+                this.relocatedDbLogBaseDir = new 
File(relocatedDbLogDir).toPath();
+                dbOptions.setDbLogDir(relocatedDbLogDir);
             }
         }
     }
@@ -441,4 +454,44 @@ public final class RocksDBResourceContainer implements 
AutoCloseable {
         File logFile = new File(logFilePath);
         return (logFile.exists() && logFile.canRead()) ? logFile : null;
     }
+
+    /** Clean all relocated rocksdb logs. */
+    private void cleanRelocatedDbLogs() {
+        if (instanceRocksDBPath != null && relocatedDbLogBaseDir != null) {
+            LOG.info("Cleaning up relocated RocksDB logs: {}.", 
relocatedDbLogBaseDir);
+
+            String relocatedDbLogPrefix =
+                    
resolveRelocatedDbLogPrefix(instanceRocksDBPath.getAbsolutePath());
+            try {
+                Arrays.stream(FileUtils.listDirectory(relocatedDbLogBaseDir))
+                        .filter(
+                                path ->
+                                        !Files.isDirectory(path)
+                                                && path.toFile()
+                                                        .getName()
+                                                        
.startsWith(relocatedDbLogPrefix))
+                        .forEach(IOUtils::deleteFileQuietly);
+            } catch (IOException e) {
+                LOG.warn(
+                        "Could not list relocated RocksDB log directory: {}",
+                        relocatedDbLogBaseDir);
+            }
+        }
+    }
+
+    /**
+     * Resolve the prefix of rocksdb's log file name according to rocksdb's 
log file name rules. See
+     * 
https://github.com/ververica/frocksdb/blob/FRocksDB-6.20.3/file/filename.cc#L30.
+     *
+     * @param instanceRocksDBAbsolutePath The path where the rocksdb directory 
is located.
+     * @return Resolved rocksdb log name prefix.
+     */
+    private String resolveRelocatedDbLogPrefix(String 
instanceRocksDBAbsolutePath) {
+        if (!instanceRocksDBAbsolutePath.isEmpty()
+                && 
!instanceRocksDBAbsolutePath.matches("^[a-zA-Z0-9\\-._].*")) {
+            instanceRocksDBAbsolutePath = 
instanceRocksDBAbsolutePath.substring(1);
+        }
+        return instanceRocksDBAbsolutePath.replaceAll("[^a-zA-Z0-9\\-._]", "_")
+                + ROCKSDB_RELOCATE_LOG_SUFFIX;
+    }
 }
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index 79869038bd7..484b24e4dc5 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.fs.FileSystem;
@@ -47,13 +48,16 @@ import 
org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 import org.apache.flink.testutils.junit.FailsInGHAContainerWithRootUser;
+import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.IOUtils;
 
+import org.apache.commons.lang3.RandomUtils;
 import org.junit.Assert;
 import org.junit.Assume;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.jupiter.api.Timeout;
 import org.junit.rules.TemporaryFolder;
 import org.rocksdb.BlockBasedTableConfig;
 import org.rocksdb.BloomFilter;
@@ -61,10 +65,12 @@ import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.CompactionStyle;
 import org.rocksdb.CompressionType;
 import org.rocksdb.DBOptions;
+import org.rocksdb.FlushOptions;
 import org.rocksdb.InfoLogLevel;
 import org.rocksdb.util.SizeUnit;
 
 import java.io.File;
+import java.nio.file.Files;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -367,6 +373,57 @@ public class RocksDBStateBackendConfigTest {
         rocksDbBackend.setDbStoragePath("relative/path");
     }
 
+    @Test
+    @Timeout(value = 60)
+    public void testCleanRelocatedDbLogs() throws Exception {
+        final File folder = tempFolder.newFolder();
+        final File relocatedDBLogDir = tempFolder.newFolder("db_logs");
+        final File logFile = new File(relocatedDBLogDir, "taskManager.log");
+        Files.createFile(logFile.toPath());
+        System.setProperty("log.file", logFile.getAbsolutePath());
+
+        Configuration conf = new Configuration();
+        conf.set(RocksDBConfigurableOptions.LOG_LEVEL, 
InfoLogLevel.DEBUG_LEVEL);
+        conf.set(RocksDBConfigurableOptions.LOG_FILE_NUM, 4);
+        conf.set(RocksDBConfigurableOptions.LOG_MAX_FILE_SIZE, 
MemorySize.parse("1kb"));
+        final EmbeddedRocksDBStateBackend rocksDbBackend =
+                new EmbeddedRocksDBStateBackend().configure(conf, 
getClass().getClassLoader());
+        final String dbStoragePath = new 
Path(folder.toURI().toString()).toString();
+        rocksDbBackend.setDbStoragePath(dbStoragePath);
+
+        final MockEnvironment env = getMockEnvironment(tempFolder.newFolder());
+        RocksDBKeyedStateBackend<Integer> keyedBackend =
+                createKeyedStateBackend(rocksDbBackend, env, 
IntSerializer.INSTANCE);
+
+        File instanceBasePath = keyedBackend.getInstanceBasePath();
+        File instanceRocksDBPath =
+                
RocksDBKeyedStateBackendBuilder.getInstanceRocksDBPath(instanceBasePath);
+
+        // avoid tests without relocate.
+        Assume.assumeTrue(instanceRocksDBPath.getAbsolutePath().length() <= 
255 - "_LOG".length());
+
+        java.nio.file.Path[] relocatedDbLogs;
+        try {
+            relocatedDbLogs = 
FileUtils.listDirectory(relocatedDBLogDir.toPath());
+            while (relocatedDbLogs.length <= 2) {
+                // If the default number of log files in rocksdb is not 
enough, add more logs.
+                try (FlushOptions flushOptions = new FlushOptions()) {
+                    keyedBackend.db.put(RandomUtils.nextBytes(32), 
RandomUtils.nextBytes(512));
+                    keyedBackend.db.flush(flushOptions);
+                }
+                relocatedDbLogs = 
FileUtils.listDirectory(relocatedDBLogDir.toPath());
+            }
+        } finally {
+            IOUtils.closeQuietly(keyedBackend);
+            keyedBackend.dispose();
+            env.close();
+        }
+
+        relocatedDbLogs = FileUtils.listDirectory(relocatedDBLogDir.toPath());
+        assertEquals(1, relocatedDbLogs.length);
+        assertEquals("taskManager.log", relocatedDbLogs[0].toFile().getName());
+    }
+
     // ------------------------------------------------------------------------
     //  RocksDB local file automatic from temp directories
     // ------------------------------------------------------------------------

Reply via email to