satishd commented on code in PR #15634: URL: https://github.com/apache/kafka/pull/15634#discussion_r1546167963
########## core/src/main/scala/kafka/log/UnifiedLog.scala: ########## @@ -136,16 +136,16 @@ class UnifiedLog(@volatile var logStartOffset: Long, */ @volatile private var firstUnstableOffsetMetadata: Option[LogOffsetMetadata] = None + @volatile var partitionMetadataFile: Option[PartitionMetadataFile] = None Review Comment: nit: You can leave it at the earliest place for this field as it is not really needed for this change. ########## core/src/main/scala/kafka/log/UnifiedLog.scala: ########## @@ -136,16 +136,16 @@ class UnifiedLog(@volatile var logStartOffset: Long, */ @volatile private var firstUnstableOffsetMetadata: Option[LogOffsetMetadata] = None + @volatile var partitionMetadataFile: Option[PartitionMetadataFile] = None + + @volatile private[kafka] var _localLogStartOffset: Long = logStartOffset + /* Keep track of the current high watermark in order to ensure that segments containing offsets at or above it are * not eligible for deletion. This means that the active segment is only eligible for deletion if the high watermark * equals the log end offset (which may never happen for a partition under consistent load). This is needed to * prevent the log start offset (which is exposed in fetch responses) from getting ahead of the high watermark. */ - @volatile private var highWatermarkMetadata: LogOffsetMetadata = new LogOffsetMetadata(logStartOffset) - - @volatile var partitionMetadataFile: Option[PartitionMetadataFile] = None - - @volatile private[kafka] var _localLogStartOffset: Long = logStartOffset + @volatile private var highWatermarkMetadata: LogOffsetMetadata = new LogOffsetMetadata(_localLogStartOffset) Review Comment: There won't be any effect with this change as `_localLogStartOffset` is initialized with `logStartOffset`. But it is good to keep `_localLogStartOffset` for consistency and relevance of this field. ########## core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala: ########## @@ -318,6 +318,80 @@ class UnifiedLogTest { assertHighWatermark(4L) } + @Test + def testHighWatermarkMaintenanceForRemoteTopic(): Unit = { + val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024, remoteLogStorageEnable = true) + val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true) + val leaderEpoch = 0 + + def assertHighWatermark(offset: Long): Unit = { + assertEquals(offset, log.highWatermark) + assertValidLogOffsetMetadata(log, log.fetchOffsetSnapshot.highWatermark) + } + + // High watermark initialized to 0 + assertHighWatermark(0L) + + var offset = 0L + for(_ <- 0 until 50) { + val records = TestUtils.singletonRecords("test".getBytes()) + val info = log.appendAsLeader(records, leaderEpoch) + offset = info.lastOffset + if (offset != 0 && offset % 10 == 0) + log.roll() + } + assertEquals(5, log.logSegments.size) + + // High watermark not changed by append + assertHighWatermark(0L) + + // Update high watermark as leader + log.maybeIncrementHighWatermark(new LogOffsetMetadata(50L)) + assertHighWatermark(50L) + assertEquals(50L, log.logEndOffset) + + // Cannot update high watermark past the log end offset + log.updateHighWatermark(60L) + assertHighWatermark(50L) + + // simulate calls to upload 3 segments to remote storage and remove them from local-log. + log.updateHighestOffsetInRemoteStorage(30) + log.maybeIncrementLocalLogStartOffset(31L, LogStartOffsetIncrementReason.SegmentDeletion) + log.deleteOldSegments() + assertEquals(2, log.logSegments.size) + assertEquals(31L, log.localLogStartOffset()) + assertHighWatermark(50L) + + // simulate one remote-log segment deletion + val logStartOffset = 11L + log.maybeIncrementLogStartOffset(logStartOffset, LogStartOffsetIncrementReason.SegmentDeletion) + assertEquals(11, log.logStartOffset) + + // Updating the HW below the log-start-offset / local-log-start-offset is not allowed. HW should reset to local-log-start-offset. + log.updateHighWatermark(new LogOffsetMetadata(5L)) + assertHighWatermark(31L) + // Updating the HW between log-start-offset and local-log-start-offset is not allowed. HW should reset to local-log-start-offset. + log.updateHighWatermark(new LogOffsetMetadata(25L)) + assertHighWatermark(31L) + // Updating the HW between local-log-start-offset and log-end-offset is allowed. + log.updateHighWatermark(new LogOffsetMetadata(32L)) + assertHighWatermark(32L) + assertEquals(11L, log.logStartOffset) + assertEquals(31L, log.localLogStartOffset()) + + // Truncating the logs to below the local-log-start-offset, should update the high watermark Review Comment: Good to see covering the truncation scenarios also. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org