satishd commented on code in PR #15634:
URL: https://github.com/apache/kafka/pull/15634#discussion_r1546167963


##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -136,16 +136,16 @@ class UnifiedLog(@volatile var logStartOffset: Long,
    */
   @volatile private var firstUnstableOffsetMetadata: Option[LogOffsetMetadata] 
= None
 
+  @volatile var partitionMetadataFile: Option[PartitionMetadataFile] = None

Review Comment:
   nit: You can leave it at the earliest place for this field as it is not 
really needed for this change. 



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -136,16 +136,16 @@ class UnifiedLog(@volatile var logStartOffset: Long,
    */
   @volatile private var firstUnstableOffsetMetadata: Option[LogOffsetMetadata] 
= None
 
+  @volatile var partitionMetadataFile: Option[PartitionMetadataFile] = None
+
+  @volatile private[kafka] var _localLogStartOffset: Long = logStartOffset
+
   /* Keep track of the current high watermark in order to ensure that segments 
containing offsets at or above it are
    * not eligible for deletion. This means that the active segment is only 
eligible for deletion if the high watermark
    * equals the log end offset (which may never happen for a partition under 
consistent load). This is needed to
    * prevent the log start offset (which is exposed in fetch responses) from 
getting ahead of the high watermark.
    */
-  @volatile private var highWatermarkMetadata: LogOffsetMetadata = new 
LogOffsetMetadata(logStartOffset)
-
-  @volatile var partitionMetadataFile: Option[PartitionMetadataFile] = None
-
-  @volatile private[kafka] var _localLogStartOffset: Long = logStartOffset
+  @volatile private var highWatermarkMetadata: LogOffsetMetadata = new 
LogOffsetMetadata(_localLogStartOffset)

Review Comment:
   There won't be any effect with this change as `_localLogStartOffset` is 
initialized with `logStartOffset`. But it is good to keep 
`_localLogStartOffset` for consistency and relevance of this field. 



##########
core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala:
##########
@@ -318,6 +318,80 @@ class UnifiedLogTest {
     assertHighWatermark(4L)
   }
 
+  @Test
+  def testHighWatermarkMaintenanceForRemoteTopic(): Unit = {
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024, 
remoteLogStorageEnable = true)
+    val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true)
+    val leaderEpoch = 0
+
+    def assertHighWatermark(offset: Long): Unit = {
+      assertEquals(offset, log.highWatermark)
+      assertValidLogOffsetMetadata(log, log.fetchOffsetSnapshot.highWatermark)
+    }
+
+    // High watermark initialized to 0
+    assertHighWatermark(0L)
+
+    var offset = 0L
+    for(_ <- 0 until 50) {
+      val records = TestUtils.singletonRecords("test".getBytes())
+      val info = log.appendAsLeader(records, leaderEpoch)
+      offset = info.lastOffset
+      if (offset != 0 && offset % 10 == 0)
+        log.roll()
+    }
+    assertEquals(5, log.logSegments.size)
+
+    // High watermark not changed by append
+    assertHighWatermark(0L)
+
+    // Update high watermark as leader
+    log.maybeIncrementHighWatermark(new LogOffsetMetadata(50L))
+    assertHighWatermark(50L)
+    assertEquals(50L, log.logEndOffset)
+
+    // Cannot update high watermark past the log end offset
+    log.updateHighWatermark(60L)
+    assertHighWatermark(50L)
+
+    // simulate calls to upload 3 segments to remote storage and remove them 
from local-log.
+    log.updateHighestOffsetInRemoteStorage(30)
+    log.maybeIncrementLocalLogStartOffset(31L, 
LogStartOffsetIncrementReason.SegmentDeletion)
+    log.deleteOldSegments()
+    assertEquals(2, log.logSegments.size)
+    assertEquals(31L, log.localLogStartOffset())
+    assertHighWatermark(50L)
+
+    // simulate one remote-log segment deletion
+    val logStartOffset = 11L
+    log.maybeIncrementLogStartOffset(logStartOffset, 
LogStartOffsetIncrementReason.SegmentDeletion)
+    assertEquals(11, log.logStartOffset)
+
+    // Updating the HW below the log-start-offset / local-log-start-offset is 
not allowed. HW should reset to local-log-start-offset.
+    log.updateHighWatermark(new LogOffsetMetadata(5L))
+    assertHighWatermark(31L)
+    // Updating the HW between log-start-offset and local-log-start-offset is 
not allowed. HW should reset to local-log-start-offset.
+    log.updateHighWatermark(new LogOffsetMetadata(25L))
+    assertHighWatermark(31L)
+    // Updating the HW between local-log-start-offset and log-end-offset is 
allowed.
+    log.updateHighWatermark(new LogOffsetMetadata(32L))
+    assertHighWatermark(32L)
+    assertEquals(11L, log.logStartOffset)
+    assertEquals(31L, log.localLogStartOffset())
+
+    // Truncating the logs to below the local-log-start-offset, should update 
the high watermark

Review Comment:
   Good to see covering the truncation scenarios also.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to