junrao commented on code in PR #15634: URL: https://github.com/apache/kafka/pull/15634#discussion_r1562991673
########## core/src/main/scala/kafka/log/UnifiedLog.scala: ########## @@ -282,15 +282,15 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Update high watermark with offset metadata. The new high watermark will be lower - * bounded by the log start offset and upper bounded by the log end offset. + * bounded by the local-log-start-offset and upper bounded by the log-end-offset. * * @param highWatermarkMetadata the suggested high watermark with offset metadata * @return the updated high watermark offset */ def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = { val endOffsetMetadata = localLog.logEndOffsetMetadata - val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < logStartOffset) { - new LogOffsetMetadata(logStartOffset) + val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < _localLogStartOffset) { Review Comment: Yes, if replication-offset-checkpoint is corrupted, HWM could temporarily be set to below local-log-start-offset. I am still trying to understand the impact of that. In the common case, the restarted broker can't become the leader or serve reads until it's caught up. At that time, the HWM will be up to date. In the rare case, the restarted broker is elected as the leader before caught up through unclean election. Is this the case that you want to address? The jira also says: > If the high watermark is less than the local-log-start-offset, then the [UnifiedLog#fetchHighWatermarkMetadata](https://sourcegraph.com/github.com/apache/kafka@d4caa1c10ec81b9c87eaaf52b73c83d5579b68d3/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L358) method will throw the OFFSET_OUT_OF_RANGE error when it converts the offset to metadata. Once this error happens, the followers will receive out-of-range exceptions and the producers won't be able to produce messages since the leader cannot move the high watermark. However, the follower read is bounded by logEndOffset, not HWM? Where does the follower read need to convert HWM to metadata? ########## core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala: ########## @@ -318,6 +318,80 @@ class UnifiedLogTest { assertHighWatermark(4L) } + @Test + def testHighWatermarkMaintenanceForRemoteTopic(): Unit = { + val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024, remoteLogStorageEnable = true) + val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true) + val leaderEpoch = 0 + + def assertHighWatermark(offset: Long): Unit = { + assertEquals(offset, log.highWatermark) + assertValidLogOffsetMetadata(log, log.fetchOffsetSnapshot.highWatermark) + } + + // High watermark initialized to 0 + assertHighWatermark(0L) + + var offset = 0L + for(_ <- 0 until 50) { + val records = TestUtils.singletonRecords("test".getBytes()) + val info = log.appendAsLeader(records, leaderEpoch) + offset = info.lastOffset + if (offset != 0 && offset % 10 == 0) + log.roll() + } + assertEquals(5, log.logSegments.size) + + // High watermark not changed by append + assertHighWatermark(0L) + + // Update high watermark as leader + log.maybeIncrementHighWatermark(new LogOffsetMetadata(50L)) + assertHighWatermark(50L) + assertEquals(50L, log.logEndOffset) + + // Cannot update high watermark past the log end offset + log.updateHighWatermark(60L) + assertHighWatermark(50L) + + // simulate calls to upload 3 segments to remote storage and remove them from local-log. + log.updateHighestOffsetInRemoteStorage(30) + log.maybeIncrementLocalLogStartOffset(31L, LogStartOffsetIncrementReason.SegmentDeletion) + log.deleteOldSegments() + assertEquals(2, log.logSegments.size) + assertEquals(31L, log.localLogStartOffset()) + assertHighWatermark(50L) + + // simulate one remote-log segment deletion + val logStartOffset = 11L + log.maybeIncrementLogStartOffset(logStartOffset, LogStartOffsetIncrementReason.SegmentDeletion) + assertEquals(11, log.logStartOffset) + + // Updating the HW below the log-start-offset / local-log-start-offset is not allowed. HW should reset to local-log-start-offset. + log.updateHighWatermark(new LogOffsetMetadata(5L)) + assertHighWatermark(31L) + // Updating the HW between log-start-offset and local-log-start-offset is not allowed. HW should reset to local-log-start-offset. Review Comment: This is moving HW below local-log-start-offset, not log-start-offset. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org