This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b08b64c2d85 KAFKA-18098 add kraft support to 
testReplicaPlacementAllServers and testReplicaPlacementPartialServers (#17955)
b08b64c2d85 is described below

commit b08b64c2d854b09c30fed2b9c6aa9fcdad96bbee
Author: Peter Lee <[email protected]>
AuthorDate: Thu Nov 28 05:21:47 2024 +0800

    KAFKA-18098 add kraft support to testReplicaPlacementAllServers and 
testReplicaPlacementPartialServers (#17955)
    
    Reviewers: Yung <[email protected]>, Colin P. McCabe 
<[email protected]>, Chia-Ping Tsai <[email protected]>
---
 .../scala/unit/kafka/admin/AddPartitionsTest.scala | 43 ++++++++++------------
 1 file changed, 20 insertions(+), 23 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala 
b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
index e05ea8b18a0..384d067e2b6 100755
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@ -17,14 +17,13 @@
 
 package kafka.admin
 
-import java.util.{Collections, Optional}
+import java.util.Collections
 import kafka.controller.ReplicaAssignment
 import kafka.server.{BaseRequestTest, BrokerServer}
 import kafka.utils.TestUtils
 import kafka.utils.TestUtils._
 import org.apache.kafka.clients.admin.{Admin, NewPartitions, NewTopic}
 import org.apache.kafka.common.errors.InvalidReplicaAssignmentException
-import org.apache.kafka.common.requests.MetadataResponse.TopicMetadata
 import org.apache.kafka.common.requests.{MetadataRequest, MetadataResponse}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{BeforeEach, TestInfo}
@@ -178,7 +177,7 @@ class AddPartitionsTest extends BaseRequestTest {
   }
 
   @ParameterizedTest
-  @ValueSource(strings = Array("zk")) // TODO: add kraft support
+  @ValueSource(strings = Array("kraft"))
   def testReplicaPlacementAllServers(quorum: String): Unit = {
     admin.createPartitions(Collections.singletonMap(topic3, 
NewPartitions.increaseTo(7))).all().get()
 
@@ -194,17 +193,19 @@ class AddPartitionsTest extends BaseRequestTest {
       new MetadataRequest.Builder(Seq(topic3).asJava, false).build)
     assertEquals(1, response.topicMetadata.size)
     val topicMetadata = response.topicMetadata.asScala.head
-    validateLeaderAndReplicas(topicMetadata, 0, 2, Set(2, 3, 0, 1))
-    validateLeaderAndReplicas(topicMetadata, 1, 3, Set(3, 2, 0, 1))
-    validateLeaderAndReplicas(topicMetadata, 2, 0, Set(0, 3, 1, 2))
-    validateLeaderAndReplicas(topicMetadata, 3, 1, Set(1, 0, 2, 3))
-    validateLeaderAndReplicas(topicMetadata, 4, 2, Set(2, 3, 0, 1))
-    validateLeaderAndReplicas(topicMetadata, 5, 3, Set(3, 0, 1, 2))
-    validateLeaderAndReplicas(topicMetadata, 6, 0, Set(0, 1, 2, 3))
+
+    assertEquals(7, topicMetadata.partitionMetadata.size)
+    for (partition <- topicMetadata.partitionMetadata.asScala) {
+      val replicas = partition.replicaIds.asScala.toSet
+      assertEquals(4, replicas.size, s"Partition ${partition.partition} should 
have 4 replicas")
+      assertTrue(replicas.subsetOf(Set(0, 1, 2, 3)), s"Replicas should only 
include brokers 0-3")
+      assertTrue(partition.leaderId.isPresent, s"Partition 
${partition.partition} should have a leader")
+      assertTrue(replicas.contains(partition.leaderId.get), "Leader should be 
one of the replicas")
+    }
   }
 
   @ParameterizedTest
-  @ValueSource(strings = Array("zk")) // TODO: add kraft support
+  @ValueSource(strings = Array("kraft"))
   def testReplicaPlacementPartialServers(quorum: String): Unit = {
     admin.createPartitions(Collections.singletonMap(topic2, 
NewPartitions.increaseTo(3))).all().get()
 
@@ -216,19 +217,15 @@ class AddPartitionsTest extends BaseRequestTest {
       new MetadataRequest.Builder(Seq(topic2).asJava, false).build)
     assertEquals(1, response.topicMetadata.size)
     val topicMetadata = response.topicMetadata.asScala.head
-    validateLeaderAndReplicas(topicMetadata, 0, 1, Set(1, 2))
-    validateLeaderAndReplicas(topicMetadata, 1, 2, Set(0, 2))
-    validateLeaderAndReplicas(topicMetadata, 2, 3, Set(1, 3))
-  }
-
-  def validateLeaderAndReplicas(metadata: TopicMetadata, partitionId: Int, 
expectedLeaderId: Int,
-                                expectedReplicas: Set[Int]): Unit = {
-    val partitionOpt = metadata.partitionMetadata.asScala.find(_.partition == 
partitionId)
-    assertTrue(partitionOpt.isDefined, s"Partition $partitionId should exist")
-    val partition = partitionOpt.get
 
-    assertEquals(Optional.of(expectedLeaderId), partition.leaderId, "Partition 
leader id should match")
-    assertEquals(expectedReplicas, partition.replicaIds.asScala.toSet, 
"Replica set should match")
+    assertEquals(3, topicMetadata.partitionMetadata.size)
+    for (partition <- topicMetadata.partitionMetadata.asScala) {
+      val replicas = partition.replicaIds.asScala.toSet
+      assertEquals(2, replicas.size, s"Partition ${partition.partition} should 
have 2 replicas")
+      assertTrue(replicas.subsetOf(Set(0, 1, 2, 3)), s"Replicas should only 
include brokers 0-3")
+      assertTrue(partition.leaderId.isPresent, s"Partition 
${partition.partition} should have a leader")
+      assertTrue(replicas.contains(partition.leaderId.get), "Leader should be 
one of the replicas")
+    }
   }
 
 }

Reply via email to