1996fanrui commented on code in PR #23922:
URL: https://github.com/apache/flink/pull/23922#discussion_r1427612626


##########
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java:
##########
@@ -359,6 +361,50 @@ public void testDbPathRelativePaths() throws Exception {
         rocksDbBackend.setDbStoragePath("relative/path");
     }
 
+    @Test
+    public void testCleanRelocatedDbLogs() throws Exception {
+        final File folder = tempFolder.newFolder();
+        final File relocatedDBLogDir = tempFolder.newFolder("db_logs");
+        final File logFile = new File(relocatedDBLogDir, "taskManager.log");
+        Files.createFile(logFile.toPath());
+        System.setProperty("log.file", logFile.getAbsolutePath());
+
+        final String dbStoragePath = new 
Path(folder.toURI().toString()).toString();
+        final EmbeddedRocksDBStateBackend rocksDbBackend = new 
EmbeddedRocksDBStateBackend();
+        rocksDbBackend.setDbStoragePath(dbStoragePath);
+
+        final MockEnvironment env = getMockEnvironment(tempFolder.newFolder());
+        RocksDBKeyedStateBackend<Integer> keyedBackend =
+                createKeyedStateBackend(rocksDbBackend, env, 
IntSerializer.INSTANCE);
+        // clear unused file
+        FileUtils.deleteFileOrDirectory(logFile);
+
+        File instanceBasePath = keyedBackend.getInstanceBasePath();
+        File instanceRocksDBPath =
+                
RocksDBKeyedStateBackendBuilder.getInstanceRocksDBPath(instanceBasePath);
+
+        // avoid tests without relocate.
+        Assume.assumeTrue(instanceRocksDBPath.getAbsolutePath().length() <= 
255 - "_LOG".length());
+
+        String relocatedDbLogPrefix =
+                RocksDBResourceContainer.resolveRelocatedDbLogPrefix(
+                        instanceRocksDBPath.getAbsolutePath());
+        java.nio.file.Path[] relocatedDbLogs;
+        try {
+            relocatedDbLogs = 
FileUtils.listDirectory(relocatedDBLogDir.toPath());
+            
assertTrue(relocatedDbLogs[0].getFileName().startsWith(relocatedDbLogPrefix));
+            // add a rolled log file
+            Files.createTempFile(relocatedDBLogDir.toPath(), 
relocatedDbLogPrefix, ".suffix");

Review Comment:
   Could we let rocksdb create LOG file and calling the close to clean it?
   
   If so, we don't need to call the `resolveRelocatedDbLogPrefix`. Current test 
just tests create an file based on `resolveRelocatedDbLogPrefix`, and clean it 
based on `resolveRelocatedDbLogPrefix`.
   
   If `resolveRelocatedDbLogPrefix` has some bugs or its rule isn't same with 
rocksdb side. This test cannot cover it.
   
   Also, if we let rocksdb create LOG file, current test will fail if rocksdb 
log file rule is changed in the future.
   
   WDYT?



##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java:
##########
@@ -438,4 +451,44 @@ private File resolveFileLocation(String logFilePath) {
         File logFile = new File(logFilePath);
         return (logFile.exists() && logFile.canRead()) ? logFile : null;
     }
+
+    /** Clean all relocated rocksdb logs. */
+    private void cleanRelocatedDbLogs() {
+        if (instanceRocksDBPath != null && relocatedDbLogBaseDir != null) {
+            LOG.info("Cleaning up relocated RocksDB logs: {}.", 
relocatedDbLogBaseDir);
+
+            String relocatedDbLogPrefix =
+                    
resolveRelocatedDbLogPrefix(instanceRocksDBPath.getAbsolutePath());
+            try {
+                Arrays.stream(FileUtils.listDirectory(relocatedDbLogBaseDir))
+                        .filter(
+                                path ->
+                                        !Files.isDirectory(path)
+                                                && path.toFile()
+                                                        .getName()
+                                                        
.startsWith(relocatedDbLogPrefix))
+                        .forEach(IOUtils::deleteFileQuietly);
+            } catch (IOException e) {
+                LOG.warn(
+                        "Could not list relocated RocksDB log directory: {}",
+                        relocatedDbLogBaseDir);
+            }
+        }
+    }
+
+    /**
+     * Resolve the prefix of rocksdb's log file name according to rocksdb's 
log file name rules. See
+     * 
https://github.com/ververica/frocksdb/blob/FRocksDB-6.20.3/file/filename.cc#L30.
+     *
+     * @param instanceRocksDBAbsolutePath The path where the rocksdb directory 
is located.
+     * @return Resolved rocksdb log name prefix.
+     */
+    public static String resolveRelocatedDbLogPrefix(String 
instanceRocksDBAbsolutePath) {

Review Comment:
   Based on the last comment, I prefer this method is a private method.
   
   Also, if it's public method in the end, please add `@VisibleForTesting` 
annotation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to