This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b08b64c2d85 KAFKA-18098 add kraft support to
testReplicaPlacementAllServers and testReplicaPlacementPartialServers (#17955)
b08b64c2d85 is described below
commit b08b64c2d854b09c30fed2b9c6aa9fcdad96bbee
Author: Peter Lee <[email protected]>
AuthorDate: Thu Nov 28 05:21:47 2024 +0800
KAFKA-18098 add kraft support to testReplicaPlacementAllServers and
testReplicaPlacementPartialServers (#17955)
Reviewers: Yung <[email protected]>, Colin P. McCabe
<[email protected]>, Chia-Ping Tsai <[email protected]>
---
.../scala/unit/kafka/admin/AddPartitionsTest.scala | 43 ++++++++++------------
1 file changed, 20 insertions(+), 23 deletions(-)
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
index e05ea8b18a0..384d067e2b6 100755
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@ -17,14 +17,13 @@
package kafka.admin
-import java.util.{Collections, Optional}
+import java.util.Collections
import kafka.controller.ReplicaAssignment
import kafka.server.{BaseRequestTest, BrokerServer}
import kafka.utils.TestUtils
import kafka.utils.TestUtils._
import org.apache.kafka.clients.admin.{Admin, NewPartitions, NewTopic}
import org.apache.kafka.common.errors.InvalidReplicaAssignmentException
-import org.apache.kafka.common.requests.MetadataResponse.TopicMetadata
import org.apache.kafka.common.requests.{MetadataRequest, MetadataResponse}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{BeforeEach, TestInfo}
@@ -178,7 +177,7 @@ class AddPartitionsTest extends BaseRequestTest {
}
@ParameterizedTest
- @ValueSource(strings = Array("zk")) // TODO: add kraft support
+ @ValueSource(strings = Array("kraft"))
def testReplicaPlacementAllServers(quorum: String): Unit = {
admin.createPartitions(Collections.singletonMap(topic3,
NewPartitions.increaseTo(7))).all().get()
@@ -194,17 +193,19 @@ class AddPartitionsTest extends BaseRequestTest {
new MetadataRequest.Builder(Seq(topic3).asJava, false).build)
assertEquals(1, response.topicMetadata.size)
val topicMetadata = response.topicMetadata.asScala.head
- validateLeaderAndReplicas(topicMetadata, 0, 2, Set(2, 3, 0, 1))
- validateLeaderAndReplicas(topicMetadata, 1, 3, Set(3, 2, 0, 1))
- validateLeaderAndReplicas(topicMetadata, 2, 0, Set(0, 3, 1, 2))
- validateLeaderAndReplicas(topicMetadata, 3, 1, Set(1, 0, 2, 3))
- validateLeaderAndReplicas(topicMetadata, 4, 2, Set(2, 3, 0, 1))
- validateLeaderAndReplicas(topicMetadata, 5, 3, Set(3, 0, 1, 2))
- validateLeaderAndReplicas(topicMetadata, 6, 0, Set(0, 1, 2, 3))
+
+ assertEquals(7, topicMetadata.partitionMetadata.size)
+ for (partition <- topicMetadata.partitionMetadata.asScala) {
+ val replicas = partition.replicaIds.asScala.toSet
+ assertEquals(4, replicas.size, s"Partition ${partition.partition} should
have 4 replicas")
+ assertTrue(replicas.subsetOf(Set(0, 1, 2, 3)), s"Replicas should only
include brokers 0-3")
+ assertTrue(partition.leaderId.isPresent, s"Partition
${partition.partition} should have a leader")
+ assertTrue(replicas.contains(partition.leaderId.get), "Leader should be
one of the replicas")
+ }
}
@ParameterizedTest
- @ValueSource(strings = Array("zk")) // TODO: add kraft support
+ @ValueSource(strings = Array("kraft"))
def testReplicaPlacementPartialServers(quorum: String): Unit = {
admin.createPartitions(Collections.singletonMap(topic2,
NewPartitions.increaseTo(3))).all().get()
@@ -216,19 +217,15 @@ class AddPartitionsTest extends BaseRequestTest {
new MetadataRequest.Builder(Seq(topic2).asJava, false).build)
assertEquals(1, response.topicMetadata.size)
val topicMetadata = response.topicMetadata.asScala.head
- validateLeaderAndReplicas(topicMetadata, 0, 1, Set(1, 2))
- validateLeaderAndReplicas(topicMetadata, 1, 2, Set(0, 2))
- validateLeaderAndReplicas(topicMetadata, 2, 3, Set(1, 3))
- }
-
- def validateLeaderAndReplicas(metadata: TopicMetadata, partitionId: Int,
expectedLeaderId: Int,
- expectedReplicas: Set[Int]): Unit = {
- val partitionOpt = metadata.partitionMetadata.asScala.find(_.partition ==
partitionId)
- assertTrue(partitionOpt.isDefined, s"Partition $partitionId should exist")
- val partition = partitionOpt.get
- assertEquals(Optional.of(expectedLeaderId), partition.leaderId, "Partition
leader id should match")
- assertEquals(expectedReplicas, partition.replicaIds.asScala.toSet,
"Replica set should match")
+ assertEquals(3, topicMetadata.partitionMetadata.size)
+ for (partition <- topicMetadata.partitionMetadata.asScala) {
+ val replicas = partition.replicaIds.asScala.toSet
+ assertEquals(2, replicas.size, s"Partition ${partition.partition} should
have 2 replicas")
+ assertTrue(replicas.subsetOf(Set(0, 1, 2, 3)), s"Replicas should only
include brokers 0-3")
+ assertTrue(partition.leaderId.isPresent, s"Partition
${partition.partition} should have a leader")
+ assertTrue(replicas.contains(partition.leaderId.get), "Leader should be
one of the replicas")
+ }
}
}