Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]

2024-04-24 Thread via GitHub


showuon merged PR #15616:
URL: https://github.com/apache/kafka/pull/15616


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]

2024-04-24 Thread via GitHub


showuon commented on PR #15616:
URL: https://github.com/apache/kafka/pull/15616#issuecomment-2074547860

   Failed tests are unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]

2024-04-23 Thread via GitHub


showuon commented on PR #15616:
URL: https://github.com/apache/kafka/pull/15616#issuecomment-2073911312

   Retriggering CI build : 
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15616/11/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]

2024-04-23 Thread via GitHub


FrankYang0529 commented on code in PR #15616:
URL: https://github.com/apache/kafka/pull/15616#discussion_r1575754146


##
core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala:
##
@@ -116,6 +124,56 @@ class AlterReplicaLogDirsRequestTest extends 
BaseRequestTest {
 assertEquals(Errors.KAFKA_STORAGE_ERROR, 
findErrorForPartition(alterReplicaDirResponse3, new TopicPartition(topic, 2)))
   }
 
+  @Test
+  def testAlterReplicaLogDirsRequestWithRetention(): Unit = {
+val partitionNum = 1
+
+// Alter replica dir before topic creation
+val logDir1 = new File(servers.head.config.logDirs(1)).getAbsolutePath
+val partitionDirs1 = (0 until partitionNum).map(partition => new 
TopicPartition(topic, partition) -> logDir1).toMap
+val alterReplicaLogDirsResponse1 = 
sendAlterReplicaLogDirsRequest(partitionDirs1)
+
+// The response should show error UNKNOWN_TOPIC_OR_PARTITION for all 
partitions
+val tp = new TopicPartition(topic, 0)
+assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, 
findErrorForPartition(alterReplicaLogDirsResponse1, tp))
+assertTrue(servers.head.logManager.getLog(tp).isEmpty)
+
+val topicProperties = new Properties()
+topicProperties.put(TopicConfig.RETENTION_BYTES_CONFIG, "1024")
+topicProperties.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, "1")

Review Comment:
   Yes, I add more comments for it. Thank you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]

2024-04-23 Thread via GitHub


FrankYang0529 commented on code in PR #15616:
URL: https://github.com/apache/kafka/pull/15616#discussion_r1575753935


##
core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala:
##
@@ -116,6 +124,56 @@ class AlterReplicaLogDirsRequestTest extends 
BaseRequestTest {
 assertEquals(Errors.KAFKA_STORAGE_ERROR, 
findErrorForPartition(alterReplicaDirResponse3, new TopicPartition(topic, 2)))
   }
 
+  @Test
+  def testAlterReplicaLogDirsRequestWithRetention(): Unit = {
+val partitionNum = 1
+
+// Alter replica dir before topic creation
+val logDir1 = new File(servers.head.config.logDirs(1)).getAbsolutePath
+val partitionDirs1 = (0 until partitionNum).map(partition => new 
TopicPartition(topic, partition) -> logDir1).toMap
+val alterReplicaLogDirsResponse1 = 
sendAlterReplicaLogDirsRequest(partitionDirs1)
+
+// The response should show error UNKNOWN_TOPIC_OR_PARTITION for all 
partitions
+val tp = new TopicPartition(topic, 0)
+assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, 
findErrorForPartition(alterReplicaLogDirsResponse1, tp))
+assertTrue(servers.head.logManager.getLog(tp).isEmpty)
+
+val topicProperties = new Properties()
+topicProperties.put(TopicConfig.RETENTION_BYTES_CONFIG, "1024")
+topicProperties.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, "1")
+topicProperties.put(TopicConfig.SEGMENT_BYTES_CONFIG, "1024")
+
+createTopic(topic, partitionNum, 1, topicProperties)
+assertEquals(logDir1, servers.head.logManager.getLog(tp).get.dir.getParent)
+
+// send enough records to trigger log rolling
+(0 until 20).foreach { _ =>
+  TestUtils.generateAndProduceMessages(servers, topic, 10, 1)
+}
+TestUtils.waitUntilTrue(() => servers.head.logManager.getLog(new 
TopicPartition(topic, 0)).get.numberOfSegments > 1,
+  "timed out waiting for log segment to roll")
+
+// Wait for log segment retention. Override initialTaskDelayMs as 5 
seconds.
+// The first retention task is executed after 5 seconds, so waiting for 10 
seconds should be enough.

Review Comment:
   Fixed it. Thank you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]

2024-04-22 Thread via GitHub


showuon commented on code in PR #15616:
URL: https://github.com/apache/kafka/pull/15616#discussion_r1575591635


##
core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala:
##
@@ -116,6 +124,56 @@ class AlterReplicaLogDirsRequestTest extends 
BaseRequestTest {
 assertEquals(Errors.KAFKA_STORAGE_ERROR, 
findErrorForPartition(alterReplicaDirResponse3, new TopicPartition(topic, 2)))
   }
 
+  @Test
+  def testAlterReplicaLogDirsRequestWithRetention(): Unit = {
+val partitionNum = 1
+
+// Alter replica dir before topic creation
+val logDir1 = new File(servers.head.config.logDirs(1)).getAbsolutePath
+val partitionDirs1 = (0 until partitionNum).map(partition => new 
TopicPartition(topic, partition) -> logDir1).toMap
+val alterReplicaLogDirsResponse1 = 
sendAlterReplicaLogDirsRequest(partitionDirs1)
+
+// The response should show error UNKNOWN_TOPIC_OR_PARTITION for all 
partitions
+val tp = new TopicPartition(topic, 0)
+assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, 
findErrorForPartition(alterReplicaLogDirsResponse1, tp))
+assertTrue(servers.head.logManager.getLog(tp).isEmpty)
+
+val topicProperties = new Properties()
+topicProperties.put(TopicConfig.RETENTION_BYTES_CONFIG, "1024")
+topicProperties.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, "1")
+topicProperties.put(TopicConfig.SEGMENT_BYTES_CONFIG, "1024")
+
+createTopic(topic, partitionNum, 1, topicProperties)
+assertEquals(logDir1, servers.head.logManager.getLog(tp).get.dir.getParent)
+
+// send enough records to trigger log rolling
+(0 until 20).foreach { _ =>
+  TestUtils.generateAndProduceMessages(servers, topic, 10, 1)
+}
+TestUtils.waitUntilTrue(() => servers.head.logManager.getLog(new 
TopicPartition(topic, 0)).get.numberOfSegments > 1,
+  "timed out waiting for log segment to roll")
+
+// Wait for log segment retention. Override initialTaskDelayMs as 5 
seconds.
+// The first retention task is executed after 5 seconds, so waiting for 10 
seconds should be enough.

