Re: [PR] [FLINK-33798][statebackend/rocksdb] automatically clean up rocksdb logs when the task exited. [flink]

2024-03-14 Thread via GitHub


1996fanrui merged PR #24495:
URL: https://github.com/apache/flink/pull/24495


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33798][statebackend/rocksdb] automatically clean up rocksdb logs when the task exited. [flink]

2024-03-14 Thread via GitHub


1996fanrui merged PR #24494:
URL: https://github.com/apache/flink/pull/24494


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33798][statebackend/rocksdb] automatically clean up rocksdb logs when the task exited. [flink]

2024-03-14 Thread via GitHub


1996fanrui merged PR #24493:
URL: https://github.com/apache/flink/pull/24493


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33798][statebackend/rocksdb] automatically clean up rocksdb logs when the task exited. [flink]

2024-03-14 Thread via GitHub


liming30 commented on PR #24493:
URL: https://github.com/apache/flink/pull/24493#issuecomment-1996621199

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33798][statebackend/rocksdb] automatically clean up rocksdb logs when the task exited. [flink]

2024-03-13 Thread via GitHub


flinkbot commented on PR #24495:
URL: https://github.com/apache/flink/pull/24495#issuecomment-1996344641

   
   ## CI report:
   
   * d22b1166f0c742eda39ce0bcf214855683642668 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33798][statebackend/rocksdb] automatically clean up rocksdb logs when the task exited. [flink]

2024-03-13 Thread via GitHub


flinkbot commented on PR #24493:
URL: https://github.com/apache/flink/pull/24493#issuecomment-1996344418

   
   ## CI report:
   
   * 908a6fb7d7400b0a3fa76a09452b1cbc79e1a972 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33798][statebackend/rocksdb] automatically clean up rocksdb logs when the task exited. [flink]

2024-03-13 Thread via GitHub


flinkbot commented on PR #24494:
URL: https://github.com/apache/flink/pull/24494#issuecomment-1996344542

   
   ## CI report:
   
   * d9e9cbec2790dc80e276b04cef7c739410cdad04 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33798][statebackend/rocksdb] automatically clean up rocksdb logs when the task exited. [flink]

2024-03-13 Thread via GitHub


liming30 commented on PR #24493:
URL: https://github.com/apache/flink/pull/24493#issuecomment-1996342006

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-33798][statebackend/rocksdb] automatically clean up rocksdb logs when the task exited. [flink]

2024-03-13 Thread via GitHub


liming30 opened a new pull request, #24494:
URL: https://github.com/apache/flink/pull/24494

   Backporting https://github.com/apache/flink/pull/23922 to release-1.18


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-33798][statebackend/rocksdb] automatically clean up rocksdb logs when the task exited. [flink]

2024-03-13 Thread via GitHub


liming30 opened a new pull request, #24493:
URL: https://github.com/apache/flink/pull/24493

   Backporting #23922 to release-1.17


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-33798][statebackend/rocksdb] automatically clean up rocksdb logs when the task exited. [flink]

2024-03-13 Thread via GitHub


liming30 opened a new pull request, #24495:
URL: https://github.com/apache/flink/pull/24495

   Backporting https://github.com/apache/flink/pull/23922 to release-1.19


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33798][statebackend/rocksdb] automatically clean up rocksdb logs when the task exited. [flink]

2024-03-12 Thread via GitHub


1996fanrui merged PR #23922:
URL: https://github.com/apache/flink/pull/23922


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33798][statebackend/rocksdb] automatically clean up rocksdb logs when the task exited. [flink]

2023-12-18 Thread via GitHub


liming30 commented on code in PR #23922:
URL: https://github.com/apache/flink/pull/23922#discussion_r1430990459


##
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java:
##
@@ -359,6 +365,58 @@ public void testDbPathRelativePaths() throws Exception {
 rocksDbBackend.setDbStoragePath("relative/path");
 }
 
