junrao commented on code in PR #12487: URL: https://github.com/apache/kafka/pull/12487#discussion_r939547038
########## core/src/main/scala/kafka/cluster/Partition.scala: ########## @@ -881,11 +881,14 @@ class Partition(val topicPartition: TopicPartition, private def isReplicaIsrEligible(followerReplicaId: Int): Boolean = { metadataCache match { // In KRaft mode, only replicas which are not fenced nor in controlled shutdown are - // allowed to join the ISR. This does not apply to ZK mode. + // allowed to join the ISR. In ZK mode, we just ensure the broker is alive and not shutting down. Review Comment: In ControllerChannelManager.sendUpdateMetadataRequests(), it seems that we include shutting down broker in liveBrokers. So, we won't know whether a remote broker is shutting down. ########## core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala: ########## @@ -1045,6 +1044,73 @@ class ControllerIntegrationTest extends QuorumTestHarness { assertEquals(expectedAlterPartitionResponse, future.get(10, TimeUnit.SECONDS)) } + @Test + def testShutdownBrokerNotAddedToIsr(): Unit = { + servers = makeServers(2) + val controllerId = TestUtils.waitUntilControllerElected(zkClient) + val otherBroker = servers.find(_.config.brokerId != controllerId).get + val brokerId = otherBroker.config.brokerId + val tp = new TopicPartition("t", 0) + val assignment = Map(tp.partition -> Seq(controllerId, brokerId)) + val fullIsr = List(controllerId, brokerId) + TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers) + + // Shut down follower. + servers(brokerId).shutdown() + servers(brokerId).awaitShutdown() + + val controller = getController().kafkaController + val leaderIsrAndControllerEpochMap = zkClient.getTopicPartitionStates(Seq(tp)) + val newLeaderAndIsr = leaderIsrAndControllerEpochMap(tp).leaderAndIsr + val topicId = controller.controllerContext.topicIds(tp.topic) + val controllerEpoch = controller.controllerContext.liveBrokerIdAndEpochs(controllerId) + + // We expect only the controller (online broker) to be in ISR + assertEquals(List(controllerId), newLeaderAndIsr.isr) + + // Try to update ISR to contain the offline broker. + val alterPartitionRequest = new AlterPartitionRequestData() + .setBrokerId(controllerId) + .setBrokerEpoch(controllerEpoch) + .setTopics(Seq(new AlterPartitionRequestData.TopicData() + .setTopicId(topicId) + .setPartitions(Seq(new AlterPartitionRequestData.PartitionData() + .setPartitionIndex(tp.partition) + .setLeaderEpoch(newLeaderAndIsr.leaderEpoch) + .setPartitionEpoch(newLeaderAndIsr.partitionEpoch) + .setNewIsr(fullIsr.map(Int.box).asJava) + .setLeaderRecoveryState(newLeaderAndIsr.leaderRecoveryState.value) + ).asJava) + ).asJava) + + val future = new CompletableFuture[AlterPartitionResponseData]() + controller.eventManager.put(AlterPartitionReceived( + alterPartitionRequest, + AlterPartitionRequestData.HIGHEST_SUPPORTED_VERSION, + future.complete + )) + + // We expect an ineligble replica error response for the partition. + val expectedAlterPartitionResponse = new AlterPartitionResponseData() + .setTopics(Seq(new AlterPartitionResponseData.TopicData() + .setTopicId(topicId) + .setPartitions(Seq(new AlterPartitionResponseData.PartitionData() + .setPartitionIndex(tp.partition) + .setErrorCode(Errors.INELIGIBLE_REPLICA.code()) + .setLeaderRecoveryState(newLeaderAndIsr.leaderRecoveryState.value) + ).asJava) + ).asJava) + + assertEquals(expectedAlterPartitionResponse, future.get(10, TimeUnit.SECONDS)) + assertEquals(List(controllerId), newLeaderAndIsr.isr) Review Comment: Should we read leaderAndIsr from controller again? ########## core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala: ########## @@ -1045,6 +1044,73 @@ class ControllerIntegrationTest extends QuorumTestHarness { assertEquals(expectedAlterPartitionResponse, future.get(10, TimeUnit.SECONDS)) } + @Test + def testShutdownBrokerNotAddedToIsr(): Unit = { + servers = makeServers(2) + val controllerId = TestUtils.waitUntilControllerElected(zkClient) + val otherBroker = servers.find(_.config.brokerId != controllerId).get + val brokerId = otherBroker.config.brokerId + val tp = new TopicPartition("t", 0) + val assignment = Map(tp.partition -> Seq(controllerId, brokerId)) + val fullIsr = List(controllerId, brokerId) + TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers) + + // Shut down follower. + servers(brokerId).shutdown() + servers(brokerId).awaitShutdown() + + val controller = getController().kafkaController + val leaderIsrAndControllerEpochMap = zkClient.getTopicPartitionStates(Seq(tp)) Review Comment: Could we just get the leaderAndIsr from the controller? It's cheaper than reading from ZK. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org