Review Comment:
   This comment is not correct now.



##
core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala:
##
@@ -116,6 +124,56 @@ class AlterReplicaLogDirsRequestTest extends 
BaseRequestTest {
 assertEquals(Errors.KAFKA_STORAGE_ERROR, 
findErrorForPartition(alterReplicaDirResponse3, new TopicPartition(topic, 2)))
   }
 
+  @Test
+  def testAlterReplicaLogDirsRequestWithRetention(): Unit = {
+val partitionNum = 1
+
+// Alter replica dir before topic creation
+val logDir1 = new File(servers.head.config.logDirs(1)).getAbsolutePath
+val partitionDirs1 = (0 until partitionNum).map(partition => new 
TopicPartition(topic, partition) -> logDir1).toMap
+val alterReplicaLogDirsResponse1 = 
sendAlterReplicaLogDirsRequest(partitionDirs1)
+
+// The response should show error UNKNOWN_TOPIC_OR_PARTITION for all 
partitions
+val tp = new TopicPartition(topic, 0)
+assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, 
findErrorForPartition(alterReplicaLogDirsResponse1, tp))
+assertTrue(servers.head.logManager.getLog(tp).isEmpty)
+
+val topicProperties = new Properties()
+topicProperties.put(TopicConfig.RETENTION_BYTES_CONFIG, "1024")
+topicProperties.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, "1")

Review Comment:
   I see. Make sense. It's for L158's assertion, right? Could you add a comment 
here to explain it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]

2024-04-22 Thread via GitHub


FrankYang0529 commented on code in PR #15616:
URL: https://github.com/apache/kafka/pull/15616#discussion_r1575583127


##
core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala:
##
@@ -116,6 +124,56 @@ class AlterReplicaLogDirsRequestTest extends 
BaseRequestTest {
 assertEquals(Errors.KAFKA_STORAGE_ERROR, 
findErrorForPartition(alterReplicaDirResponse3, new TopicPartition(topic, 2)))
   }
 
+  @Test
+  def testAlterReplicaLogDirsRequestWithRetention(): Unit = {
+val partitionNum = 1
+
+// Alter replica dir before topic creation
+val logDir1 = new File(servers.head.config.logDirs(1)).getAbsolutePath
+val partitionDirs1 = (0 until partitionNum).map(partition => new 
TopicPartition(topic, partition) -> logDir1).toMap
+val alterReplicaLogDirsResponse1 = 
sendAlterReplicaLogDirsRequest(partitionDirs1)
+
+// The response should show error UNKNOWN_TOPIC_OR_PARTITION for all 
partitions
+val tp = new TopicPartition(topic, 0)
+assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, 
findErrorForPartition(alterReplicaLogDirsResponse1, tp))
+assertTrue(servers.head.logManager.getLog(tp).isEmpty)
+
+val topicProperties = new Properties()
+topicProperties.put(TopicConfig.RETENTION_BYTES_CONFIG, "1024")
+topicProperties.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, "1")

Review Comment:
   The default value is 1 minute. We want to trigger some files have `.deleted` 
suffix, but not be removed too fast, because we want to wait for dir movement 
happened.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]

2024-04-22 Thread via GitHub


FrankYang0529 commented on code in PR #15616:
URL: https://github.com/apache/kafka/pull/15616#discussion_r1575583127


##
core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala:
##
@@ -116,6 +124,56 @@ class AlterReplicaLogDirsRequestTest extends 
BaseRequestTest {
 assertEquals(Errors.KAFKA_STORAGE_ERROR, 
findErrorForPartition(alterReplicaDirResponse3, new TopicPartition(topic, 2)))
   }
 
+  @Test
+  def testAlterReplicaLogDirsRequestWithRetention(): Unit = {
+val partitionNum = 1
+
+// Alter replica dir before topic creation
+val logDir1 = new File(servers.head.config.logDirs(1)).getAbsolutePath
+val partitionDirs1 = (0 until partitionNum).map(partition => new 
TopicPartition(topic, partition) -> logDir1).toMap
+val alterReplicaLogDirsResponse1 = 
sendAlterReplicaLogDirsRequest(partitionDirs1)
+
+// The response should show error UNKNOWN_TOPIC_OR_PARTITION for all 
partitions
+val tp = new TopicPartition(topic, 0)
+assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, 
findErrorForPartition(alterReplicaLogDirsResponse1, tp))
+assertTrue(servers.head.logManager.getLog(tp).isEmpty)
+
+val topicProperties = new Properties()
+topicProperties.put(TopicConfig.RETENTION_BYTES_CONFIG, "1024")
+topicProperties.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, "1")

Review Comment:
   The default value is 1 minute. We want to trigger some files have `.deleted` 
subfix, but not be removed too fast, because we want to wait for dir movement 
happened.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]

2024-04-22 Thread via GitHub


showuon commented on code in PR #15616:
URL: https://github.com/apache/kafka/pull/15616#discussion_r1575558298


##
core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala:
##
@@ -116,6 +124,56 @@ class AlterReplicaLogDirsRequestTest extends 
BaseRequestTest {
 assertEquals(Errors.KAFKA_STORAGE_ERROR, 
findErrorForPartition(alterReplicaDirResponse3, new TopicPartition(topic, 2)))
   }
 