+@Test
+@Timeout(value = 60)
+public void testCleanRelocatedDbLogs() throws Exception {
+final File folder = tempFolder.newFolder();
+final File relocatedDBLogDir = tempFolder.newFolder("db_logs");
+final File logFile = new File(relocatedDBLogDir, "taskManager.log");
+Files.createFile(logFile.toPath());
+System.setProperty("log.file", logFile.getAbsolutePath());
+
+Configuration conf = new Configuration();
+conf.set(RocksDBConfigurableOptions.LOG_LEVEL, 
InfoLogLevel.DEBUG_LEVEL);
+conf.set(RocksDBConfigurableOptions.LOG_FILE_NUM, 4);
+conf.set(RocksDBConfigurableOptions.LOG_MAX_FILE_SIZE, 
MemorySize.parse("1kb"));
+final EmbeddedRocksDBStateBackend rocksDbBackend =
+new EmbeddedRocksDBStateBackend().configure(conf, 
getClass().getClassLoader());
+final String dbStoragePath = new 
Path(folder.toURI().toString()).toString();
+rocksDbBackend.setDbStoragePath(dbStoragePath);
+
+final MockEnvironment env = getMockEnvironment(tempFolder.newFolder());
+RocksDBKeyedStateBackend keyedBackend =
+createKeyedStateBackend(rocksDbBackend, env, 
IntSerializer.INSTANCE);
+// clear unused file
+FileUtils.deleteFileOrDirectory(logFile);
+
+File instanceBasePath = keyedBackend.getInstanceBasePath();
+File instanceRocksDBPath =
+
RocksDBKeyedStateBackendBuilder.getInstanceRocksDBPath(instanceBasePath);
+
+// avoid tests without relocate.
+Assume.assumeTrue(instanceRocksDBPath.getAbsolutePath().length() <= 
255 - "_LOG".length());

Review Comment:
   I noticed that the length limit issue was being addressed in `frocksdb`, 
which would ensure relocations occurred, but there was no further progress on 
the related PR. https://github.com/ververica/frocksdb/pull/66



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33798][statebackend/rocksdb] automatically clean up rocksdb logs when the task exited. [flink]

2023-12-18 Thread via GitHub


liming30 commented on code in PR #23922:
URL: https://github.com/apache/flink/pull/23922#discussion_r1430988502


##
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java:
##
@@ -359,6 +365,58 @@ public void testDbPathRelativePaths() throws Exception {
 rocksDbBackend.setDbStoragePath("relative/path");
 }
 
+@Test
+@Timeout(value = 60)
+public void testCleanRelocatedDbLogs() throws Exception {
+final File folder = tempFolder.newFolder();
+final File relocatedDBLogDir = tempFolder.newFolder("db_logs");
+final File logFile = new File(relocatedDBLogDir, "taskManager.log");
+Files.createFile(logFile.toPath());
+System.setProperty("log.file", logFile.getAbsolutePath());
+
+Configuration conf = new Configuration();
+conf.set(RocksDBConfigurableOptions.LOG_LEVEL, 
InfoLogLevel.DEBUG_LEVEL);
+conf.set(RocksDBConfigurableOptions.LOG_FILE_NUM, 4);
+conf.set(RocksDBConfigurableOptions.LOG_MAX_FILE_SIZE, 
MemorySize.parse("1kb"));
+final EmbeddedRocksDBStateBackend rocksDbBackend =
+new EmbeddedRocksDBStateBackend().configure(conf, 
getClass().getClassLoader());
+final String dbStoragePath = new 
Path(folder.toURI().toString()).toString();
+rocksDbBackend.setDbStoragePath(dbStoragePath);
+
+final MockEnvironment env = getMockEnvironment(tempFolder.newFolder());
+RocksDBKeyedStateBackend keyedBackend =
+createKeyedStateBackend(rocksDbBackend, env, 
IntSerializer.INSTANCE);
+// clear unused file
+FileUtils.deleteFileOrDirectory(logFile);

Review Comment:
   Good idea, I will resolve it in the next commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33798][statebackend/rocksdb] automatically clean up rocksdb logs when the task exited. [flink]

