This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9e01a921550 KAFKA-15722: Add KRaft support in
RackAwareAutoTopicCreationTest
9e01a921550 is described below
commit 9e01a9215506f82e3ce9386816123005e6513fe8
Author: TapDang <[email protected]>
AuthorDate: Mon Nov 11 22:18:12 2024 -0500
KAFKA-15722: Add KRaft support in RackAwareAutoTopicCreationTest
Reviewers: Colin P. McCabe <[email protected]>
---
.../kafka/api/RackAwareAutoTopicCreationTest.scala | 51 ++++++++++++++++------
1 file changed, 38 insertions(+), 13 deletions(-)
diff --git
a/core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala
b/core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala
index a8d2431f80b..9bd23de7137 100644
---
a/core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala
@@ -17,34 +17,54 @@
package kafka.api
import java.util.Properties
-
-import kafka.admin.{RackAwareMode, RackAwareTest}
+import kafka.admin.RackAwareTest
import kafka.integration.KafkaServerTestHarness
import kafka.server.KafkaConfig
-import kafka.utils.TestUtils
+import kafka.utils.{TestInfoUtils, TestUtils}
+import org.apache.kafka.clients.admin.Admin
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.server.config.{ReplicationConfigs, ServerLogConfigs}
+import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.security.auth.SecurityProtocol
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.MethodSource
+
import scala.collection.Map
+import scala.jdk.CollectionConverters.ListHasAsScala
class RackAwareAutoTopicCreationTest extends KafkaServerTestHarness with
RackAwareTest {
val numServers = 4
val numPartitions = 8
val replicationFactor = 2
val overridingProps = new Properties()
+ var admin: Admin = _
overridingProps.put(ServerLogConfigs.NUM_PARTITIONS_CONFIG,
numPartitions.toString)
overridingProps.put(ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG,
replicationFactor.toString)
def generateConfigs =
(0 until numServers) map { node =>
- TestUtils.createBrokerConfig(node, zkConnect, enableControlledShutdown =
false, rack = Some((node / 2).toString))
+ TestUtils.createBrokerConfig(node, null, enableControlledShutdown =
false, rack = Some((node / 2).toString))
} map (KafkaConfig.fromProps(_, overridingProps))
private val topic = "topic"
- @Test
- def testAutoCreateTopic(): Unit = {
+ @BeforeEach
+ override def setUp(testInfo: TestInfo): Unit = {
+ super.setUp(testInfo)
+ admin = TestUtils.createAdminClient(brokers,
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))
+ }
+
+ @AfterEach
+ override def tearDown(): Unit = {
+ if (admin != null) admin.close()
+ super.tearDown()
+ }
+
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+ @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+ def testAutoCreateTopic(quorum: String, groupProtocol: String): Unit = {
val producer = TestUtils.createProducer(bootstrapServers())
try {
// Send a message to auto-create the topic
@@ -52,15 +72,20 @@ class RackAwareAutoTopicCreationTest extends
KafkaServerTestHarness with RackAwa
assertEquals(0L, producer.send(record).get.offset, "Should have offset
0")
// double check that the topic is created with leader elected
- TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
- val assignment = zkClient.getReplicaAssignmentForTopics(Set(topic)).map
{ case (topicPartition, replicas) =>
- topicPartition.partition -> replicas
- }
- val brokerMetadatas =
adminZkClient.getBrokerMetadatas(RackAwareMode.Enforced)
+ TestUtils.waitUntilLeaderIsElectedOrChangedWithAdmin(admin, topic, 0)
+ val assignment = getReplicaAssignment(topic)
+ val brokerMetadatas = brokers.head.metadataCache.getAliveBrokers()
val expectedMap = Map(0 -> "0", 1 -> "0", 2 -> "1", 3 -> "1")
assertEquals(expectedMap, brokerMetadatas.map(b => b.id ->
b.rack.get).toMap)
- checkReplicaDistribution(assignment, expectedMap, numServers,
numPartitions, replicationFactor)
+ checkReplicaDistribution(assignment, expectedMap, numServers,
numPartitions, replicationFactor,
+ verifyLeaderDistribution = false)
} finally producer.close()
}
+
+ private def getReplicaAssignment(topic: String): Map[Int, Seq[Int]] = {
+ TestUtils.describeTopic(admin, topic).partitions.asScala.map { partition =>
+ partition.partition -> partition.replicas.asScala.map(_.id).toSeq
+ }.toMap
+ }
}