ahuang98 commented on code in PR #12479:
URL: https://github.com/apache/kafka/pull/12479#discussion_r943125682


##########
core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala:
##########
@@ -45,103 +59,122 @@ class DeleteTopicTest extends QuorumTestHarness {
 
   @AfterEach
   override def tearDown(): Unit = {
-    TestUtils.shutdownServers(servers)
+    adminClient.close()
+    TestUtils.shutdownServers(brokers)
     super.tearDown()
   }
 
-  @Test
-  def testDeleteTopicWithAllAliveReplicas(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDeleteTopicWithAllAliveReplicas(quorum: String): Unit = {
     val topic = "test"
-    servers = createTestTopicAndCluster(topic)
-    // start topic deletion
-    adminZkClient.deleteTopic(topic)
-    TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers)
+    createTestTopicAndCluster(topic)
+    TestUtils.deleteTopicWithAdmin(adminClient, topic, brokers)
   }
 
-  @Test
-  def testResumeDeleteTopicWithRecoveredFollower(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testResumeDeleteTopicWithRecoveredFollower(quorum: String): Unit = {
     val topicPartition = new TopicPartition("test", 0)
     val topic = topicPartition.topic
-    servers = createTestTopicAndCluster(topic)
+    createTestTopicAndCluster(topic)
     // shut down one follower replica
-    val leaderIdOpt = zkClient.getLeaderForPartition(new TopicPartition(topic, 
0))
-    assertTrue(leaderIdOpt.isDefined, "Leader should exist for partition 
[test,0]")
-    val follower = servers.filter(s => s.config.brokerId != 
leaderIdOpt.get).last
+    val partition = getTopicPartitionInfo(adminClient, topic, 0)
+    assertTrue(partition.isPresent, "Partition [test,0] should exist")
+    val leaderNode = partition.get().leader()
+    assertFalse(leaderNode.isEmpty, "Leader should exist for partition 
[test,0]")
+    val follower = brokers.filter(s => s.config.brokerId != 
leaderNode.id()).last
     follower.shutdown()
     // start topic deletion
-    adminZkClient.deleteTopic(topic)
+    adminClient.deleteTopics(Collections.singleton(topic))
     // check if all replicas but the one that is shut down has deleted the log
     TestUtils.waitUntilTrue(() =>
-      servers.filter(s => s.config.brokerId != follower.config.brokerId)
-        .forall(_.getLogManager.getLog(topicPartition).isEmpty), "Replicas 0,1 
have not deleted log.")
+      brokers.filter(s => s.config.brokerId != follower.config.brokerId)
+        .forall(_.logManager.getLog(topicPartition).isEmpty), "Replicas 0,1 
have not deleted log.")
     // ensure topic deletion is halted
-    TestUtils.waitUntilTrue(() => zkClient.isTopicMarkedForDeletion(topic),
-      "Admin path /admin/delete_topics/test path deleted even when a follower 
replica is down")
+    if (isKRaftTest()) {
+      val aliveBrokers = brokers.filter(b => b != follower)
+      TestUtils.waitForAllPartitionsMetadata(aliveBrokers, topic, 0)
+      TestUtils.waitForAllPartitionsMetadata(Seq(follower), topic, 1)
+      assertThrows(classOf[ExecutionException], () => 
adminClient.describeTopics(Collections.singleton(topic)).topicNameValues().get(topic).get())
+    } else {
+      TestUtils.waitUntilTrue(() => zkClient.isTopicMarkedForDeletion(topic),
+        "Admin path /admin/delete_topics/test path deleted even when a 
follower replica is down")
+    }
     // restart follower replica
     follower.startup()
-    TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers)
+    TestUtils.waitForAllPartitionsMetadata(brokers, topic, 0)
   }
 
-  @Test
-  def testResumeDeleteTopicOnControllerFailover(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testResumeDeleteTopicOnControllerFailover(quorum: String): Unit = {
     val topicPartition = new TopicPartition("test", 0)
     val topic = topicPartition.topic
-    servers = createTestTopicAndCluster(topic)
-    val controllerId = zkClient.getControllerId.getOrElse(fail("Controller 
doesn't exist"))
-    val controller = servers.filter(s => s.config.brokerId == 
controllerId).head
-    val leaderIdOpt = zkClient.getLeaderForPartition(new TopicPartition(topic, 
0))
-    val follower = servers.filter(s => s.config.brokerId != leaderIdOpt.get && 
s.config.brokerId != controllerId).last
+    createTestTopicAndCluster(topic)
+
+    val partition = getTopicPartitionInfo(adminClient, topic, 0)
+    assertTrue(partition.isPresent, "Partition [test,0] should exist")
+    val leaderNode = partition.get().leader()
+    assertFalse(leaderNode.isEmpty, "Leader should exist for partition 
[test,0]")
+
+    val verifyDeletionExecutable: Executable = () => 
TestUtils.verifyTopicDeletion(zkClientOrNull, topic, 1, brokers)
+    var shutdownController: Executable = null
+    var restartController: Executable = null
+    var follower: KafkaBroker = null
+    if (isKRaftTest()) {
+      val prevPort = controllerServerPort
+      shutdownController = () => controllerServer.shutdown()
+      restartController = () => startControllerServer(prevPort)
+      follower = brokers.filter(s => s.config.brokerId != leaderNode.id()).last
+    } else {
+      val controllerId = zkClient.getControllerId.getOrElse(fail("Controller 
doesn't exist"))
+      val controller = brokers.filter(s => s.config.brokerId == 
controllerId).head
+      shutdownController = () => controller.shutdown()
+      restartController = () => controller.startup()
+      follower = brokers.filter(s => s.config.brokerId != leaderNode.id() && 
s.config.brokerId != controllerId).last
+    }

Review Comment:
   I tried to move some of this logic into QuorumTestHarness, seems a bit 
awkward. In ZK mode restarting the controller would just mean restarting the 
broker which was the prior controller, but in KRaft mode it would require 
actually restarting the controller server. This means we have to store the 
prior controller in QuorumTestHarness or accept the broker as an argument for 
the restartController function. 
   It also seems a bit awkward to store servers in 
ZooKeeperQuorumImplementation just so we can find the controller for shutdown 
purposes purposes. This logic might make more sense in KafkaServerTestHarness 
or IntegrationTestHarness, but as explained in a prior comment this test 
wouldn't benefit from extending from those harnesses.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to