2023-12-18 Thread via GitHub


liming30 commented on code in PR #23922:
URL: https://github.com/apache/flink/pull/23922#discussion_r1430987609


##
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java:
##
@@ -359,6 +365,58 @@ public void testDbPathRelativePaths() throws Exception {
 rocksDbBackend.setDbStoragePath("relative/path");
 }
 
+@Test
+@Timeout(value = 60)
+public void testCleanRelocatedDbLogs() throws Exception {
+final File folder = tempFolder.newFolder();
+final File relocatedDBLogDir = tempFolder.newFolder("db_logs");
+final File logFile = new File(relocatedDBLogDir, "taskManager.log");
+Files.createFile(logFile.toPath());
+System.setProperty("log.file", logFile.getAbsolutePath());
+
+Configuration conf = new Configuration();
+conf.set(RocksDBConfigurableOptions.LOG_LEVEL, 
InfoLogLevel.DEBUG_LEVEL);
+conf.set(RocksDBConfigurableOptions.LOG_FILE_NUM, 4);
+conf.set(RocksDBConfigurableOptions.LOG_MAX_FILE_SIZE, 
MemorySize.parse("1kb"));
+final EmbeddedRocksDBStateBackend rocksDbBackend =
+new EmbeddedRocksDBStateBackend().configure(conf, 
getClass().getClassLoader());
+final String dbStoragePath = new 
Path(folder.toURI().toString()).toString();
+rocksDbBackend.setDbStoragePath(dbStoragePath);
+
+final MockEnvironment env = getMockEnvironment(tempFolder.newFolder());
+RocksDBKeyedStateBackend keyedBackend =
+createKeyedStateBackend(rocksDbBackend, env, 
IntSerializer.INSTANCE);
+// clear unused file
+FileUtils.deleteFileOrDirectory(logFile);
+
+File instanceBasePath = keyedBackend.getInstanceBasePath();
+File instanceRocksDBPath =
+
RocksDBKeyedStateBackendBuilder.getInstanceRocksDBPath(instanceBasePath);
+
+// avoid tests without relocate.
+Assume.assumeTrue(instanceRocksDBPath.getAbsolutePath().length() <= 
255 - "_LOG".length());

Review Comment:
   Sorry, I cannot guarantee that the length of `instanceRocksDBPath` will be 
less than `255 - "_LOG".length()` in this test, which means that relocation may 
not occur. 
   
   Forcibly specifying an absolute path may cause problems such as the path not 
existing and no permissions on the test machine. Is there any other way to 
ensure that relocation will definitely happen?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33798][statebackend/rocksdb] automatically clean up rocksdb logs when the task exited. [flink]

2023-12-18 Thread via GitHub


1996fanrui commented on code in PR #23922:
URL: https://github.com/apache/flink/pull/23922#discussion_r1430885170


##
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java:
##
@@ -359,6 +365,58 @@ public void testDbPathRelativePaths() throws Exception {
 rocksDbBackend.setDbStoragePath("relative/path");
 }
 
