kamalcph commented on code in PR #15634:
URL: https://github.com/apache/kafka/pull/15634#discussion_r1557089094


##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -282,15 +282,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
    * Update high watermark with offset metadata. The new high watermark will 
be lower
-   * bounded by the log start offset and upper bounded by the log end offset.
+   * bounded by the local-log-start-offset and upper bounded by the 
log-end-offset.
    *
    * @param highWatermarkMetadata the suggested high watermark with offset 
metadata
    * @return the updated high watermark offset
    */
   def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = {
     val endOffsetMetadata = localLog.logEndOffsetMetadata
-    val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < 
logStartOffset) {
-      new LogOffsetMetadata(logStartOffset)
+    val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < 
_localLogStartOffset) {

Review Comment:
   > when will we set HWM to be lower than _localLogStartOffset?
   
   This can happen when recovering the partition due to ungraceful shutdown and 
the replication-offset-checkpoint file is missing/corrupted. When the broker 
comes online, HWM is set to to localLogStartOffset in 
[UnifiedLog#updateLocalLogStartOffset](https://sourcegraph.com/github.com/apache/kafka@f895ab5145077c5efa10a4a898628d901b01e2c2/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L162),
 then we load the HWM from the checkpoint file in 
[Partition#createLog](https://sourcegraph.com/github.com/apache/kafka@f895ab5145077c5efa10a4a898628d901b01e2c2/-/blob/core/src/main/scala/kafka/cluster/Partition.scala?L495).
   
   If the HWM checkpoint file is missing / does not contain the entry for 
partition, then the default value of 0 is taken. If 0 < LogStartOffset (LSO), 
then LSO is assumed as HWM . Thus, the non-monotonic update of highwatermark 
from LLSO to LSO can happen.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to