hachikuji commented on code in PR #12489:
URL: https://github.com/apache/kafka/pull/12489#discussion_r940721941


##########
core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala:
##########
@@ -1002,47 +1002,59 @@ class ControllerIntegrationTest extends 
QuorumTestHarness {
 
     val controller = getController().kafkaController
     val leaderIsrAndControllerEpochMap = 
zkClient.getTopicPartitionStates(Seq(tp))
-    val newLeaderAndIsr = leaderIsrAndControllerEpochMap(tp).leaderAndIsr
+    val oldLeaderAndIsr = leaderIsrAndControllerEpochMap(tp).leaderAndIsr
+    val newIsr = List(oldLeaderAndIsr.leader)
+    val newPartitionEpoch = oldLeaderAndIsr.partitionEpoch + 1
     val topicId = controller.controllerContext.topicIds(tp.topic)
     val brokerId = otherBroker.config.brokerId
     val brokerEpoch = 
controller.controllerContext.liveBrokerIdAndEpochs(otherBroker.config.brokerId)
 
-    // When re-sending the current ISR, we should not get and error or any ISR 
changes
-    val alterPartitionRequest = new AlterPartitionRequestData()
-      .setBrokerId(brokerId)
-      .setBrokerEpoch(brokerEpoch)
-      .setTopics(Seq(new AlterPartitionRequestData.TopicData()
-        .setTopicId(topicId)
-        .setPartitions(Seq(new AlterPartitionRequestData.PartitionData()
-          .setPartitionIndex(tp.partition)
-          .setLeaderEpoch(newLeaderAndIsr.leaderEpoch)
-          .setPartitionEpoch(newLeaderAndIsr.partitionEpoch)
-          .setNewIsr(newLeaderAndIsr.isr.map(Int.box).asJava)
-          .setLeaderRecoveryState(newLeaderAndIsr.leaderRecoveryState.value)
+    def sendAndVerifyAlterPartitionResponse(requestPartitionEpoch: Int): Unit 
= {
+      val alterPartitionRequest = new AlterPartitionRequestData()
+        .setBrokerId(brokerId)
+        .setBrokerEpoch(brokerEpoch)
+        .setTopics(Seq(new AlterPartitionRequestData.TopicData()
+          .setTopicId(topicId)
+          .setPartitions(Seq(new AlterPartitionRequestData.PartitionData()
+            .setPartitionIndex(tp.partition)
+            .setLeaderEpoch(oldLeaderAndIsr.leaderEpoch)
+            .setPartitionEpoch(requestPartitionEpoch)
+            .setNewIsr(newIsr.map(Int.box).asJava)
+            .setLeaderRecoveryState(oldLeaderAndIsr.leaderRecoveryState.value)
+          ).asJava)
         ).asJava)
-      ).asJava)
 
-    val future = new CompletableFuture[AlterPartitionResponseData]()
-    controller.eventManager.put(AlterPartitionReceived(
-      alterPartitionRequest,
-      AlterPartitionRequestData.HIGHEST_SUPPORTED_VERSION,
-      future.complete
-    ))
+      val future = new CompletableFuture[AlterPartitionResponseData]()
+      controller.eventManager.put(AlterPartitionReceived(
+        alterPartitionRequest,
+        AlterPartitionRequestData.HIGHEST_SUPPORTED_VERSION,
+        future.complete
+      ))
 
-    val expectedAlterPartitionResponse = new AlterPartitionResponseData()
-      .setTopics(Seq(new AlterPartitionResponseData.TopicData()
-        .setTopicId(topicId)
-        .setPartitions(Seq(new AlterPartitionResponseData.PartitionData()
-          .setPartitionIndex(tp.partition)
-          .setLeaderId(brokerId)
-          .setLeaderEpoch(newLeaderAndIsr.leaderEpoch)
-          .setPartitionEpoch(newLeaderAndIsr.partitionEpoch)
-          .setIsr(newLeaderAndIsr.isr.map(Int.box).asJava)
-          .setLeaderRecoveryState(newLeaderAndIsr.leaderRecoveryState.value)
+      // When re-sending an ISR update, we should not get and error or any ISR 
changes
+      val expectedAlterPartitionResponse = new AlterPartitionResponseData()
+        .setTopics(Seq(new AlterPartitionResponseData.TopicData()
+          .setTopicId(topicId)
+          .setPartitions(Seq(new AlterPartitionResponseData.PartitionData()
+            .setPartitionIndex(tp.partition)
+            .setLeaderId(brokerId)
+            .setLeaderEpoch(oldLeaderAndIsr.leaderEpoch)
+            .setPartitionEpoch(newPartitionEpoch)
+            .setIsr(newIsr.map(Int.box).asJava)
+            .setLeaderRecoveryState(oldLeaderAndIsr.leaderRecoveryState.value)
+          ).asJava)
         ).asJava)
-      ).asJava)
+      assertEquals(expectedAlterPartitionResponse, future.get(10, 
TimeUnit.SECONDS))
+    }
 
-    assertEquals(expectedAlterPartitionResponse, future.get(10, 
TimeUnit.SECONDS))
+    // send a request, expect the partition epoch to be incremented
+    sendAndVerifyAlterPartitionResponse(oldLeaderAndIsr.partitionEpoch)
+
+    // re-send the same request with various partition epochs 
(less/equal/greater than the current
+    // epoch), expect it to succeed while the partition epoch remains the same
+    sendAndVerifyAlterPartitionResponse(oldLeaderAndIsr.partitionEpoch)
+    sendAndVerifyAlterPartitionResponse(newPartitionEpoch)
+    sendAndVerifyAlterPartitionResponse(newPartitionEpoch + 1)

Review Comment:
   I do find it a little odd that the partition epoch is ignored completely 
when the ISR matches the desired state. We do have the leader epoch check, so 
at least we can ensure that an old leader won't be mislead into thinking that 
its change was successfully applied. How about a case when the request is sent 
to an old controller? Suppose a scenario like this:
   
   1. Controller A has leader=1, isr=[1,2], partition epoch=10
   2. Controller B is elected
   3. Leader sends AlterPartition(epoch=10) to B to remove 2 from ISR => 
partition epoch = 11
   4. Leader sends AlterPartition(epoch=11) to A to add 2 back to the ISR => A 
accepts, but there is no bump
   
   I think this case is ruled out because the leader has to find the new 
controller and then revert back. The controller epoch probably would catch that 
case. What if we add a restart between steps 3 and 4? Would it be possible to 
find the old controller after restarting? Probably not, but I think I'd sleep 
better if we could at least reject requests where the partition epoch is 
greater than what the controller has in its cache. Does that make sense?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to