kamalcph commented on code in PR #15634: URL: https://github.com/apache/kafka/pull/15634#discussion_r1563885907
########## core/src/main/scala/kafka/log/UnifiedLog.scala: ########## @@ -282,15 +282,15 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Update high watermark with offset metadata. The new high watermark will be lower - * bounded by the log start offset and upper bounded by the log end offset. + * bounded by the local-log-start-offset and upper bounded by the log-end-offset. * * @param highWatermarkMetadata the suggested high watermark with offset metadata * @return the updated high watermark offset */ def updateHighWatermark(highWatermarkMetadata: LogOffsetMetadata): Long = { val endOffsetMetadata = localLog.logEndOffsetMetadata - val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < logStartOffset) { - new LogOffsetMetadata(logStartOffset) + val newHighWatermarkMetadata = if (highWatermarkMetadata.messageOffset < _localLogStartOffset) { Review Comment: > In the rare case, the restarted broker is elected as the leader before caught up through unclean election. Is this the case that you want to address? yes, we want to address this case too. And, the issue can also happen during clean preferred-leader-election: ``` Call stack: The replica (1002) has full data but HW is invalid, then the fetch-offset will be equal to LeaderLog(1001).highWatermark Leader (1001): KafkaApis.handleFetchRequest ReplicaManager.fetchMessages ReplicaManager.readFromLocalLog Partition.fetchRecords Partition.updateFollowerFetchState Partition.maybeExpandIsr Partition.submitAlterPartition ... ... ... # If there is not enough data to respond and there is no remote data, we will let the fetch request wait for new data. # parks the request in the DelayedFetchPurgatory Another thread, runs Preferred-Leader-Election in controller (1003), since the replica 1002 joined the ISR list, it can be elected as the preferred leader. The controller sends LeaderAndIsr requests to all the brokers. KafkaController.processReplicaLeaderElection KafkaController.onReplicaElection PartitionStateMachine.handleStateChanges PartitionStateMachine.doHandleStateChanges PartitionStateMachine.electLeaderForPartitions ControllerChannelManager.sendRequestsToBrokers Replica 1002 got elected as Leader and have invalid highWatermark since it didn't processed the fetch-response from the previous leader 1001, throws OFFSET_OUT_OF_RANGE error when processing the LeaderAndIsr request. Note that if the LeaderAndIsr request fails to process for one partition, then the remaining partitions in that request won't be processed. KafkaApis.handleLeaderAndIsrRequest ReplicaManager.becomeLeaderOrFollower ReplicaManager.makeLeaders Partition.makeLeader Partition.maybeIncrementLeaderHW UnifiedLog.maybeIncrementHighWatermark (LeaderLog) UnifiedLog.fetchHighWatermarkMetadata The controller assumes that the current-leader for the tp0 is 1002, but the broker 1002 couldn't process the LISR. The controller retries the LISR until the broker becomes leader for tp0. During this time, the producers won't be able to send messages, as the node 1002, sends NOT_LEADER_FOR_PARTITION errorcode to the producer. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org