+@Test
+@Timeout(value = 60)
+public void testCleanRelocatedDbLogs() throws Exception {
+final File folder = tempFolder.newFolder();
+final File relocatedDBLogDir = tempFolder.newFolder("db_logs");
+final File logFile = new File(relocatedDBLogDir, "taskManager.log");
+Files.createFile(logFile.toPath());
+System.setProperty("log.file", logFile.getAbsolutePath());
+
+Configuration conf = new Configuration();
+conf.set(RocksDBConfigurableOptions.LOG_LEVEL, 
InfoLogLevel.DEBUG_LEVEL);
+conf.set(RocksDBConfigurableOptions.LOG_FILE_NUM, 4);
+conf.set(RocksDBConfigurableOptions.LOG_MAX_FILE_SIZE, 
MemorySize.parse("1kb"));
+final EmbeddedRocksDBStateBackend rocksDbBackend =
+new EmbeddedRocksDBStateBackend().configure(conf, 
getClass().getClassLoader());
+final String dbStoragePath = new 
Path(folder.toURI().toString()).toString();
+rocksDbBackend.setDbStoragePath(dbStoragePath);
+
+final MockEnvironment env = getMockEnvironment(tempFolder.newFolder());
+RocksDBKeyedStateBackend keyedBackend =
+createKeyedStateBackend(rocksDbBackend, env, 
IntSerializer.INSTANCE);
+// clear unused file
+FileUtils.deleteFileOrDirectory(logFile);

Review Comment:
   We don't need to clean the `taskManager.log`, right?
   
   If so, we can check that only rocksdb log is cleaned.  (We should ensure 
rocksdb doesn't clean other logs.)



##
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java:
##
@@ -359,6 +365,58 @@ public void testDbPathRelativePaths() throws Exception {
 rocksDbBackend.setDbStoragePath("relative/path");
 }
 
+@Test
+@Timeout(value = 60)
+public void testCleanRelocatedDbLogs() throws Exception {
+final File folder = tempFolder.newFolder();
+final File relocatedDBLogDir = tempFolder.newFolder("db_logs");
+final File logFile = new File(relocatedDBLogDir, "taskManager.log");
+Files.createFile(logFile.toPath());
+System.setProperty("log.file", logFile.getAbsolutePath());
+
+Configuration conf = new Configuration();
+conf.set(RocksDBConfigurableOptions.LOG_LEVEL, 
InfoLogLevel.DEBUG_LEVEL);
+conf.set(RocksDBConfigurableOptions.LOG_FILE_NUM, 4);
+conf.set(RocksDBConfigurableOptions.LOG_MAX_FILE_SIZE, 
MemorySize.parse("1kb"));
+final EmbeddedRocksDBStateBackend rocksDbBackend =
+new EmbeddedRocksDBStateBackend().configure(conf, 
getClass().getClassLoader());
+final String dbStoragePath = new 
Path(folder.toURI().toString()).toString();
+rocksDbBackend.setDbStoragePath(dbStoragePath);
+
+final MockEnvironment env = getMockEnvironment(tempFolder.newFolder());
+RocksDBKeyedStateBackend keyedBackend =
+createKeyedStateBackend(rocksDbBackend, env, 
IntSerializer.INSTANCE);
+// clear unused file
+FileUtils.deleteFileOrDirectory(logFile);
+
+File instanceBasePath = keyedBackend.getInstanceBasePath();
+File instanceRocksDBPath =
+
RocksDBKeyedStateBackendBuilder.getInstanceRocksDBPath(instanceBasePath);
+
+// avoid tests without relocate.
+Assume.assumeTrue(instanceRocksDBPath.getAbsolutePath().length() <= 
255 - "_LOG".length());

Review Comment:
   Could we ensure the relocation always take effect in current test?
   
   If we cannot ensure it, this test will be ignored, and we can't  found this 
bug if the log clean logic is broken in the future.
   
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33798][statebackend/rocksdb] automatically clean up rocksdb logs when the task exited. [flink]

2023-12-17 Thread via GitHub


liming30 commented on code in PR #23922:
URL: https://github.com/apache/flink/pull/23922#discussion_r1429554092


##
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java:
##
@@ -359,6 +361,50 @@ public void testDbPathRelativePaths() throws Exception {
 rocksDbBackend.setDbStoragePath("relative/path");
 }
 
+@Test
+public void testCleanRelocatedDbLogs() throws Exception {
+final File folder = tempFolder.newFolder();
+final File relocatedDBLogDir = tempFolder.newFolder("db_logs");
+final File logFile = new File(relocatedDBLogDir, "taskManager.log");
+Files.createFile(logFile.toPath());
+System.setProperty("log.file", logFile.getAbsolutePath());
+
+final String dbStoragePath = new 
Path(folder.toURI().toString()).toString();
+final EmbeddedRocksDBStateBackend rocksDbBackend = new 
EmbeddedRocksDBStateBackend();
+rocksDbBackend.setDbStoragePath(dbStoragePath);
+
+final MockEnvironment env = getMockEnvironment(tempFolder.newFolder());
+RocksDBKeyedStateBackend keyedBackend =
+createKeyedStateBackend(rocksDbBackend, env, 
IntSerializer.INSTANCE);
+// clear unused file
+FileUtils.deleteFileOrDirectory(logFile);
+
+File instanceBasePath = keyedBackend.getInstanceBasePath();
+File instanceRocksDBPath =
+
RocksDBKeyedStateBackendBuilder.getInstanceRocksDBPath(instanceBasePath);
+
+// avoid tests without relocate.
+Assume.assumeTrue(instanceRocksDBPath.getAbsolutePath().length() <= 
255 - "_LOG".length());
+
+String relocatedDbLogPrefix =
+RocksDBResourceContainer.resolveRelocatedDbLogPrefix(
+instanceRocksDBPath.getAbsolutePath());
+java.nio.file.Path[] relocatedDbLogs;
+try {
+relocatedDbLogs = 
FileUtils.listDirectory(relocatedDBLogDir.toPath());
+
assertTrue(relocatedDbLogs[0].getFileName().startsWith(relocatedDbLogPrefix));
+// add a rolled log file
+Files.createTempFile(relocatedDBLogDir.toPath(), 
relocatedDbLogPrefix, ".suffix");

Review Comment:
   Resolved.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33798][statebackend/rocksdb] automatically clean up rocksdb logs when the task exited. [flink]

2023-12-14 Thread via GitHub


1996fanrui commented on code in PR #23922:
URL: https://github.com/apache/flink/pull/23922#discussion_r1427612626


##
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java:
##
@@ -359,6 +361,50 @@ public void testDbPathRelativePaths() throws Exception {
 rocksDbBackend.setDbStoragePath("relative/path");
 }
 
+@Test
+public void testCleanRelocatedDbLogs() throws Exception {
+final File folder = tempFolder.newFolder();
+final File relocatedDBLogDir = tempFolder.newFolder("db_logs");
+final File logFile = new File(relocatedDBLogDir, "taskManager.log");
+Files.createFile(logFile.toPath());
+System.setProperty("log.file", logFile.getAbsolutePath());
+
+final String dbStoragePath = new 
Path(folder.toURI().toString()).toString();
+final EmbeddedRocksDBStateBackend rocksDbBackend = new 
EmbeddedRocksDBStateBackend();
+rocksDbBackend.setDbStoragePath(dbStoragePath);
+
+final MockEnvironment env = getMockEnvironment(tempFolder.newFolder());
+RocksDBKeyedStateBackend keyedBackend =
+createKeyedStateBackend(rocksDbBackend, env, 
IntSerializer.INSTANCE);
+// clear unused file
+FileUtils.deleteFileOrDirectory(logFile);
+
+File instanceBasePath = keyedBackend.getInstanceBasePath();
+File instanceRocksDBPath =
+
RocksDBKeyedStateBackendBuilder.getInstanceRocksDBPath(instanceBasePath);
+
+// avoid tests without relocate.
+Assume.assumeTrue(instanceRocksDBPath.getAbsolutePath().length() <= 
255 - "_LOG".length());
+
+String relocatedDbLogPrefix =
+RocksDBResourceContainer.resolveRelocatedDbLogPrefix(
+instanceRocksDBPath.getAbsolutePath());
+java.nio.file.Path[] relocatedDbLogs;
+try {
+relocatedDbLogs = 
FileUtils.listDirectory(relocatedDBLogDir.toPath());
+
assertTrue(relocatedDbLogs[0].getFileName().startsWith(relocatedDbLogPrefix));
+// add a rolled log file
+Files.createTempFile(relocatedDBLogDir.toPath(), 
relocatedDbLogPrefix, ".suffix");

Review Comment:
   Could we let rocksdb create LOG file and calling the close to clean it?
   
   If so, we don't need to call the `resolveRelocatedDbLogPrefix`. Current test 
just tests create an file based on `resolveRelocatedDbLogPrefix`, and clean it 
based on `resolveRelocatedDbLogPrefix`.
   
   If `resolveRelocatedDbLogPrefix` has some bugs or its rule isn't same with 
rocksdb side. This test cannot cover it.
   
   Also, if we let rocksdb create LOG file, current test will fail if rocksdb 
log file rule is changed in the future.
   
   WDYT?



##
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java:
##
@@ -438,4 +451,44 @@ private File resolveFileLocation(String logFilePath) {
 File logFile = new File(logFilePath);
 return (logFile.exists() && logFile.canRead()) ? logFile : null;
 }
+
+/** Clean all relocated rocksdb logs. */
+private void cleanRelocatedDbLogs() {
+if (instanceRocksDBPath != null && relocatedDbLogBaseDir != null) {
+LOG.info("Cleaning up relocated RocksDB logs: {}.", 
relocatedDbLogBaseDir);
+
+String relocatedDbLogPrefix =
+
resolveRelocatedDbLogPrefix(instanceRocksDBPath.getAbsolutePath());
+try {
+Arrays.stream(FileUtils.listDirectory(relocatedDbLogBaseDir))
+.filter(
+path ->
+!Files.isDirectory(path)
+&& path.toFile()
+.getName()
+
.startsWith(relocatedDbLogPrefix))
+.forEach(IOUtils::deleteFileQuietly);
+} catch (IOException e) {
+LOG.warn(
+"Could not list relocated RocksDB log directory: {}",
+relocatedDbLogBaseDir);
+}
+}
+}
+
+/**
+ * Resolve the prefix of rocksdb's log file name according to rocksdb's 
log file name rules. See
+ * 
https://github.com/ververica/frocksdb/blob/FRocksDB-6.20.3/file/filename.cc#L30.
+ *
+ * @param instanceRocksDBAbsolutePath The path where the rocksdb directory 
is located.
+ * @return Resolved rocksdb log name prefix.
+ */
+public static String resolveRelocatedDbLogPrefix(String 
instanceRocksDBAbsolutePath) {

Review Comment:
   Based on the last comment, I prefer this method is a private method.
   
   Also, if it's public method in the end, please add `@VisibleForTesting` 
annotation.



-- 
This is an automated message 

Re: [PR] [FLINK-33798][statebackend/rocksdb] automatically clean up rocksdb logs when the task exited. [flink]

2023-12-13 Thread via GitHub


liming30 commented on PR #23922:
URL: https://github.com/apache/flink/pull/23922#issuecomment-1855059115

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33798][statebackend/rocksdb] automatically clean up rocksdb logs when the task exited. [flink]

2023-12-13 Thread via GitHub


flinkbot commented on PR #23922:
URL: https://github.com/apache/flink/pull/23922#issuecomment-1854091789

   
   ## CI report:
   
   * 7c02b054dfcf713a87e92344bbc95d0d8bd12393 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-33798][statebackend/rocksdb] automatically clean up rocksdb logs when the task exited. [flink]

2023-12-13 Thread via GitHub


liming30 opened a new pull request, #23922:
URL: https://github.com/apache/flink/pull/23922

   
   
   
   ## What is the purpose of the change
   
[FLINK-33798][statebackend/rocksdb] automatically clean up rocksdb logs 
when the task exited.
   
   
   ## Brief change log
   
   When the task exits, delete all rocksdb log files with the same prefix in 
the relocated directory.
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
   - 
org.apache.flink.contrib.streaming.state.RocksDBStateBackendConfigTest#testCleanRelocatedDbLogs
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't 
know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org