dajac commented on code in PR #13765: URL: https://github.com/apache/kafka/pull/13765#discussion_r1214750188
########## core/src/main/scala/kafka/cluster/Partition.scala: ########## @@ -1091,8 +1091,10 @@ class Partition(val topicPartition: TopicPartition, // Note here we are using the "maximal", see explanation above val replicaState = replica.stateSnapshot if (replicaState.logEndOffsetMetadata.messageOffset < newHighWatermark.messageOffset && - (replicaState.isCaughtUp(leaderLogEndOffset.messageOffset, currentTimeMs, replicaLagTimeMaxMs) - || partitionState.maximalIsr.contains(replica.brokerId))) { + ((replicaState.isCaughtUp(leaderLogEndOffset.messageOffset, currentTimeMs, replicaLagTimeMaxMs) && + isReplicaIsrEligible(replica.brokerId)) || Review Comment: Is it worth extracting this condition into an helper method (e.g. isIsrEligibleAndCaughtUp)? That would simplify the condition. ########## core/src/test/scala/unit/kafka/cluster/PartitionTest.scala: ########## @@ -1456,6 +1456,105 @@ class PartitionTest extends AbstractPartitionTest { assertEquals(alterPartitionListener.failures.get, 1) } + @ParameterizedTest + @ValueSource(strings = Array("fenced", "shutdown", "unfenced")) + def testHWMIncreasesWithFencedOrShutdownFollower(brokerState: String): Unit = { Review Comment: nit: s/HWM/HighWatermark? ########## core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala: ########## @@ -357,6 +363,51 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { } } + @ParameterizedTest + @ValueSource(strings = Array("zk", "kraft")) + def testSendToPartitionWithFollowerShutdown(quorum: String): Unit = { Review Comment: nit: `*ShouldNotTimeout`? it would be great to capture the issue in the test name or to add a comment about it. ########## metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java: ########## @@ -260,11 +265,17 @@ private void tryElection(PartitionChangeRecord record) { * NO_LEADER_CHANGE, a leader epoch bump will automatically occur. That takes care of * case 1. In this function, we check for cases 2 and 3, and handle them by manually * setting record.leader to the current leader. + * + * In MV before 3.6 there was a bug (KAFKA-15021) in the brokers' replica manager + * that required that the leader epoch be bump whenever the ISR shrank. In MV 3.6 this leader + * bump is not required when the ISR shrinks. */ void triggerLeaderEpochBumpIfNeeded(PartitionChangeRecord record) { Review Comment: For my understanding, do we bump the leader epoch when the ISR is expanded? My understanding is that we don't. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org