ahuang98 commented on code in PR #12479:
URL: https://github.com/apache/kafka/pull/12479#discussion_r941848002


##########
core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala:
##########
@@ -37,29 +43,57 @@ class RackAwareAutoTopicCreationTest extends 
KafkaServerTestHarness with RackAwa
 
   def generateConfigs =
     (0 until numServers) map { node =>
-      TestUtils.createBrokerConfig(node, zkConnect, enableControlledShutdown = 
false, rack = Some((node / 2).toString))
+      TestUtils.createBrokerConfig(node, zkConnectOrNull, 
enableControlledShutdown = false, rack = Some((node / 2).toString))
     } map (KafkaConfig.fromProps(_, overridingProps))
 
   private val topic = "topic"
 
-  @Test
-  def testAutoCreateTopic(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testAutoCreateTopic(quorum: String): Unit = {
     val producer = TestUtils.createProducer(bootstrapServers())
+    val props = new Properties()
+    props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
+    val adminClient = Admin.create(props)
+
+    TestUtils.waitUntilTrue(
+      () => brokers.head.metadataCache.getAliveBrokers().size == numServers,
+      "Timed out waiting for all brokers to become unfenced")
+
     try {
       // Send a message to auto-create the topic
       val record = new ProducerRecord(topic, null, "key".getBytes, 
"value".getBytes)
       assertEquals(0L, producer.send(record).get.offset, "Should have offset 
0")
 
-      // double check that the topic is created with leader elected
-      TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
-      val assignment = zkClient.getReplicaAssignmentForTopics(Set(topic)).map 
{ case (topicPartition, replicas) =>
-        topicPartition.partition -> replicas
+      val partition = 
adminClient.describeTopics(Collections.singleton(topic)).topicNameValues().get(topic).get().
+        partitions().stream().filter(_.partition == 0).findAny()
+      assertTrue(partition.isPresent, "Partition [topic,0] should exist")
+      assertFalse(partition.get().leader().isEmpty, "Leader should exist for 
partition [topic,0]")
+
+      val assignment = 
adminClient.describeTopics(Collections.singleton(topic)).topicNameValues.asScala.map
 {
+        case (topicName, topicDescriptionFuture) =>
+          try topicName -> topicDescriptionFuture.get
+          catch {
+            case t: ExecutionException if 
t.getCause.isInstanceOf[UnknownTopicOrPartitionException] =>
+              throw new ExecutionException(
+                new UnknownTopicOrPartitionException(s"Topic $topicName not 
found."))

Review Comment:
   https://issues.apache.org/jira/browse/KAFKA-14153



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to