kamalcph commented on code in PR #15634:
URL: https://github.com/apache/kafka/pull/15634#discussion_r1563885907


##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -282,15 +282,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
    * Update high watermark with offset metadata. The new high watermark will 
be lower
-   * bounded by the log start offset and upper bounded by the log end offset.
+   * bounded by the local-log-start-offset and upper bounded by the 
log-end-offset.
    *
    * @param highWatermarkMetadata the suggested high watermark with offset 
metadata
    * @return the updated high watermark offset
    */
   def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = {
     val endOffsetMetadata = localLog.logEndOffsetMetadata
-    val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < 
logStartOffset) {
-      new LogOffsetMetadata(logStartOffset)
+    val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < 
_localLogStartOffset) {

Review Comment:
   > In the rare case, the restarted broker is elected as the leader before 
caught up through unclean election. Is this the case that you want to address?
   
   yes, we want to address this case too. And, the issue can also happen during 
clean preferred-leader-election:
   
   ```
   Call stack: The replica (1002) has full data but HW is invalid, then the 
fetch-offset will be equal to LeaderLog(1001).highWatermark
   
   Leader (1001):
       KafkaApis.handleFetchRequest
           ReplicaManager.fetchMessages
               ReplicaManager.readFromLocalLog
                   Partition.fetchRecords
                       Partition.updateFollowerFetchState
                           Partition.maybeExpandIsr
                               Partition.submitAlterPartition
                               ...
                               ...
                               ...
           # If there is not enough data to respond and there is no remote 
data, we will let the fetch request wait for new data.
           # parks the request in the DelayedFetchPurgatory
   
   
   Another thread, runs Preferred-Leader-Election in controller (1003), since 
the replica 1002 joined the ISR list, it can be elected as the preferred 
leader. The controller sends LeaderAndIsr requests to all the brokers.
   
       KafkaController.processReplicaLeaderElection
           KafkaController.onReplicaElection
               PartitionStateMachine.handleStateChanges
                   PartitionStateMachine.doHandleStateChanges
                       PartitionStateMachine.electLeaderForPartitions
                   ControllerChannelManager.sendRequestsToBrokers
   
   
   Replica 1002 got elected as Leader and have invalid highWatermark since it 
didn't process the fetch-response from the previous leader 1001, throws 
OFFSET_OUT_OF_RANGE error when processing the LeaderAndIsr request. Note that 
in LeaderAndIsr request even if one partition fails, then the remaining 
partitions in that request won't be processed.
   
       KafkaApis.handleLeaderAndIsrRequest
           ReplicaManager.becomeLeaderOrFollower
               ReplicaManager.makeLeaders
                   Partition.makeLeader
                       Partition.maybeIncrementLeaderHW
                           UnifiedLog.maybeIncrementHighWatermark (LeaderLog)
                               UnifiedLog.fetchHighWatermarkMetadata
   
   
   The controller assumes that the current-leader for the tp0 is 1002, but the 
broker 1002 couldn't process the LISR. The controller retries the LISR until 
the broker 1002 becomes leader for tp0. During this time, the producers won't 
be able to send messages, as the node 1002, sends NOT_LEADER_FOR_PARTITION 
error-code to the producer.
   
   During this time, if a follower sends the FETCH request to read from the 
current-leader 1002, then OFFSET_OUT_OF_RANGE error will be returned by the 
leader:
   
       KafkaApis.handleFetchRequest
           ReplicaManager.fetchMessages
               ReplicaManager.readFromLog
                   Partition.fetchRecords
                       Partition.readRecords
                           UnifiedLog.read 
                               UnifiedLog.fetchHighWatermarkMetadata
                                   UnifiedLog.convertToOffsetMetadataOrThrow
                                       LocalLog.convertToOffsetMetadataOrThrow
                                           LocalLog.read
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to