+  @Test
+  def testAlterReplicaLogDirsRequestWithRetention(): Unit = {
+val partitionNum = 1
+
+// Alter replica dir before topic creation
+val logDir1 = new File(servers.head.config.logDirs(1)).getAbsolutePath
+val partitionDirs1 = (0 until partitionNum).map(partition => new 
TopicPartition(topic, partition) -> logDir1).toMap
+val alterReplicaLogDirsResponse1 = 
sendAlterReplicaLogDirsRequest(partitionDirs1)
+
+// The response should show error UNKNOWN_TOPIC_OR_PARTITION for all 
partitions
+val tp = new TopicPartition(topic, 0)
+assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, 
findErrorForPartition(alterReplicaLogDirsResponse1, tp))
+assertTrue(servers.head.logManager.getLog(tp).isEmpty)
+
+val topicProperties = new Properties()
+topicProperties.put(TopicConfig.RETENTION_BYTES_CONFIG, "1024")
+topicProperties.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, "1")

Review Comment:
   Why 10 seconds here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]

2024-04-22 Thread via GitHub


FrankYang0529 commented on code in PR #15616:
URL: https://github.com/apache/kafka/pull/15616#discussion_r1574551794


##
core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala:
##
@@ -37,6 +40,10 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest 
{
 
   val topic = "topic"
 
+  override def brokerPropertyOverrides(properties: Properties): Unit = {
+properties.put(ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_CONFIG, "5000")

Review Comment:
   Yeah, I also add `ServerLogConfigs.LOG_CLEANUP_INTERVAL_MS_CONFIG` as 
`1000`. The default value is 5 minutes. If we don't add it, after the initial 
task, we need to wait 5 minutes for the next one.



##
core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala:
##
@@ -116,6 +123,57 @@ class AlterReplicaLogDirsRequestTest extends 
BaseRequestTest {
 assertEquals(Errors.KAFKA_STORAGE_ERROR, 
findErrorForPartition(alterReplicaDirResponse3, new TopicPartition(topic, 2)))
   }
 
+  @Test
+  def testAlterReplicaLogDirsRequestWithRetention(): Unit = {
+val partitionNum = 1
+
+// Alter replica dir before topic creation
+val logDir1 = new File(servers.head.config.logDirs(1)).getAbsolutePath
+val partitionDirs1 = (0 until partitionNum).map(partition => new 
TopicPartition(topic, partition) -> logDir1).toMap
+val alterReplicaLogDirsResponse1 = 
sendAlterReplicaLogDirsRequest(partitionDirs1)
+
+// The response should show error UNKNOWN_TOPIC_OR_PARTITION for all 
partitions
+val tp = new TopicPartition(topic, 0)
+assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, 
findErrorForPartition(alterReplicaLogDirsResponse1, tp))
+assertTrue(servers.head.logManager.getLog(tp).isEmpty)
+
+val topicProperties = new Properties()
+topicProperties.put(TopicConfig.RETENTION_BYTES_CONFIG, "1024")
+topicProperties.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, "1")
+topicProperties.put(TopicConfig.SEGMENT_BYTES_CONFIG, "1024")
+
+createTopic(topic, partitionNum, 1, topicProperties)
+assertEquals(logDir1, servers.head.logManager.getLog(tp).get.dir.getParent)
+
+// send enough records to trigger log rolling
+(0 until 20).foreach { _ =>
+  TestUtils.generateAndProduceMessages(servers, topic, 10, 1)
+}
+TestUtils.waitUntilTrue(() => servers.head.logManager.getLog(new 
TopicPartition(topic, 0)).get.numberOfSegments > 1,
+  "timed out waiting for log segment to roll")
+
+// Wait for log segment retention. Override initialTaskDelayMs as 5 
seconds.
+// The first retention task is executed after 5 seconds, so waiting for 10 
seconds should be enough.
+TestUtils.waitUntilTrue(() => {
+  new File(logDir1, 
tp.toString).listFiles().count(_.getName.endsWith(LogFileUtils.DELETED_FILE_SUFFIX))
 > 0
+}, "timed out waiting for log segment to retention", 1)

Review Comment:
   Yes, removed it. Thank you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]

2024-04-21 Thread via GitHub


showuon commented on code in PR #15616:
URL: https://github.com/apache/kafka/pull/15616#discussion_r1574049613


##
core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala:
##
@@ -116,6 +123,57 @@ class AlterReplicaLogDirsRequestTest extends 
BaseRequestTest {
 assertEquals(Errors.KAFKA_STORAGE_ERROR, 
findErrorForPartition(alterReplicaDirResponse3, new TopicPartition(topic, 2)))
   }
 
+  @Test
+  def testAlterReplicaLogDirsRequestWithRetention(): Unit = {
+val partitionNum = 1
+
+// Alter replica dir before topic creation
+val logDir1 = new File(servers.head.config.logDirs(1)).getAbsolutePath
+val partitionDirs1 = (0 until partitionNum).map(partition => new 
TopicPartition(topic, partition) -> logDir1).toMap
+val alterReplicaLogDirsResponse1 = 
sendAlterReplicaLogDirsRequest(partitionDirs1)
+
+// The response should show error UNKNOWN_TOPIC_OR_PARTITION for all 
partitions
+val tp = new TopicPartition(topic, 0)
+assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, 
findErrorForPartition(alterReplicaLogDirsResponse1, tp))
+assertTrue(servers.head.logManager.getLog(tp).isEmpty)
+
+val topicProperties = new Properties()
+topicProperties.put(TopicConfig.RETENTION_BYTES_CONFIG, "1024")
+topicProperties.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, "1")
+topicProperties.put(TopicConfig.SEGMENT_BYTES_CONFIG, "1024")
+
+createTopic(topic, partitionNum, 1, topicProperties)
+assertEquals(logDir1, servers.head.logManager.getLog(tp).get.dir.getParent)
+
+// send enough records to trigger log rolling
+(0 until 20).foreach { _ =>
+  TestUtils.generateAndProduceMessages(servers, topic, 10, 1)
+}
+TestUtils.waitUntilTrue(() => servers.head.logManager.getLog(new 
TopicPartition(topic, 0)).get.numberOfSegments > 1,
+  "timed out waiting for log segment to roll")
+
+// Wait for log segment retention. Override initialTaskDelayMs as 5 
seconds.
+// The first retention task is executed after 5 seconds, so waiting for 10 
seconds should be enough.
+TestUtils.waitUntilTrue(() => {
+  new File(logDir1, 
tp.toString).listFiles().count(_.getName.endsWith(LogFileUtils.DELETED_FILE_SUFFIX))
 > 0
+}, "timed out waiting for log segment to retention", 1)

