ahuang98 commented on code in PR #12479: URL: https://github.com/apache/kafka/pull/12479#discussion_r941848002
########## core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala: ########## @@ -37,29 +43,57 @@ class RackAwareAutoTopicCreationTest extends KafkaServerTestHarness with RackAwa def generateConfigs = (0 until numServers) map { node => - TestUtils.createBrokerConfig(node, zkConnect, enableControlledShutdown = false, rack = Some((node / 2).toString)) + TestUtils.createBrokerConfig(node, zkConnectOrNull, enableControlledShutdown = false, rack = Some((node / 2).toString)) } map (KafkaConfig.fromProps(_, overridingProps)) private val topic = "topic" - @Test - def testAutoCreateTopic(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testAutoCreateTopic(quorum: String): Unit = { val producer = TestUtils.createProducer(bootstrapServers()) + val props = new Properties() + props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers()) + val adminClient = Admin.create(props) + + TestUtils.waitUntilTrue( + () => brokers.head.metadataCache.getAliveBrokers().size == numServers, + "Timed out waiting for all brokers to become unfenced") + try { // Send a message to auto-create the topic val record = new ProducerRecord(topic, null, "key".getBytes, "value".getBytes) assertEquals(0L, producer.send(record).get.offset, "Should have offset 0") - // double check that the topic is created with leader elected - TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) - val assignment = zkClient.getReplicaAssignmentForTopics(Set(topic)).map { case (topicPartition, replicas) => - topicPartition.partition -> replicas + val partition = adminClient.describeTopics(Collections.singleton(topic)).topicNameValues().get(topic).get(). + partitions().stream().filter(_.partition == 0).findAny() + assertTrue(partition.isPresent, "Partition [topic,0] should exist") + assertFalse(partition.get().leader().isEmpty, "Leader should exist for partition [topic,0]") + + val assignment = adminClient.describeTopics(Collections.singleton(topic)).topicNameValues.asScala.map { + case (topicName, topicDescriptionFuture) => + try topicName -> topicDescriptionFuture.get + catch { + case t: ExecutionException if t.getCause.isInstanceOf[UnknownTopicOrPartitionException] => + throw new ExecutionException( + new UnknownTopicOrPartitionException(s"Topic $topicName not found.")) Review Comment: https://issues.apache.org/jira/browse/KAFKA-14153 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org