dengziming commented on code in PR #14846:
URL: https://github.com/apache/kafka/pull/14846#discussion_r1448306793


##########
core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala:
##########
@@ -425,8 +539,10 @@ class DeleteTopicTest extends QuorumTestHarness {
       */
 
     val replicaAssignment = Map(0 -> List(0, 1, 2), 1 -> List(0, 1, 2))
-    val topic = "test"
-    servers = createTestTopicAndCluster(topic, true, replicaAssignment)
+    val brokerConfigs = TestUtils.createBrokerConfigs(3, zkConnectOrNull, 
enableControlledShutdown = false)

Review Comment:
   Why not use `createTestTopicAndCluster` directly?



##########
core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala:
##########
@@ -19,78 +19,95 @@ package kafka.admin
 import java.util
 import java.util.concurrent.ExecutionException
 import java.util.{Collections, Optional, Properties}
-
 import scala.collection.Seq
 import kafka.log.UnifiedLog
 import kafka.zk.TopicPartitionZNode
-import kafka.utils.TestUtils
-import kafka.server.{KafkaConfig, KafkaServer, QuorumTestHarness}
+import kafka.utils._
+import kafka.server.{KafkaBroker, KafkaConfig, KafkaServer, QuorumTestHarness}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, Test}
-import kafka.common.TopicAlreadyMarkedForDeletionException
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
 import kafka.controller.{OfflineReplica, PartitionAndReplica, 
ReplicaAssignment, ReplicaDeletionSuccessful}
 import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, 
NewPartitionReassignment, NewPartitions}
+import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.errors.UnknownTopicOrPartitionException
+import org.apache.kafka.common.errors.{TopicDeletionDisabledException, 
UnknownTopicOrPartitionException}
+import org.apache.kafka.metadata.BrokerState
+
 import scala.jdk.CollectionConverters._
 
 class DeleteTopicTest extends QuorumTestHarness {
 
+  var brokers: Seq[KafkaBroker] = Seq()
+
   var servers: Seq[KafkaServer] = Seq()

Review Comment:
   I thinks it's unnecessary to keep both servers and brokers, we can use 
`KafkaBroker` in most case, or cast it to `KafkaServer` if necessary



##########
core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala:
##########
@@ -354,39 +433,54 @@ class DeleteTopicTest extends QuorumTestHarness {
    server.logManager.cleaner.awaitCleaned(new TopicPartition(topicName, 0), 0)
 
     // delete topic
-    adminZkClient.deleteTopic("test")
-    TestUtils.verifyTopicDeletion(zkClient, "test", 1, servers)
+    admin.deleteTopics(Collections.singletonList(topic)).all().get()
+    TestUtils.verifyTopicDeletion(zkClientOrNull, "test", 1, brokers)
   }
 
-  @Test
-  def testDeleteTopicAlreadyMarkedAsDeleted(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDeleteTopicAlreadyMarkedAsDeleted(quorum: String): Unit = {
     val topicPartition = new TopicPartition("test", 0)
     val topic = topicPartition.topic
-    servers = createTestTopicAndCluster(topic)
+    brokers = createTestTopicAndCluster(topic)
     // start topic deletion
-    adminZkClient.deleteTopic(topic)
+    admin.deleteTopics(Collections.singletonList(topic)).all().get()
     // try to delete topic marked as deleted
-    assertThrows(classOf[TopicAlreadyMarkedForDeletionException], () => 
adminZkClient.deleteTopic(topic))
+    // start topic deletion
+    TestUtils.waitUntilTrue(() => {
+      try {
+        admin.deleteTopics(Collections.singletonList(topic)).all().get()
+        false
+      } catch {
+        case e: ExecutionException =>
+          classOf[UnknownTopicOrPartitionException].equals(e.getCause.getClass)
+      }
+    }, s"Topic ${topic} should be marked for deletion or already deleted.")
 
-    TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers)
+    TestUtils.verifyTopicDeletion(zkClientOrNull, topic, 1, brokers)
   }
 
-  private def createTestTopicAndCluster(topic: String, deleteTopicEnabled: 
Boolean = true, replicaAssignment: Map[Int, List[Int]] = 
expectedReplicaAssignment): Seq[KafkaServer] = {
-    val brokerConfigs = TestUtils.createBrokerConfigs(3, zkConnect, 
enableControlledShutdown = false)
+  private def createTestTopicAndCluster(topic: String, numOfConfigs: Int = 3, 
deleteTopicEnabled: Boolean = true, replicaAssignment: Map[Int, List[Int]] = 
expectedReplicaAssignment): Seq[KafkaBroker] = {
+    val brokerConfigs = TestUtils.createBrokerConfigs(numOfConfigs, 
zkConnectOrNull, enableControlledShutdown = false)
     brokerConfigs.foreach(_.setProperty("delete.topic.enable", 
deleteTopicEnabled.toString))
     createTestTopicAndCluster(topic, brokerConfigs, replicaAssignment)
   }
 
-  private def createTestTopicAndCluster(topic: String, brokerConfigs: 
Seq[Properties], replicaAssignment: Map[Int, List[Int]]): Seq[KafkaServer] = {
+  private def createTestTopicAndCluster(topic: String, brokerConfigs: 
Seq[Properties], replicaAssignment: Map[Int, List[Int]]): Seq[KafkaBroker] = {
     val topicPartition = new TopicPartition(topic, 0)
     // create brokers
-    val servers = brokerConfigs.map(b => 
TestUtils.createServer(KafkaConfig.fromProps(b)))
+    val brokers = brokerConfigs.map(b => 
createBroker(KafkaConfig.fromProps(b)))
+
+    admin = TestUtils.createAdminClient(brokers, 
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))

Review Comment:
   Add a helper method such as `getAdminClient` in this class, we only create a 
new one, if the admin is null or is closed.



##########
core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala:
##########
@@ -354,39 +433,54 @@ class DeleteTopicTest extends QuorumTestHarness {
    server.logManager.cleaner.awaitCleaned(new TopicPartition(topicName, 0), 0)
 
     // delete topic
-    adminZkClient.deleteTopic("test")
-    TestUtils.verifyTopicDeletion(zkClient, "test", 1, servers)
+    admin.deleteTopics(Collections.singletonList(topic)).all().get()
+    TestUtils.verifyTopicDeletion(zkClientOrNull, "test", 1, brokers)
   }
 
-  @Test
-  def testDeleteTopicAlreadyMarkedAsDeleted(): Unit = {

Review Comment:
   Let's just keep it unchanged since it's not testable in kraft mode.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to