dengziming commented on code in PR #14846:
URL: https://github.com/apache/kafka/pull/14846#discussion_r1440196006


##########
core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala:
##########
@@ -187,164 +197,233 @@ class DeleteTopicTest extends QuorumTestHarness {
     }.toSet
   }
 
-  @Test
-  def testIncreasePartitionCountDuringDeleteTopic(): Unit = {
-    val topic = "test"
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testIncreasePartitionCountDuringDeleteTopic(quorum: String): Unit = {
     val topicPartition = new TopicPartition(topic, 0)
-    val brokerConfigs = TestUtils.createBrokerConfigs(4, zkConnect, false)
-    brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true"))
-    // create brokers
-    val allServers = brokerConfigs.map(b => 
TestUtils.createServer(KafkaConfig.fromProps(b)))
-    this.servers = allServers
-    val servers = allServers.filter(s => 
expectedReplicaAssignment(0).contains(s.config.brokerId))
-    // create the topic
-    TestUtils.createTopic(zkClient, topic, expectedReplicaAssignment, servers)
-    // wait until replica log is created on every broker
-    TestUtils.waitUntilTrue(() => 
servers.forall(_.getLogManager.getLog(topicPartition).isDefined),
-      "Replicas for topic test not created.")
-    // shutdown a broker to make sure the following topic deletion will be 
suspended
-    val leaderIdOpt = zkClient.getLeaderForPartition(topicPartition)
-    assertTrue(leaderIdOpt.isDefined, "Leader should exist for partition 
[test,0]")
-    val follower = servers.filter(s => s.config.brokerId != 
leaderIdOpt.get).last
-    follower.shutdown()
-    // start topic deletion
-    adminZkClient.deleteTopic(topic)
 
-    // make sure deletion of all of the topic's replicas have been tried
-    ensureControllerExists()
-    val (controller, controllerId) = getController()
-    val allReplicasForTopic = getAllReplicasFromAssignment(topic, 
expectedReplicaAssignment)
-    TestUtils.waitUntilTrue(() => {
-      val replicasInDeletionSuccessful = 
controller.kafkaController.controllerContext.replicasInState(topic, 
ReplicaDeletionSuccessful)
-      val offlineReplicas = 
controller.kafkaController.controllerContext.replicasInState(topic, 
OfflineReplica)
-      allReplicasForTopic == (replicasInDeletionSuccessful union 
offlineReplicas)
-    }, s"Not all replicas for topic $topic are in states of either 
ReplicaDeletionSuccessful or OfflineReplica")
+    if (isKRaftTest()) {
+      val topicPartition = new TopicPartition(topic, 0)
+      val allBrokers = createTestTopicAndCluster(topic, 4, deleteTopicEnabled 
= true)
+      this.brokers = allBrokers
+      val partitionHostingBrokers = allBrokers.filter(b => 
expectedReplicaAssignment(0).contains(b.config.brokerId))
+
+      // wait until replica log is created on every broker
+      TestUtils.waitUntilTrue(() => 
partitionHostingBrokers.forall(_.logManager.getLog(topicPartition).isDefined),
+        "Replicas for topic test not created.")
+
+      // shutdown a broker to make sure the following topic deletion will be 
suspended
+      val leaderIdOpt = 
TestUtils.waitUntilLeaderIsKnown(partitionHostingBrokers, topicPartition)
+      val follower = partitionHostingBrokers.filter(s => s.config.brokerId != 
leaderIdOpt).last
+      follower.shutdown()
+      // start topic deletion
+      admin.deleteTopics(Collections.singletonList(topic)).all().get()
+
+      // increase the partition count for topic
+      val props = new Properties()
+      props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
TestUtils.plaintextBootstrapServers(partitionHostingBrokers))
+      TestUtils.resource(Admin.create(props)) { adminClient =>
+        try {
+          adminClient.createPartitions(Map(topic -> 
NewPartitions.increaseTo(2)).asJava).all().get()
+        } catch {
+          case _: ExecutionException =>
+        }
+      }
 
-    // increase the partition count for topic
-    val props = new Properties()
-    props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
TestUtils.plaintextBootstrapServers(servers))
-    TestUtils.resource(Admin.create(props)) { adminClient =>
-      try {
-        adminClient.createPartitions(Map(topic -> 
NewPartitions.increaseTo(2)).asJava).all().get()
-      } catch {
-        case _: ExecutionException =>
+      // bring back the failed broker
+      follower.startup()
+      TestUtils.verifyTopicDeletion(null, topic, 2, partitionHostingBrokers)
+    } else {
+      val brokerConfigs = TestUtils.createBrokerConfigs(4, zkConnect, false)
+      brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true"))
+      // create brokers
+      val allServers = brokerConfigs.map(b => 
TestUtils.createServer(KafkaConfig.fromProps(b)))
+      this.servers = allServers
+      val partitionHostingServers = allServers.filter(s => 
expectedReplicaAssignment(0).contains(s.config.brokerId))
+      // create the topic
+      TestUtils.createTopic(zkClient, topic, expectedReplicaAssignment, 
partitionHostingServers)
+      // wait until replica log is created on every broker
+      TestUtils.waitUntilTrue(() => 
partitionHostingServers.forall(_.getLogManager.getLog(topicPartition).isDefined),
+        "Replicas for topic test not created.")
+
+      // shutdown a broker to make sure the following topic deletion will be 
suspended
+      val leaderIdOpt = zkClient.getLeaderForPartition(topicPartition)
+      assertTrue(leaderIdOpt.isDefined, "Leader should exist for partition 
[test,0]")
+      val follower = partitionHostingServers.filter(s => s.config.brokerId != 
leaderIdOpt.get).last
+      follower.shutdown()
+      // start topic deletion
+      adminZkClient.deleteTopic(topic)
+
+      // make sure deletion of all of the topic's replicas have been tried
+      ensureControllerExists()
+      val (controller, controllerId) = getController()
+      val allReplicasForTopic = getAllReplicasFromAssignment(topic, 
expectedReplicaAssignment)
+      TestUtils.waitUntilTrue(() => {
+        val replicasInDeletionSuccessful = 
controller.kafkaController.controllerContext.replicasInState(topic, 
ReplicaDeletionSuccessful)
+        val offlineReplicas = 
controller.kafkaController.controllerContext.replicasInState(topic, 
OfflineReplica)
+        allReplicasForTopic == (replicasInDeletionSuccessful union 
offlineReplicas)
+      }, s"Not all replicas for topic $topic are in states of either 
ReplicaDeletionSuccessful or OfflineReplica")
+
+      // increase the partition count for topic
+      val props = new Properties()
+      props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
TestUtils.plaintextBootstrapServers(partitionHostingServers))
+      TestUtils.resource(Admin.create(props)) { adminClient =>
+        try {
+          adminClient.createPartitions(Map(topic -> 
NewPartitions.increaseTo(2)).asJava).all().get()
+        } catch {
+          case _: ExecutionException =>
+        }
       }
+      // trigger a controller switch now
+      val previousControllerId = controllerId
+
+      controller.shutdown()
+
+      ensureControllerExists()
+      // wait until a new controller to show up
+      TestUtils.waitUntilTrue(() => {
+        val (newController, newControllerId) = getController()
+        newControllerId != previousControllerId
+      }, "The new controller should not have the failed controller id")
+
+      // bring back the failed brokers
+      follower.startup()
+      controller.startup()
+      TestUtils.verifyTopicDeletion(zkClient, topic, 2, 
partitionHostingServers)
     }
-    // trigger a controller switch now
-    val previousControllerId = controllerId
-
-    controller.shutdown()
-
-    ensureControllerExists()
-    // wait until a new controller to show up
-    TestUtils.waitUntilTrue(() => {
-      val (newController, newControllerId) = getController()
-      newControllerId != previousControllerId
-    }, "The new controller should not have the failed controller id")
-
-    // bring back the failed brokers
-    follower.startup()
-    controller.startup()
-    TestUtils.verifyTopicDeletion(zkClient, topic, 2, servers)
   }
 
-
-  @Test
-  def testDeleteTopicDuringAddPartition(): Unit = {
-    val topic = "test"
-    servers = createTestTopicAndCluster(topic)
-    val leaderIdOpt = zkClient.getLeaderForPartition(new TopicPartition(topic, 
0))
-    assertTrue(leaderIdOpt.isDefined, "Leader should exist for partition 
[test,0]")
-    val follower = servers.filter(_.config.brokerId != leaderIdOpt.get).last
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDeleteTopicDuringAddPartition(quorum: String): Unit = {
+    brokers = createTestTopicAndCluster(topic)
+    val leaderIdOpt = TestUtils.waitUntilLeaderIsKnown(brokers, new 
TopicPartition(topic, 0))
+    val follower = brokers.filter(_.config.brokerId != leaderIdOpt).last
     val newPartition = new TopicPartition(topic, 1)
-    // capture the brokers before we shutdown so that we don't fail validation 
in `addPartitions`
-    val brokers = adminZkClient.getBrokerMetadatas()
-    follower.shutdown()
-    // wait until the broker has been removed from ZK to reduce non-determinism
-    TestUtils.waitUntilTrue(() => 
zkClient.getBroker(follower.config.brokerId).isEmpty,
-      s"Follower ${follower.config.brokerId} was not removed from ZK")
-    // add partitions to topic
-    adminZkClient.addPartitions(topic, expectedReplicaFullAssignment, brokers, 
2,
-      Some(Map(1 -> Seq(0, 1, 2), 2 -> Seq(0, 1, 2))))
+
+    if (isKRaftTest()) {

Review Comment:
   We have placed the code from both modes in two different branches, which is 
no different from the two methods. Can we try to ensure that the same code is 
used in both modes as much as possible. for example, the 2 branches both have a 
`follower.shutdown()` and both call `increasePartitions`, we can move it out of 
it/else and move `adminZkClient.addPartitions` into 
`TestUtils.increasePartitions`, WDYT.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to