tinaselenge commented on code in PR #14846: URL: https://github.com/apache/kafka/pull/14846#discussion_r1441637951
########## core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala: ########## @@ -187,164 +197,233 @@ class DeleteTopicTest extends QuorumTestHarness { }.toSet } - @Test - def testIncreasePartitionCountDuringDeleteTopic(): Unit = { - val topic = "test" + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testIncreasePartitionCountDuringDeleteTopic(quorum: String): Unit = { val topicPartition = new TopicPartition(topic, 0) - val brokerConfigs = TestUtils.createBrokerConfigs(4, zkConnect, false) - brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true")) - // create brokers - val allServers = brokerConfigs.map(b => TestUtils.createServer(KafkaConfig.fromProps(b))) - this.servers = allServers - val servers = allServers.filter(s => expectedReplicaAssignment(0).contains(s.config.brokerId)) - // create the topic - TestUtils.createTopic(zkClient, topic, expectedReplicaAssignment, servers) - // wait until replica log is created on every broker - TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager.getLog(topicPartition).isDefined), - "Replicas for topic test not created.") - // shutdown a broker to make sure the following topic deletion will be suspended - val leaderIdOpt = zkClient.getLeaderForPartition(topicPartition) - assertTrue(leaderIdOpt.isDefined, "Leader should exist for partition [test,0]") - val follower = servers.filter(s => s.config.brokerId != leaderIdOpt.get).last - follower.shutdown() - // start topic deletion - adminZkClient.deleteTopic(topic) - // make sure deletion of all of the topic's replicas have been tried - ensureControllerExists() - val (controller, controllerId) = getController() - val allReplicasForTopic = getAllReplicasFromAssignment(topic, expectedReplicaAssignment) - TestUtils.waitUntilTrue(() => { - val replicasInDeletionSuccessful = controller.kafkaController.controllerContext.replicasInState(topic, ReplicaDeletionSuccessful) - val offlineReplicas = controller.kafkaController.controllerContext.replicasInState(topic, OfflineReplica) - allReplicasForTopic == (replicasInDeletionSuccessful union offlineReplicas) - }, s"Not all replicas for topic $topic are in states of either ReplicaDeletionSuccessful or OfflineReplica") + if (isKRaftTest()) { + val topicPartition = new TopicPartition(topic, 0) + val allBrokers = createTestTopicAndCluster(topic, 4, deleteTopicEnabled = true) + this.brokers = allBrokers + val partitionHostingBrokers = allBrokers.filter(b => expectedReplicaAssignment(0).contains(b.config.brokerId)) + + // wait until replica log is created on every broker + TestUtils.waitUntilTrue(() => partitionHostingBrokers.forall(_.logManager.getLog(topicPartition).isDefined), + "Replicas for topic test not created.") + + // shutdown a broker to make sure the following topic deletion will be suspended + val leaderIdOpt = TestUtils.waitUntilLeaderIsKnown(partitionHostingBrokers, topicPartition) + val follower = partitionHostingBrokers.filter(s => s.config.brokerId != leaderIdOpt).last + follower.shutdown() + // start topic deletion + admin.deleteTopics(Collections.singletonList(topic)).all().get() + + // increase the partition count for topic + val props = new Properties() + props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.plaintextBootstrapServers(partitionHostingBrokers)) + TestUtils.resource(Admin.create(props)) { adminClient => + try { + adminClient.createPartitions(Map(topic -> NewPartitions.increaseTo(2)).asJava).all().get() + } catch { + case _: ExecutionException => + } + } - // increase the partition count for topic - val props = new Properties() - props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.plaintextBootstrapServers(servers)) - TestUtils.resource(Admin.create(props)) { adminClient => - try { - adminClient.createPartitions(Map(topic -> NewPartitions.increaseTo(2)).asJava).all().get() - } catch { - case _: ExecutionException => + // bring back the failed broker + follower.startup() + TestUtils.verifyTopicDeletion(null, topic, 2, partitionHostingBrokers) + } else { + val brokerConfigs = TestUtils.createBrokerConfigs(4, zkConnect, false) + brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true")) + // create brokers + val allServers = brokerConfigs.map(b => TestUtils.createServer(KafkaConfig.fromProps(b))) + this.servers = allServers + val partitionHostingServers = allServers.filter(s => expectedReplicaAssignment(0).contains(s.config.brokerId)) + // create the topic + TestUtils.createTopic(zkClient, topic, expectedReplicaAssignment, partitionHostingServers) + // wait until replica log is created on every broker + TestUtils.waitUntilTrue(() => partitionHostingServers.forall(_.getLogManager.getLog(topicPartition).isDefined), + "Replicas for topic test not created.") + + // shutdown a broker to make sure the following topic deletion will be suspended + val leaderIdOpt = zkClient.getLeaderForPartition(topicPartition) + assertTrue(leaderIdOpt.isDefined, "Leader should exist for partition [test,0]") + val follower = partitionHostingServers.filter(s => s.config.brokerId != leaderIdOpt.get).last + follower.shutdown() + // start topic deletion + adminZkClient.deleteTopic(topic) + + // make sure deletion of all of the topic's replicas have been tried + ensureControllerExists() + val (controller, controllerId) = getController() + val allReplicasForTopic = getAllReplicasFromAssignment(topic, expectedReplicaAssignment) + TestUtils.waitUntilTrue(() => { + val replicasInDeletionSuccessful = controller.kafkaController.controllerContext.replicasInState(topic, ReplicaDeletionSuccessful) + val offlineReplicas = controller.kafkaController.controllerContext.replicasInState(topic, OfflineReplica) + allReplicasForTopic == (replicasInDeletionSuccessful union offlineReplicas) + }, s"Not all replicas for topic $topic are in states of either ReplicaDeletionSuccessful or OfflineReplica") + + // increase the partition count for topic + val props = new Properties() + props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.plaintextBootstrapServers(partitionHostingServers)) + TestUtils.resource(Admin.create(props)) { adminClient => + try { + adminClient.createPartitions(Map(topic -> NewPartitions.increaseTo(2)).asJava).all().get() + } catch { + case _: ExecutionException => + } } + // trigger a controller switch now + val previousControllerId = controllerId + + controller.shutdown() + + ensureControllerExists() + // wait until a new controller to show up + TestUtils.waitUntilTrue(() => { + val (newController, newControllerId) = getController() + newControllerId != previousControllerId + }, "The new controller should not have the failed controller id") + + // bring back the failed brokers + follower.startup() + controller.startup() + TestUtils.verifyTopicDeletion(zkClient, topic, 2, partitionHostingServers) } - // trigger a controller switch now - val previousControllerId = controllerId - - controller.shutdown() - - ensureControllerExists() - // wait until a new controller to show up - TestUtils.waitUntilTrue(() => { - val (newController, newControllerId) = getController() - newControllerId != previousControllerId - }, "The new controller should not have the failed controller id") - - // bring back the failed brokers - follower.startup() - controller.startup() - TestUtils.verifyTopicDeletion(zkClient, topic, 2, servers) } - - @Test - def testDeleteTopicDuringAddPartition(): Unit = { - val topic = "test" - servers = createTestTopicAndCluster(topic) - val leaderIdOpt = zkClient.getLeaderForPartition(new TopicPartition(topic, 0)) - assertTrue(leaderIdOpt.isDefined, "Leader should exist for partition [test,0]") - val follower = servers.filter(_.config.brokerId != leaderIdOpt.get).last + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testDeleteTopicDuringAddPartition(quorum: String): Unit = { + brokers = createTestTopicAndCluster(topic) + val leaderIdOpt = TestUtils.waitUntilLeaderIsKnown(brokers, new TopicPartition(topic, 0)) + val follower = brokers.filter(_.config.brokerId != leaderIdOpt).last val newPartition = new TopicPartition(topic, 1) - // capture the brokers before we shutdown so that we don't fail validation in `addPartitions` - val brokers = adminZkClient.getBrokerMetadatas() - follower.shutdown() - // wait until the broker has been removed from ZK to reduce non-determinism - TestUtils.waitUntilTrue(() => zkClient.getBroker(follower.config.brokerId).isEmpty, - s"Follower ${follower.config.brokerId} was not removed from ZK") - // add partitions to topic - adminZkClient.addPartitions(topic, expectedReplicaFullAssignment, brokers, 2, - Some(Map(1 -> Seq(0, 1, 2), 2 -> Seq(0, 1, 2)))) + + if (isKRaftTest()) { Review Comment: The type of `follower` object is different, depending on which mode. In KRaft it is KafkaBroker and in ZK it is KafkaServer, even though they both support shutdown() method, I'm not sure we can define this object with different types. I did try this, but ended up with many if statements and it didn't quite work. So I placed them into only two branches since the only common code was increasePartitions(). I'm happy to move increasePartitions() out of the if else. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org