dengziming commented on code in PR #14846: URL: https://github.com/apache/kafka/pull/14846#discussion_r1448306793
########## core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala: ########## @@ -425,8 +539,10 @@ class DeleteTopicTest extends QuorumTestHarness { */ val replicaAssignment = Map(0 -> List(0, 1, 2), 1 -> List(0, 1, 2)) - val topic = "test" - servers = createTestTopicAndCluster(topic, true, replicaAssignment) + val brokerConfigs = TestUtils.createBrokerConfigs(3, zkConnectOrNull, enableControlledShutdown = false) Review Comment: Why not use `createTestTopicAndCluster` directly? ########## core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala: ########## @@ -19,78 +19,95 @@ package kafka.admin import java.util import java.util.concurrent.ExecutionException import java.util.{Collections, Optional, Properties} - import scala.collection.Seq import kafka.log.UnifiedLog import kafka.zk.TopicPartitionZNode -import kafka.utils.TestUtils -import kafka.server.{KafkaConfig, KafkaServer, QuorumTestHarness} +import kafka.utils._ +import kafka.server.{KafkaBroker, KafkaConfig, KafkaServer, QuorumTestHarness} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, Test} -import kafka.common.TopicAlreadyMarkedForDeletionException +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource import kafka.controller.{OfflineReplica, PartitionAndReplica, ReplicaAssignment, ReplicaDeletionSuccessful} import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, NewPartitionReassignment, NewPartitions} +import org.apache.kafka.common.network.ListenerName +import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.errors.UnknownTopicOrPartitionException +import org.apache.kafka.common.errors.{TopicDeletionDisabledException, UnknownTopicOrPartitionException} +import org.apache.kafka.metadata.BrokerState + import scala.jdk.CollectionConverters._ class DeleteTopicTest extends QuorumTestHarness { + var brokers: Seq[KafkaBroker] = Seq() + var servers: Seq[KafkaServer] = Seq() Review Comment: I thinks it's unnecessary to keep both servers and brokers, we can use `KafkaBroker` in most case, or cast it to `KafkaServer` if necessary ########## core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala: ########## @@ -354,39 +433,54 @@ class DeleteTopicTest extends QuorumTestHarness { server.logManager.cleaner.awaitCleaned(new TopicPartition(topicName, 0), 0) // delete topic - adminZkClient.deleteTopic("test") - TestUtils.verifyTopicDeletion(zkClient, "test", 1, servers) + admin.deleteTopics(Collections.singletonList(topic)).all().get() + TestUtils.verifyTopicDeletion(zkClientOrNull, "test", 1, brokers) } - @Test - def testDeleteTopicAlreadyMarkedAsDeleted(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testDeleteTopicAlreadyMarkedAsDeleted(quorum: String): Unit = { val topicPartition = new TopicPartition("test", 0) val topic = topicPartition.topic - servers = createTestTopicAndCluster(topic) + brokers = createTestTopicAndCluster(topic) // start topic deletion - adminZkClient.deleteTopic(topic) + admin.deleteTopics(Collections.singletonList(topic)).all().get() // try to delete topic marked as deleted - assertThrows(classOf[TopicAlreadyMarkedForDeletionException], () => adminZkClient.deleteTopic(topic)) + // start topic deletion + TestUtils.waitUntilTrue(() => { + try { + admin.deleteTopics(Collections.singletonList(topic)).all().get() + false + } catch { + case e: ExecutionException => + classOf[UnknownTopicOrPartitionException].equals(e.getCause.getClass) + } + }, s"Topic ${topic} should be marked for deletion or already deleted.") - TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers) + TestUtils.verifyTopicDeletion(zkClientOrNull, topic, 1, brokers) } - private def createTestTopicAndCluster(topic: String, deleteTopicEnabled: Boolean = true, replicaAssignment: Map[Int, List[Int]] = expectedReplicaAssignment): Seq[KafkaServer] = { - val brokerConfigs = TestUtils.createBrokerConfigs(3, zkConnect, enableControlledShutdown = false) + private def createTestTopicAndCluster(topic: String, numOfConfigs: Int = 3, deleteTopicEnabled: Boolean = true, replicaAssignment: Map[Int, List[Int]] = expectedReplicaAssignment): Seq[KafkaBroker] = { + val brokerConfigs = TestUtils.createBrokerConfigs(numOfConfigs, zkConnectOrNull, enableControlledShutdown = false) brokerConfigs.foreach(_.setProperty("delete.topic.enable", deleteTopicEnabled.toString)) createTestTopicAndCluster(topic, brokerConfigs, replicaAssignment) } - private def createTestTopicAndCluster(topic: String, brokerConfigs: Seq[Properties], replicaAssignment: Map[Int, List[Int]]): Seq[KafkaServer] = { + private def createTestTopicAndCluster(topic: String, brokerConfigs: Seq[Properties], replicaAssignment: Map[Int, List[Int]]): Seq[KafkaBroker] = { val topicPartition = new TopicPartition(topic, 0) // create brokers - val servers = brokerConfigs.map(b => TestUtils.createServer(KafkaConfig.fromProps(b))) + val brokers = brokerConfigs.map(b => createBroker(KafkaConfig.fromProps(b))) + + admin = TestUtils.createAdminClient(brokers, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)) Review Comment: Add a helper method such as `getAdminClient` in this class, we only create a new one, if the admin is null or is closed. ########## core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala: ########## @@ -354,39 +433,54 @@ class DeleteTopicTest extends QuorumTestHarness { server.logManager.cleaner.awaitCleaned(new TopicPartition(topicName, 0), 0) // delete topic - adminZkClient.deleteTopic("test") - TestUtils.verifyTopicDeletion(zkClient, "test", 1, servers) + admin.deleteTopics(Collections.singletonList(topic)).all().get() + TestUtils.verifyTopicDeletion(zkClientOrNull, "test", 1, brokers) } - @Test - def testDeleteTopicAlreadyMarkedAsDeleted(): Unit = { Review Comment: Let's just keep it unchanged since it's not testable in kraft mode. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org