junrao commented on code in PR #15634:
URL: https://github.com/apache/kafka/pull/15634#discussion_r1562991673


##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -282,15 +282,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
    * Update high watermark with offset metadata. The new high watermark will 
be lower
-   * bounded by the log start offset and upper bounded by the log end offset.
+   * bounded by the local-log-start-offset and upper bounded by the 
log-end-offset.
    *
    * @param highWatermarkMetadata the suggested high watermark with offset 
metadata
    * @return the updated high watermark offset
    */
   def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = {
     val endOffsetMetadata = localLog.logEndOffsetMetadata
-    val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < 
logStartOffset) {
-      new LogOffsetMetadata(logStartOffset)
+    val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < 
_localLogStartOffset) {

Review Comment:
   Yes, if replication-offset-checkpoint is corrupted, HWM could temporarily be 
set to below local-log-start-offset. I am still trying to understand the impact 
of that. In the common case, the restarted broker can't become the leader or 
serve reads until it's caught up. At that time, the HWM will be up to date. In 
the rare case, the restarted broker is elected as the leader before caught up 
through unclean election. Is this the case that you want to address?
   
   The jira also says:
   
   > If the high watermark is less than the local-log-start-offset, then the 
[UnifiedLog#fetchHighWatermarkMetadata](https://sourcegraph.com/github.com/apache/kafka@d4caa1c10ec81b9c87eaaf52b73c83d5579b68d3/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L358)
 method will throw the OFFSET_OUT_OF_RANGE error when it converts the offset to 
metadata. Once this error happens, the followers will receive out-of-range 
exceptions and the producers won't be able to produce messages since the leader 
cannot move the high watermark.
   
   However, the follower read is bounded by logEndOffset, not HWM? Where does 
the follower read need to convert HWM to metadata?



##########
core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala:
##########
@@ -318,6 +318,80 @@ class UnifiedLogTest {
     assertHighWatermark(4L)
   }
 
+  @Test
+  def testHighWatermarkMaintenanceForRemoteTopic(): Unit = {
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024, 
remoteLogStorageEnable = true)
+    val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true)
+    val leaderEpoch = 0
+
+    def assertHighWatermark(offset: Long): Unit = {
+      assertEquals(offset, log.highWatermark)
+      assertValidLogOffsetMetadata(log, log.fetchOffsetSnapshot.highWatermark)
+    }
+
+    // High watermark initialized to 0
+    assertHighWatermark(0L)
+
+    var offset = 0L
+    for(_ <- 0 until 50) {
+      val records = TestUtils.singletonRecords("test".getBytes())
+      val info = log.appendAsLeader(records, leaderEpoch)
+      offset = info.lastOffset
+      if (offset != 0 && offset % 10 == 0)
+        log.roll()
+    }
+    assertEquals(5, log.logSegments.size)
+
+    // High watermark not changed by append
+    assertHighWatermark(0L)
+
+    // Update high watermark as leader
+    log.maybeIncrementHighWatermark(new LogOffsetMetadata(50L))
+    assertHighWatermark(50L)
+    assertEquals(50L, log.logEndOffset)
+
+    // Cannot update high watermark past the log end offset
+    log.updateHighWatermark(60L)
+    assertHighWatermark(50L)
+
+    // simulate calls to upload 3 segments to remote storage and remove them 
from local-log.
+    log.updateHighestOffsetInRemoteStorage(30)
+    log.maybeIncrementLocalLogStartOffset(31L, 
LogStartOffsetIncrementReason.SegmentDeletion)
+    log.deleteOldSegments()
+    assertEquals(2, log.logSegments.size)
+    assertEquals(31L, log.localLogStartOffset())
+    assertHighWatermark(50L)
+
+    // simulate one remote-log segment deletion
+    val logStartOffset = 11L
+    log.maybeIncrementLogStartOffset(logStartOffset, 
LogStartOffsetIncrementReason.SegmentDeletion)
+    assertEquals(11, log.logStartOffset)
+
+    // Updating the HW below the log-start-offset / local-log-start-offset is 
not allowed. HW should reset to local-log-start-offset.
+    log.updateHighWatermark(new LogOffsetMetadata(5L))
+    assertHighWatermark(31L)
+    // Updating the HW between log-start-offset and local-log-start-offset is 
not allowed. HW should reset to local-log-start-offset.

Review Comment:
   This is moving HW below local-log-start-offset, not log-start-offset.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to