ahuang98 commented on code in PR #12479: URL: https://github.com/apache/kafka/pull/12479#discussion_r943125682
########## core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala: ########## @@ -45,103 +59,122 @@ class DeleteTopicTest extends QuorumTestHarness { @AfterEach override def tearDown(): Unit = { - TestUtils.shutdownServers(servers) + adminClient.close() + TestUtils.shutdownServers(brokers) super.tearDown() } - @Test - def testDeleteTopicWithAllAliveReplicas(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testDeleteTopicWithAllAliveReplicas(quorum: String): Unit = { val topic = "test" - servers = createTestTopicAndCluster(topic) - // start topic deletion - adminZkClient.deleteTopic(topic) - TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers) + createTestTopicAndCluster(topic) + TestUtils.deleteTopicWithAdmin(adminClient, topic, brokers) } - @Test - def testResumeDeleteTopicWithRecoveredFollower(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testResumeDeleteTopicWithRecoveredFollower(quorum: String): Unit = { val topicPartition = new TopicPartition("test", 0) val topic = topicPartition.topic - servers = createTestTopicAndCluster(topic) + createTestTopicAndCluster(topic) // shut down one follower replica - val leaderIdOpt = zkClient.getLeaderForPartition(new TopicPartition(topic, 0)) - assertTrue(leaderIdOpt.isDefined, "Leader should exist for partition [test,0]") - val follower = servers.filter(s => s.config.brokerId != leaderIdOpt.get).last + val partition = getTopicPartitionInfo(adminClient, topic, 0) + assertTrue(partition.isPresent, "Partition [test,0] should exist") + val leaderNode = partition.get().leader() + assertFalse(leaderNode.isEmpty, "Leader should exist for partition [test,0]") + val follower = brokers.filter(s => s.config.brokerId != leaderNode.id()).last follower.shutdown() // start topic deletion - adminZkClient.deleteTopic(topic) + adminClient.deleteTopics(Collections.singleton(topic)) // check if all replicas but the one that is shut down has deleted the log TestUtils.waitUntilTrue(() => - servers.filter(s => s.config.brokerId != follower.config.brokerId) - .forall(_.getLogManager.getLog(topicPartition).isEmpty), "Replicas 0,1 have not deleted log.") + brokers.filter(s => s.config.brokerId != follower.config.brokerId) + .forall(_.logManager.getLog(topicPartition).isEmpty), "Replicas 0,1 have not deleted log.") // ensure topic deletion is halted - TestUtils.waitUntilTrue(() => zkClient.isTopicMarkedForDeletion(topic), - "Admin path /admin/delete_topics/test path deleted even when a follower replica is down") + if (isKRaftTest()) { + val aliveBrokers = brokers.filter(b => b != follower) + TestUtils.waitForAllPartitionsMetadata(aliveBrokers, topic, 0) + TestUtils.waitForAllPartitionsMetadata(Seq(follower), topic, 1) + assertThrows(classOf[ExecutionException], () => adminClient.describeTopics(Collections.singleton(topic)).topicNameValues().get(topic).get()) + } else { + TestUtils.waitUntilTrue(() => zkClient.isTopicMarkedForDeletion(topic), + "Admin path /admin/delete_topics/test path deleted even when a follower replica is down") + } // restart follower replica follower.startup() - TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers) + TestUtils.waitForAllPartitionsMetadata(brokers, topic, 0) } - @Test - def testResumeDeleteTopicOnControllerFailover(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testResumeDeleteTopicOnControllerFailover(quorum: String): Unit = { val topicPartition = new TopicPartition("test", 0) val topic = topicPartition.topic - servers = createTestTopicAndCluster(topic) - val controllerId = zkClient.getControllerId.getOrElse(fail("Controller doesn't exist")) - val controller = servers.filter(s => s.config.brokerId == controllerId).head - val leaderIdOpt = zkClient.getLeaderForPartition(new TopicPartition(topic, 0)) - val follower = servers.filter(s => s.config.brokerId != leaderIdOpt.get && s.config.brokerId != controllerId).last + createTestTopicAndCluster(topic) + + val partition = getTopicPartitionInfo(adminClient, topic, 0) + assertTrue(partition.isPresent, "Partition [test,0] should exist") + val leaderNode = partition.get().leader() + assertFalse(leaderNode.isEmpty, "Leader should exist for partition [test,0]") + + val verifyDeletionExecutable: Executable = () => TestUtils.verifyTopicDeletion(zkClientOrNull, topic, 1, brokers) + var shutdownController: Executable = null + var restartController: Executable = null + var follower: KafkaBroker = null + if (isKRaftTest()) { + val prevPort = controllerServerPort + shutdownController = () => controllerServer.shutdown() + restartController = () => startControllerServer(prevPort) + follower = brokers.filter(s => s.config.brokerId != leaderNode.id()).last + } else { + val controllerId = zkClient.getControllerId.getOrElse(fail("Controller doesn't exist")) + val controller = brokers.filter(s => s.config.brokerId == controllerId).head + shutdownController = () => controller.shutdown() + restartController = () => controller.startup() + follower = brokers.filter(s => s.config.brokerId != leaderNode.id() && s.config.brokerId != controllerId).last + } Review Comment: I tried to move some of this logic into QuorumTestHarness, seems a bit awkward. In ZK mode restarting the controller would just mean restarting the broker which was the prior controller, but in KRaft mode it would require actually restarting the controller server. This means we have to store the prior controller in QuorumTestHarness or accept the broker as an argument for the restartController function. It also seems a bit awkward to store servers in ZooKeeperQuorumImplementation just so we can find the controller for shutdown purposes purposes. This logic might make more sense in KafkaServerTestHarness or IntegrationTestHarness, but as explained in a prior comment this test wouldn't benefit from extending from those harnesses. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org