dajac commented on code in PR #12487:
URL: https://github.com/apache/kafka/pull/12487#discussion_r941077938


##########
core/src/test/scala/unit/kafka/cluster/PartitionTest.scala:
##########
@@ -1375,8 +1377,11 @@ class PartitionTest extends AbstractPartitionTest {
     assertEquals(alterPartitionListener.failures.get, 1)
   }
 
-  @Test
-  def testIsrNotExpandedIfReplicaIsFenced(): Unit = {
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testIsrNotExpandedIfReplicaIsFenced(quorum: String): Unit = {

Review Comment:
   nit: Should we update the test name as well?



##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -881,11 +881,14 @@ class Partition(val topicPartition: TopicPartition,
   private def isReplicaIsrEligible(followerReplicaId: Int): Boolean = {
     metadataCache match {
       // In KRaft mode, only replicas which are not fenced nor in controlled 
shutdown are
-      // allowed to join the ISR. This does not apply to ZK mode.
+      // allowed to join the ISR. In ZK mode, we just ensure the broker is 
alive and not shutting down.

Review Comment:
   This basically means that the leader will retry adding back the 
shutting-down broker to the ISR until the shutting-down broker is removed from 
the metadata cache. It is worth noting that, during this time, other replicas 
cannot be added back to the ISR. The controller rejects any ISR expansion 
containing at least one ineligible replica. This is why we added that 
in-controller-shutdown state in KRaft. It allows the leader to filter them out 
as soon.
   
   This may be acceptable here. Otherwise, we would have to propagate the 
shutting-down brokers via the UpdateMetadataRequest. What do others think?
   



##########
core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala:
##########
@@ -1045,6 +1049,78 @@ class ControllerIntegrationTest extends 
QuorumTestHarness {
     assertEquals(expectedAlterPartitionResponse, future.get(10, 
TimeUnit.SECONDS))
   }
 
+  @ParameterizedTest
+  @MethodSource(Array("testAlterPartitionVersionSource"))
+  def testShutdownBrokerNotAddedToIsr(alterPartitionVersion: Short): Unit = {
+    servers = makeServers(2)
+    val controllerId = TestUtils.waitUntilControllerElected(zkClient)
+    val otherBroker = servers.find(_.config.brokerId != controllerId).get
+    val brokerId = otherBroker.config.brokerId
+    val tp = new TopicPartition("t", 0)
+    val assignment = Map(tp.partition -> Seq(controllerId, brokerId))
+    val fullIsr = List(controllerId, brokerId)
+    TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = 
assignment, servers = servers)
+
+    // Shut down follower.
+    servers(brokerId).shutdown()
+    servers(brokerId).awaitShutdown()
+
+    val controller = getController().kafkaController
+    val leaderIsrAndControllerEpochMap = 
controller.controllerContext.partitionsLeadershipInfo
+    val leaderAndIsr = leaderIsrAndControllerEpochMap(tp).leaderAndIsr
+    val topicId = controller.controllerContext.topicIds(tp.topic)
+    val controllerEpoch = 
controller.controllerContext.liveBrokerIdAndEpochs(controllerId)
+
+    // We expect only the controller (online broker) to be in ISR
+    assertEquals(List(controllerId), leaderAndIsr.isr)
+
+    val requestTopic = new AlterPartitionRequestData.TopicData()
+      .setPartitions(Seq(new AlterPartitionRequestData.PartitionData()
+        .setPartitionIndex(tp.partition)
+        .setLeaderEpoch(leaderAndIsr.leaderEpoch)
+        .setPartitionEpoch(leaderAndIsr.partitionEpoch)
+        .setNewIsr(fullIsr.map(Int.box).asJava)
+        
.setLeaderRecoveryState(leaderAndIsr.leaderRecoveryState.value)).asJava)
+    if (alterPartitionVersion > 1) requestTopic.setTopicId(topicId) else 
requestTopic.setTopicName(tp.topic)
+
+    // Try to update ISR to contain the offline broker.
+    val alterPartitionRequest = new AlterPartitionRequestData()
+      .setBrokerId(controllerId)
+      .setBrokerEpoch(controllerEpoch)
+      .setTopics(Seq(requestTopic).asJava)
+
+    val future = new CompletableFuture[AlterPartitionResponseData]()
+    controller.eventManager.put(AlterPartitionReceived(
+      alterPartitionRequest,
+      alterPartitionVersion,
+      future.complete
+    ))

Review Comment:
   nit: This piece of code is used in multiple places now. I wonder if it is 
worth pulling it in a helper method. What do you think?



##########
core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala:
##########
@@ -1045,6 +1049,78 @@ class ControllerIntegrationTest extends 
QuorumTestHarness {
     assertEquals(expectedAlterPartitionResponse, future.get(10, 
TimeUnit.SECONDS))
   }
 
+  @ParameterizedTest
+  @MethodSource(Array("testAlterPartitionVersionSource"))
+  def testShutdownBrokerNotAddedToIsr(alterPartitionVersion: Short): Unit = {
+    servers = makeServers(2)
+    val controllerId = TestUtils.waitUntilControllerElected(zkClient)
+    val otherBroker = servers.find(_.config.brokerId != controllerId).get
+    val brokerId = otherBroker.config.brokerId
+    val tp = new TopicPartition("t", 0)
+    val assignment = Map(tp.partition -> Seq(controllerId, brokerId))
+    val fullIsr = List(controllerId, brokerId)
+    TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = 
assignment, servers = servers)
+
+    // Shut down follower.
+    servers(brokerId).shutdown()
+    servers(brokerId).awaitShutdown()
+
+    val controller = getController().kafkaController
+    val leaderIsrAndControllerEpochMap = 
controller.controllerContext.partitionsLeadershipInfo
+    val leaderAndIsr = leaderIsrAndControllerEpochMap(tp).leaderAndIsr
+    val topicId = controller.controllerContext.topicIds(tp.topic)
+    val controllerEpoch = 
controller.controllerContext.liveBrokerIdAndEpochs(controllerId)
+
+    // We expect only the controller (online broker) to be in ISR
+    assertEquals(List(controllerId), leaderAndIsr.isr)
+
+    val requestTopic = new AlterPartitionRequestData.TopicData()
+      .setPartitions(Seq(new AlterPartitionRequestData.PartitionData()
+        .setPartitionIndex(tp.partition)
+        .setLeaderEpoch(leaderAndIsr.leaderEpoch)
+        .setPartitionEpoch(leaderAndIsr.partitionEpoch)
+        .setNewIsr(fullIsr.map(Int.box).asJava)
+        
.setLeaderRecoveryState(leaderAndIsr.leaderRecoveryState.value)).asJava)
+    if (alterPartitionVersion > 1) requestTopic.setTopicId(topicId) else 
requestTopic.setTopicName(tp.topic)
+
+    // Try to update ISR to contain the offline broker.
+    val alterPartitionRequest = new AlterPartitionRequestData()
+      .setBrokerId(controllerId)
+      .setBrokerEpoch(controllerEpoch)
+      .setTopics(Seq(requestTopic).asJava)
+
+    val future = new CompletableFuture[AlterPartitionResponseData]()
+    controller.eventManager.put(AlterPartitionReceived(
+      alterPartitionRequest,
+      alterPartitionVersion,
+      future.complete
+    ))
+
+    val error = if (alterPartitionVersion > 1) Errors.INELIGIBLE_REPLICA else 
Errors.OPERATION_NOT_ATTEMPTED
+    val responseTopic = new AlterPartitionResponseData.TopicData()

Review Comment:
   nit: Should we use `expectedError` and `expectedResponseTopic`?



##########
core/src/main/scala/kafka/controller/KafkaController.scala:
##########
@@ -2363,7 +2363,23 @@ class KafkaController(val config: KafkaConfig,
             )
             None
           } else {
-            Some(tp -> newLeaderAndIsr)
+            // Pull out replicas being added to ISR and verify they are all 
online

Review Comment:
   nit: `.` at the end of this sentence.



##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -881,11 +881,14 @@ class Partition(val topicPartition: TopicPartition,
   private def isReplicaIsrEligible(followerReplicaId: Int): Boolean = {
     metadataCache match {
       // In KRaft mode, only replicas which are not fenced nor in controlled 
shutdown are
-      // allowed to join the ISR. This does not apply to ZK mode.
+      // allowed to join the ISR. In ZK mode, we just ensure the broker is 
alive and not shutting down.
       case kRaftMetadataCache: KRaftMetadataCache =>
         !kRaftMetadataCache.isBrokerFenced(followerReplicaId) &&
           !kRaftMetadataCache.isBrokerShuttingDown(followerReplicaId)
 
+      case zkMetadataCache: ZkMetadataCache =>

Review Comment:
   nit: Could we add the part of the comment related to ZK above this line?



##########
core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala:
##########
@@ -1045,6 +1049,78 @@ class ControllerIntegrationTest extends 
QuorumTestHarness {
     assertEquals(expectedAlterPartitionResponse, future.get(10, 
TimeUnit.SECONDS))
   }
 
+  @ParameterizedTest
+  @MethodSource(Array("testAlterPartitionVersionSource"))

Review Comment:
   nit: You could use `@ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION)`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to