1996fanrui commented on code in PR #23922: URL: https://github.com/apache/flink/pull/23922#discussion_r1427612626
########## flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java: ########## @@ -359,6 +361,50 @@ public void testDbPathRelativePaths() throws Exception { rocksDbBackend.setDbStoragePath("relative/path"); } + @Test + public void testCleanRelocatedDbLogs() throws Exception { + final File folder = tempFolder.newFolder(); + final File relocatedDBLogDir = tempFolder.newFolder("db_logs"); + final File logFile = new File(relocatedDBLogDir, "taskManager.log"); + Files.createFile(logFile.toPath()); + System.setProperty("log.file", logFile.getAbsolutePath()); + + final String dbStoragePath = new Path(folder.toURI().toString()).toString(); + final EmbeddedRocksDBStateBackend rocksDbBackend = new EmbeddedRocksDBStateBackend(); + rocksDbBackend.setDbStoragePath(dbStoragePath); + + final MockEnvironment env = getMockEnvironment(tempFolder.newFolder()); + RocksDBKeyedStateBackend<Integer> keyedBackend = + createKeyedStateBackend(rocksDbBackend, env, IntSerializer.INSTANCE); + // clear unused file + FileUtils.deleteFileOrDirectory(logFile); + + File instanceBasePath = keyedBackend.getInstanceBasePath(); + File instanceRocksDBPath = + RocksDBKeyedStateBackendBuilder.getInstanceRocksDBPath(instanceBasePath); + + // avoid tests without relocate. + Assume.assumeTrue(instanceRocksDBPath.getAbsolutePath().length() <= 255 - "_LOG".length()); + + String relocatedDbLogPrefix = + RocksDBResourceContainer.resolveRelocatedDbLogPrefix( + instanceRocksDBPath.getAbsolutePath()); + java.nio.file.Path[] relocatedDbLogs; + try { + relocatedDbLogs = FileUtils.listDirectory(relocatedDBLogDir.toPath()); + assertTrue(relocatedDbLogs[0].getFileName().startsWith(relocatedDbLogPrefix)); + // add a rolled log file + Files.createTempFile(relocatedDBLogDir.toPath(), relocatedDbLogPrefix, ".suffix"); Review Comment: Could we let rocksdb create LOG file and calling the close to clean it? If so, we don't need to call the `resolveRelocatedDbLogPrefix`. Current test just tests create an file based on `resolveRelocatedDbLogPrefix`, and clean it based on `resolveRelocatedDbLogPrefix`. If `resolveRelocatedDbLogPrefix` has some bugs or its rule isn't same with rocksdb side. This test cannot cover it. Also, if we let rocksdb create LOG file, current test will fail if rocksdb log file rule is changed in the future. WDYT? ########## flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java: ########## @@ -438,4 +451,44 @@ private File resolveFileLocation(String logFilePath) { File logFile = new File(logFilePath); return (logFile.exists() && logFile.canRead()) ? logFile : null; } + + /** Clean all relocated rocksdb logs. */ + private void cleanRelocatedDbLogs() { + if (instanceRocksDBPath != null && relocatedDbLogBaseDir != null) { + LOG.info("Cleaning up relocated RocksDB logs: {}.", relocatedDbLogBaseDir); + + String relocatedDbLogPrefix = + resolveRelocatedDbLogPrefix(instanceRocksDBPath.getAbsolutePath()); + try { + Arrays.stream(FileUtils.listDirectory(relocatedDbLogBaseDir)) + .filter( + path -> + !Files.isDirectory(path) + && path.toFile() + .getName() + .startsWith(relocatedDbLogPrefix)) + .forEach(IOUtils::deleteFileQuietly); + } catch (IOException e) { + LOG.warn( + "Could not list relocated RocksDB log directory: {}", + relocatedDbLogBaseDir); + } + } + } + + /** + * Resolve the prefix of rocksdb's log file name according to rocksdb's log file name rules. See + * https://github.com/ververica/frocksdb/blob/FRocksDB-6.20.3/file/filename.cc#L30. + * + * @param instanceRocksDBAbsolutePath The path where the rocksdb directory is located. + * @return Resolved rocksdb log name prefix. + */ + public static String resolveRelocatedDbLogPrefix(String instanceRocksDBAbsolutePath) { Review Comment: Based on the last comment, I prefer this method is a private method. Also, if it's public method in the end, please add `@VisibleForTesting` annotation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org