Review Comment:
   nit: I think we can leave the timeout as default value. That is, removing 
the 3rd parameter directly.



##
core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala:
##
@@ -37,6 +40,10 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest 
{
 
   val topic = "topic"
 
+  override def brokerPropertyOverrides(properties: Properties): Unit = {
+properties.put(ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_CONFIG, "5000")

Review Comment:
   Why do we need to wait 5 secs for it? I would say we can set to 0 to speed 
up the test. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]

2024-04-21 Thread via GitHub


chia7712 commented on PR #15616:
URL: https://github.com/apache/kafka/pull/15616#issuecomment-2067966296

   >  thanks for the great suggestion. I took a look LogSegment#deleteIfExists 
and LogSegment#deleteTypeIfExists. If we want to handle fallback deletion in 
LocalLog, we may need to return true/false in that two functions. However, 
LogSegment#deleteIfExists uses Utils.tryAll to handle 4 try/catch blocks. If we 
want to return true/false, we need to refactor Utils.tryAll as well. Finally, 
LogSegment#deleteIfExists is not only used by LocalLog.deleteSegmentFiles, but 
also LocalLog.splitOverflowedSegment. We need to handle 
LocalLog.splitOverflowedSegment path, too. I think we can use another Jira to 
track the change. Thanks.
   
   agree. Let's ship it first.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]

2024-04-21 Thread via GitHub


FrankYang0529 commented on code in PR #15616:
URL: https://github.com/apache/kafka/pull/15616#discussion_r1573652117


##
core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala:
##
@@ -116,6 +118,57 @@ class AlterReplicaLogDirsRequestTest extends 
BaseRequestTest {
 assertEquals(Errors.KAFKA_STORAGE_ERROR, 
findErrorForPartition(alterReplicaDirResponse3, new TopicPartition(topic, 2)))
   }
 
+  @Test
+  def testAlterReplicaLogDirsRequestWithRetention(): Unit = {
+val partitionNum = 1
+
+// Alter replica dir before topic creation
+val logDir1 = new File(servers.head.config.logDirs(1)).getAbsolutePath
+val partitionDirs1 = (0 until partitionNum).map(partition => new 
TopicPartition(topic, partition) -> logDir1).toMap
+val alterReplicaLogDirsResponse1 = 
sendAlterReplicaLogDirsRequest(partitionDirs1)
+
+// The response should show error UNKNOWN_TOPIC_OR_PARTITION for all 
partitions
+val tp = new TopicPartition(topic, 0)
+assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, 
findErrorForPartition(alterReplicaLogDirsResponse1, tp))
+assertTrue(servers.head.logManager.getLog(tp).isEmpty)
+
+val topicProperties = new Properties()
+topicProperties.put(TopicConfig.RETENTION_BYTES_CONFIG, "1024")
+topicProperties.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, "1")
+topicProperties.put(TopicConfig.SEGMENT_BYTES_CONFIG, "1024")
+
+createTopic(topic, partitionNum, 1, topicProperties)
+assertEquals(logDir1, servers.head.logManager.getLog(tp).get.dir.getParent)
+
+// send enough records to trigger log rolling
+(0 until 20).foreach { _ =>
+  TestUtils.generateAndProduceMessages(servers, topic, 10, 1)
+}
+TestUtils.waitUntilTrue(() => servers.head.logManager.getLog(new 
TopicPartition(topic, 0)).get.numberOfSegments > 1,
+  "timed out waiting for log segment to roll")
+
+// Wait for log segment retention. LogManager#InitialTaskDelayMs is 30 
seconds.
+// The first retention task is executed after 30 seconds, so waiting for 
35 seconds should be enough.
+TestUtils.waitUntilTrue(() => {
+  new File(logDir1, 
tp.toString).listFiles().count(_.getName.endsWith(LogFileUtils.DELETED_FILE_SUFFIX))
 > 0
+}, "timed out waiting for log segment to retention", 35000)

Review Comment:
   Thank you. I updated `LOG_INITIAL_TASK_DELAY_MS_CONFIG` as 5 seconds.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]

2024-04-21 Thread via GitHub


FrankYang0529 commented on PR #15616:
URL: https://github.com/apache/kafka/pull/15616#issuecomment-2067936507

   > This PR is good but it seems to me `LogSegment` should NOT guess the 
directory structure managed by upper class (i.e `LogManager`).
   > 
   > It seems the root cause is caused by following steps:
   > 
   > 1. the segments to be deleted removed from `LocalLog`
   > 2. `LocalLog#renameDir` move whole folder
   > 3. `LocalLog#renameDir` update the parent folder for all segments. 
However, the segments to be deleted are removed form inner collection already. 
Hence, some `Segment#log` has a stale file.
   > 
   > If I understand correctly, another solution is that we pass a function to 
get latest dir when calling `deleteSegmentFiles` (
   > 
   > 
https://github.com/apache/kafka/blob/2d4abb85bf4a3afb1e3359a05786ab8f3fda127e/core/src/main/scala/kafka/log/LocalLog.scala#L904
   > 
   > ). If deleting segment get not-found error, we call `updateParentDir` and 
delete it again.
   > WDYT?
   
   Hi @chia7712, thanks for the great suggestion. I took a look 
`LogSegment#deleteIfExists` and `LogSegment#deleteTypeIfExists`. If we want to 
handle fallback deletion in `LocalLog`, we may need to return true/false in 
that two functions. However, `LogSegment#deleteIfExists` uses `Utils.tryAll` to 
handle 4 try/catch blocks. If we want to return true/false, we need to refactor 
`Utils.tryAll` as well. Finally, `LogSegment#deleteIfExists` is not only used 
by `LocalLog.deleteSegmentFiles`, but also `LocalLog.splitOverflowedSegment`. 
We need to handle `LocalLog.splitOverflowedSegment` path, too. I think we can 
use another Jira to track the change. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]

2024-04-20 Thread via GitHub


chia7712 commented on code in PR #15616:
URL: https://github.com/apache/kafka/pull/15616#discussion_r1573271296


