This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 62c2880a584 KAFKA-14144:; Compare AlterPartition LeaderAndIsr before fencing partition epoch (#12489) 62c2880a584 is described below commit 62c2880a584dee52af52d74d7f0ec44475528c72 Author: David Mao <47232755+sple...@users.noreply.github.com> AuthorDate: Tue Aug 9 08:55:38 2022 -0700 KAFKA-14144:; Compare AlterPartition LeaderAndIsr before fencing partition epoch (#12489) This PR fixes an AlterPartition regression introduced in https://github.com/apache/kafka/pull/12032 When an AlterPartition request succeeds, the partition epoch gets bumped. In Zk controller mode the sender also relies on the AlterPartition response to be informed of the new partition epoch. If the sender times out the request before a response is sent, the sender will have a stale partition epoch compared to the ZK controller state and will be fenced on subsequent AlterPartition request attempts. The sender will not receive an updated partition epoch until it receives a LeaderAndIsr request for controller-initiated ISR changes. Reviewers: Jason Gustafson <ja...@confluent.io> --- core/src/main/scala/kafka/api/LeaderAndIsr.scala | 5 +- .../scala/kafka/controller/KafkaController.scala | 11 +-- .../controller/ControllerIntegrationTest.scala | 82 +++++++++++++--------- 3 files changed, 58 insertions(+), 40 deletions(-) diff --git a/core/src/main/scala/kafka/api/LeaderAndIsr.scala b/core/src/main/scala/kafka/api/LeaderAndIsr.scala index dd1d381a144..da68cdb479c 100644 --- a/core/src/main/scala/kafka/api/LeaderAndIsr.scala +++ b/core/src/main/scala/kafka/api/LeaderAndIsr.scala @@ -62,7 +62,7 @@ case class LeaderAndIsr( if (leader == LeaderAndIsr.NoLeader) None else Some(leader) } - def equalsIgnorePartitionEpoch(other: LeaderAndIsr): Boolean = { + def equalsAllowStalePartitionEpoch(other: LeaderAndIsr): Boolean = { if (this == other) { true } else if (other == null) { @@ -71,7 +71,8 @@ case class LeaderAndIsr( leader == other.leader && leaderEpoch == other.leaderEpoch && isr.equals(other.isr) && - leaderRecoveryState == other.leaderRecoveryState + leaderRecoveryState == other.leaderRecoveryState && + partitionEpoch <= other.partitionEpoch } } diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 8d16eb7e1da..d179bcd6ca4 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -2339,14 +2339,15 @@ class KafkaController(val config: KafkaConfig, if (newLeaderAndIsr.leaderEpoch != currentLeaderAndIsr.leaderEpoch) { partitionResponses(tp) = Left(Errors.FENCED_LEADER_EPOCH) None - } else if (newLeaderAndIsr.partitionEpoch < currentLeaderAndIsr.partitionEpoch) { - partitionResponses(tp) = Left(Errors.INVALID_UPDATE_VERSION) - None - } else if (newLeaderAndIsr.equalsIgnorePartitionEpoch(currentLeaderAndIsr)) { + } else if (newLeaderAndIsr.equalsAllowStalePartitionEpoch(currentLeaderAndIsr)) { // If a partition is already in the desired state, just return it + // this check must be done before fencing based on partition epoch to maintain idempotency partitionResponses(tp) = Right(currentLeaderAndIsr) None - } else if (newLeaderAndIsr.leaderRecoveryState == LeaderRecoveryState.RECOVERING && newLeaderAndIsr.isr.length > 1) { + } else if (newLeaderAndIsr.partitionEpoch != currentLeaderAndIsr.partitionEpoch) { + partitionResponses(tp) = Left(Errors.INVALID_UPDATE_VERSION) + None + } else if (newLeaderAndIsr.leaderRecoveryState == LeaderRecoveryState.RECOVERING && newLeaderAndIsr.isr.length > 1) { partitionResponses(tp) = Left(Errors.INVALID_REQUEST) info( s"Rejecting AlterPartition from node $brokerId for $tp because leader is recovering and ISR is greater than 1: " + diff --git a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala index 57cbeafd4d0..76188894371 100644 --- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala @@ -1002,47 +1002,58 @@ class ControllerIntegrationTest extends QuorumTestHarness { val controller = getController().kafkaController val leaderIsrAndControllerEpochMap = zkClient.getTopicPartitionStates(Seq(tp)) - val newLeaderAndIsr = leaderIsrAndControllerEpochMap(tp).leaderAndIsr + val oldLeaderAndIsr = leaderIsrAndControllerEpochMap(tp).leaderAndIsr + val newIsr = List(oldLeaderAndIsr.leader) + val newPartitionEpoch = oldLeaderAndIsr.partitionEpoch + 1 val topicId = controller.controllerContext.topicIds(tp.topic) val brokerId = otherBroker.config.brokerId val brokerEpoch = controller.controllerContext.liveBrokerIdAndEpochs(otherBroker.config.brokerId) - // When re-sending the current ISR, we should not get and error or any ISR changes - val alterPartitionRequest = new AlterPartitionRequestData() - .setBrokerId(brokerId) - .setBrokerEpoch(brokerEpoch) - .setTopics(Seq(new AlterPartitionRequestData.TopicData() - .setTopicId(topicId) - .setPartitions(Seq(new AlterPartitionRequestData.PartitionData() - .setPartitionIndex(tp.partition) - .setLeaderEpoch(newLeaderAndIsr.leaderEpoch) - .setPartitionEpoch(newLeaderAndIsr.partitionEpoch) - .setNewIsr(newLeaderAndIsr.isr.map(Int.box).asJava) - .setLeaderRecoveryState(newLeaderAndIsr.leaderRecoveryState.value) + def sendAndVerifyAlterPartitionResponse(requestPartitionEpoch: Int): Unit = { + val alterPartitionRequest = new AlterPartitionRequestData() + .setBrokerId(brokerId) + .setBrokerEpoch(brokerEpoch) + .setTopics(Seq(new AlterPartitionRequestData.TopicData() + .setTopicId(topicId) + .setPartitions(Seq(new AlterPartitionRequestData.PartitionData() + .setPartitionIndex(tp.partition) + .setLeaderEpoch(oldLeaderAndIsr.leaderEpoch) + .setPartitionEpoch(requestPartitionEpoch) + .setNewIsr(newIsr.map(Int.box).asJava) + .setLeaderRecoveryState(oldLeaderAndIsr.leaderRecoveryState.value) + ).asJava) ).asJava) - ).asJava) - val future = new CompletableFuture[AlterPartitionResponseData]() - controller.eventManager.put(AlterPartitionReceived( - alterPartitionRequest, - AlterPartitionRequestData.HIGHEST_SUPPORTED_VERSION, - future.complete - )) + val future = new CompletableFuture[AlterPartitionResponseData]() + controller.eventManager.put(AlterPartitionReceived( + alterPartitionRequest, + AlterPartitionRequestData.HIGHEST_SUPPORTED_VERSION, + future.complete + )) - val expectedAlterPartitionResponse = new AlterPartitionResponseData() - .setTopics(Seq(new AlterPartitionResponseData.TopicData() - .setTopicId(topicId) - .setPartitions(Seq(new AlterPartitionResponseData.PartitionData() - .setPartitionIndex(tp.partition) - .setLeaderId(brokerId) - .setLeaderEpoch(newLeaderAndIsr.leaderEpoch) - .setPartitionEpoch(newLeaderAndIsr.partitionEpoch) - .setIsr(newLeaderAndIsr.isr.map(Int.box).asJava) - .setLeaderRecoveryState(newLeaderAndIsr.leaderRecoveryState.value) + // When re-sending an ISR update, we should not get and error or any ISR changes + val expectedAlterPartitionResponse = new AlterPartitionResponseData() + .setTopics(Seq(new AlterPartitionResponseData.TopicData() + .setTopicId(topicId) + .setPartitions(Seq(new AlterPartitionResponseData.PartitionData() + .setPartitionIndex(tp.partition) + .setLeaderId(brokerId) + .setLeaderEpoch(oldLeaderAndIsr.leaderEpoch) + .setPartitionEpoch(newPartitionEpoch) + .setIsr(newIsr.map(Int.box).asJava) + .setLeaderRecoveryState(oldLeaderAndIsr.leaderRecoveryState.value) + ).asJava) ).asJava) - ).asJava) + assertEquals(expectedAlterPartitionResponse, future.get(10, TimeUnit.SECONDS)) + } - assertEquals(expectedAlterPartitionResponse, future.get(10, TimeUnit.SECONDS)) + // send a request, expect the partition epoch to be incremented + sendAndVerifyAlterPartitionResponse(oldLeaderAndIsr.partitionEpoch) + + // re-send the same request with various partition epochs (less/equal/greater than the current + // epoch), expect it to succeed while the partition epoch remains the same + sendAndVerifyAlterPartitionResponse(oldLeaderAndIsr.partitionEpoch) + sendAndVerifyAlterPartitionResponse(newPartitionEpoch) } @Test @@ -1100,7 +1111,6 @@ class ControllerIntegrationTest extends QuorumTestHarness { assertAlterPartition( partitionError = Errors.UNKNOWN_TOPIC_ID, - topicPartition = tp, topicIdOpt = Some(Uuid.randomUuid()) ) @@ -1118,9 +1128,15 @@ class ControllerIntegrationTest extends QuorumTestHarness { assertAlterPartition( partitionError = Errors.INVALID_UPDATE_VERSION, + isr = Set(leaderId), partitionEpoch = partitionEpoch - 1 ) + assertAlterPartition( + partitionError = Errors.INVALID_UPDATE_VERSION, + partitionEpoch = partitionEpoch + 1 + ) + assertAlterPartition( partitionError = Errors.FENCED_LEADER_EPOCH, leaderEpoch = leaderEpoch - 1