This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch cmccabe_2024_02_26_fix in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 925c61a99ce51103af1ab3188997def5ad1f767b Author: Colin P. McCabe <cmcc...@apache.org> AuthorDate: Mon Feb 26 13:24:16 2024 -0800 MINOR: remove test constructor for PartitionAssignment Remove the test constructor for PartitionAssignment and remove the TODO. --- .../kafka/server/KRaftClusterTest.scala | 39 +++++++++++++++++++++- .../metadata/placement/PartitionAssignment.java | 6 ---- .../controller/PartitionChangeBuilderTest.java | 6 ++-- .../PartitionReassignmentReplicasTest.java | 22 ++++++------ .../controller/ReplicationControlManagerTest.java | 18 +++++----- .../placement/PartitionAssignmentTest.java | 12 ++++--- .../placement/StripedReplicaPlacerTest.java | 17 +++++----- .../metadata/placement/TopicAssignmentTest.java | 9 ++--- 8 files changed, 83 insertions(+), 46 deletions(-) diff --git a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala index 130d0e5642e..379a7dd24aa 100644 --- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala +++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala @@ -26,7 +26,7 @@ import org.apache.kafka.clients.admin._ import org.apache.kafka.common.acl.{AclBinding, AclBindingFilter} import org.apache.kafka.common.config.{ConfigException, ConfigResource} import org.apache.kafka.common.config.ConfigResource.Type -import org.apache.kafka.common.errors.{PolicyViolationException, UnsupportedVersionException} +import org.apache.kafka.common.errors.{InvalidPartitionsException,PolicyViolationException, UnsupportedVersionException} import org.apache.kafka.common.message.DescribeClusterRequestData import org.apache.kafka.common.metadata.{ConfigRecord, FeatureLevelRecord} import org.apache.kafka.common.metrics.Metrics @@ -792,6 +792,43 @@ class KRaftClusterTest { } } + /** + * Test that setting the Confluent-specific configuration + * confluent.apply.create.topic.policy.to.create.partitions has the expected effect. + */ + @ParameterizedTest + @ValueSource(strings = Array("3.7-IV0", "3.7-IV2")) + def testCreatePartitions(metadataVersionString: String): Unit = { + val cluster = new KafkaClusterTestKit.Builder( + new TestKitNodes.Builder(). + setNumBrokerNodes(4). + setBootstrapMetadataVersion(MetadataVersion.fromVersionString(metadataVersionString)). + setNumControllerNodes(3).build()). + build() + try { + cluster.format() + cluster.startup() + cluster.waitForReadyBrokers() + val admin = Admin.create(cluster.clientProperties()) + try { + val createResults = admin.createTopics(Arrays.asList( + new NewTopic("foo", 1, 3.toShort), + new NewTopic("bar", 2, 3.toShort))).values() + createResults.get("foo").get() + createResults.get("bar").get() + val increaseResults = admin.createPartitions(Map( + "foo" -> NewPartitions.increaseTo(3), + "bar" -> NewPartitions.increaseTo(2)).asJava).values() + increaseResults.get("foo").get() + assertEquals(classOf[InvalidPartitionsException], assertThrows( + classOf[ExecutionException], () => increaseResults.get("bar").get()).getCause.getClass) + } finally { + admin.close() + } + } finally { + cluster.close() + } + } private def clusterImage( cluster: KafkaClusterTestKit, brokerId: Int diff --git a/metadata/src/main/java/org/apache/kafka/metadata/placement/PartitionAssignment.java b/metadata/src/main/java/org/apache/kafka/metadata/placement/PartitionAssignment.java index 177d5311afd..a7012d1505c 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/placement/PartitionAssignment.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/placement/PartitionAssignment.java @@ -17,7 +17,6 @@ package org.apache.kafka.metadata.placement; -import org.apache.kafka.common.DirectoryId; import org.apache.kafka.common.Uuid; import java.util.ArrayList; @@ -39,11 +38,6 @@ public class PartitionAssignment { private final List<Integer> replicas; private final List<Uuid> directories; - // TODO remove -- just here for testing - public PartitionAssignment(List<Integer> replicas) { - this(replicas, brokerId -> DirectoryId.UNASSIGNED); - } - public PartitionAssignment(List<Integer> replicas, DefaultDirProvider defaultDirProvider) { this.replicas = Collections.unmodifiableList(new ArrayList<>(replicas)); Uuid[] directories = new Uuid[replicas.size()]; diff --git a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java index efc9bd2a24f..b956a7f2ef9 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java @@ -27,7 +27,6 @@ import org.apache.kafka.metadata.LeaderRecoveryState; import org.apache.kafka.metadata.PartitionRegistration; import org.apache.kafka.metadata.Replicas; import org.apache.kafka.metadata.placement.DefaultDirProvider; -import org.apache.kafka.metadata.placement.PartitionAssignment; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.common.MetadataVersion; import org.junit.jupiter.api.Test; @@ -49,6 +48,7 @@ import static org.apache.kafka.controller.PartitionChangeBuilder.Election; import static org.apache.kafka.controller.PartitionChangeBuilder.changeRecordIsNoOp; import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER; import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE; +import static org.apache.kafka.metadata.placement.PartitionAssignmentTest.partitionAssignment; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -502,7 +502,7 @@ public class PartitionChangeBuilderTest { @MethodSource("partitionChangeRecordVersions") public void testRemovingReplicaReassignment(short version) { PartitionReassignmentReplicas replicas = new PartitionReassignmentReplicas( - new PartitionAssignment(Replicas.toList(FOO.replicas)), new PartitionAssignment(Arrays.asList(1, 2))); + partitionAssignment(Replicas.toList(FOO.replicas)), partitionAssignment(Arrays.asList(1, 2))); assertEquals(Collections.singletonList(3), replicas.removing()); assertEquals(Collections.emptyList(), replicas.adding()); assertEquals(Arrays.asList(1, 2, 3), replicas.replicas()); @@ -527,7 +527,7 @@ public class PartitionChangeBuilderTest { @MethodSource("partitionChangeRecordVersions") public void testAddingReplicaReassignment(short version) { PartitionReassignmentReplicas replicas = new PartitionReassignmentReplicas( - new PartitionAssignment(Replicas.toList(FOO.replicas)), new PartitionAssignment(Arrays.asList(1, 2, 3, 4))); + partitionAssignment(Replicas.toList(FOO.replicas)), partitionAssignment(Arrays.asList(1, 2, 3, 4))); assertEquals(Collections.emptyList(), replicas.removing()); assertEquals(Collections.singletonList(4), replicas.adding()); assertEquals(Arrays.asList(1, 2, 3, 4), replicas.replicas()); diff --git a/metadata/src/test/java/org/apache/kafka/controller/PartitionReassignmentReplicasTest.java b/metadata/src/test/java/org/apache/kafka/controller/PartitionReassignmentReplicasTest.java index b2bc540bda5..17be98d47f0 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/PartitionReassignmentReplicasTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/PartitionReassignmentReplicasTest.java @@ -24,10 +24,10 @@ import java.util.Optional; import org.apache.kafka.common.Uuid; import org.apache.kafka.metadata.LeaderRecoveryState; import org.apache.kafka.metadata.PartitionRegistration; -import org.apache.kafka.metadata.placement.PartitionAssignment; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import static org.apache.kafka.metadata.placement.PartitionAssignmentTest.partitionAssignment; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -38,7 +38,7 @@ public class PartitionReassignmentReplicasTest { @Test public void testNoneAddedOrRemoved() { PartitionReassignmentReplicas replicas = new PartitionReassignmentReplicas( - new PartitionAssignment(Arrays.asList(3, 2, 1)), new PartitionAssignment(Arrays.asList(3, 2, 1))); + partitionAssignment(Arrays.asList(3, 2, 1)), partitionAssignment(Arrays.asList(3, 2, 1))); assertEquals(Collections.emptyList(), replicas.removing()); assertEquals(Collections.emptyList(), replicas.adding()); assertEquals(Arrays.asList(3, 2, 1), replicas.replicas()); @@ -47,7 +47,7 @@ public class PartitionReassignmentReplicasTest { @Test public void testAdditions() { PartitionReassignmentReplicas replicas = new PartitionReassignmentReplicas( - new PartitionAssignment(Arrays.asList(3, 2, 1)), new PartitionAssignment(Arrays.asList(3, 6, 2, 1, 5))); + partitionAssignment(Arrays.asList(3, 2, 1)), partitionAssignment(Arrays.asList(3, 6, 2, 1, 5))); assertEquals(Collections.emptyList(), replicas.removing()); assertEquals(Arrays.asList(5, 6), replicas.adding()); assertEquals(Arrays.asList(3, 6, 2, 1, 5), replicas.replicas()); @@ -56,7 +56,7 @@ public class PartitionReassignmentReplicasTest { @Test public void testRemovals() { PartitionReassignmentReplicas replicas = new PartitionReassignmentReplicas( - new PartitionAssignment(Arrays.asList(3, 2, 1, 0)), new PartitionAssignment(Arrays.asList(3, 1))); + partitionAssignment(Arrays.asList(3, 2, 1, 0)), partitionAssignment(Arrays.asList(3, 1))); assertEquals(Arrays.asList(0, 2), replicas.removing()); assertEquals(Collections.emptyList(), replicas.adding()); assertEquals(Arrays.asList(3, 1, 0, 2), replicas.replicas()); @@ -65,7 +65,7 @@ public class PartitionReassignmentReplicasTest { @Test public void testAdditionsAndRemovals() { PartitionReassignmentReplicas replicas = new PartitionReassignmentReplicas( - new PartitionAssignment(Arrays.asList(3, 2, 1, 0)), new PartitionAssignment(Arrays.asList(7, 3, 1, 9))); + partitionAssignment(Arrays.asList(3, 2, 1, 0)), partitionAssignment(Arrays.asList(7, 3, 1, 9))); assertEquals(Arrays.asList(0, 2), replicas.removing()); assertEquals(Arrays.asList(7, 9), replicas.adding()); assertEquals(Arrays.asList(7, 3, 1, 9, 0, 2), replicas.replicas()); @@ -74,7 +74,7 @@ public class PartitionReassignmentReplicasTest { @Test public void testRearrangement() { PartitionReassignmentReplicas replicas = new PartitionReassignmentReplicas( - new PartitionAssignment(Arrays.asList(3, 2, 1, 0)), new PartitionAssignment(Arrays.asList(0, 1, 3, 2))); + partitionAssignment(Arrays.asList(3, 2, 1, 0)), partitionAssignment(Arrays.asList(0, 1, 3, 2))); assertEquals(Collections.emptyList(), replicas.removing()); assertEquals(Collections.emptyList(), replicas.adding()); assertEquals(Arrays.asList(0, 1, 3, 2), replicas.replicas()); @@ -83,7 +83,7 @@ public class PartitionReassignmentReplicasTest { @Test public void testDoesNotCompleteReassignment() { PartitionReassignmentReplicas replicas = new PartitionReassignmentReplicas( - new PartitionAssignment(Arrays.asList(0, 1, 2)), new PartitionAssignment(Arrays.asList(3, 4, 5))); + partitionAssignment(Arrays.asList(0, 1, 2)), partitionAssignment(Arrays.asList(3, 4, 5))); assertTrue(replicas.isReassignmentInProgress()); Optional<PartitionReassignmentReplicas.CompletedReassignment> reassignmentOptional = replicas.maybeCompleteReassignment(Arrays.asList(0, 1, 2, 3, 4)); @@ -107,7 +107,7 @@ public class PartitionReassignmentReplicasTest { @Test public void testDoesCompleteReassignmentAllNewReplicas() { PartitionReassignmentReplicas replicas = new PartitionReassignmentReplicas( - new PartitionAssignment(Arrays.asList(0, 1, 2)), new PartitionAssignment(Arrays.asList(3, 4, 5))); + partitionAssignment(Arrays.asList(0, 1, 2)), partitionAssignment(Arrays.asList(3, 4, 5))); assertTrue(replicas.isReassignmentInProgress()); Optional<PartitionReassignmentReplicas.CompletedReassignment> reassignmentOptional = replicas.maybeCompleteReassignment(Arrays.asList(0, 1, 2, 3, 4, 5)); @@ -120,7 +120,7 @@ public class PartitionReassignmentReplicasTest { @Test public void testDoesCompleteReassignmentSomeNewReplicas() { PartitionReassignmentReplicas replicas = new PartitionReassignmentReplicas( - new PartitionAssignment(Arrays.asList(0, 1, 2)), new PartitionAssignment(Arrays.asList(0, 1, 3))); + partitionAssignment(Arrays.asList(0, 1, 2)), partitionAssignment(Arrays.asList(0, 1, 3))); assertTrue(replicas.isReassignmentInProgress()); Optional<PartitionReassignmentReplicas.CompletedReassignment> reassignmentOptional = replicas.maybeCompleteReassignment(Arrays.asList(0, 1, 2, 3)); @@ -199,7 +199,7 @@ public class PartitionReassignmentReplicasTest { @Test public void testDoesNotCompleteReassignmentIfIsrDoesNotHaveAllTargetReplicas() { PartitionReassignmentReplicas replicas = new PartitionReassignmentReplicas( - new PartitionAssignment(Arrays.asList(0, 1, 2)), new PartitionAssignment(Arrays.asList(0, 1, 3))); + partitionAssignment(Arrays.asList(0, 1, 2)), partitionAssignment(Arrays.asList(0, 1, 3))); assertTrue(replicas.isReassignmentInProgress()); Optional<PartitionReassignmentReplicas.CompletedReassignment> reassignmentOptional = replicas.maybeCompleteReassignment(Arrays.asList(3)); @@ -209,7 +209,7 @@ public class PartitionReassignmentReplicasTest { @Test public void testOriginalReplicas() { PartitionReassignmentReplicas replicas = new PartitionReassignmentReplicas( - new PartitionAssignment(Arrays.asList(0, 1, 2)), new PartitionAssignment(Arrays.asList(0, 1, 3))); + partitionAssignment(Arrays.asList(0, 1, 2)), partitionAssignment(Arrays.asList(0, 1, 3))); assertEquals(Arrays.asList(0, 1, 2), replicas.originalReplicas()); } } diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java index 3d54720be92..e9885fa8e94 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java @@ -87,7 +87,6 @@ import org.apache.kafka.metadata.LeaderRecoveryState; import org.apache.kafka.metadata.PartitionRegistration; import org.apache.kafka.metadata.RecordTestUtils; import org.apache.kafka.metadata.Replicas; -import org.apache.kafka.metadata.placement.PartitionAssignment; import org.apache.kafka.metadata.placement.StripedReplicaPlacer; import org.apache.kafka.metadata.placement.UsableBroker; import org.apache.kafka.server.common.ApiMessageAndVersion; @@ -148,6 +147,7 @@ import static org.apache.kafka.controller.ControllerRequestContextUtil.QUOTA_EXC import static org.apache.kafka.controller.ControllerRequestContextUtil.anonymousContextFor; import static org.apache.kafka.controller.ControllerRequestContextUtil.anonymousContextWithMutationQuotaExceededFor; import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER; +import static org.apache.kafka.metadata.placement.PartitionAssignmentTest.partitionAssignment; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -1584,13 +1584,13 @@ public class ReplicationControlManagerTest { public void testValidateGoodManualPartitionAssignments() throws Exception { ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build(); ctx.registerBrokers(1, 2, 3); - ctx.replicationControl.validateManualPartitionAssignment(new PartitionAssignment(asList(1)), + ctx.replicationControl.validateManualPartitionAssignment(partitionAssignment(asList(1)), OptionalInt.of(1)); - ctx.replicationControl.validateManualPartitionAssignment(new PartitionAssignment(asList(1)), + ctx.replicationControl.validateManualPartitionAssignment(partitionAssignment(asList(1)), OptionalInt.empty()); - ctx.replicationControl.validateManualPartitionAssignment(new PartitionAssignment(asList(1, 2, 3)), + ctx.replicationControl.validateManualPartitionAssignment(partitionAssignment(asList(1, 2, 3)), OptionalInt.of(3)); - ctx.replicationControl.validateManualPartitionAssignment(new PartitionAssignment(asList(1, 2, 3)), + ctx.replicationControl.validateManualPartitionAssignment(partitionAssignment(asList(1, 2, 3)), OptionalInt.empty()); } @@ -1600,20 +1600,20 @@ public class ReplicationControlManagerTest { ctx.registerBrokers(1, 2); assertEquals("The manual partition assignment includes an empty replica list.", assertThrows(InvalidReplicaAssignmentException.class, () -> - ctx.replicationControl.validateManualPartitionAssignment(new PartitionAssignment(asList()), + ctx.replicationControl.validateManualPartitionAssignment(partitionAssignment(asList()), OptionalInt.empty())).getMessage()); assertEquals("The manual partition assignment includes broker 3, but no such " + "broker is registered.", assertThrows(InvalidReplicaAssignmentException.class, () -> - ctx.replicationControl.validateManualPartitionAssignment(new PartitionAssignment(asList(1, 2, 3)), + ctx.replicationControl.validateManualPartitionAssignment(partitionAssignment(asList(1, 2, 3)), OptionalInt.empty())).getMessage()); assertEquals("The manual partition assignment includes the broker 2 more than " + "once.", assertThrows(InvalidReplicaAssignmentException.class, () -> - ctx.replicationControl.validateManualPartitionAssignment(new PartitionAssignment(asList(1, 2, 2)), + ctx.replicationControl.validateManualPartitionAssignment(partitionAssignment(asList(1, 2, 2)), OptionalInt.empty())).getMessage()); assertEquals("The manual partition assignment includes a partition with 2 " + "replica(s), but this is not consistent with previous partitions, which have " + "3 replica(s).", assertThrows(InvalidReplicaAssignmentException.class, () -> - ctx.replicationControl.validateManualPartitionAssignment(new PartitionAssignment(asList(1, 2)), + ctx.replicationControl.validateManualPartitionAssignment(partitionAssignment(asList(1, 2)), OptionalInt.of(3))).getMessage()); } diff --git a/metadata/src/test/java/org/apache/kafka/metadata/placement/PartitionAssignmentTest.java b/metadata/src/test/java/org/apache/kafka/metadata/placement/PartitionAssignmentTest.java index 06cf5ae50d5..6dca18b4dd5 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/placement/PartitionAssignmentTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/placement/PartitionAssignmentTest.java @@ -20,6 +20,7 @@ package org.apache.kafka.metadata.placement; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; +import org.apache.kafka.common.DirectoryId; import org.apache.kafka.common.Uuid; import org.junit.jupiter.api.Test; @@ -27,20 +28,23 @@ import java.util.Arrays; import java.util.List; public class PartitionAssignmentTest { + public static PartitionAssignment partitionAssignment(List<Integer> replicas) { + return new PartitionAssignment(replicas, __ -> DirectoryId.MIGRATING); + } @Test public void testPartitionAssignmentReplicas() { List<Integer> replicas = Arrays.asList(0, 1, 2); - assertEquals(replicas, new PartitionAssignment(replicas).replicas()); + assertEquals(replicas, partitionAssignment(replicas).replicas()); } @Test public void testConsistentEqualsAndHashCode() { List<PartitionAssignment> partitionAssignments = Arrays.asList( - new PartitionAssignment( + partitionAssignment( Arrays.asList(0, 1, 2) ), - new PartitionAssignment( + partitionAssignment( Arrays.asList(1, 2, 0) ) ); @@ -49,7 +53,7 @@ public class PartitionAssignmentTest { for (int j = 0; j < partitionAssignments.size(); j++) { if (i == j) { assertEquals(partitionAssignments.get(i), partitionAssignments.get(j)); - assertEquals(partitionAssignments.get(i), new PartitionAssignment(partitionAssignments.get(i).replicas())); + assertEquals(partitionAssignments.get(i), partitionAssignment(partitionAssignments.get(i).replicas())); assertEquals(partitionAssignments.get(i).hashCode(), partitionAssignments.get(j).hashCode()); } else { assertNotEquals(partitionAssignments.get(i), partitionAssignments.get(j)); diff --git a/metadata/src/test/java/org/apache/kafka/metadata/placement/StripedReplicaPlacerTest.java b/metadata/src/test/java/org/apache/kafka/metadata/placement/StripedReplicaPlacerTest.java index 8b02416d2be..f8223e03645 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/placement/StripedReplicaPlacerTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/placement/StripedReplicaPlacerTest.java @@ -34,6 +34,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import static org.apache.kafka.metadata.placement.PartitionAssignmentTest.partitionAssignment; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -118,9 +119,9 @@ public class StripedReplicaPlacerTest { public void testMultiPartitionTopicPlacementOnSingleUnfencedBroker() { MockRandom random = new MockRandom(); StripedReplicaPlacer placer = new StripedReplicaPlacer(random); - assertEquals(new TopicAssignment(Arrays.asList(new PartitionAssignment(Arrays.asList(0)), - new PartitionAssignment(Arrays.asList(0)), - new PartitionAssignment(Arrays.asList(0)))), + assertEquals(new TopicAssignment(Arrays.asList(partitionAssignment(Arrays.asList(0)), + partitionAssignment(Arrays.asList(0)), + partitionAssignment(Arrays.asList(0)))), place(placer, 0, 3, (short) 1, Arrays.asList( new UsableBroker(0, Optional.empty(), false), new UsableBroker(1, Optional.empty(), true)))); @@ -224,11 +225,11 @@ public class StripedReplicaPlacerTest { public void testSuccessfulPlacement() { MockRandom random = new MockRandom(); StripedReplicaPlacer placer = new StripedReplicaPlacer(random); - assertEquals(new TopicAssignment(Arrays.asList(new PartitionAssignment(Arrays.asList(2, 3, 0)), - new PartitionAssignment(Arrays.asList(3, 0, 1)), - new PartitionAssignment(Arrays.asList(0, 1, 2)), - new PartitionAssignment(Arrays.asList(1, 2, 3)), - new PartitionAssignment(Arrays.asList(1, 0, 2)))), + assertEquals(new TopicAssignment(Arrays.asList(partitionAssignment(Arrays.asList(2, 3, 0)), + partitionAssignment(Arrays.asList(3, 0, 1)), + partitionAssignment(Arrays.asList(0, 1, 2)), + partitionAssignment(Arrays.asList(1, 2, 3)), + partitionAssignment(Arrays.asList(1, 0, 2)))), place(placer, 0, 5, (short) 3, Arrays.asList( new UsableBroker(0, Optional.empty(), false), new UsableBroker(3, Optional.empty(), false), diff --git a/metadata/src/test/java/org/apache/kafka/metadata/placement/TopicAssignmentTest.java b/metadata/src/test/java/org/apache/kafka/metadata/placement/TopicAssignmentTest.java index 7b5a24c3b84..26f8841d834 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/placement/TopicAssignmentTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/placement/TopicAssignmentTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.metadata.placement; +import static org.apache.kafka.metadata.placement.PartitionAssignmentTest.partitionAssignment; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; @@ -33,8 +34,8 @@ public class TopicAssignmentTest { List<Integer> replicasP0 = Arrays.asList(0, 1, 2); List<Integer> replicasP1 = Arrays.asList(1, 2, 0); List<PartitionAssignment> partitionAssignments = Arrays.asList( - new PartitionAssignment(replicasP0), - new PartitionAssignment(replicasP1) + partitionAssignment(replicasP0), + partitionAssignment(replicasP1) ); assertEquals(partitionAssignments, new TopicAssignment(partitionAssignments).assignments()); } @@ -44,14 +45,14 @@ public class TopicAssignmentTest { List<TopicAssignment> topicAssignments = Arrays.asList( new TopicAssignment( Arrays.asList( - new PartitionAssignment( + partitionAssignment( Arrays.asList(0, 1, 2) ) ) ), new TopicAssignment( Arrays.asList( - new PartitionAssignment( + partitionAssignment( Arrays.asList(1, 2, 0) ) )