showuon commented on a change in pull request #9178: URL: https://github.com/apache/kafka/pull/9178#discussion_r486751738
########## File path: core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala ########## @@ -361,6 +366,93 @@ class LogCleanerManagerTest extends Logging { assertEquals("should have 1 logs ready to be deleted", 1, deletableLog3.size) } + @Test + def testUpdateCheckpointsShouldAddOffsetToPartition(): Unit = { + val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes) + val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact) + val cleanerManager: LogCleanerManager = createCleanerManager(log) + + // expect the checkpoint offset is not the expectedOffset before doing updateCheckpoints + assertNotEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).getOrElse(0)) + + cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset)) + // expect the checkpoint offset is now updated to the expectedOffset after doing updateCheckpoints + assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get) + } + + @Test + def testUpdateCheckpointsShouldRemovePartitionData(): Unit = { + val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes) + val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact) + val cleanerManager: LogCleanerManager = createCleanerManager(log) + + // write some data into the cleaner-offset-checkpoint file + cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset)) + assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get) + + // updateCheckpoints should remove the topicPartition data in the logDir + cleanerManager.updateCheckpoints(logDir, None, topicPartitionToBeRemoved = Some(topicPartition)) + assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).isEmpty) + } + + @Test + def testHandleLogDirFailureShouldRemoveDirAndData(): Unit = { + val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes) + val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact) + val cleanerManager: LogCleanerManager = createCleanerManager(log) + + // write some data into the cleaner-offset-checkpoint file in logDir and logDir2 + cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset)) + cleanerManager.updateCheckpoints(logDir2, Option(topicPartition2, offset)) + assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get) + assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition2).get) + + cleanerManager.handleLogDirFailure(logDir.getAbsolutePath) + // verify the partition data in logDir is gone, and data in logDir2 is still there + assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition2).get) + assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).isEmpty) + } + + @Test + def testMaybeTruncateCheckpointShouldTruncateData(): Unit = { + val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes) + val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact) + val cleanerManager: LogCleanerManager = createCleanerManager(log) + val lowerOffset = 1L + val higherOffset = 1000L + + // write some data into the cleaner-offset-checkpoint file in logDir + cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset)) + assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get) + + // we should not truncate the checkpoint data for checkpointed offset < the given offset (higherOffset) Review comment: I checked again and I think I was right. The truncate Checkpoint file will happen only when the provided offset smaller than the one the the checkpoint file. So the comment is correct. I just added an equal sign (<=) to make it more accurate. Thanks. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org