Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]
showuon merged PR #15616: URL: https://github.com/apache/kafka/pull/15616 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]
showuon commented on PR #15616: URL: https://github.com/apache/kafka/pull/15616#issuecomment-2074547860 Failed tests are unrelated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]
showuon commented on PR #15616: URL: https://github.com/apache/kafka/pull/15616#issuecomment-2073911312 Retriggering CI build : https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15616/11/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]
FrankYang0529 commented on code in PR #15616: URL: https://github.com/apache/kafka/pull/15616#discussion_r1575754146 ## core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala: ## @@ -116,6 +124,56 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest { assertEquals(Errors.KAFKA_STORAGE_ERROR, findErrorForPartition(alterReplicaDirResponse3, new TopicPartition(topic, 2))) } + @Test + def testAlterReplicaLogDirsRequestWithRetention(): Unit = { +val partitionNum = 1 + +// Alter replica dir before topic creation +val logDir1 = new File(servers.head.config.logDirs(1)).getAbsolutePath +val partitionDirs1 = (0 until partitionNum).map(partition => new TopicPartition(topic, partition) -> logDir1).toMap +val alterReplicaLogDirsResponse1 = sendAlterReplicaLogDirsRequest(partitionDirs1) + +// The response should show error UNKNOWN_TOPIC_OR_PARTITION for all partitions +val tp = new TopicPartition(topic, 0) +assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, findErrorForPartition(alterReplicaLogDirsResponse1, tp)) +assertTrue(servers.head.logManager.getLog(tp).isEmpty) + +val topicProperties = new Properties() +topicProperties.put(TopicConfig.RETENTION_BYTES_CONFIG, "1024") +topicProperties.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, "1") Review Comment: Yes, I add more comments for it. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]
FrankYang0529 commented on code in PR #15616: URL: https://github.com/apache/kafka/pull/15616#discussion_r1575753935 ## core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala: ## @@ -116,6 +124,56 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest { assertEquals(Errors.KAFKA_STORAGE_ERROR, findErrorForPartition(alterReplicaDirResponse3, new TopicPartition(topic, 2))) } + @Test + def testAlterReplicaLogDirsRequestWithRetention(): Unit = { +val partitionNum = 1 + +// Alter replica dir before topic creation +val logDir1 = new File(servers.head.config.logDirs(1)).getAbsolutePath +val partitionDirs1 = (0 until partitionNum).map(partition => new TopicPartition(topic, partition) -> logDir1).toMap +val alterReplicaLogDirsResponse1 = sendAlterReplicaLogDirsRequest(partitionDirs1) + +// The response should show error UNKNOWN_TOPIC_OR_PARTITION for all partitions +val tp = new TopicPartition(topic, 0) +assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, findErrorForPartition(alterReplicaLogDirsResponse1, tp)) +assertTrue(servers.head.logManager.getLog(tp).isEmpty) + +val topicProperties = new Properties() +topicProperties.put(TopicConfig.RETENTION_BYTES_CONFIG, "1024") +topicProperties.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, "1") +topicProperties.put(TopicConfig.SEGMENT_BYTES_CONFIG, "1024") + +createTopic(topic, partitionNum, 1, topicProperties) +assertEquals(logDir1, servers.head.logManager.getLog(tp).get.dir.getParent) + +// send enough records to trigger log rolling +(0 until 20).foreach { _ => + TestUtils.generateAndProduceMessages(servers, topic, 10, 1) +} +TestUtils.waitUntilTrue(() => servers.head.logManager.getLog(new TopicPartition(topic, 0)).get.numberOfSegments > 1, + "timed out waiting for log segment to roll") + +// Wait for log segment retention. Override initialTaskDelayMs as 5 seconds. +// The first retention task is executed after 5 seconds, so waiting for 10 seconds should be enough. Review Comment: Fixed it. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]
showuon commented on code in PR #15616: URL: https://github.com/apache/kafka/pull/15616#discussion_r1575591635 ## core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala: ## @@ -116,6 +124,56 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest { assertEquals(Errors.KAFKA_STORAGE_ERROR, findErrorForPartition(alterReplicaDirResponse3, new TopicPartition(topic, 2))) } + @Test + def testAlterReplicaLogDirsRequestWithRetention(): Unit = { +val partitionNum = 1 + +// Alter replica dir before topic creation +val logDir1 = new File(servers.head.config.logDirs(1)).getAbsolutePath +val partitionDirs1 = (0 until partitionNum).map(partition => new TopicPartition(topic, partition) -> logDir1).toMap +val alterReplicaLogDirsResponse1 = sendAlterReplicaLogDirsRequest(partitionDirs1) + +// The response should show error UNKNOWN_TOPIC_OR_PARTITION for all partitions +val tp = new TopicPartition(topic, 0) +assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, findErrorForPartition(alterReplicaLogDirsResponse1, tp)) +assertTrue(servers.head.logManager.getLog(tp).isEmpty) + +val topicProperties = new Properties() +topicProperties.put(TopicConfig.RETENTION_BYTES_CONFIG, "1024") +topicProperties.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, "1") +topicProperties.put(TopicConfig.SEGMENT_BYTES_CONFIG, "1024") + +createTopic(topic, partitionNum, 1, topicProperties) +assertEquals(logDir1, servers.head.logManager.getLog(tp).get.dir.getParent) + +// send enough records to trigger log rolling +(0 until 20).foreach { _ => + TestUtils.generateAndProduceMessages(servers, topic, 10, 1) +} +TestUtils.waitUntilTrue(() => servers.head.logManager.getLog(new TopicPartition(topic, 0)).get.numberOfSegments > 1, + "timed out waiting for log segment to roll") + +// Wait for log segment retention. Override initialTaskDelayMs as 5 seconds. +// The first retention task is executed after 5 seconds, so waiting for 10 seconds should be enough. Review Comment: This comment is not correct now. ## core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala: ## @@ -116,6 +124,56 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest { assertEquals(Errors.KAFKA_STORAGE_ERROR, findErrorForPartition(alterReplicaDirResponse3, new TopicPartition(topic, 2))) } + @Test + def testAlterReplicaLogDirsRequestWithRetention(): Unit = { +val partitionNum = 1 + +// Alter replica dir before topic creation +val logDir1 = new File(servers.head.config.logDirs(1)).getAbsolutePath +val partitionDirs1 = (0 until partitionNum).map(partition => new TopicPartition(topic, partition) -> logDir1).toMap +val alterReplicaLogDirsResponse1 = sendAlterReplicaLogDirsRequest(partitionDirs1) + +// The response should show error UNKNOWN_TOPIC_OR_PARTITION for all partitions +val tp = new TopicPartition(topic, 0) +assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, findErrorForPartition(alterReplicaLogDirsResponse1, tp)) +assertTrue(servers.head.logManager.getLog(tp).isEmpty) + +val topicProperties = new Properties() +topicProperties.put(TopicConfig.RETENTION_BYTES_CONFIG, "1024") +topicProperties.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, "1") Review Comment: I see. Make sense. It's for L158's assertion, right? Could you add a comment here to explain it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]
FrankYang0529 commented on code in PR #15616: URL: https://github.com/apache/kafka/pull/15616#discussion_r1575583127 ## core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala: ## @@ -116,6 +124,56 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest { assertEquals(Errors.KAFKA_STORAGE_ERROR, findErrorForPartition(alterReplicaDirResponse3, new TopicPartition(topic, 2))) } + @Test + def testAlterReplicaLogDirsRequestWithRetention(): Unit = { +val partitionNum = 1 + +// Alter replica dir before topic creation +val logDir1 = new File(servers.head.config.logDirs(1)).getAbsolutePath +val partitionDirs1 = (0 until partitionNum).map(partition => new TopicPartition(topic, partition) -> logDir1).toMap +val alterReplicaLogDirsResponse1 = sendAlterReplicaLogDirsRequest(partitionDirs1) + +// The response should show error UNKNOWN_TOPIC_OR_PARTITION for all partitions +val tp = new TopicPartition(topic, 0) +assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, findErrorForPartition(alterReplicaLogDirsResponse1, tp)) +assertTrue(servers.head.logManager.getLog(tp).isEmpty) + +val topicProperties = new Properties() +topicProperties.put(TopicConfig.RETENTION_BYTES_CONFIG, "1024") +topicProperties.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, "1") Review Comment: The default value is 1 minute. We want to trigger some files have `.deleted` suffix, but not be removed too fast, because we want to wait for dir movement happened. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]
FrankYang0529 commented on code in PR #15616: URL: https://github.com/apache/kafka/pull/15616#discussion_r1575583127 ## core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala: ## @@ -116,6 +124,56 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest { assertEquals(Errors.KAFKA_STORAGE_ERROR, findErrorForPartition(alterReplicaDirResponse3, new TopicPartition(topic, 2))) } + @Test + def testAlterReplicaLogDirsRequestWithRetention(): Unit = { +val partitionNum = 1 + +// Alter replica dir before topic creation +val logDir1 = new File(servers.head.config.logDirs(1)).getAbsolutePath +val partitionDirs1 = (0 until partitionNum).map(partition => new TopicPartition(topic, partition) -> logDir1).toMap +val alterReplicaLogDirsResponse1 = sendAlterReplicaLogDirsRequest(partitionDirs1) + +// The response should show error UNKNOWN_TOPIC_OR_PARTITION for all partitions +val tp = new TopicPartition(topic, 0) +assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, findErrorForPartition(alterReplicaLogDirsResponse1, tp)) +assertTrue(servers.head.logManager.getLog(tp).isEmpty) + +val topicProperties = new Properties() +topicProperties.put(TopicConfig.RETENTION_BYTES_CONFIG, "1024") +topicProperties.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, "1") Review Comment: The default value is 1 minute. We want to trigger some files have `.deleted` subfix, but not be removed too fast, because we want to wait for dir movement happened. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]
showuon commented on code in PR #15616: URL: https://github.com/apache/kafka/pull/15616#discussion_r1575558298 ## core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala: ## @@ -116,6 +124,56 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest { assertEquals(Errors.KAFKA_STORAGE_ERROR, findErrorForPartition(alterReplicaDirResponse3, new TopicPartition(topic, 2))) } + @Test + def testAlterReplicaLogDirsRequestWithRetention(): Unit = { +val partitionNum = 1 + +// Alter replica dir before topic creation +val logDir1 = new File(servers.head.config.logDirs(1)).getAbsolutePath +val partitionDirs1 = (0 until partitionNum).map(partition => new TopicPartition(topic, partition) -> logDir1).toMap +val alterReplicaLogDirsResponse1 = sendAlterReplicaLogDirsRequest(partitionDirs1) + +// The response should show error UNKNOWN_TOPIC_OR_PARTITION for all partitions +val tp = new TopicPartition(topic, 0) +assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, findErrorForPartition(alterReplicaLogDirsResponse1, tp)) +assertTrue(servers.head.logManager.getLog(tp).isEmpty) + +val topicProperties = new Properties() +topicProperties.put(TopicConfig.RETENTION_BYTES_CONFIG, "1024") +topicProperties.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, "1") Review Comment: Why 10 seconds here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]
FrankYang0529 commented on code in PR #15616: URL: https://github.com/apache/kafka/pull/15616#discussion_r1574551794 ## core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala: ## @@ -37,6 +40,10 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest { val topic = "topic" + override def brokerPropertyOverrides(properties: Properties): Unit = { +properties.put(ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_CONFIG, "5000") Review Comment: Yeah, I also add `ServerLogConfigs.LOG_CLEANUP_INTERVAL_MS_CONFIG` as `1000`. The default value is 5 minutes. If we don't add it, after the initial task, we need to wait 5 minutes for the next one. ## core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala: ## @@ -116,6 +123,57 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest { assertEquals(Errors.KAFKA_STORAGE_ERROR, findErrorForPartition(alterReplicaDirResponse3, new TopicPartition(topic, 2))) } + @Test + def testAlterReplicaLogDirsRequestWithRetention(): Unit = { +val partitionNum = 1 + +// Alter replica dir before topic creation +val logDir1 = new File(servers.head.config.logDirs(1)).getAbsolutePath +val partitionDirs1 = (0 until partitionNum).map(partition => new TopicPartition(topic, partition) -> logDir1).toMap +val alterReplicaLogDirsResponse1 = sendAlterReplicaLogDirsRequest(partitionDirs1) + +// The response should show error UNKNOWN_TOPIC_OR_PARTITION for all partitions +val tp = new TopicPartition(topic, 0) +assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, findErrorForPartition(alterReplicaLogDirsResponse1, tp)) +assertTrue(servers.head.logManager.getLog(tp).isEmpty) + +val topicProperties = new Properties() +topicProperties.put(TopicConfig.RETENTION_BYTES_CONFIG, "1024") +topicProperties.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, "1") +topicProperties.put(TopicConfig.SEGMENT_BYTES_CONFIG, "1024") + +createTopic(topic, partitionNum, 1, topicProperties) +assertEquals(logDir1, servers.head.logManager.getLog(tp).get.dir.getParent) + +// send enough records to trigger log rolling +(0 until 20).foreach { _ => + TestUtils.generateAndProduceMessages(servers, topic, 10, 1) +} +TestUtils.waitUntilTrue(() => servers.head.logManager.getLog(new TopicPartition(topic, 0)).get.numberOfSegments > 1, + "timed out waiting for log segment to roll") + +// Wait for log segment retention. Override initialTaskDelayMs as 5 seconds. +// The first retention task is executed after 5 seconds, so waiting for 10 seconds should be enough. +TestUtils.waitUntilTrue(() => { + new File(logDir1, tp.toString).listFiles().count(_.getName.endsWith(LogFileUtils.DELETED_FILE_SUFFIX)) > 0 +}, "timed out waiting for log segment to retention", 1) Review Comment: Yes, removed it. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]
showuon commented on code in PR #15616: URL: https://github.com/apache/kafka/pull/15616#discussion_r1574049613 ## core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala: ## @@ -116,6 +123,57 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest { assertEquals(Errors.KAFKA_STORAGE_ERROR, findErrorForPartition(alterReplicaDirResponse3, new TopicPartition(topic, 2))) } + @Test + def testAlterReplicaLogDirsRequestWithRetention(): Unit = { +val partitionNum = 1 + +// Alter replica dir before topic creation +val logDir1 = new File(servers.head.config.logDirs(1)).getAbsolutePath +val partitionDirs1 = (0 until partitionNum).map(partition => new TopicPartition(topic, partition) -> logDir1).toMap +val alterReplicaLogDirsResponse1 = sendAlterReplicaLogDirsRequest(partitionDirs1) + +// The response should show error UNKNOWN_TOPIC_OR_PARTITION for all partitions +val tp = new TopicPartition(topic, 0) +assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, findErrorForPartition(alterReplicaLogDirsResponse1, tp)) +assertTrue(servers.head.logManager.getLog(tp).isEmpty) + +val topicProperties = new Properties() +topicProperties.put(TopicConfig.RETENTION_BYTES_CONFIG, "1024") +topicProperties.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, "1") +topicProperties.put(TopicConfig.SEGMENT_BYTES_CONFIG, "1024") + +createTopic(topic, partitionNum, 1, topicProperties) +assertEquals(logDir1, servers.head.logManager.getLog(tp).get.dir.getParent) + +// send enough records to trigger log rolling +(0 until 20).foreach { _ => + TestUtils.generateAndProduceMessages(servers, topic, 10, 1) +} +TestUtils.waitUntilTrue(() => servers.head.logManager.getLog(new TopicPartition(topic, 0)).get.numberOfSegments > 1, + "timed out waiting for log segment to roll") + +// Wait for log segment retention. Override initialTaskDelayMs as 5 seconds. +// The first retention task is executed after 5 seconds, so waiting for 10 seconds should be enough. +TestUtils.waitUntilTrue(() => { + new File(logDir1, tp.toString).listFiles().count(_.getName.endsWith(LogFileUtils.DELETED_FILE_SUFFIX)) > 0 +}, "timed out waiting for log segment to retention", 1) Review Comment: nit: I think we can leave the timeout as default value. That is, removing the 3rd parameter directly. ## core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala: ## @@ -37,6 +40,10 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest { val topic = "topic" + override def brokerPropertyOverrides(properties: Properties): Unit = { +properties.put(ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_CONFIG, "5000") Review Comment: Why do we need to wait 5 secs for it? I would say we can set to 0 to speed up the test. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]
chia7712 commented on PR #15616: URL: https://github.com/apache/kafka/pull/15616#issuecomment-2067966296 > thanks for the great suggestion. I took a look LogSegment#deleteIfExists and LogSegment#deleteTypeIfExists. If we want to handle fallback deletion in LocalLog, we may need to return true/false in that two functions. However, LogSegment#deleteIfExists uses Utils.tryAll to handle 4 try/catch blocks. If we want to return true/false, we need to refactor Utils.tryAll as well. Finally, LogSegment#deleteIfExists is not only used by LocalLog.deleteSegmentFiles, but also LocalLog.splitOverflowedSegment. We need to handle LocalLog.splitOverflowedSegment path, too. I think we can use another Jira to track the change. Thanks. agree. Let's ship it first. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]
FrankYang0529 commented on code in PR #15616: URL: https://github.com/apache/kafka/pull/15616#discussion_r1573652117 ## core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala: ## @@ -116,6 +118,57 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest { assertEquals(Errors.KAFKA_STORAGE_ERROR, findErrorForPartition(alterReplicaDirResponse3, new TopicPartition(topic, 2))) } + @Test + def testAlterReplicaLogDirsRequestWithRetention(): Unit = { +val partitionNum = 1 + +// Alter replica dir before topic creation +val logDir1 = new File(servers.head.config.logDirs(1)).getAbsolutePath +val partitionDirs1 = (0 until partitionNum).map(partition => new TopicPartition(topic, partition) -> logDir1).toMap +val alterReplicaLogDirsResponse1 = sendAlterReplicaLogDirsRequest(partitionDirs1) + +// The response should show error UNKNOWN_TOPIC_OR_PARTITION for all partitions +val tp = new TopicPartition(topic, 0) +assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, findErrorForPartition(alterReplicaLogDirsResponse1, tp)) +assertTrue(servers.head.logManager.getLog(tp).isEmpty) + +val topicProperties = new Properties() +topicProperties.put(TopicConfig.RETENTION_BYTES_CONFIG, "1024") +topicProperties.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, "1") +topicProperties.put(TopicConfig.SEGMENT_BYTES_CONFIG, "1024") + +createTopic(topic, partitionNum, 1, topicProperties) +assertEquals(logDir1, servers.head.logManager.getLog(tp).get.dir.getParent) + +// send enough records to trigger log rolling +(0 until 20).foreach { _ => + TestUtils.generateAndProduceMessages(servers, topic, 10, 1) +} +TestUtils.waitUntilTrue(() => servers.head.logManager.getLog(new TopicPartition(topic, 0)).get.numberOfSegments > 1, + "timed out waiting for log segment to roll") + +// Wait for log segment retention. LogManager#InitialTaskDelayMs is 30 seconds. +// The first retention task is executed after 30 seconds, so waiting for 35 seconds should be enough. +TestUtils.waitUntilTrue(() => { + new File(logDir1, tp.toString).listFiles().count(_.getName.endsWith(LogFileUtils.DELETED_FILE_SUFFIX)) > 0 +}, "timed out waiting for log segment to retention", 35000) Review Comment: Thank you. I updated `LOG_INITIAL_TASK_DELAY_MS_CONFIG` as 5 seconds. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]
FrankYang0529 commented on PR #15616: URL: https://github.com/apache/kafka/pull/15616#issuecomment-2067936507 > This PR is good but it seems to me `LogSegment` should NOT guess the directory structure managed by upper class (i.e `LogManager`). > > It seems the root cause is caused by following steps: > > 1. the segments to be deleted removed from `LocalLog` > 2. `LocalLog#renameDir` move whole folder > 3. `LocalLog#renameDir` update the parent folder for all segments. However, the segments to be deleted are removed form inner collection already. Hence, some `Segment#log` has a stale file. > > If I understand correctly, another solution is that we pass a function to get latest dir when calling `deleteSegmentFiles` ( > > https://github.com/apache/kafka/blob/2d4abb85bf4a3afb1e3359a05786ab8f3fda127e/core/src/main/scala/kafka/log/LocalLog.scala#L904 > > ). If deleting segment get not-found error, we call `updateParentDir` and delete it again. > WDYT? Hi @chia7712, thanks for the great suggestion. I took a look `LogSegment#deleteIfExists` and `LogSegment#deleteTypeIfExists`. If we want to handle fallback deletion in `LocalLog`, we may need to return true/false in that two functions. However, `LogSegment#deleteIfExists` uses `Utils.tryAll` to handle 4 try/catch blocks. If we want to return true/false, we need to refactor `Utils.tryAll` as well. Finally, `LogSegment#deleteIfExists` is not only used by `LocalLog.deleteSegmentFiles`, but also `LocalLog.splitOverflowedSegment`. We need to handle `LocalLog.splitOverflowedSegment` path, too. I think we can use another Jira to track the change. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]
chia7712 commented on code in PR #15616: URL: https://github.com/apache/kafka/pull/15616#discussion_r1573271296 ## core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala: ## @@ -116,6 +118,57 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest { assertEquals(Errors.KAFKA_STORAGE_ERROR, findErrorForPartition(alterReplicaDirResponse3, new TopicPartition(topic, 2))) } + @Test + def testAlterReplicaLogDirsRequestWithRetention(): Unit = { +val partitionNum = 1 + +// Alter replica dir before topic creation +val logDir1 = new File(servers.head.config.logDirs(1)).getAbsolutePath +val partitionDirs1 = (0 until partitionNum).map(partition => new TopicPartition(topic, partition) -> logDir1).toMap +val alterReplicaLogDirsResponse1 = sendAlterReplicaLogDirsRequest(partitionDirs1) + +// The response should show error UNKNOWN_TOPIC_OR_PARTITION for all partitions +val tp = new TopicPartition(topic, 0) +assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, findErrorForPartition(alterReplicaLogDirsResponse1, tp)) +assertTrue(servers.head.logManager.getLog(tp).isEmpty) + +val topicProperties = new Properties() +topicProperties.put(TopicConfig.RETENTION_BYTES_CONFIG, "1024") +topicProperties.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, "1") +topicProperties.put(TopicConfig.SEGMENT_BYTES_CONFIG, "1024") + +createTopic(topic, partitionNum, 1, topicProperties) +assertEquals(logDir1, servers.head.logManager.getLog(tp).get.dir.getParent) + +// send enough records to trigger log rolling +(0 until 20).foreach { _ => + TestUtils.generateAndProduceMessages(servers, topic, 10, 1) +} +TestUtils.waitUntilTrue(() => servers.head.logManager.getLog(new TopicPartition(topic, 0)).get.numberOfSegments > 1, + "timed out waiting for log segment to roll") + +// Wait for log segment retention. LogManager#InitialTaskDelayMs is 30 seconds. +// The first retention task is executed after 30 seconds, so waiting for 35 seconds should be enough. +TestUtils.waitUntilTrue(() => { + new File(logDir1, tp.toString).listFiles().count(_.getName.endsWith(LogFileUtils.DELETED_FILE_SUFFIX)) > 0 +}, "timed out waiting for log segment to retention", 35000) Review Comment: @FrankYang0529 #15719 is merged now. please rebase code :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]
chia7712 commented on PR #15616: URL: https://github.com/apache/kafka/pull/15616#issuecomment-2066277923 This PR is good but it seems to me `LogSegment` should NOT guess the directory structure managed by upper class (i.e `LogManager`). It seems the root cause is caused by following steps: 1. the segments to be deleted removed from `LocalLog` 2. `LocalLog#renameDir` move whole folder 3. `LocalLog#renameDir` update the parent folder for all segments. However, the segments to be deleted are removed form inner collection already. Hence, some `Segment#log` has a stale file. If I understand correctly, another solution is that we pass a function to get latest dir when calling `deleteSegmentFiles` (https://github.com/apache/kafka/blob/2d4abb85bf4a3afb1e3359a05786ab8f3fda127e/core/src/main/scala/kafka/log/LocalLog.scala#L904). If deleting segment get not-found error, we call `updateParentDir` and delete it again. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]
showuon commented on PR #15616: URL: https://github.com/apache/kafka/pull/15616#issuecomment-2066130114 @johnnychhsu , do you have any other comments? I'll merge this at the weekend if no other comments. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]
FrankYang0529 commented on PR #15616: URL: https://github.com/apache/kafka/pull/15616#issuecomment-2063996320 > @FrankYang0529 , there is checkstyle error: `[2024-04-17T14:04:27.072Z] [ant:checkstyle] [ERROR] /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-15616/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java:70:34: Name 'futureDirPattern' must match pattern '(^[A-Z][A-Z0-9]*(_[A-Z0-9]+)*$)|(^log$)'. [ConstantName] ` > > Please help fix it. Thanks. Fixed it and check `./gradlew checkstyleMain checkstyleTest` can pass on my laptop. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]
showuon commented on PR #15616: URL: https://github.com/apache/kafka/pull/15616#issuecomment-2062889530 @FrankYang0529 , there is checkstyle error: `[2024-04-17T14:04:27.072Z] [ant:checkstyle] [ERROR] /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-15616/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java:70:34: Name 'futureDirPattern' must match pattern '(^[A-Z][A-Z0-9]*(_[A-Z0-9]+)*$)|(^log$)'. [ConstantName] ` Please help fix it. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]
showuon commented on code in PR #15616: URL: https://github.com/apache/kafka/pull/15616#discussion_r1569846240 ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java: ## @@ -801,8 +803,23 @@ private Void deleteTypeIfExists(StorageAction delete, Stri try { if (delete.execute()) LOGGER.info("Deleted {} {}.", fileType, file.getAbsolutePath()); -else if (logIfMissing) -LOGGER.info("Failed to delete {} {} because it does not exist.", fileType, file.getAbsolutePath()); +else { +if (logIfMissing) { +LOGGER.info("Failed to delete {} {} because it does not exist.", fileType, file.getAbsolutePath()); +} + +// During alter log dir, the log segment may be moved to a new directory, so async delete may fail. +// Fallback to delete the file in the new directory to avoid orphan file. +Pattern dirPattern = Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)-future"); Review Comment: Sounds good. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]
FrankYang0529 commented on code in PR #15616: URL: https://github.com/apache/kafka/pull/15616#discussion_r1569024432 ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java: ## @@ -801,8 +803,23 @@ private Void deleteTypeIfExists(StorageAction delete, Stri try { if (delete.execute()) LOGGER.info("Deleted {} {}.", fileType, file.getAbsolutePath()); -else if (logIfMissing) -LOGGER.info("Failed to delete {} {} because it does not exist.", fileType, file.getAbsolutePath()); +else { +if (logIfMissing) { +LOGGER.info("Failed to delete {} {} because it does not exist.", fileType, file.getAbsolutePath()); +} + +// During alter log dir, the log segment may be moved to a new directory, so async delete may fail. +// Fallback to delete the file in the new directory to avoid orphan file. +Pattern dirPattern = Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)-future"); Review Comment: I think we can create a file like `LogDirUtils.java` in the future, so we don't need to define same variables like `FutureDirPattern` and `FutureDirSuffix` in `LogSegment.java` and `LocalLog.scala`. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]
FrankYang0529 commented on code in PR #15616: URL: https://github.com/apache/kafka/pull/15616#discussion_r1568897953 ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java: ## @@ -801,8 +803,23 @@ private Void deleteTypeIfExists(StorageAction delete, Stri try { if (delete.execute()) LOGGER.info("Deleted {} {}.", fileType, file.getAbsolutePath()); -else if (logIfMissing) -LOGGER.info("Failed to delete {} {} because it does not exist.", fileType, file.getAbsolutePath()); +else { +if (logIfMissing) { +LOGGER.info("Failed to delete {} {} because it does not exist.", fileType, file.getAbsolutePath()); +} + +// During alter log dir, the log segment may be moved to a new directory, so async delete may fail. +// Fallback to delete the file in the new directory to avoid orphan file. +Pattern dirPattern = Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)-future"); Review Comment: Updated it. Thanks for the review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]
showuon commented on code in PR #15616: URL: https://github.com/apache/kafka/pull/15616#discussion_r1568266346 ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java: ## @@ -801,8 +803,23 @@ private Void deleteTypeIfExists(StorageAction delete, Stri try { if (delete.execute()) LOGGER.info("Deleted {} {}.", fileType, file.getAbsolutePath()); -else if (logIfMissing) -LOGGER.info("Failed to delete {} {} because it does not exist.", fileType, file.getAbsolutePath()); +else { +if (logIfMissing) { +LOGGER.info("Failed to delete {} {} because it does not exist.", fileType, file.getAbsolutePath()); +} + +// During alter log dir, the log segment may be moved to a new directory, so async delete may fail. +// Fallback to delete the file in the new directory to avoid orphan file. +Pattern dirPattern = Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)-future"); Review Comment: nit: The `future` string can be replaced with a variable. Also the regex can also become a variable, to make it readable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]
showuon commented on code in PR #15616: URL: https://github.com/apache/kafka/pull/15616#discussion_r1568241856 ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java: ## @@ -800,8 +802,23 @@ private Void deleteTypeIfExists(StorageAction delete, Stri try { if (delete.execute()) LOGGER.info("Deleted {} {}.", fileType, file.getAbsolutePath()); -else if (logIfMissing) -LOGGER.info("Failed to delete {} {} because it does not exist.", fileType, file.getAbsolutePath()); +else { +if (logIfMissing) { +LOGGER.info("Failed to delete {} {} because it does not exist.", fileType, file.getAbsolutePath()); +} + +// During alter log dir, the log segment may be moved to a new directory, so async delete may fail. +// Fallback to delete the file in the new directory to avoid orphan file. +Pattern dirPattern = Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)-(delete|future)"); Review Comment: > We can use topicPartition, but we use similar regular expression in other place. Do we also need to update it? Thanks for pointing out these to me. I was wrong. I tried to match with topic name containing `.` or `-`, they both match correctly. That means we can use this regex directly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]
FrankYang0529 commented on code in PR #15616: URL: https://github.com/apache/kafka/pull/15616#discussion_r1567555940 ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java: ## @@ -800,8 +802,23 @@ private Void deleteTypeIfExists(StorageAction delete, Stri try { if (delete.execute()) LOGGER.info("Deleted {} {}.", fileType, file.getAbsolutePath()); -else if (logIfMissing) -LOGGER.info("Failed to delete {} {} because it does not exist.", fileType, file.getAbsolutePath()); +else { +if (logIfMissing) { Review Comment: I think the term `logIfMissing` is for log, not a flag for fallback deletion. We do the best to remove orphan files even if `logIfMissing` is false. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]
FrankYang0529 commented on code in PR #15616: URL: https://github.com/apache/kafka/pull/15616#discussion_r1567544147 ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java: ## @@ -800,8 +802,23 @@ private Void deleteTypeIfExists(StorageAction delete, Stri try { if (delete.execute()) LOGGER.info("Deleted {} {}.", fileType, file.getAbsolutePath()); -else if (logIfMissing) -LOGGER.info("Failed to delete {} {} because it does not exist.", fileType, file.getAbsolutePath()); +else { +if (logIfMissing) { +LOGGER.info("Failed to delete {} {} because it does not exist.", fileType, file.getAbsolutePath()); +} + +// During alter log dir, the log segment may be moved to a new directory, so async delete may fail. +// Fallback to delete the file in the new directory to avoid orphan file. +Pattern dirPattern = Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)-(delete|future)"); Review Comment: 1. Yeah, I misunderstood file name and folder name. Removed `-delete` folder case. 2. We can use `topicPartition`, but we use similar regular expression in other place. Do we also need to update it? https://github.com/apache/kafka/blob/269b457d30940e51f532d6a1b616b9506f87232f/core/src/main/scala/kafka/log/LocalLog.scala#L586-L588 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]
FrankYang0529 commented on code in PR #15616: URL: https://github.com/apache/kafka/pull/15616#discussion_r1567531204 ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java: ## @@ -800,8 +802,23 @@ private Void deleteTypeIfExists(StorageAction delete, Stri try { if (delete.execute()) LOGGER.info("Deleted {} {}.", fileType, file.getAbsolutePath()); -else if (logIfMissing) -LOGGER.info("Failed to delete {} {} because it does not exist.", fileType, file.getAbsolutePath()); +else { +if (logIfMissing) { +LOGGER.info("Failed to delete {} {} because it does not exist.", fileType, file.getAbsolutePath()); +} + +// During alter log dir, the log segment may be moved to a new directory, so async delete may fail. +// Fallback to delete the file in the new directory to avoid orphan file. +Pattern dirPattern = Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)-(delete|future)"); +Matcher dirMatcher = dirPattern.matcher(file.getParent()); +if (dirMatcher.matches()) { +String topicPartitionAbsolutePath = dirMatcher.group(1) + "-" + dirMatcher.group(2); +File fallbackFile = new File(topicPartitionAbsolutePath, file.getName()); +if (fallbackFile.exists() && fallbackFile.delete()) { Review Comment: Sorry, after reading next comment, I find that I misunderstood this comment. One is file name and another is folder name. Yeah, it's better to check file name ends with `.deleted` before falling back deletion, so we don't accidentally delete other files. ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java: ## @@ -800,8 +802,23 @@ private Void deleteTypeIfExists(StorageAction delete, Stri try { if (delete.execute()) LOGGER.info("Deleted {} {}.", fileType, file.getAbsolutePath()); -else if (logIfMissing) -LOGGER.info("Failed to delete {} {} because it does not exist.", fileType, file.getAbsolutePath()); +else { +if (logIfMissing) { +LOGGER.info("Failed to delete {} {} because it does not exist.", fileType, file.getAbsolutePath()); +} + +// During alter log dir, the log segment may be moved to a new directory, so async delete may fail. +// Fallback to delete the file in the new directory to avoid orphan file. +Pattern dirPattern = Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)-(delete|future)"); +Matcher dirMatcher = dirPattern.matcher(file.getParent()); +if (dirMatcher.matches()) { +String topicPartitionAbsolutePath = dirMatcher.group(1) + "-" + dirMatcher.group(2); +File fallbackFile = new File(topicPartitionAbsolutePath, file.getName()); +if (fallbackFile.exists() && fallbackFile.delete()) { +LOGGER.warn("Fallback to delete {} {}.", fileType, fallbackFile.getAbsolutePath()); Review Comment: Yes, updated it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]
FrankYang0529 commented on code in PR #15616: URL: https://github.com/apache/kafka/pull/15616#discussion_r1567414173 ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java: ## @@ -800,8 +802,23 @@ private Void deleteTypeIfExists(StorageAction delete, Stri try { if (delete.execute()) LOGGER.info("Deleted {} {}.", fileType, file.getAbsolutePath()); -else if (logIfMissing) -LOGGER.info("Failed to delete {} {} because it does not exist.", fileType, file.getAbsolutePath()); +else { +if (logIfMissing) { +LOGGER.info("Failed to delete {} {} because it does not exist.", fileType, file.getAbsolutePath()); +} + +// During alter log dir, the log segment may be moved to a new directory, so async delete may fail. +// Fallback to delete the file in the new directory to avoid orphan file. +Pattern dirPattern = Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)-(delete|future)"); +Matcher dirMatcher = dirPattern.matcher(file.getParent()); +if (dirMatcher.matches()) { +String topicPartitionAbsolutePath = dirMatcher.group(1) + "-" + dirMatcher.group(2); +File fallbackFile = new File(topicPartitionAbsolutePath, file.getName()); +if (fallbackFile.exists() && fallbackFile.delete()) { Review Comment: No, I use `"^(\\S+)-(\\S+)\\.(\\S+)-(delete|future)"` as regular expression, so file name can end with `-delete` or `-future`. The `.delete()` function here will return true if the file is successfully deleted. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]
FrankYang0529 commented on code in PR #15616: URL: https://github.com/apache/kafka/pull/15616#discussion_r1567406313 ## core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala: ## @@ -116,6 +118,57 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest { assertEquals(Errors.KAFKA_STORAGE_ERROR, findErrorForPartition(alterReplicaDirResponse3, new TopicPartition(topic, 2))) } + @Test + def testAlterReplicaLogDirsRequestWithRetention(): Unit = { +val partitionNum = 1 + +// Alter replica dir before topic creation +val logDir1 = new File(servers.head.config.logDirs(1)).getAbsolutePath +val partitionDirs1 = (0 until partitionNum).map(partition => new TopicPartition(topic, partition) -> logDir1).toMap +val alterReplicaLogDirsResponse1 = sendAlterReplicaLogDirsRequest(partitionDirs1) + +// The response should show error UNKNOWN_TOPIC_OR_PARTITION for all partitions +val tp = new TopicPartition(topic, 0) +assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, findErrorForPartition(alterReplicaLogDirsResponse1, tp)) +assertTrue(servers.head.logManager.getLog(tp).isEmpty) + +val topicProperties = new Properties() +topicProperties.put(TopicConfig.RETENTION_BYTES_CONFIG, "1024") +topicProperties.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, "1") +topicProperties.put(TopicConfig.SEGMENT_BYTES_CONFIG, "1024") + +createTopic(topic, partitionNum, 1, topicProperties) +assertEquals(logDir1, servers.head.logManager.getLog(tp).get.dir.getParent) + +// send enough records to trigger log rolling +(0 until 20).foreach { _ => + TestUtils.generateAndProduceMessages(servers, topic, 10, 1) +} +TestUtils.waitUntilTrue(() => servers.head.logManager.getLog(new TopicPartition(topic, 0)).get.numberOfSegments > 1, + "timed out waiting for log segment to roll") + +// Wait for log segment retention. LogManager#InitialTaskDelayMs is 30 seconds. +// The first retention task is executed after 30 seconds, so waiting for 35 seconds should be enough. +TestUtils.waitUntilTrue(() => { + new File(logDir1, tp.toString).listFiles().count(_.getName.endsWith(LogFileUtils.DELETED_FILE_SUFFIX)) > 0 +}, "timed out waiting for log segment to retention", 35000) Review Comment: Yeah, it takes long time for first retention. I will update the case after https://github.com/apache/kafka/pull/15719 is merged. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]
johnnychhsu commented on code in PR #15616: URL: https://github.com/apache/kafka/pull/15616#discussion_r1565898495 ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java: ## @@ -800,8 +802,23 @@ private Void deleteTypeIfExists(StorageAction delete, Stri try { if (delete.execute()) LOGGER.info("Deleted {} {}.", fileType, file.getAbsolutePath()); -else if (logIfMissing) -LOGGER.info("Failed to delete {} {} because it does not exist.", fileType, file.getAbsolutePath()); +else { +if (logIfMissing) { Review Comment: why can't we just add the fall back login inside `if (logIfMissing)`, instead adding a new if-else block inside this else block? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]
showuon commented on code in PR #15616: URL: https://github.com/apache/kafka/pull/15616#discussion_r1562184559 ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java: ## @@ -800,8 +802,23 @@ private Void deleteTypeIfExists(StorageAction delete, Stri try { if (delete.execute()) LOGGER.info("Deleted {} {}.", fileType, file.getAbsolutePath()); -else if (logIfMissing) -LOGGER.info("Failed to delete {} {} because it does not exist.", fileType, file.getAbsolutePath()); +else { +if (logIfMissing) { +LOGGER.info("Failed to delete {} {} because it does not exist.", fileType, file.getAbsolutePath()); +} + +// During alter log dir, the log segment may be moved to a new directory, so async delete may fail. +// Fallback to delete the file in the new directory to avoid orphan file. +Pattern dirPattern = Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)-(delete|future)"); +Matcher dirMatcher = dirPattern.matcher(file.getParent()); +if (dirMatcher.matches()) { +String topicPartitionAbsolutePath = dirMatcher.group(1) + "-" + dirMatcher.group(2); +File fallbackFile = new File(topicPartitionAbsolutePath, file.getName()); +if (fallbackFile.exists() && fallbackFile.delete()) { Review Comment: Does the file name always ends with `.delete`? Should we check it before deletion? ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java: ## @@ -800,8 +802,23 @@ private Void deleteTypeIfExists(StorageAction delete, Stri try { if (delete.execute()) LOGGER.info("Deleted {} {}.", fileType, file.getAbsolutePath()); -else if (logIfMissing) -LOGGER.info("Failed to delete {} {} because it does not exist.", fileType, file.getAbsolutePath()); +else { +if (logIfMissing) { +LOGGER.info("Failed to delete {} {} because it does not exist.", fileType, file.getAbsolutePath()); +} + +// During alter log dir, the log segment may be moved to a new directory, so async delete may fail. +// Fallback to delete the file in the new directory to avoid orphan file. +Pattern dirPattern = Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)-(delete|future)"); Review Comment: 1. Why does it contain `delete` in the end? 2. Unfortunately, the topic name could contain `-` or `.`, so it's unsafe to do regex like this. I'm thinking we can pass `topicPartition` as parameter into `deleteTypeIfExists` so that we don't have to do further regex like this. And just verify if fileName.endsWith("future") because the normal folder name should always ends with a number (partition number), instead of "future". WDYT? ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java: ## @@ -800,8 +802,23 @@ private Void deleteTypeIfExists(StorageAction delete, Stri try { if (delete.execute()) LOGGER.info("Deleted {} {}.", fileType, file.getAbsolutePath()); -else if (logIfMissing) -LOGGER.info("Failed to delete {} {} because it does not exist.", fileType, file.getAbsolutePath()); +else { +if (logIfMissing) { +LOGGER.info("Failed to delete {} {} because it does not exist.", fileType, file.getAbsolutePath()); +} + +// During alter log dir, the log segment may be moved to a new directory, so async delete may fail. +// Fallback to delete the file in the new directory to avoid orphan file. +Pattern dirPattern = Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)-(delete|future)"); +Matcher dirMatcher = dirPattern.matcher(file.getParent()); +if (dirMatcher.matches()) { +String topicPartitionAbsolutePath = dirMatcher.group(1) + "-" + dirMatcher.group(2); +File fallbackFile = new File(topicPartitionAbsolutePath, file.getName()); +if (fallbackFile.exists() && fallbackFile.delete()) { +LOGGER.warn("Fallback to delete {} {}.", fileType, fallbackFile.getAbsolutePath()); Review Comment: Why did we use `warn` here? I think we can use `info` since it's expected behavior. WDYT? ## core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala: ## @@ -116,6 +118,57 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest { assertEquals(Errors.KAFKA_STORAGE_ERROR, findErrorForPartition(alterReplicaDirResponse3, new TopicPartition(topic, 2))) } + @Test + def
[PR] KAFKA-16424: remove truncated logs after alter dir [kafka]
FrankYang0529 opened a new pull request, #15616: URL: https://github.com/apache/kafka/pull/15616 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org