junrao commented on a change in pull request #11199: URL: https://github.com/apache/kafka/pull/11199#discussion_r687993862
########## File path: core/src/main/scala/kafka/log/LogCleanerManager.scala ########## @@ -595,8 +595,8 @@ private[log] object LogCleanerManager extends Logging { // may be cleaned val firstUncleanableDirtyOffset: Long = Seq( - // we do not clean beyond the first unstable offset - log.firstUnstableOffset, + // we do not clean beyond the last stable offset Review comment: This is an existing issue. But could we update the comment in line 593 to include last stable offset too? ########## File path: core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala ########## @@ -541,6 +541,29 @@ class LogCleanerManagerTest extends Logging { while(log.numberOfSegments < 8) log.appendAsLeader(records(log.logEndOffset.toInt, log.logEndOffset.toInt, time.milliseconds()), leaderEpoch = 0) + log.updateHighWatermark(50) + + val lastCleanOffset = Some(0L) + val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, lastCleanOffset, time.milliseconds) + assertEquals(0L, cleanableOffsets.firstDirtyOffset, "The first cleanable offset starts at the beginning of the log.") + assertEquals(log.highWatermark, cleanableOffsets.firstUncleanableDirtyOffset, "The first uncleanable offset is bounded by the hwm.") Review comment: Since the description of the test says bounded by LSO, should we change log.highWatermark to log.lastStableOffset and the error message accordingly? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org