##
core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala:
##
@@ -116,6 +118,57 @@ class AlterReplicaLogDirsRequestTest extends 
BaseRequestTest {
 assertEquals(Errors.KAFKA_STORAGE_ERROR, 
findErrorForPartition(alterReplicaDirResponse3, new TopicPartition(topic, 2)))
   }
 
+  @Test
+  def testAlterReplicaLogDirsRequestWithRetention(): Unit = {
+val partitionNum = 1
+
+// Alter replica dir before topic creation
+val logDir1 = new File(servers.head.config.logDirs(1)).getAbsolutePath
+val partitionDirs1 = (0 until partitionNum).map(partition => new 
TopicPartition(topic, partition) -> logDir1).toMap
+val alterReplicaLogDirsResponse1 = 
sendAlterReplicaLogDirsRequest(partitionDirs1)
+
+// The response should show error UNKNOWN_TOPIC_OR_PARTITION for all 
partitions
+val tp = new TopicPartition(topic, 0)
+assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, 
findErrorForPartition(alterReplicaLogDirsResponse1, tp))
+assertTrue(servers.head.logManager.getLog(tp).isEmpty)
+
+val topicProperties = new Properties()
+topicProperties.put(TopicConfig.RETENTION_BYTES_CONFIG, "1024")
+topicProperties.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, "1")
+topicProperties.put(TopicConfig.SEGMENT_BYTES_CONFIG, "1024")
+
+createTopic(topic, partitionNum, 1, topicProperties)
+assertEquals(logDir1, servers.head.logManager.getLog(tp).get.dir.getParent)
+
+// send enough records to trigger log rolling
+(0 until 20).foreach { _ =>
+  TestUtils.generateAndProduceMessages(servers, topic, 10, 1)
+}
+TestUtils.waitUntilTrue(() => servers.head.logManager.getLog(new 
TopicPartition(topic, 0)).get.numberOfSegments > 1,
+  "timed out waiting for log segment to roll")
+
+// Wait for log segment retention. LogManager#InitialTaskDelayMs is 30 
seconds.
+// The first retention task is executed after 30 seconds, so waiting for 
35 seconds should be enough.
+TestUtils.waitUntilTrue(() => {
+  new File(logDir1, 
tp.toString).listFiles().count(_.getName.endsWith(LogFileUtils.DELETED_FILE_SUFFIX))
 > 0
+}, "timed out waiting for log segment to retention", 35000)

Review Comment:
   @FrankYang0529 #15719 is merged now. please rebase code :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]

2024-04-19 Thread via GitHub


chia7712 commented on PR #15616:
URL: https://github.com/apache/kafka/pull/15616#issuecomment-2066277923

   This PR is good but it seems to me `LogSegment` should NOT guess the 
directory structure managed by upper class (i.e `LogManager`).
   
   It seems the root cause is caused by following steps:
   1. the segments to be deleted removed from `LocalLog`
   2. `LocalLog#renameDir` move whole folder
   3. `LocalLog#renameDir` update the parent folder for all segments. However, 
the segments to be deleted are removed form inner collection already. Hence, 
some `Segment#log` has a stale file.
   
   If I understand correctly, another solution is that we pass a function to 
get latest dir when calling `deleteSegmentFiles` 
(https://github.com/apache/kafka/blob/2d4abb85bf4a3afb1e3359a05786ab8f3fda127e/core/src/main/scala/kafka/log/LocalLog.scala#L904).
 If deleting segment get not-found error, we call `updateParentDir` and delete 
it again.
   
   WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]

2024-04-19 Thread via GitHub


showuon commented on PR #15616:
URL: https://github.com/apache/kafka/pull/15616#issuecomment-2066130114

   @johnnychhsu , do you have any other comments? I'll merge this at the 
weekend if no other comments. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]

2024-04-18 Thread via GitHub


FrankYang0529 commented on PR #15616:
URL: https://github.com/apache/kafka/pull/15616#issuecomment-2063996320

   > @FrankYang0529 , there is checkstyle error: `[2024-04-17T14:04:27.072Z] 
[ant:checkstyle] [ERROR] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-15616/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java:70:34:
 Name 'futureDirPattern' must match pattern 
'(^[A-Z][A-Z0-9]*(_[A-Z0-9]+)*$)|(^log$)'. [ConstantName] `
   > 
   > Please help fix it. Thanks.
   
   Fixed it and check `./gradlew checkstyleMain checkstyleTest` can pass on my 
laptop. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]

2024-04-17 Thread via GitHub


showuon commented on PR #15616:
URL: https://github.com/apache/kafka/pull/15616#issuecomment-2062889530

   @FrankYang0529 , there is checkstyle error:
   `[2024-04-17T14:04:27.072Z] [ant:checkstyle] [ERROR] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-15616/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java:70:34:
 Name 'futureDirPattern' must match pattern 
'(^[A-Z][A-Z0-9]*(_[A-Z0-9]+)*$)|(^log$)'. [ConstantName]
   `
   
   Please help fix it. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]

2024-04-17 Thread via GitHub


