Re: [PR] [FLINK-33798][statebackend/rocksdb] automatically clean up rocksdb logs when the task exited. [flink]
1996fanrui merged PR #24495: URL: https://github.com/apache/flink/pull/24495 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33798][statebackend/rocksdb] automatically clean up rocksdb logs when the task exited. [flink]
1996fanrui merged PR #24494: URL: https://github.com/apache/flink/pull/24494 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33798][statebackend/rocksdb] automatically clean up rocksdb logs when the task exited. [flink]
1996fanrui merged PR #24493: URL: https://github.com/apache/flink/pull/24493 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33798][statebackend/rocksdb] automatically clean up rocksdb logs when the task exited. [flink]
liming30 commented on PR #24493: URL: https://github.com/apache/flink/pull/24493#issuecomment-1996621199 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33798][statebackend/rocksdb] automatically clean up rocksdb logs when the task exited. [flink]
flinkbot commented on PR #24495: URL: https://github.com/apache/flink/pull/24495#issuecomment-1996344641 ## CI report: * d22b1166f0c742eda39ce0bcf214855683642668 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33798][statebackend/rocksdb] automatically clean up rocksdb logs when the task exited. [flink]
flinkbot commented on PR #24493: URL: https://github.com/apache/flink/pull/24493#issuecomment-1996344418 ## CI report: * 908a6fb7d7400b0a3fa76a09452b1cbc79e1a972 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33798][statebackend/rocksdb] automatically clean up rocksdb logs when the task exited. [flink]
flinkbot commented on PR #24494: URL: https://github.com/apache/flink/pull/24494#issuecomment-1996344542 ## CI report: * d9e9cbec2790dc80e276b04cef7c739410cdad04 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33798][statebackend/rocksdb] automatically clean up rocksdb logs when the task exited. [flink]
liming30 commented on PR #24493: URL: https://github.com/apache/flink/pull/24493#issuecomment-1996342006 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-33798][statebackend/rocksdb] automatically clean up rocksdb logs when the task exited. [flink]
liming30 opened a new pull request, #24494: URL: https://github.com/apache/flink/pull/24494 Backporting https://github.com/apache/flink/pull/23922 to release-1.18 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-33798][statebackend/rocksdb] automatically clean up rocksdb logs when the task exited. [flink]
liming30 opened a new pull request, #24493: URL: https://github.com/apache/flink/pull/24493 Backporting #23922 to release-1.17 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-33798][statebackend/rocksdb] automatically clean up rocksdb logs when the task exited. [flink]
liming30 opened a new pull request, #24495: URL: https://github.com/apache/flink/pull/24495 Backporting https://github.com/apache/flink/pull/23922 to release-1.19 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33798][statebackend/rocksdb] automatically clean up rocksdb logs when the task exited. [flink]
1996fanrui merged PR #23922: URL: https://github.com/apache/flink/pull/23922 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33798][statebackend/rocksdb] automatically clean up rocksdb logs when the task exited. [flink]
liming30 commented on code in PR #23922: URL: https://github.com/apache/flink/pull/23922#discussion_r1430990459 ## flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java: ## @@ -359,6 +365,58 @@ public void testDbPathRelativePaths() throws Exception { rocksDbBackend.setDbStoragePath("relative/path"); } +@Test +@Timeout(value = 60) +public void testCleanRelocatedDbLogs() throws Exception { +final File folder = tempFolder.newFolder(); +final File relocatedDBLogDir = tempFolder.newFolder("db_logs"); +final File logFile = new File(relocatedDBLogDir, "taskManager.log"); +Files.createFile(logFile.toPath()); +System.setProperty("log.file", logFile.getAbsolutePath()); + +Configuration conf = new Configuration(); +conf.set(RocksDBConfigurableOptions.LOG_LEVEL, InfoLogLevel.DEBUG_LEVEL); +conf.set(RocksDBConfigurableOptions.LOG_FILE_NUM, 4); +conf.set(RocksDBConfigurableOptions.LOG_MAX_FILE_SIZE, MemorySize.parse("1kb")); +final EmbeddedRocksDBStateBackend rocksDbBackend = +new EmbeddedRocksDBStateBackend().configure(conf, getClass().getClassLoader()); +final String dbStoragePath = new Path(folder.toURI().toString()).toString(); +rocksDbBackend.setDbStoragePath(dbStoragePath); + +final MockEnvironment env = getMockEnvironment(tempFolder.newFolder()); +RocksDBKeyedStateBackend keyedBackend = +createKeyedStateBackend(rocksDbBackend, env, IntSerializer.INSTANCE); +// clear unused file +FileUtils.deleteFileOrDirectory(logFile); + +File instanceBasePath = keyedBackend.getInstanceBasePath(); +File instanceRocksDBPath = + RocksDBKeyedStateBackendBuilder.getInstanceRocksDBPath(instanceBasePath); + +// avoid tests without relocate. +Assume.assumeTrue(instanceRocksDBPath.getAbsolutePath().length() <= 255 - "_LOG".length()); Review Comment: I noticed that the length limit issue was being addressed in `frocksdb`, which would ensure relocations occurred, but there was no further progress on the related PR. https://github.com/ververica/frocksdb/pull/66 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33798][statebackend/rocksdb] automatically clean up rocksdb logs when the task exited. [flink]
liming30 commented on code in PR #23922: URL: https://github.com/apache/flink/pull/23922#discussion_r1430988502 ## flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java: ## @@ -359,6 +365,58 @@ public void testDbPathRelativePaths() throws Exception { rocksDbBackend.setDbStoragePath("relative/path"); } +@Test +@Timeout(value = 60) +public void testCleanRelocatedDbLogs() throws Exception { +final File folder = tempFolder.newFolder(); +final File relocatedDBLogDir = tempFolder.newFolder("db_logs"); +final File logFile = new File(relocatedDBLogDir, "taskManager.log"); +Files.createFile(logFile.toPath()); +System.setProperty("log.file", logFile.getAbsolutePath()); + +Configuration conf = new Configuration(); +conf.set(RocksDBConfigurableOptions.LOG_LEVEL, InfoLogLevel.DEBUG_LEVEL); +conf.set(RocksDBConfigurableOptions.LOG_FILE_NUM, 4); +conf.set(RocksDBConfigurableOptions.LOG_MAX_FILE_SIZE, MemorySize.parse("1kb")); +final EmbeddedRocksDBStateBackend rocksDbBackend = +new EmbeddedRocksDBStateBackend().configure(conf, getClass().getClassLoader()); +final String dbStoragePath = new Path(folder.toURI().toString()).toString(); +rocksDbBackend.setDbStoragePath(dbStoragePath); + +final MockEnvironment env = getMockEnvironment(tempFolder.newFolder()); +RocksDBKeyedStateBackend keyedBackend = +createKeyedStateBackend(rocksDbBackend, env, IntSerializer.INSTANCE); +// clear unused file +FileUtils.deleteFileOrDirectory(logFile); Review Comment: Good idea, I will resolve it in the next commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33798][statebackend/rocksdb] automatically clean up rocksdb logs when the task exited. [flink]
liming30 commented on code in PR #23922: URL: https://github.com/apache/flink/pull/23922#discussion_r1430987609 ## flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java: ## @@ -359,6 +365,58 @@ public void testDbPathRelativePaths() throws Exception { rocksDbBackend.setDbStoragePath("relative/path"); } +@Test +@Timeout(value = 60) +public void testCleanRelocatedDbLogs() throws Exception { +final File folder = tempFolder.newFolder(); +final File relocatedDBLogDir = tempFolder.newFolder("db_logs"); +final File logFile = new File(relocatedDBLogDir, "taskManager.log"); +Files.createFile(logFile.toPath()); +System.setProperty("log.file", logFile.getAbsolutePath()); + +Configuration conf = new Configuration(); +conf.set(RocksDBConfigurableOptions.LOG_LEVEL, InfoLogLevel.DEBUG_LEVEL); +conf.set(RocksDBConfigurableOptions.LOG_FILE_NUM, 4); +conf.set(RocksDBConfigurableOptions.LOG_MAX_FILE_SIZE, MemorySize.parse("1kb")); +final EmbeddedRocksDBStateBackend rocksDbBackend = +new EmbeddedRocksDBStateBackend().configure(conf, getClass().getClassLoader()); +final String dbStoragePath = new Path(folder.toURI().toString()).toString(); +rocksDbBackend.setDbStoragePath(dbStoragePath); + +final MockEnvironment env = getMockEnvironment(tempFolder.newFolder()); +RocksDBKeyedStateBackend keyedBackend = +createKeyedStateBackend(rocksDbBackend, env, IntSerializer.INSTANCE); +// clear unused file +FileUtils.deleteFileOrDirectory(logFile); + +File instanceBasePath = keyedBackend.getInstanceBasePath(); +File instanceRocksDBPath = + RocksDBKeyedStateBackendBuilder.getInstanceRocksDBPath(instanceBasePath); + +// avoid tests without relocate. +Assume.assumeTrue(instanceRocksDBPath.getAbsolutePath().length() <= 255 - "_LOG".length()); Review Comment: Sorry, I cannot guarantee that the length of `instanceRocksDBPath` will be less than `255 - "_LOG".length()` in this test, which means that relocation may not occur. Forcibly specifying an absolute path may cause problems such as the path not existing and no permissions on the test machine. Is there any other way to ensure that relocation will definitely happen? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33798][statebackend/rocksdb] automatically clean up rocksdb logs when the task exited. [flink]
1996fanrui commented on code in PR #23922: URL: https://github.com/apache/flink/pull/23922#discussion_r1430885170 ## flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java: ## @@ -359,6 +365,58 @@ public void testDbPathRelativePaths() throws Exception { rocksDbBackend.setDbStoragePath("relative/path"); } +@Test +@Timeout(value = 60) +public void testCleanRelocatedDbLogs() throws Exception { +final File folder = tempFolder.newFolder(); +final File relocatedDBLogDir = tempFolder.newFolder("db_logs"); +final File logFile = new File(relocatedDBLogDir, "taskManager.log"); +Files.createFile(logFile.toPath()); +System.setProperty("log.file", logFile.getAbsolutePath()); + +Configuration conf = new Configuration(); +conf.set(RocksDBConfigurableOptions.LOG_LEVEL, InfoLogLevel.DEBUG_LEVEL); +conf.set(RocksDBConfigurableOptions.LOG_FILE_NUM, 4); +conf.set(RocksDBConfigurableOptions.LOG_MAX_FILE_SIZE, MemorySize.parse("1kb")); +final EmbeddedRocksDBStateBackend rocksDbBackend = +new EmbeddedRocksDBStateBackend().configure(conf, getClass().getClassLoader()); +final String dbStoragePath = new Path(folder.toURI().toString()).toString(); +rocksDbBackend.setDbStoragePath(dbStoragePath); + +final MockEnvironment env = getMockEnvironment(tempFolder.newFolder()); +RocksDBKeyedStateBackend keyedBackend = +createKeyedStateBackend(rocksDbBackend, env, IntSerializer.INSTANCE); +// clear unused file +FileUtils.deleteFileOrDirectory(logFile); Review Comment: We don't need to clean the `taskManager.log`, right? If so, we can check that only rocksdb log is cleaned. (We should ensure rocksdb doesn't clean other logs.) ## flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java: ## @@ -359,6 +365,58 @@ public void testDbPathRelativePaths() throws Exception { rocksDbBackend.setDbStoragePath("relative/path"); } +@Test +@Timeout(value = 60) +public void testCleanRelocatedDbLogs() throws Exception { +final File folder = tempFolder.newFolder(); +final File relocatedDBLogDir = tempFolder.newFolder("db_logs"); +final File logFile = new File(relocatedDBLogDir, "taskManager.log"); +Files.createFile(logFile.toPath()); +System.setProperty("log.file", logFile.getAbsolutePath()); + +Configuration conf = new Configuration(); +conf.set(RocksDBConfigurableOptions.LOG_LEVEL, InfoLogLevel.DEBUG_LEVEL); +conf.set(RocksDBConfigurableOptions.LOG_FILE_NUM, 4); +conf.set(RocksDBConfigurableOptions.LOG_MAX_FILE_SIZE, MemorySize.parse("1kb")); +final EmbeddedRocksDBStateBackend rocksDbBackend = +new EmbeddedRocksDBStateBackend().configure(conf, getClass().getClassLoader()); +final String dbStoragePath = new Path(folder.toURI().toString()).toString(); +rocksDbBackend.setDbStoragePath(dbStoragePath); + +final MockEnvironment env = getMockEnvironment(tempFolder.newFolder()); +RocksDBKeyedStateBackend keyedBackend = +createKeyedStateBackend(rocksDbBackend, env, IntSerializer.INSTANCE); +// clear unused file +FileUtils.deleteFileOrDirectory(logFile); + +File instanceBasePath = keyedBackend.getInstanceBasePath(); +File instanceRocksDBPath = + RocksDBKeyedStateBackendBuilder.getInstanceRocksDBPath(instanceBasePath); + +// avoid tests without relocate. +Assume.assumeTrue(instanceRocksDBPath.getAbsolutePath().length() <= 255 - "_LOG".length()); Review Comment: Could we ensure the relocation always take effect in current test? If we cannot ensure it, this test will be ignored, and we can't found this bug if the log clean logic is broken in the future. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33798][statebackend/rocksdb] automatically clean up rocksdb logs when the task exited. [flink]
liming30 commented on code in PR #23922: URL: https://github.com/apache/flink/pull/23922#discussion_r1429554092 ## flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java: ## @@ -359,6 +361,50 @@ public void testDbPathRelativePaths() throws Exception { rocksDbBackend.setDbStoragePath("relative/path"); } +@Test +public void testCleanRelocatedDbLogs() throws Exception { +final File folder = tempFolder.newFolder(); +final File relocatedDBLogDir = tempFolder.newFolder("db_logs"); +final File logFile = new File(relocatedDBLogDir, "taskManager.log"); +Files.createFile(logFile.toPath()); +System.setProperty("log.file", logFile.getAbsolutePath()); + +final String dbStoragePath = new Path(folder.toURI().toString()).toString(); +final EmbeddedRocksDBStateBackend rocksDbBackend = new EmbeddedRocksDBStateBackend(); +rocksDbBackend.setDbStoragePath(dbStoragePath); + +final MockEnvironment env = getMockEnvironment(tempFolder.newFolder()); +RocksDBKeyedStateBackend keyedBackend = +createKeyedStateBackend(rocksDbBackend, env, IntSerializer.INSTANCE); +// clear unused file +FileUtils.deleteFileOrDirectory(logFile); + +File instanceBasePath = keyedBackend.getInstanceBasePath(); +File instanceRocksDBPath = + RocksDBKeyedStateBackendBuilder.getInstanceRocksDBPath(instanceBasePath); + +// avoid tests without relocate. +Assume.assumeTrue(instanceRocksDBPath.getAbsolutePath().length() <= 255 - "_LOG".length()); + +String relocatedDbLogPrefix = +RocksDBResourceContainer.resolveRelocatedDbLogPrefix( +instanceRocksDBPath.getAbsolutePath()); +java.nio.file.Path[] relocatedDbLogs; +try { +relocatedDbLogs = FileUtils.listDirectory(relocatedDBLogDir.toPath()); + assertTrue(relocatedDbLogs[0].getFileName().startsWith(relocatedDbLogPrefix)); +// add a rolled log file +Files.createTempFile(relocatedDBLogDir.toPath(), relocatedDbLogPrefix, ".suffix"); Review Comment: Resolved. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33798][statebackend/rocksdb] automatically clean up rocksdb logs when the task exited. [flink]
1996fanrui commented on code in PR #23922: URL: https://github.com/apache/flink/pull/23922#discussion_r1427612626 ## flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java: ## @@ -359,6 +361,50 @@ public void testDbPathRelativePaths() throws Exception { rocksDbBackend.setDbStoragePath("relative/path"); } +@Test +public void testCleanRelocatedDbLogs() throws Exception { +final File folder = tempFolder.newFolder(); +final File relocatedDBLogDir = tempFolder.newFolder("db_logs"); +final File logFile = new File(relocatedDBLogDir, "taskManager.log"); +Files.createFile(logFile.toPath()); +System.setProperty("log.file", logFile.getAbsolutePath()); + +final String dbStoragePath = new Path(folder.toURI().toString()).toString(); +final EmbeddedRocksDBStateBackend rocksDbBackend = new EmbeddedRocksDBStateBackend(); +rocksDbBackend.setDbStoragePath(dbStoragePath); + +final MockEnvironment env = getMockEnvironment(tempFolder.newFolder()); +RocksDBKeyedStateBackend keyedBackend = +createKeyedStateBackend(rocksDbBackend, env, IntSerializer.INSTANCE); +// clear unused file +FileUtils.deleteFileOrDirectory(logFile); + +File instanceBasePath = keyedBackend.getInstanceBasePath(); +File instanceRocksDBPath = + RocksDBKeyedStateBackendBuilder.getInstanceRocksDBPath(instanceBasePath); + +// avoid tests without relocate. +Assume.assumeTrue(instanceRocksDBPath.getAbsolutePath().length() <= 255 - "_LOG".length()); + +String relocatedDbLogPrefix = +RocksDBResourceContainer.resolveRelocatedDbLogPrefix( +instanceRocksDBPath.getAbsolutePath()); +java.nio.file.Path[] relocatedDbLogs; +try { +relocatedDbLogs = FileUtils.listDirectory(relocatedDBLogDir.toPath()); + assertTrue(relocatedDbLogs[0].getFileName().startsWith(relocatedDbLogPrefix)); +// add a rolled log file +Files.createTempFile(relocatedDBLogDir.toPath(), relocatedDbLogPrefix, ".suffix"); Review Comment: Could we let rocksdb create LOG file and calling the close to clean it? If so, we don't need to call the `resolveRelocatedDbLogPrefix`. Current test just tests create an file based on `resolveRelocatedDbLogPrefix`, and clean it based on `resolveRelocatedDbLogPrefix`. If `resolveRelocatedDbLogPrefix` has some bugs or its rule isn't same with rocksdb side. This test cannot cover it. Also, if we let rocksdb create LOG file, current test will fail if rocksdb log file rule is changed in the future. WDYT? ## flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java: ## @@ -438,4 +451,44 @@ private File resolveFileLocation(String logFilePath) { File logFile = new File(logFilePath); return (logFile.exists() && logFile.canRead()) ? logFile : null; } + +/** Clean all relocated rocksdb logs. */ +private void cleanRelocatedDbLogs() { +if (instanceRocksDBPath != null && relocatedDbLogBaseDir != null) { +LOG.info("Cleaning up relocated RocksDB logs: {}.", relocatedDbLogBaseDir); + +String relocatedDbLogPrefix = + resolveRelocatedDbLogPrefix(instanceRocksDBPath.getAbsolutePath()); +try { +Arrays.stream(FileUtils.listDirectory(relocatedDbLogBaseDir)) +.filter( +path -> +!Files.isDirectory(path) +&& path.toFile() +.getName() + .startsWith(relocatedDbLogPrefix)) +.forEach(IOUtils::deleteFileQuietly); +} catch (IOException e) { +LOG.warn( +"Could not list relocated RocksDB log directory: {}", +relocatedDbLogBaseDir); +} +} +} + +/** + * Resolve the prefix of rocksdb's log file name according to rocksdb's log file name rules. See + * https://github.com/ververica/frocksdb/blob/FRocksDB-6.20.3/file/filename.cc#L30. + * + * @param instanceRocksDBAbsolutePath The path where the rocksdb directory is located. + * @return Resolved rocksdb log name prefix. + */ +public static String resolveRelocatedDbLogPrefix(String instanceRocksDBAbsolutePath) { Review Comment: Based on the last comment, I prefer this method is a private method. Also, if it's public method in the end, please add `@VisibleForTesting` annotation. -- This is an automated message
Re: [PR] [FLINK-33798][statebackend/rocksdb] automatically clean up rocksdb logs when the task exited. [flink]
liming30 commented on PR #23922: URL: https://github.com/apache/flink/pull/23922#issuecomment-1855059115 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33798][statebackend/rocksdb] automatically clean up rocksdb logs when the task exited. [flink]
flinkbot commented on PR #23922: URL: https://github.com/apache/flink/pull/23922#issuecomment-1854091789 ## CI report: * 7c02b054dfcf713a87e92344bbc95d0d8bd12393 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-33798][statebackend/rocksdb] automatically clean up rocksdb logs when the task exited. [flink]
liming30 opened a new pull request, #23922: URL: https://github.com/apache/flink/pull/23922 ## What is the purpose of the change [FLINK-33798][statebackend/rocksdb] automatically clean up rocksdb logs when the task exited. ## Brief change log When the task exits, delete all rocksdb log files with the same prefix in the relocated directory. ## Verifying this change This change added tests and can be verified as follows: - org.apache.flink.contrib.streaming.state.RocksDBStateBackendConfigTest#testCleanRelocatedDbLogs ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org