tinaselenge commented on code in PR #14846:
URL: https://github.com/apache/kafka/pull/14846#discussion_r1453250983


##########
core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala:
##########
@@ -354,39 +433,54 @@ class DeleteTopicTest extends QuorumTestHarness {
    server.logManager.cleaner.awaitCleaned(new TopicPartition(topicName, 0), 0)
 
     // delete topic
-    adminZkClient.deleteTopic("test")
-    TestUtils.verifyTopicDeletion(zkClient, "test", 1, servers)
+    admin.deleteTopics(Collections.singletonList(topic)).all().get()
+    TestUtils.verifyTopicDeletion(zkClientOrNull, "test", 1, brokers)
   }
 
-  @Test
-  def testDeleteTopicAlreadyMarkedAsDeleted(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDeleteTopicAlreadyMarkedAsDeleted(quorum: String): Unit = {
     val topicPartition = new TopicPartition("test", 0)
     val topic = topicPartition.topic
-    servers = createTestTopicAndCluster(topic)
+    brokers = createTestTopicAndCluster(topic)
     // start topic deletion
-    adminZkClient.deleteTopic(topic)
+    admin.deleteTopics(Collections.singletonList(topic)).all().get()
     // try to delete topic marked as deleted
-    assertThrows(classOf[TopicAlreadyMarkedForDeletionException], () => 
adminZkClient.deleteTopic(topic))
+    // start topic deletion
+    TestUtils.waitUntilTrue(() => {
+      try {
+        admin.deleteTopics(Collections.singletonList(topic)).all().get()
+        false
+      } catch {
+        case e: ExecutionException =>
+          classOf[UnknownTopicOrPartitionException].equals(e.getCause.getClass)
+      }
+    }, s"Topic ${topic} should be marked for deletion or already deleted.")
 
-    TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers)
+    TestUtils.verifyTopicDeletion(zkClientOrNull, topic, 1, brokers)
   }
 
-  private def createTestTopicAndCluster(topic: String, deleteTopicEnabled: 
Boolean = true, replicaAssignment: Map[Int, List[Int]] = 
expectedReplicaAssignment): Seq[KafkaServer] = {
-    val brokerConfigs = TestUtils.createBrokerConfigs(3, zkConnect, 
enableControlledShutdown = false)
+  private def createTestTopicAndCluster(topic: String, numOfConfigs: Int = 3, 
deleteTopicEnabled: Boolean = true, replicaAssignment: Map[Int, List[Int]] = 
expectedReplicaAssignment): Seq[KafkaBroker] = {
+    val brokerConfigs = TestUtils.createBrokerConfigs(numOfConfigs, 
zkConnectOrNull, enableControlledShutdown = false)
     brokerConfigs.foreach(_.setProperty("delete.topic.enable", 
deleteTopicEnabled.toString))
     createTestTopicAndCluster(topic, brokerConfigs, replicaAssignment)
   }
 
-  private def createTestTopicAndCluster(topic: String, brokerConfigs: 
Seq[Properties], replicaAssignment: Map[Int, List[Int]]): Seq[KafkaServer] = {
+  private def createTestTopicAndCluster(topic: String, brokerConfigs: 
Seq[Properties], replicaAssignment: Map[Int, List[Int]]): Seq[KafkaBroker] = {
     val topicPartition = new TopicPartition(topic, 0)
     // create brokers
-    val servers = brokerConfigs.map(b => 
TestUtils.createServer(KafkaConfig.fromProps(b)))
+    val brokers = brokerConfigs.map(b => 
createBroker(KafkaConfig.fromProps(b)))
+
+    admin = TestUtils.createAdminClient(brokers, 
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))

Review Comment:
   Not sure if this is necessary,  as we only create the admin here when a new 
set of brokers are created. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to