showuon commented on code in PR #15616:
URL: https://github.com/apache/kafka/pull/15616#discussion_r1569846240


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java:
##
@@ -801,8 +803,23 @@ private Void deleteTypeIfExists(StorageAction delete, Stri
 try {
 if (delete.execute())
 LOGGER.info("Deleted {} {}.", fileType, 
file.getAbsolutePath());
-else if (logIfMissing)
-LOGGER.info("Failed to delete {} {} because it does not 
exist.", fileType, file.getAbsolutePath());
+else {
+if (logIfMissing) {
+LOGGER.info("Failed to delete {} {} because it does not 
exist.", fileType, file.getAbsolutePath());
+}
+
+// During alter log dir, the log segment may be moved to a new 
directory, so async delete may fail.
+// Fallback to delete the file in the new directory to avoid 
orphan file.
+Pattern dirPattern = 
Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)-future");

Review Comment:
   Sounds good.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]

2024-04-17 Thread via GitHub


FrankYang0529 commented on code in PR #15616:
URL: https://github.com/apache/kafka/pull/15616#discussion_r1569024432


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java:
##
@@ -801,8 +803,23 @@ private Void deleteTypeIfExists(StorageAction delete, Stri
 try {
 if (delete.execute())
 LOGGER.info("Deleted {} {}.", fileType, 
file.getAbsolutePath());
-else if (logIfMissing)
-LOGGER.info("Failed to delete {} {} because it does not 
exist.", fileType, file.getAbsolutePath());
+else {
+if (logIfMissing) {
+LOGGER.info("Failed to delete {} {} because it does not 
exist.", fileType, file.getAbsolutePath());
+}
+
+// During alter log dir, the log segment may be moved to a new 
directory, so async delete may fail.
+// Fallback to delete the file in the new directory to avoid 
orphan file.
+Pattern dirPattern = 
Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)-future");

Review Comment:
   I think we can create a file like `LogDirUtils.java` in the future, so we 
don't need to define same variables like `FutureDirPattern` and 
`FutureDirSuffix` in `LogSegment.java` and `LocalLog.scala`. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]

2024-04-17 Thread via GitHub


FrankYang0529 commented on code in PR #15616:
URL: https://github.com/apache/kafka/pull/15616#discussion_r1568897953


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java:
##
@@ -801,8 +803,23 @@ private Void deleteTypeIfExists(StorageAction delete, Stri
 try {
 if (delete.execute())
 LOGGER.info("Deleted {} {}.", fileType, 
file.getAbsolutePath());
-else if (logIfMissing)
-LOGGER.info("Failed to delete {} {} because it does not 
exist.", fileType, file.getAbsolutePath());
+else {
+if (logIfMissing) {
+LOGGER.info("Failed to delete {} {} because it does not 
exist.", fileType, file.getAbsolutePath());
+}
+
+// During alter log dir, the log segment may be moved to a new 
directory, so async delete may fail.
+// Fallback to delete the file in the new directory to avoid 
orphan file.
+Pattern dirPattern = 
Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)-future");

Review Comment:
   Updated it. Thanks for the review.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]

2024-04-16 Thread via GitHub


showuon commented on code in PR #15616:
URL: https://github.com/apache/kafka/pull/15616#discussion_r1568266346


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java:
##
@@ -801,8 +803,23 @@ private Void deleteTypeIfExists(StorageAction delete, Stri
 try {
 if (delete.execute())
 LOGGER.info("Deleted {} {}.", fileType, 
file.getAbsolutePath());
-else if (logIfMissing)
-LOGGER.info("Failed to delete {} {} because it does not 
exist.", fileType, file.getAbsolutePath());
+else {
+if (logIfMissing) {
+LOGGER.info("Failed to delete {} {} because it does not 
exist.", fileType, file.getAbsolutePath());
+}
+
+// During alter log dir, the log segment may be moved to a new 
directory, so async delete may fail.
+// Fallback to delete the file in the new directory to avoid 
orphan file.
+Pattern dirPattern = 
Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)-future");

Review Comment:
   nit: The `future` string can be replaced with a variable. Also the regex can 
also become a variable, to make it readable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]

2024-04-16 Thread via GitHub


showuon commented on code in PR #15616:
URL: https://github.com/apache/kafka/pull/15616#discussion_r1568241856


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java:
##
@@ -800,8 +802,23 @@ private Void deleteTypeIfExists(StorageAction delete, Stri
 try {
 if (delete.execute())
 LOGGER.info("Deleted {} {}.", fileType, 
file.getAbsolutePath());
-else if (logIfMissing)
-LOGGER.info("Failed to delete {} {} because it does not 
exist.", fileType, file.getAbsolutePath());
+else {
+if (logIfMissing) {
+LOGGER.info("Failed to delete {} {} because it does not 
exist.", fileType, file.getAbsolutePath());
+}
+
+// During alter log dir, the log segment may be moved to a new 
directory, so async delete may fail.
+// Fallback to delete the file in the new directory to avoid 
orphan file.
+Pattern dirPattern = 
Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)-(delete|future)");

Review Comment:
   > We can use topicPartition, but we use similar regular expression in other 
place. Do we also need to update it?
   
   Thanks for pointing out these to me. I was wrong. I tried to match with 
topic name containing `.` or `-`, they both match correctly. That means we can 
use this regex directly. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]

2024-04-16 Thread via GitHub


FrankYang0529 commented on code in PR #15616:
URL: https://github.com/apache/kafka/pull/15616#discussion_r1567555940


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java:
##
@@ -800,8 +802,23 @@ private Void deleteTypeIfExists(StorageAction delete, Stri
 try {
 if (delete.execute())
 LOGGER.info("Deleted {} {}.", fileType, 
file.getAbsolutePath());
-else if (logIfMissing)
-LOGGER.info("Failed to delete {} {} because it does not 
exist.", fileType, file.getAbsolutePath());
+else {
+if (logIfMissing) {

Review Comment:
   I think the term `logIfMissing` is for log, not a flag for fallback 
deletion. We do the best to remove orphan files even if `logIfMissing` is false.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]

2024-04-16 Thread via GitHub


FrankYang0529 commented on code in PR #15616:
URL: https://github.com/apache/kafka/pull/15616#discussion_r1567544147


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java:
##
@@ -800,8 +802,23 @@ private Void deleteTypeIfExists(StorageAction delete, Stri
 try {
 if (delete.execute())
 LOGGER.info("Deleted {} {}.", fileType, 
file.getAbsolutePath());
-else if (logIfMissing)
-LOGGER.info("Failed to delete {} {} because it does not 
exist.", fileType, file.getAbsolutePath());
+else {
+if (logIfMissing) {
+LOGGER.info("Failed to delete {} {} because it does not 
exist.", fileType, file.getAbsolutePath());
+}
+
+// During alter log dir, the log segment may be moved to a new 
directory, so async delete may fail.
+// Fallback to delete the file in the new directory to avoid 
orphan file.
+Pattern dirPattern = 
Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)-(delete|future)");

Review Comment:
   1. Yeah, I misunderstood file name and folder name. Removed `-delete` folder 
case.
   2. We can use `topicPartition`, but we use similar regular expression in 
other place. Do we also need to update it?
   
   
https://github.com/apache/kafka/blob/269b457d30940e51f532d6a1b616b9506f87232f/core/src/main/scala/kafka/log/LocalLog.scala#L586-L588



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]

2024-04-16 Thread via GitHub


FrankYang0529 commented on code in PR #15616:
URL: https://github.com/apache/kafka/pull/15616#discussion_r1567531204


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java:
##
@@ -800,8 +802,23 @@ private Void deleteTypeIfExists(StorageAction delete, Stri
 try {
 if (delete.execute())
 LOGGER.info("Deleted {} {}.", fileType, 
file.getAbsolutePath());
-else if (logIfMissing)
-LOGGER.info("Failed to delete {} {} because it does not 
exist.", fileType, file.getAbsolutePath());
+else {
+if (logIfMissing) {
+LOGGER.info("Failed to delete {} {} because it does not 
exist.", fileType, file.getAbsolutePath());
+}
+
+// During alter log dir, the log segment may be moved to a new 
directory, so async delete may fail.
+// Fallback to delete the file in the new directory to avoid 
orphan file.
+Pattern dirPattern = 
Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)-(delete|future)");
+Matcher dirMatcher = dirPattern.matcher(file.getParent());
+if (dirMatcher.matches()) {
+String topicPartitionAbsolutePath = dirMatcher.group(1) + 
"-" + dirMatcher.group(2);
+File fallbackFile = new File(topicPartitionAbsolutePath, 
file.getName());
+if (fallbackFile.exists() && fallbackFile.delete()) {

Review Comment:
   Sorry, after reading next comment, I find that I misunderstood this comment. 
One is file name and another is folder name. Yeah, it's better to check file 
name ends with `.deleted` before falling back deletion, so we don't 
accidentally delete other files. 



##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java:
##
@@ -800,8 +802,23 @@ private Void deleteTypeIfExists(StorageAction delete, Stri
 try {
 if (delete.execute())
 LOGGER.info("Deleted {} {}.", fileType, 
file.getAbsolutePath());
-else if (logIfMissing)
-LOGGER.info("Failed to delete {} {} because it does not 
exist.", fileType, file.getAbsolutePath());
+else {
+if (logIfMissing) {
+LOGGER.info("Failed to delete {} {} because it does not 
exist.", fileType, file.getAbsolutePath());
+}
+
+// During alter log dir, the log segment may be moved to a new 
directory, so async delete may fail.
+// Fallback to delete the file in the new directory to avoid 
orphan file.
+Pattern dirPattern = 
Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)-(delete|future)");
+Matcher dirMatcher = dirPattern.matcher(file.getParent());
+if (dirMatcher.matches()) {
+String topicPartitionAbsolutePath = dirMatcher.group(1) + 
"-" + dirMatcher.group(2);
+File fallbackFile = new File(topicPartitionAbsolutePath, 
file.getName());
+if (fallbackFile.exists() && fallbackFile.delete()) {
+LOGGER.warn("Fallback to delete {} {}.", fileType, 
fallbackFile.getAbsolutePath());

Review Comment:
   Yes, updated it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]

2024-04-16 Thread via GitHub


FrankYang0529 commented on code in PR #15616:
URL: https://github.com/apache/kafka/pull/15616#discussion_r1567414173


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java:
##
@@ -800,8 +802,23 @@ private Void deleteTypeIfExists(StorageAction delete, Stri
 try {
 if (delete.execute())
 LOGGER.info("Deleted {} {}.", fileType, 
file.getAbsolutePath());
-else if (logIfMissing)
-LOGGER.info("Failed to delete {} {} because it does not 
exist.", fileType, file.getAbsolutePath());
+else {
+if (logIfMissing) {
+LOGGER.info("Failed to delete {} {} because it does not 
exist.", fileType, file.getAbsolutePath());
+}
+
+// During alter log dir, the log segment may be moved to a new 
directory, so async delete may fail.
+// Fallback to delete the file in the new directory to avoid 
orphan file.
+Pattern dirPattern = 
Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)-(delete|future)");
+Matcher dirMatcher = dirPattern.matcher(file.getParent());
+if (dirMatcher.matches()) {
+String topicPartitionAbsolutePath = dirMatcher.group(1) + 
"-" + dirMatcher.group(2);
+File fallbackFile = new File(topicPartitionAbsolutePath, 
file.getName());
+if (fallbackFile.exists() && fallbackFile.delete()) {

Review Comment:
   No, I use `"^(\\S+)-(\\S+)\\.(\\S+)-(delete|future)"` as regular expression, 
so file name can end with `-delete` or `-future`. The `.delete()` function here 
will return true if the file is successfully deleted.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]

2024-04-16 Thread via GitHub


FrankYang0529 commented on code in PR #15616:
URL: https://github.com/apache/kafka/pull/15616#discussion_r1567406313


##
core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala:
##
@@ -116,6 +118,57 @@ class AlterReplicaLogDirsRequestTest extends 
BaseRequestTest {
 assertEquals(Errors.KAFKA_STORAGE_ERROR, 
findErrorForPartition(alterReplicaDirResponse3, new TopicPartition(topic, 2)))
   }
 
+  @Test
+  def testAlterReplicaLogDirsRequestWithRetention(): Unit = {
+val partitionNum = 1
+
+// Alter replica dir before topic creation
+val logDir1 = new File(servers.head.config.logDirs(1)).getAbsolutePath
+val partitionDirs1 = (0 until partitionNum).map(partition => new 
TopicPartition(topic, partition) -> logDir1).toMap
+val alterReplicaLogDirsResponse1 = 
sendAlterReplicaLogDirsRequest(partitionDirs1)
+
+// The response should show error UNKNOWN_TOPIC_OR_PARTITION for all 
partitions
+val tp = new TopicPartition(topic, 0)
+assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, 
findErrorForPartition(alterReplicaLogDirsResponse1, tp))
+assertTrue(servers.head.logManager.getLog(tp).isEmpty)
+
+val topicProperties = new Properties()
+topicProperties.put(TopicConfig.RETENTION_BYTES_CONFIG, "1024")
+topicProperties.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, "1")
+topicProperties.put(TopicConfig.SEGMENT_BYTES_CONFIG, "1024")
+
+createTopic(topic, partitionNum, 1, topicProperties)
+assertEquals(logDir1, servers.head.logManager.getLog(tp).get.dir.getParent)
+
+// send enough records to trigger log rolling
+(0 until 20).foreach { _ =>
+  TestUtils.generateAndProduceMessages(servers, topic, 10, 1)
+}
+TestUtils.waitUntilTrue(() => servers.head.logManager.getLog(new 
TopicPartition(topic, 0)).get.numberOfSegments > 1,
+  "timed out waiting for log segment to roll")
+
+// Wait for log segment retention. LogManager#InitialTaskDelayMs is 30 
seconds.
+// The first retention task is executed after 30 seconds, so waiting for 
35 seconds should be enough.
+TestUtils.waitUntilTrue(() => {
+  new File(logDir1, 
tp.toString).listFiles().count(_.getName.endsWith(LogFileUtils.DELETED_FILE_SUFFIX))
 > 0
+}, "timed out waiting for log segment to retention", 35000)

Review Comment:
   Yeah, it takes long time for first retention. I will update the case after 
https://github.com/apache/kafka/pull/15719 is merged. Thank you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]

2024-04-15 Thread via GitHub


johnnychhsu commented on code in PR #15616:
URL: https://github.com/apache/kafka/pull/15616#discussion_r1565898495


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java:
##
@@ -800,8 +802,23 @@ private Void deleteTypeIfExists(StorageAction delete, Stri
 try {
 if (delete.execute())
 LOGGER.info("Deleted {} {}.", fileType, 
file.getAbsolutePath());
-else if (logIfMissing)
-LOGGER.info("Failed to delete {} {} because it does not 
exist.", fileType, file.getAbsolutePath());
+else {
+if (logIfMissing) {

Review Comment:
   why can't we just add the fall back login inside `if (logIfMissing)`, 
instead adding a new if-else block inside this else block? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]

2024-04-12 Thread via GitHub


showuon commented on code in PR #15616:
URL: https://github.com/apache/kafka/pull/15616#discussion_r1562184559


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java:
##
@@ -800,8 +802,23 @@ private Void deleteTypeIfExists(StorageAction delete, Stri
 try {
 if (delete.execute())
 LOGGER.info("Deleted {} {}.", fileType, 
file.getAbsolutePath());
-else if (logIfMissing)
-LOGGER.info("Failed to delete {} {} because it does not 
exist.", fileType, file.getAbsolutePath());
+else {
+if (logIfMissing) {
+LOGGER.info("Failed to delete {} {} because it does not 
exist.", fileType, file.getAbsolutePath());
+}
+
+// During alter log dir, the log segment may be moved to a new 
directory, so async delete may fail.
+// Fallback to delete the file in the new directory to avoid 
orphan file.
+Pattern dirPattern = 
Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)-(delete|future)");
+Matcher dirMatcher = dirPattern.matcher(file.getParent());
+if (dirMatcher.matches()) {
+String topicPartitionAbsolutePath = dirMatcher.group(1) + 
"-" + dirMatcher.group(2);
+File fallbackFile = new File(topicPartitionAbsolutePath, 
file.getName());
+if (fallbackFile.exists() && fallbackFile.delete()) {

Review Comment:
   Does the file name always ends with `.delete`? Should we check it before 
deletion?



##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java:
##
@@ -800,8 +802,23 @@ private Void deleteTypeIfExists(StorageAction delete, Stri
 try {
 if (delete.execute())
 LOGGER.info("Deleted {} {}.", fileType, 
file.getAbsolutePath());
-else if (logIfMissing)
-LOGGER.info("Failed to delete {} {} because it does not 
exist.", fileType, file.getAbsolutePath());
+else {
+if (logIfMissing) {
+LOGGER.info("Failed to delete {} {} because it does not 
exist.", fileType, file.getAbsolutePath());
+}
+
+// During alter log dir, the log segment may be moved to a new 
directory, so async delete may fail.
+// Fallback to delete the file in the new directory to avoid 
orphan file.
+Pattern dirPattern = 
Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)-(delete|future)");

Review Comment:
   1. Why does it contain `delete` in the end?
   2. Unfortunately, the topic name could contain `-` or `.`, so it's unsafe to 
do regex like this.
   
   I'm thinking we can pass `topicPartition` as parameter into 
`deleteTypeIfExists` so that we don't have to do further regex like this. And 
just verify if fileName.endsWith("future") because the normal folder name 
should always ends with a number (partition number), instead of "future". WDYT?



##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java:
##
@@ -800,8 +802,23 @@ private Void deleteTypeIfExists(StorageAction delete, Stri
 try {
 if (delete.execute())
 LOGGER.info("Deleted {} {}.", fileType, 
file.getAbsolutePath());
-else if (logIfMissing)
-LOGGER.info("Failed to delete {} {} because it does not 
exist.", fileType, file.getAbsolutePath());
+else {
+if (logIfMissing) {
+LOGGER.info("Failed to delete {} {} because it does not 
exist.", fileType, file.getAbsolutePath());
+}
+
+// During alter log dir, the log segment may be moved to a new 
directory, so async delete may fail.
+// Fallback to delete the file in the new directory to avoid 
orphan file.
+Pattern dirPattern = 
Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)-(delete|future)");
+Matcher dirMatcher = dirPattern.matcher(file.getParent());
+if (dirMatcher.matches()) {
+String topicPartitionAbsolutePath = dirMatcher.group(1) + 
"-" + dirMatcher.group(2);
+File fallbackFile = new File(topicPartitionAbsolutePath, 
file.getName());
+if (fallbackFile.exists() && fallbackFile.delete()) {
+LOGGER.warn("Fallback to delete {} {}.", fileType, 
fallbackFile.getAbsolutePath());

Review Comment:
   Why did we use `warn` here? I think we can use `info` since it's expected 
behavior. WDYT?



##
core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala:
##
@@ -116,6 +118,57 @@ class AlterReplicaLogDirsRequestTest extends 
BaseRequestTest {
 assertEquals(Errors.KAFKA_STORAGE_ERROR, 
findErrorForPartition(alterReplicaDirResponse3, new TopicPartition(topic, 2)))
   }
 
+  @Test
+  def 

[PR] KAFKA-16424: remove truncated logs after alter dir [kafka]

2024-03-28 Thread via GitHub


FrankYang0529 opened a new pull request, #15616:
URL: https://github.com/apache/kafka/pull/15616

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org