tinaselenge commented on code in PR #14846:
URL: https://github.com/apache/kafka/pull/14846#discussion_r1441637951


##########
core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala:
##########
@@ -187,164 +197,233 @@ class DeleteTopicTest extends QuorumTestHarness {
     }.toSet
   }
 
-  @Test
-  def testIncreasePartitionCountDuringDeleteTopic(): Unit = {
-    val topic = "test"
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testIncreasePartitionCountDuringDeleteTopic(quorum: String): Unit = {
     val topicPartition = new TopicPartition(topic, 0)
-    val brokerConfigs = TestUtils.createBrokerConfigs(4, zkConnect, false)
-    brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true"))
-    // create brokers
-    val allServers = brokerConfigs.map(b => 
TestUtils.createServer(KafkaConfig.fromProps(b)))
-    this.servers = allServers
-    val servers = allServers.filter(s => 
expectedReplicaAssignment(0).contains(s.config.brokerId))
-    // create the topic
-    TestUtils.createTopic(zkClient, topic, expectedReplicaAssignment, servers)
-    // wait until replica log is created on every broker
-    TestUtils.waitUntilTrue(() => 
servers.forall(_.getLogManager.getLog(topicPartition).isDefined),
-      "Replicas for topic test not created.")
-    // shutdown a broker to make sure the following topic deletion will be 
suspended
-    val leaderIdOpt = zkClient.getLeaderForPartition(topicPartition)
-    assertTrue(leaderIdOpt.isDefined, "Leader should exist for partition 
[test,0]")
-    val follower = servers.filter(s => s.config.brokerId != 
leaderIdOpt.get).last
-    follower.shutdown()
-    // start topic deletion
-    adminZkClient.deleteTopic(topic)
 
-    // make sure deletion of all of the topic's replicas have been tried
-    ensureControllerExists()
-    val (controller, controllerId) = getController()
-    val allReplicasForTopic = getAllReplicasFromAssignment(topic, 
expectedReplicaAssignment)
-    TestUtils.waitUntilTrue(() => {
-      val replicasInDeletionSuccessful = 
controller.kafkaController.controllerContext.replicasInState(topic, 
ReplicaDeletionSuccessful)
-      val offlineReplicas = 
controller.kafkaController.controllerContext.replicasInState(topic, 
OfflineReplica)
-      allReplicasForTopic == (replicasInDeletionSuccessful union 
offlineReplicas)
-    }, s"Not all replicas for topic $topic are in states of either 
ReplicaDeletionSuccessful or OfflineReplica")
+    if (isKRaftTest()) {
+      val topicPartition = new TopicPartition(topic, 0)
+      val allBrokers = createTestTopicAndCluster(topic, 4, deleteTopicEnabled 
= true)
+      this.brokers = allBrokers
+      val partitionHostingBrokers = allBrokers.filter(b => 
expectedReplicaAssignment(0).contains(b.config.brokerId))
+
+      // wait until replica log is created on every broker
+      TestUtils.waitUntilTrue(() => 
partitionHostingBrokers.forall(_.logManager.getLog(topicPartition).isDefined),
+        "Replicas for topic test not created.")
+
+      // shutdown a broker to make sure the following topic deletion will be 
suspended
+      val leaderIdOpt = 
TestUtils.waitUntilLeaderIsKnown(partitionHostingBrokers, topicPartition)
+      val follower = partitionHostingBrokers.filter(s => s.config.brokerId != 
leaderIdOpt).last
+      follower.shutdown()
+      // start topic deletion
+      admin.deleteTopics(Collections.singletonList(topic)).all().get()
+
+      // increase the partition count for topic
+      val props = new Properties()
+      props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
TestUtils.plaintextBootstrapServers(partitionHostingBrokers))
+      TestUtils.resource(Admin.create(props)) { adminClient =>
+        try {
+          adminClient.createPartitions(Map(topic -> 
NewPartitions.increaseTo(2)).asJava).all().get()
+        } catch {
+          case _: ExecutionException =>
+        }
+      }
 
-    // increase the partition count for topic
-    val props = new Properties()
-    props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
TestUtils.plaintextBootstrapServers(servers))
-    TestUtils.resource(Admin.create(props)) { adminClient =>
-      try {
-        adminClient.createPartitions(Map(topic -> 
NewPartitions.increaseTo(2)).asJava).all().get()
-      } catch {
-        case _: ExecutionException =>
+      // bring back the failed broker
+      follower.startup()
+      TestUtils.verifyTopicDeletion(null, topic, 2, partitionHostingBrokers)
+    } else {
+      val brokerConfigs = TestUtils.createBrokerConfigs(4, zkConnect, false)
+      brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true"))
+      // create brokers
+      val allServers = brokerConfigs.map(b => 
TestUtils.createServer(KafkaConfig.fromProps(b)))
+      this.servers = allServers
+      val partitionHostingServers = allServers.filter(s => 
expectedReplicaAssignment(0).contains(s.config.brokerId))
+      // create the topic
+      TestUtils.createTopic(zkClient, topic, expectedReplicaAssignment, 
partitionHostingServers)
+      // wait until replica log is created on every broker
+      TestUtils.waitUntilTrue(() => 
partitionHostingServers.forall(_.getLogManager.getLog(topicPartition).isDefined),
+        "Replicas for topic test not created.")
+
+      // shutdown a broker to make sure the following topic deletion will be 
suspended
+      val leaderIdOpt = zkClient.getLeaderForPartition(topicPartition)
+      assertTrue(leaderIdOpt.isDefined, "Leader should exist for partition 
[test,0]")
+      val follower = partitionHostingServers.filter(s => s.config.brokerId != 
leaderIdOpt.get).last
+      follower.shutdown()
+      // start topic deletion
+      adminZkClient.deleteTopic(topic)
+
+      // make sure deletion of all of the topic's replicas have been tried
+      ensureControllerExists()
+      val (controller, controllerId) = getController()
+      val allReplicasForTopic = getAllReplicasFromAssignment(topic, 
expectedReplicaAssignment)
+      TestUtils.waitUntilTrue(() => {
+        val replicasInDeletionSuccessful = 
controller.kafkaController.controllerContext.replicasInState(topic, 
ReplicaDeletionSuccessful)
+        val offlineReplicas = 
controller.kafkaController.controllerContext.replicasInState(topic, 
OfflineReplica)
+        allReplicasForTopic == (replicasInDeletionSuccessful union 
offlineReplicas)
+      }, s"Not all replicas for topic $topic are in states of either 
ReplicaDeletionSuccessful or OfflineReplica")
+
+      // increase the partition count for topic
+      val props = new Properties()
+      props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
TestUtils.plaintextBootstrapServers(partitionHostingServers))
+      TestUtils.resource(Admin.create(props)) { adminClient =>
+        try {
+          adminClient.createPartitions(Map(topic -> 
NewPartitions.increaseTo(2)).asJava).all().get()
+        } catch {
+          case _: ExecutionException =>
+        }
       }
+      // trigger a controller switch now
+      val previousControllerId = controllerId
+
+      controller.shutdown()
+
+      ensureControllerExists()
+      // wait until a new controller to show up
+      TestUtils.waitUntilTrue(() => {
+        val (newController, newControllerId) = getController()
+        newControllerId != previousControllerId
+      }, "The new controller should not have the failed controller id")
+
+      // bring back the failed brokers
+      follower.startup()
+      controller.startup()
+      TestUtils.verifyTopicDeletion(zkClient, topic, 2, 
partitionHostingServers)
     }
-    // trigger a controller switch now
-    val previousControllerId = controllerId
-
-    controller.shutdown()
-
-    ensureControllerExists()
-    // wait until a new controller to show up
-    TestUtils.waitUntilTrue(() => {
-      val (newController, newControllerId) = getController()
-      newControllerId != previousControllerId
-    }, "The new controller should not have the failed controller id")
-
-    // bring back the failed brokers
-    follower.startup()
-    controller.startup()
-    TestUtils.verifyTopicDeletion(zkClient, topic, 2, servers)
   }
 
-
-  @Test
-  def testDeleteTopicDuringAddPartition(): Unit = {
-    val topic = "test"
-    servers = createTestTopicAndCluster(topic)
-    val leaderIdOpt = zkClient.getLeaderForPartition(new TopicPartition(topic, 
0))
-    assertTrue(leaderIdOpt.isDefined, "Leader should exist for partition 
[test,0]")
-    val follower = servers.filter(_.config.brokerId != leaderIdOpt.get).last
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDeleteTopicDuringAddPartition(quorum: String): Unit = {
+    brokers = createTestTopicAndCluster(topic)
+    val leaderIdOpt = TestUtils.waitUntilLeaderIsKnown(brokers, new 
TopicPartition(topic, 0))
+    val follower = brokers.filter(_.config.brokerId != leaderIdOpt).last
     val newPartition = new TopicPartition(topic, 1)
-    // capture the brokers before we shutdown so that we don't fail validation 
in `addPartitions`
-    val brokers = adminZkClient.getBrokerMetadatas()
-    follower.shutdown()
-    // wait until the broker has been removed from ZK to reduce non-determinism
-    TestUtils.waitUntilTrue(() => 
zkClient.getBroker(follower.config.brokerId).isEmpty,
-      s"Follower ${follower.config.brokerId} was not removed from ZK")
-    // add partitions to topic
-    adminZkClient.addPartitions(topic, expectedReplicaFullAssignment, brokers, 
2,
-      Some(Map(1 -> Seq(0, 1, 2), 2 -> Seq(0, 1, 2))))
+
+    if (isKRaftTest()) {

Review Comment:
   The type of `follower` object is different, depending on which mode. In 
KRaft it is KafkaBroker and in ZK it is KafkaServer, even though they both 
support shutdown() method, I'm not sure we can define this object with 
different types. I did try this, but ended up with many if statements and it 
didn't quite work. So I placed them into only two branches since the only 
common code was increasePartitions(). I'm happy to move increasePartitions() 
out of the if else. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to