junrao commented on a change in pull request #11199:
URL: https://github.com/apache/kafka/pull/11199#discussion_r687993862



##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -595,8 +595,8 @@ private[log] object LogCleanerManager extends Logging {
     // may be cleaned
     val firstUncleanableDirtyOffset: Long = Seq(
 
-      // we do not clean beyond the first unstable offset
-      log.firstUnstableOffset,
+      // we do not clean beyond the last stable offset

Review comment:
       This is an existing issue. But could we update the comment in line 593 
to include last stable offset too?

##########
File path: core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
##########
@@ -541,6 +541,29 @@ class LogCleanerManagerTest extends Logging {
     while(log.numberOfSegments < 8)
       log.appendAsLeader(records(log.logEndOffset.toInt, 
log.logEndOffset.toInt, time.milliseconds()), leaderEpoch = 0)
 
+    log.updateHighWatermark(50)
+
+    val lastCleanOffset = Some(0L)
+    val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, 
lastCleanOffset, time.milliseconds)
+    assertEquals(0L, cleanableOffsets.firstDirtyOffset, "The first cleanable 
offset starts at the beginning of the log.")
+    assertEquals(log.highWatermark, 
cleanableOffsets.firstUncleanableDirtyOffset, "The first uncleanable offset is 
bounded by the hwm.")

Review comment:
       Since the description of the test says bounded by LSO, should we change 
log.highWatermark to log.lastStableOffset and the error message accordingly? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to