This is an automated email from the ASF dual-hosted git repository. showuon pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 71a4e6fc0ce KAFKA-15140: improve TopicCommandIntegrationTest to be less flaky (#14891) 71a4e6fc0ce is described below commit 71a4e6fc0ce43e907c320ed5afca95b84620b0e8 Author: Owen Leung <owen.leu...@gmail.com> AuthorDate: Mon Feb 19 19:37:31 2024 +0800 KAFKA-15140: improve TopicCommandIntegrationTest to be less flaky (#14891) This PR improves TopicCommandIntegrationTest by : - using TestUtils.createTopicWithAdmin - replacing \n with lineSeperator - using waitForAllReassignmentsToComplete - adding more log when assertion fails Reviewers: Luke Chen <show...@gmail.com>, Justine Olshan <jols...@confluent.io> --- .../kafka/tools/TopicCommandIntegrationTest.java | 526 ++++++++++++--------- 1 file changed, 298 insertions(+), 228 deletions(-) diff --git a/tools/src/test/java/org/apache/kafka/tools/TopicCommandIntegrationTest.java b/tools/src/test/java/org/apache/kafka/tools/TopicCommandIntegrationTest.java index b934e04012c..fa1d8ea8c51 100644 --- a/tools/src/test/java/org/apache/kafka/tools/TopicCommandIntegrationTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/TopicCommandIntegrationTest.java @@ -18,6 +18,7 @@ package org.apache.kafka.tools; import kafka.admin.RackAwareTest; +import kafka.server.ControllerServer; import kafka.server.KafkaBroker; import kafka.server.KafkaConfig; import kafka.utils.Logging; @@ -28,7 +29,6 @@ import org.apache.kafka.clients.admin.AdminClientTestUtils; import org.apache.kafka.clients.admin.Config; import org.apache.kafka.clients.admin.ListPartitionReassignmentsResult; import org.apache.kafka.clients.admin.NewPartitionReassignment; -import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.PartitionReassignment; import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.common.Node; @@ -38,7 +38,6 @@ import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.config.TopicConfig; import org.apache.kafka.common.errors.ClusterAuthorizationException; -import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.errors.TopicExistsException; import org.apache.kafka.common.internals.Topic; import org.apache.kafka.common.message.MetadataResponseData; @@ -53,6 +52,8 @@ import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import scala.collection.JavaConverters; +import scala.collection.mutable.Buffer; +import scala.collection.Seq; import java.util.ArrayList; import java.util.Arrays; @@ -66,6 +67,7 @@ import java.util.Optional; import java.util.Properties; import java.util.Set; import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -82,13 +84,16 @@ import static org.mockito.Mockito.spy; @SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for usages of JavaConverters public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTestHarness implements Logging, RackAwareTest { private short defaultReplicationFactor = 1; - private int numPartitions = 1; + private int defaultNumPartitions = 1; private TopicCommand.TopicService topicService; private Admin adminClient; private String bootstrapServer; private String testTopicName; private long defaultTimeout = 10000; + private Buffer<KafkaBroker> scalaBrokers; + private Seq<ControllerServer> scalaControllers; + /** * Implementations must override this method to return a set of KafkaConfigs. This method will be invoked for every * test and should not reuse previous configurations unless they select their ports randomly when servers are started. @@ -107,7 +112,7 @@ public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTe rackInfo.put(5, "rack3"); List<Properties> brokerConfigs = ToolsTestUtils - .createBrokerProperties(6, zkConnectOrNull(), rackInfo, numPartitions, defaultReplicationFactor); + .createBrokerProperties(6, zkConnectOrNull(), rackInfo, defaultNumPartitions, defaultReplicationFactor); List<KafkaConfig> configs = new ArrayList<>(); for (Properties props : brokerConfigs) { @@ -124,19 +129,6 @@ public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTe return new TopicCommand.TopicCommandOptions(finalOptions); } - private void createAndWaitTopic(TopicCommand.TopicCommandOptions opts) throws Exception { - topicService.createTopic(opts); - waitForTopicCreated(opts.topic().get()); - } - - private void waitForTopicCreated(String topicName) { - waitForTopicCreated(topicName, defaultTimeout); - } - - private void waitForTopicCreated(String topicName, long timeout) { - TestUtils.waitForPartitionMetadata(brokers(), topicName, 0, timeout); - } - @BeforeEach public void setUp(TestInfo info) { super.setUp(info); @@ -147,7 +139,10 @@ public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTe adminClient = Admin.create(props); topicService = new TopicCommand.TopicService(props, Optional.of(bootstrapServer)); testTopicName = String.format("%s-%s", info.getTestMethod().get().getName(), TestUtils.randomString(10)); + scalaBrokers = brokers(); + scalaControllers = controllerServers(); } + @AfterEach public void close() throws Exception { if (topicService != null) @@ -159,47 +154,51 @@ public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTe @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) @ValueSource(strings = {"zk", "kraft"}) public void testCreate(String quorum) throws Exception { - createAndWaitTopic(buildTopicCommandOptionsWithBootstrap( - "--create", "--partitions", "2", "--replication-factor", "1", "--topic", testTopicName)); - - assertTrue(adminClient.listTopics().names().get().contains(testTopicName)); + TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, 2, 1, + scala.collection.immutable.Map$.MODULE$.empty(), new Properties() + ); + assertTrue(adminClient.listTopics().names().get().contains(testTopicName), + "Admin client didn't see the created topic. It saw: " + adminClient.listTopics().names().get()); } @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) @ValueSource(strings = {"zk", "kraft"}) public void testCreateWithDefaults(String quorum) throws Exception { - createAndWaitTopic(buildTopicCommandOptionsWithBootstrap("--create", "--topic", testTopicName)); - + TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, defaultNumPartitions, defaultReplicationFactor, + scala.collection.immutable.Map$.MODULE$.empty(), new Properties() + ); List<TopicPartitionInfo> partitions = adminClient .describeTopics(Collections.singletonList(testTopicName)) .allTopicNames() .get() .get(testTopicName) .partitions(); - assertEquals(numPartitions, partitions.size()); - assertEquals(defaultReplicationFactor, (short) partitions.get(0).replicas().size()); + assertEquals(defaultNumPartitions, partitions.size(), "Unequal partition size: " + partitions.size()); + assertEquals(defaultReplicationFactor, (short) partitions.get(0).replicas().size(), "Unequal replication factor: " + partitions.get(0).replicas().size()); } @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) @ValueSource(strings = {"zk", "kraft"}) public void testCreateWithDefaultReplication(String quorum) throws Exception { - createAndWaitTopic(buildTopicCommandOptionsWithBootstrap("--create", "--topic", testTopicName, "--partitions", "2")); - + TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, 2, defaultReplicationFactor, + scala.collection.immutable.Map$.MODULE$.empty(), new Properties() + ); List<TopicPartitionInfo> partitions = adminClient .describeTopics(Collections.singletonList(testTopicName)) .allTopicNames() .get() .get(testTopicName) .partitions(); - assertEquals(2, partitions.size()); - assertEquals(defaultReplicationFactor, (short) partitions.get(0).replicas().size()); + assertEquals(2, partitions.size(), "Unequal partition size: " + partitions.size()); + assertEquals(defaultReplicationFactor, (short) partitions.get(0).replicas().size(), "Unequal replication factor: " + partitions.get(0).replicas().size()); } @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) @ValueSource(strings = {"zk", "kraft"}) public void testCreateWithDefaultPartitions(String quorum) throws Exception { - createAndWaitTopic(buildTopicCommandOptionsWithBootstrap("--create", "--topic", testTopicName, "--replication-factor", "2")); - + TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, defaultNumPartitions, 2, + scala.collection.immutable.Map$.MODULE$.empty(), new Properties() + ); List<TopicPartitionInfo> partitions = adminClient .describeTopics(Collections.singletonList(testTopicName)) .allTopicNames() @@ -207,52 +206,65 @@ public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTe .get(testTopicName) .partitions(); - assertEquals(numPartitions, partitions.size()); - assertEquals(2, (short) partitions.get(0).replicas().size()); + assertEquals(defaultNumPartitions, partitions.size(), "Unequal partition size: " + partitions.size()); + assertEquals(2, (short) partitions.get(0).replicas().size(), "Partitions not replicated: " + partitions.get(0).replicas().size()); } @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) @ValueSource(strings = {"zk", "kraft"}) public void testCreateWithConfigs(String quorum) throws Exception { ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName); - createAndWaitTopic(buildTopicCommandOptionsWithBootstrap("--create", "--partitions", "2", "--replication-factor", "2", "--topic", testTopicName, "--config", - "delete.retention.ms=1000")); + Properties topicConfig = new Properties(); + topicConfig.put(TopicConfig.DELETE_RETENTION_MS_CONFIG, "1000"); + + TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, 2, 2, + scala.collection.immutable.Map$.MODULE$.empty(), topicConfig + ); Config configs = adminClient.describeConfigs(Collections.singleton(configResource)).all().get().get(configResource); - assertEquals(1000, Integer.valueOf(configs.get("delete.retention.ms").value())); + assertEquals(1000, Integer.valueOf(configs.get("delete.retention.ms").value()), + "Config not set correctly: " + configs.get("delete.retention.ms").value()); } @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) @ValueSource(strings = {"zk", "kraft"}) public void testCreateWhenAlreadyExists(String quorum) throws Exception { - int numPartitions = 1; - // create the topic TopicCommand.TopicCommandOptions createOpts = buildTopicCommandOptionsWithBootstrap( - "--create", "--partitions", Integer.toString(numPartitions), "--replication-factor", "1", + "--create", "--partitions", Integer.toString(defaultNumPartitions), "--replication-factor", "1", "--topic", testTopicName); - createAndWaitTopic(createOpts); + TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, defaultNumPartitions, defaultReplicationFactor, + scala.collection.immutable.Map$.MODULE$.empty(), new Properties() + ); // try to re-create the topic - assertThrows(TopicExistsException.class, () -> topicService.createTopic(createOpts)); + assertThrows(TopicExistsException.class, () -> topicService.createTopic(createOpts), + "Expected TopicExistsException to throw"); } @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) @ValueSource(strings = {"zk", "kraft"}) public void testCreateWhenAlreadyExistsWithIfNotExists(String quorum) throws Exception { + TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, defaultNumPartitions, defaultReplicationFactor, + scala.collection.immutable.Map$.MODULE$.empty(), new Properties() + ); TopicCommand.TopicCommandOptions createOpts = buildTopicCommandOptionsWithBootstrap("--create", "--topic", testTopicName, "--if-not-exists"); - createAndWaitTopic(createOpts); topicService.createTopic(createOpts); } @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) @ValueSource(strings = {"zk", "kraft"}) public void testCreateWithReplicaAssignment(String quorum) throws Exception { - // create the topic - TopicCommand.TopicCommandOptions createOpts = - buildTopicCommandOptionsWithBootstrap("--create", "--replica-assignment", "5:4,3:2,1:0", "--topic", testTopicName); - createAndWaitTopic(createOpts); + scala.collection.mutable.HashMap<Object, Seq<Object>> replicaAssignmentMap = new scala.collection.mutable.HashMap<>(); + + replicaAssignmentMap.put(0, JavaConverters.asScalaBufferConverter(Arrays.asList((Object) 5, (Object) 4)).asScala().toSeq()); + replicaAssignmentMap.put(1, JavaConverters.asScalaBufferConverter(Arrays.asList((Object) 3, (Object) 2)).asScala().toSeq()); + replicaAssignmentMap.put(2, JavaConverters.asScalaBufferConverter(Arrays.asList((Object) 1, (Object) 0)).asScala().toSeq()); + + TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, defaultNumPartitions, + defaultReplicationFactor, replicaAssignmentMap, new Properties() + ); List<TopicPartitionInfo> partitions = adminClient .describeTopics(Collections.singletonList(testTopicName)) @@ -261,10 +273,14 @@ public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTe .get(testTopicName) .partitions(); - assertEquals(3, partitions.size()); - assertEquals(Arrays.asList(5, 4), getPartitionReplicas(partitions, 0)); - assertEquals(Arrays.asList(3, 2), getPartitionReplicas(partitions, 1)); - assertEquals(Arrays.asList(1, 0), getPartitionReplicas(partitions, 2)); + assertEquals(3, partitions.size(), + "Unequal partition size: " + partitions.size()); + assertEquals(Arrays.asList(5, 4), getPartitionReplicas(partitions, 0), + "Unexpected replica assignment: " + getPartitionReplicas(partitions, 0)); + assertEquals(Arrays.asList(3, 2), getPartitionReplicas(partitions, 1), + "Unexpected replica assignment: " + getPartitionReplicas(partitions, 1)); + assertEquals(Arrays.asList(1, 0), getPartitionReplicas(partitions, 2), + "Unexpected replica assignment: " + getPartitionReplicas(partitions, 2)); } private List<Integer> getPartitionReplicas(List<TopicPartitionInfo> partitions, int partitionNumber) { @@ -276,7 +292,7 @@ public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTe public void testCreateWithInvalidReplicationFactor(String quorum) { TopicCommand.TopicCommandOptions opts = buildTopicCommandOptionsWithBootstrap("--create", "--partitions", "2", "--replication-factor", Integer.toString(Short.MAX_VALUE + 1), "--topic", testTopicName); - assertThrows(IllegalArgumentException.class, () -> topicService.createTopic(opts)); + assertThrows(IllegalArgumentException.class, () -> topicService.createTopic(opts), "Expected IllegalArgumentException to throw"); } @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) @@ -284,14 +300,14 @@ public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTe public void testCreateWithNegativeReplicationFactor(String quorum) { TopicCommand.TopicCommandOptions opts = buildTopicCommandOptionsWithBootstrap("--create", "--partitions", "2", "--replication-factor", "-1", "--topic", testTopicName); - assertThrows(IllegalArgumentException.class, () -> topicService.createTopic(opts)); + assertThrows(IllegalArgumentException.class, () -> topicService.createTopic(opts), "Expected IllegalArgumentException to throw"); } @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) @ValueSource(strings = {"zk", "kraft"}) public void testCreateWithNegativePartitionCount(String quorum) { TopicCommand.TopicCommandOptions opts = buildTopicCommandOptionsWithBootstrap("--create", "--partitions", "-1", "--replication-factor", "1", "--topic", testTopicName); - assertThrows(IllegalArgumentException.class, () -> topicService.createTopic(opts)); + assertThrows(IllegalArgumentException.class, () -> topicService.createTopic(opts), "Expected IllegalArgumentException to throw"); } @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) @@ -300,17 +316,18 @@ public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTe TopicCommand.TopicCommandOptions createOpts = buildTopicCommandOptionsWithBootstrap("--create", "--partitions", "1", "--replication-factor", "1", "--topic", testTopicName, "--config", "message.timestamp.type=boom"); - assertThrows(ConfigException.class, () -> topicService.createTopic(createOpts)); + assertThrows(ConfigException.class, () -> topicService.createTopic(createOpts), "Expected ConfigException to throw"); } @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) @ValueSource(strings = {"zk", "kraft"}) public void testListTopics(String quorum) throws Exception { - createAndWaitTopic(buildTopicCommandOptionsWithBootstrap( - "--create", "--partitions", "1", "--replication-factor", "1", "--topic", testTopicName)); + TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, defaultNumPartitions, defaultReplicationFactor, + scala.collection.immutable.Map$.MODULE$.empty(), new Properties() + ); String output = captureListTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--list")); - assertTrue(output.contains(testTopicName)); + assertTrue(output.contains(testTopicName), "Expected topic name to be present in output: " + output); } @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) @@ -319,107 +336,124 @@ public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTe String topic1 = "kafka.testTopic1"; String topic2 = "kafka.testTopic2"; String topic3 = "oooof.testTopic1"; - adminClient.createTopics( - Arrays.asList(new NewTopic(topic1, 2, (short) 2), - new NewTopic(topic2, 2, (short) 2), - new NewTopic(topic3, 2, (short) 2))) - .all().get(); - waitForTopicCreated(topic1); - waitForTopicCreated(topic2); - waitForTopicCreated(topic3); + int partition = 2; + short replicationFactor = 2; + TestUtils.createTopicWithAdmin(adminClient, topic1, scalaBrokers, scalaControllers, partition, replicationFactor, + scala.collection.immutable.Map$.MODULE$.empty(), new Properties() + ); + TestUtils.createTopicWithAdmin(adminClient, topic2, scalaBrokers, scalaControllers, partition, replicationFactor, + scala.collection.immutable.Map$.MODULE$.empty(), new Properties() + ); + TestUtils.createTopicWithAdmin(adminClient, topic3, scalaBrokers, scalaControllers, partition, replicationFactor, + scala.collection.immutable.Map$.MODULE$.empty(), new Properties() + ); String output = captureListTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--list", "--topic", "kafka.*")); - assertTrue(output.contains(topic1)); - assertTrue(output.contains(topic2)); - assertFalse(output.contains(topic3)); + assertTrue(output.contains(topic1), "Expected topic name " + topic1 + " to be present in output: " + output); + assertTrue(output.contains(topic2), "Expected topic name " + topic2 + " to be present in output: " + output); + assertFalse(output.contains(topic3), "Do not expect topic name " + topic3 + " to be present in output: " + output); } @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) @ValueSource(strings = {"zk", "kraft"}) public void testListTopicsWithExcludeInternal(String quorum) throws ExecutionException, InterruptedException { String topic1 = "kafka.testTopic1"; - adminClient.createTopics( - Arrays.asList(new NewTopic(topic1, 2, (short) 2), - new NewTopic(Topic.GROUP_METADATA_TOPIC_NAME, 2, (short) 2))) - .all().get(); - waitForTopicCreated(topic1); + String hiddenConsumerTopic = Topic.GROUP_METADATA_TOPIC_NAME; + int partition = 2; + short replicationFactor = 2; + TestUtils.createTopicWithAdmin(adminClient, topic1, scalaBrokers, scalaControllers, partition, replicationFactor, + scala.collection.immutable.Map$.MODULE$.empty(), new Properties() + ); + TestUtils.createTopicWithAdmin(adminClient, hiddenConsumerTopic, scalaBrokers, scalaControllers, partition, replicationFactor, + scala.collection.immutable.Map$.MODULE$.empty(), new Properties() + ); String output = captureListTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--list", "--exclude-internal")); - assertTrue(output.contains(topic1)); - assertFalse(output.contains(Topic.GROUP_METADATA_TOPIC_NAME)); + assertTrue(output.contains(topic1), "Expected topic name " + topic1 + " to be present in output: " + output); + assertFalse(output.contains(hiddenConsumerTopic), "Do not expect topic name " + hiddenConsumerTopic + " to be present in output: " + output); } @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) @ValueSource(strings = {"zk", "kraft"}) public void testAlterPartitionCount(String quorum) throws ExecutionException, InterruptedException { - adminClient.createTopics( - Arrays.asList(new NewTopic(testTopicName, 2, (short) 2))).all().get(); - waitForTopicCreated(testTopicName); - + int partition = 2; + short replicationFactor = 2; + TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, partition, replicationFactor, + scala.collection.immutable.Map$.MODULE$.empty(), new Properties() + ); topicService.alterTopic(buildTopicCommandOptionsWithBootstrap("--alter", "--topic", testTopicName, "--partitions", "3")); + TestUtils.waitForAllReassignmentsToComplete(adminClient, 100L); kafka.utils.TestUtils.waitUntilTrue( () -> brokers().forall(b -> b.metadataCache().getTopicPartitions(testTopicName).size() == 3), () -> "Timeout waiting for new assignment propagating to broker", org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS, 100L); TopicDescription topicDescription = adminClient.describeTopics(Collections.singletonList(testTopicName)).topicNameValues().get(testTopicName).get(); - assertEquals(3, topicDescription.partitions().size()); + assertEquals(3, topicDescription.partitions().size(), "Expected partition count to be 3. Got: " + topicDescription.partitions().size()); } @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) @ValueSource(strings = {"zk", "kraft"}) public void testAlterAssignment(String quorum) throws ExecutionException, InterruptedException { - adminClient.createTopics( - Collections.singletonList(new NewTopic(testTopicName, 2, (short) 2))).all().get(); - waitForTopicCreated(testTopicName); - + int partition = 2; + short replicationFactor = 2; + TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, partition, replicationFactor, + scala.collection.immutable.Map$.MODULE$.empty(), new Properties() + ); topicService.alterTopic(buildTopicCommandOptionsWithBootstrap("--alter", "--topic", testTopicName, "--replica-assignment", "5:3,3:1,4:2", "--partitions", "3")); + + TestUtils.waitForAllReassignmentsToComplete(adminClient, 100L); kafka.utils.TestUtils.waitUntilTrue( () -> brokers().forall(b -> b.metadataCache().getTopicPartitions(testTopicName).size() == 3), () -> "Timeout waiting for new assignment propagating to broker", org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS, 100L); TopicDescription topicDescription = adminClient.describeTopics(Collections.singletonList(testTopicName)).topicNameValues().get(testTopicName).get(); - assertTrue(topicDescription.partitions().size() == 3); + assertTrue(topicDescription.partitions().size() == 3, "Expected partition count to be 3. Got: " + topicDescription.partitions().size()); List<Integer> partitionReplicas = getPartitionReplicas(topicDescription.partitions(), 2); - assertEquals(Arrays.asList(4, 2), partitionReplicas); + assertEquals(Arrays.asList(4, 2), partitionReplicas, "Expected to have replicas 4,2. Got: " + partitionReplicas); } @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) @ValueSource(strings = {"zk", "kraft"}) public void testAlterAssignmentWithMoreAssignmentThanPartitions(String quorum) throws ExecutionException, InterruptedException { - adminClient.createTopics( - Arrays.asList(new NewTopic(testTopicName, 2, (short) 2))).all().get(); - waitForTopicCreated(testTopicName); - + int partition = 2; + short replicationFactor = 2; + TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, partition, replicationFactor, + scala.collection.immutable.Map$.MODULE$.empty(), new Properties() + ); assertThrows(ExecutionException.class, () -> topicService.alterTopic(buildTopicCommandOptionsWithBootstrap("--alter", - "--topic", testTopicName, "--replica-assignment", "5:3,3:1,4:2,3:2", "--partitions", "3"))); + "--topic", testTopicName, "--replica-assignment", "5:3,3:1,4:2,3:2", "--partitions", "3")), + "Expected to fail with ExecutionException"); } @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) @ValueSource(strings = {"zk", "kraft"}) public void testAlterAssignmentWithMorePartitionsThanAssignment(String quorum) throws ExecutionException, InterruptedException { - adminClient.createTopics( - Arrays.asList(new NewTopic(testTopicName, 2, (short) 2))).all().get(); - waitForTopicCreated(testTopicName); + int partition = 2; + short replicationFactor = 2; + TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, partition, replicationFactor, + scala.collection.immutable.Map$.MODULE$.empty(), new Properties() + ); assertThrows(ExecutionException.class, () -> topicService.alterTopic(buildTopicCommandOptionsWithBootstrap("--alter", "--topic", testTopicName, - "--replica-assignment", "5:3,3:1,4:2", "--partitions", "6"))); + "--replica-assignment", "5:3,3:1,4:2", "--partitions", "6")), + "Expected to fail with ExecutionException"); } @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) @ValueSource(strings = {"zk", "kraft"}) public void testAlterWithInvalidPartitionCount(String quorum) throws Exception { - createAndWaitTopic( - buildTopicCommandOptionsWithBootstrap("--create", "--partitions", "1", "--replication-factor", "1", "--topic", testTopicName) + TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, defaultNumPartitions, defaultReplicationFactor, + scala.collection.immutable.Map$.MODULE$.empty(), new Properties() ); - assertThrows(ExecutionException.class, - () -> topicService.alterTopic(buildTopicCommandOptionsWithBootstrap("--alter", "--partitions", "-1", "--topic", testTopicName))); + () -> topicService.alterTopic(buildTopicCommandOptionsWithBootstrap("--alter", "--partitions", "-1", "--topic", testTopicName)), + "Expected to fail with ExecutionException"); } @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) @@ -428,7 +462,7 @@ public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTe // alter a topic that does not exist without --if-exists TopicCommand.TopicCommandOptions alterOpts = buildTopicCommandOptionsWithBootstrap("--alter", "--topic", testTopicName, "--partitions", "1"); TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient); - assertThrows(IllegalArgumentException.class, () -> topicService.alterTopic(alterOpts)); + assertThrows(IllegalArgumentException.class, () -> topicService.alterTopic(alterOpts), "Expected to fail with IllegalArgumentException"); } @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) @@ -450,11 +484,9 @@ public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTe int numPartitions = 18; int replicationFactor = 3; - TopicCommand.TopicCommandOptions createOpts = buildTopicCommandOptionsWithBootstrap("--create", - "--partitions", Integer.toString(numPartitions), - "--replication-factor", Integer.toString(replicationFactor), - "--topic", testTopicName); - createAndWaitTopic(createOpts); + TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, numPartitions, replicationFactor, + scala.collection.immutable.Map$.MODULE$.empty(), new Properties() + ); Map<Integer, List<Integer>> assignment = adminClient.describeTopics(Collections.singletonList(testTopicName)) .allTopicNames().get().get(testTopicName).partitions() @@ -472,6 +504,7 @@ public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTe "--topic", testTopicName); topicService.alterTopic(alterOpts); + TestUtils.waitForAllReassignmentsToComplete(adminClient, 100L); kafka.utils.TestUtils.waitUntilTrue( () -> brokers().forall(p -> p.metadataCache().getTopicPartitions(testTopicName).size() == alteredNumPartitions), () -> "Timeout waiting for new assignment propagating to broker", @@ -487,22 +520,18 @@ public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTe @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) @ValueSource(strings = {"zk", "kraft"}) public void testConfigPreservationAcrossPartitionAlteration(String quorum) throws Exception { - int numPartitionsOriginal = 1; - String cleanupKey = "cleanup.policy"; - String cleanupVal = "compact"; + String cleanUpPolicy = "compact"; + Properties topicConfig = new Properties(); + topicConfig.put(TopicConfig.CLEANUP_POLICY_CONFIG, cleanUpPolicy); + TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, defaultNumPartitions, defaultReplicationFactor, + scala.collection.immutable.Map$.MODULE$.empty(), topicConfig + ); - // create the topic - TopicCommand.TopicCommandOptions createOpts = buildTopicCommandOptionsWithBootstrap("--create", - "--partitions", Integer.toString(numPartitionsOriginal), - "--replication-factor", "1", - "--config", cleanupKey + "=" + cleanupVal, - "--topic", testTopicName); - createAndWaitTopic(createOpts); ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName); Config props = adminClient.describeConfigs(Collections.singleton(configResource)).all().get().get(configResource); // val props = adminZkClient.fetchEntityConfig(ConfigType.Topic, testTopicName) - assertNotNull(props.get(cleanupKey), "Properties after creation don't contain " + cleanupKey); - assertEquals(cleanupVal, props.get(cleanupKey).value(), "Properties after creation have incorrect value"); + assertNotNull(props.get(TopicConfig.CLEANUP_POLICY_CONFIG), "Properties after creation don't contain " + cleanUpPolicy); + assertEquals(cleanUpPolicy, props.get(TopicConfig.CLEANUP_POLICY_CONFIG).value(), "Properties after creation have incorrect value"); // pre-create the topic config changes path to avoid a NoNodeException if (!isKRaftTest()) { @@ -514,21 +543,20 @@ public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTe TopicCommand.TopicCommandOptions alterOpts = buildTopicCommandOptionsWithBootstrap("--alter", "--partitions", Integer.toString(numPartitionsModified), "--topic", testTopicName); topicService.alterTopic(alterOpts); + + TestUtils.waitForAllReassignmentsToComplete(adminClient, 100L); Config newProps = adminClient.describeConfigs(Collections.singleton(configResource)).all().get().get(configResource); - assertNotNull(newProps.get(cleanupKey), "Updated properties do not contain " + cleanupKey); - assertEquals(cleanupVal, newProps.get(cleanupKey).value(), "Updated properties have incorrect value"); + assertNotNull(newProps.get(TopicConfig.CLEANUP_POLICY_CONFIG), "Updated properties do not contain " + TopicConfig.CLEANUP_POLICY_CONFIG); + assertEquals(cleanUpPolicy, newProps.get(TopicConfig.CLEANUP_POLICY_CONFIG).value(), "Updated properties have incorrect value"); } @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) @ValueSource(strings = {"zk", "kraft"}) public void testTopicDeletion(String quorum) throws Exception { // create the NormalTopic - TopicCommand.TopicCommandOptions createOpts = buildTopicCommandOptionsWithBootstrap("--create", - "--partitions", "1", - "--replication-factor", "1", - "--topic", testTopicName); - createAndWaitTopic(createOpts); - + TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, defaultNumPartitions, defaultReplicationFactor, + scala.collection.immutable.Map$.MODULE$.empty(), new Properties() + ); // delete the NormalTopic TopicCommand.TopicCommandOptions deleteOpts = buildTopicCommandOptionsWithBootstrap("--delete", "--topic", testTopicName); @@ -545,12 +573,9 @@ public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTe public void testTopicWithCollidingCharDeletionAndCreateAgain(String quorum) throws Exception { // create the topic with colliding chars String topicWithCollidingChar = "test.a"; - TopicCommand.TopicCommandOptions createOpts = buildTopicCommandOptionsWithBootstrap("--create", - "--partitions", "1", - "--replication-factor", "1", - "--topic", topicWithCollidingChar); - createAndWaitTopic(createOpts); - + TestUtils.createTopicWithAdmin(adminClient, topicWithCollidingChar, scalaBrokers, scalaControllers, defaultNumPartitions, defaultReplicationFactor, + scala.collection.immutable.Map$.MODULE$.empty(), new Properties() + ); // delete the topic TopicCommand.TopicCommandOptions deleteOpts = buildTopicCommandOptionsWithBootstrap("--delete", "--topic", topicWithCollidingChar); @@ -560,19 +585,18 @@ public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTe } topicService.deleteTopic(deleteOpts); TestUtils.verifyTopicDeletion(zkClientOrNull(), topicWithCollidingChar, 1, brokers()); - assertDoesNotThrow(() -> createAndWaitTopic(createOpts)); + assertDoesNotThrow(() -> TestUtils.createTopicWithAdmin(adminClient, topicWithCollidingChar, scalaBrokers, scalaControllers, defaultNumPartitions, defaultReplicationFactor, + scala.collection.immutable.Map$.MODULE$.empty(), new Properties() + ), "Should be able to create a topic with colliding chars after deletion."); } @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) @ValueSource(strings = {"zk", "kraft"}) public void testDeleteInternalTopic(String quorum) throws Exception { // create the offset topic - TopicCommand.TopicCommandOptions createOffsetTopicOpts = buildTopicCommandOptionsWithBootstrap("--create", - "--partitions", "1", - "--replication-factor", "1", - "--topic", Topic.GROUP_METADATA_TOPIC_NAME); - createAndWaitTopic(createOffsetTopicOpts); - + TestUtils.createTopicWithAdmin(adminClient, Topic.GROUP_METADATA_TOPIC_NAME, scalaBrokers, scalaControllers, defaultNumPartitions, defaultReplicationFactor, + scala.collection.immutable.Map$.MODULE$.empty(), new Properties() + ); // Try to delete the Topic.GROUP_METADATA_TOPIC_NAME which is allowed by default. // This is a difference between the new and the old command as the old one didn't allow internal topic deletion. // If deleting internal topics is not desired, ACLS should be used to control it. @@ -583,7 +607,7 @@ public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTe assertFalse(zkClient().pathExists(deleteOffsetTopicPath), "Delete path for topic shouldn't exist before deletion."); } topicService.deleteTopic(deleteOffsetTopicOpts); - TestUtils.verifyTopicDeletion(zkClientOrNull(), Topic.GROUP_METADATA_TOPIC_NAME, 1, brokers()); + TestUtils.verifyTopicDeletion(zkClientOrNull(), Topic.GROUP_METADATA_TOPIC_NAME, defaultNumPartitions, brokers()); } @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) @@ -591,7 +615,8 @@ public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTe public void testDeleteWhenTopicDoesntExist(String quorum) { // delete a topic that does not exist TopicCommand.TopicCommandOptions deleteOpts = buildTopicCommandOptionsWithBootstrap("--delete", "--topic", testTopicName); - assertThrows(IllegalArgumentException.class, () -> topicService.deleteTopic(deleteOpts)); + assertThrows(IllegalArgumentException.class, () -> topicService.deleteTopic(deleteOpts), + "Expected an exception when trying to delete a topic that does not exist."); } @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) @@ -603,21 +628,23 @@ public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTe @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) @ValueSource(strings = {"zk", "kraft"}) public void testDescribe(String quorum) throws ExecutionException, InterruptedException { - adminClient.createTopics( - Collections.singletonList(new NewTopic(testTopicName, 2, (short) 2))).all().get(); - waitForTopicCreated(testTopicName); - + int partition = 2; + short replicationFactor = 2; + TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, partition, replicationFactor, + scala.collection.immutable.Map$.MODULE$.empty(), new Properties() + ); String output = captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe", "--topic", testTopicName)); - String[] rows = output.split("\n"); - assertEquals(3, rows.length); - assertTrue(rows[0].startsWith(String.format("Topic: %s", testTopicName))); + String[] rows = output.split(System.lineSeparator()); + assertEquals(3, rows.length, "Expected 3 rows in output, got " + rows.length); + assertTrue(rows[0].startsWith(String.format("Topic: %s", testTopicName)), "Row does not start with " + testTopicName + ". Row is: " + rows[0]); } @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) @ValueSource(strings = {"zk", "kraft"}) public void testDescribeWhenTopicDoesntExist(String quorum) { assertThrows(IllegalArgumentException.class, - () -> topicService.describeTopic(buildTopicCommandOptionsWithBootstrap("--describe", "--topic", testTopicName))); + () -> topicService.describeTopic(buildTopicCommandOptionsWithBootstrap("--describe", "--topic", testTopicName)), + "Expected an exception when trying to describe a topic that does not exist."); } @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) @@ -629,10 +656,11 @@ public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTe @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) @ValueSource(strings = {"zk", "kraft"}) public void testDescribeUnavailablePartitions(String quorum) throws ExecutionException, InterruptedException { - adminClient.createTopics( - Collections.singletonList(new NewTopic(testTopicName, 6, (short) 1))).all().get(); - waitForTopicCreated(testTopicName); - + int partitions = 6; + short replicationFactor = 1; + TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, partitions, replicationFactor, + scala.collection.immutable.Map$.MODULE$.empty(), new Properties() + ); try { // check which partition is on broker 0 which we'll kill TopicDescription testTopicDescription = adminClient.describeTopics(Collections.singletonList(testTopicName)) @@ -678,9 +706,11 @@ public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTe // grab the console output and assert String output = captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe", "--topic", testTopicName, "--unavailable-partitions")); - String[] rows = output.split("\n"); - assertTrue(rows[0].startsWith(String.format("Topic: %s", testTopicName))); - assertTrue(rows[0].contains("Leader: none\tReplicas: 0\tIsr:")); + String[] rows = output.split(System.lineSeparator()); + assertTrue(rows[0].startsWith(String.format("Topic: %s", testTopicName)), + "Unexpected Topic " + rows[0] + " received. Expect " + String.format("Topic: %s", testTopicName)); + assertTrue(rows[0].contains("Leader: none\tReplicas: 0\tIsr:"), + "Rows did not contain 'Leader: none\tReplicas: 0\tIsr:'"); } finally { restartDeadBrokers(false); } @@ -689,10 +719,11 @@ public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTe @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) @ValueSource(strings = {"zk", "kraft"}) public void testDescribeUnderReplicatedPartitions(String quorum) throws ExecutionException, InterruptedException { - adminClient.createTopics( - Collections.singletonList(new NewTopic(testTopicName, 1, (short) 6))).all().get(); - waitForTopicCreated(testTopicName); - + int partitions = 1; + short replicationFactor = 6; + TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, partitions, replicationFactor, + scala.collection.immutable.Map$.MODULE$.empty(), new Properties() + ); try { killBroker(0); if (isKRaftTest()) { @@ -701,7 +732,7 @@ public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTe TestUtils.waitForPartitionMetadata(aliveBrokers(), testTopicName, 0, org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS); } String output = captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe", "--under-replicated-partitions")); - String[] rows = output.split("\n"); + String[] rows = output.split(System.lineSeparator()); assertTrue(rows[0].startsWith(String.format("Topic: %s", testTopicName)), String.format("Unexpected output: %s", rows[0])); } finally { restartDeadBrokers(false); @@ -711,13 +742,13 @@ public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTe @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) @ValueSource(strings = {"zk", "kraft"}) public void testDescribeUnderMinIsrPartitions(String quorum) throws ExecutionException, InterruptedException { - Map<String, String> configMap = new HashMap<>(); - configMap.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "6"); - - adminClient.createTopics( - Collections.singletonList(new NewTopic(testTopicName, 1, (short) 6).configs(configMap))).all().get(); - waitForTopicCreated(testTopicName); - + Properties topicConfig = new Properties(); + topicConfig.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "6"); + int partitions = 1; + short replicationFactor = 6; + TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, partitions, replicationFactor, + scala.collection.immutable.Map$.MODULE$.empty(), topicConfig + ); try { killBroker(0); if (isKRaftTest()) { @@ -730,8 +761,9 @@ public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTe ); } String output = captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe", "--under-min-isr-partitions")); - String[] rows = output.split("\n"); - assertTrue(rows[0].startsWith(String.format("Topic: %s", testTopicName))); + String[] rows = output.split(System.lineSeparator()); + assertTrue(rows[0].startsWith(String.format("Topic: %s", testTopicName)), + "Unexpected topic: " + rows[0]); } finally { restartDeadBrokers(false); } @@ -740,15 +772,11 @@ public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTe @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) @ValueSource(strings = {"zk", "kraft"}) public void testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress(String quorum) throws ExecutionException, InterruptedException { - Map<String, String> configMap = new HashMap<>(); - short replicationFactor = 1; - int partitions = 1; TopicPartition tp = new TopicPartition(testTopicName, 0); - adminClient.createTopics( - Collections.singletonList(new NewTopic(testTopicName, partitions, replicationFactor).configs(configMap)) - ).all().get(); - waitForTopicCreated(testTopicName); + TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, defaultNumPartitions, defaultReplicationFactor, + scala.collection.immutable.Map$.MODULE$.empty(), new Properties() + ); // Produce multiple batches. TestUtils.generateAndProduceMessages(brokers(), testTopicName, 10, -1); @@ -785,19 +813,36 @@ public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTe () -> "Reassignment didn't add the second node", org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS, 100L); + ensureConsistentKRaftMetadata(); + // describe the topic and test if it's under-replicated String simpleDescribeOutput = captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe", "--topic", testTopicName)); - String[] simpleDescribeOutputRows = simpleDescribeOutput.split("\n"); - assertTrue(simpleDescribeOutputRows[0].startsWith(String.format("Topic: %s", testTopicName))); - assertEquals(2, simpleDescribeOutputRows.length); + String[] simpleDescribeOutputRows = simpleDescribeOutput.split(System.lineSeparator()); + assertTrue(simpleDescribeOutputRows[0].startsWith(String.format("Topic: %s", testTopicName)), + "Unexpected describe output: " + simpleDescribeOutputRows[0]); + assertEquals(2, simpleDescribeOutputRows.length, + "Unexpected describe output length: " + simpleDescribeOutputRows.length); String underReplicatedOutput = captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe", "--under-replicated-partitions")); assertEquals("", underReplicatedOutput, String.format("--under-replicated-partitions shouldn't return anything: '%s'", underReplicatedOutput)); - // Verify reassignment is still ongoing. - PartitionReassignment reassignments = adminClient.listPartitionReassignments(Collections.singleton(tp)).reassignments().get().get(tp); - assertFalse(reassignments.addingReplicas().isEmpty()); + int maxRetries = 20; + long pause = 100L; + long waitTimeMs = maxRetries * pause; + AtomicReference<PartitionReassignment> reassignmentsRef = new AtomicReference<>(); + + TestUtils.waitUntilTrue(() -> { + try { + PartitionReassignment tempReassignments = adminClient.listPartitionReassignments(Collections.singleton(tp)).reassignments().get().get(tp); + reassignmentsRef.set(tempReassignments); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException("Error while fetching reassignments", e); + } + return reassignmentsRef.get() != null; + }, () -> "Reassignments did not become non-null within the specified time", waitTimeMs, pause); + + assertFalse(reassignmentsRef.get().addingReplicas().isEmpty()); ToolsTestUtils.removeReplicationThrottleForPartitions(adminClient, brokerIds, Collections.singleton(tp)); TestUtils.waitForAllReassignmentsToComplete(adminClient, 100L); @@ -806,13 +851,14 @@ public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTe @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) @ValueSource(strings = {"zk", "kraft"}) public void testDescribeAtMinIsrPartitions(String quorum) throws ExecutionException, InterruptedException { - Map<String, String> configMap = new HashMap<>(); - configMap.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "4"); - - adminClient.createTopics( - Collections.singletonList(new NewTopic(testTopicName, 1, (short) 6).configs(configMap))).all().get(); - waitForTopicCreated(testTopicName); + Properties topicConfig = new Properties(); + topicConfig.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "4"); + int partitions = 1; + short replicationFactor = 6; + TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, partitions, replicationFactor, + scala.collection.immutable.Map$.MODULE$.empty(), topicConfig + ); try { killBroker(0); killBroker(1); @@ -828,8 +874,9 @@ public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTe } String output = captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe", "--at-min-isr-partitions")); - String[] rows = output.split("\n"); - assertTrue(rows[0].startsWith(String.format("Topic: %s", testTopicName))); + String[] rows = output.split(System.lineSeparator()); + assertTrue(rows[0].startsWith(String.format("Topic: %s", testTopicName)), + "Unexpected output: " + rows[0]); assertEquals(1, rows.length); } finally { restartDeadBrokers(false); @@ -853,21 +900,32 @@ public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTe String offlineTopic = "offline-topic"; String fullyReplicatedTopic = "fully-replicated-topic"; - Map<String, String> configMap = new HashMap<>(); - configMap.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "6"); + scala.collection.mutable.HashMap<Object, Seq<Object>> fullyReplicatedReplicaAssignmentMap = new scala.collection.mutable.HashMap<>(); + fullyReplicatedReplicaAssignmentMap.put(0, JavaConverters.asScalaBufferConverter(Arrays.asList((Object) 1, (Object) 2, (Object) 3)).asScala().toSeq()); - adminClient.createTopics( - java.util.Arrays.asList( - new NewTopic(underMinIsrTopic, 1, (short) 6).configs(configMap), - new NewTopic(notUnderMinIsrTopic, 1, (short) 6), - new NewTopic(offlineTopic, Collections.singletonMap(0, Collections.singletonList(0))), - new NewTopic(fullyReplicatedTopic, Collections.singletonMap(0, java.util.Arrays.asList(1, 2, 3)))) - ).all().get(); + scala.collection.mutable.HashMap<Object, Seq<Object>> offlineReplicaAssignmentMap = new scala.collection.mutable.HashMap<>(); + offlineReplicaAssignmentMap.put(0, JavaConverters.asScalaBufferConverter(Arrays.asList((Object) 0)).asScala().toSeq()); + + Properties topicConfig = new Properties(); + topicConfig.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "6"); + + int partitions = 1; + short replicationFactor = 6; + int negativePartition = -1; + short negativeReplicationFactor = -1; + TestUtils.createTopicWithAdmin(adminClient, underMinIsrTopic, scalaBrokers, scalaControllers, partitions, replicationFactor, + scala.collection.immutable.Map$.MODULE$.empty(), topicConfig + ); + TestUtils.createTopicWithAdmin(adminClient, notUnderMinIsrTopic, scalaBrokers, scalaControllers, partitions, replicationFactor, + scala.collection.immutable.Map$.MODULE$.empty(), new Properties() + ); + TestUtils.createTopicWithAdmin(adminClient, offlineTopic, scalaBrokers, scalaControllers, negativePartition, negativeReplicationFactor, + offlineReplicaAssignmentMap, new Properties() + ); + TestUtils.createTopicWithAdmin(adminClient, fullyReplicatedTopic, scalaBrokers, scalaControllers, negativePartition, negativeReplicationFactor, + fullyReplicatedReplicaAssignmentMap, new Properties() + ); - waitForTopicCreated(underMinIsrTopic); - waitForTopicCreated(notUnderMinIsrTopic); - waitForTopicCreated(offlineTopic); - waitForTopicCreated(fullyReplicatedTopic); try { killBroker(0); @@ -881,10 +939,13 @@ public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTe () -> "Timeout waiting for partition metadata propagating to brokers for underMinIsrTopic topic", org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS, 100L); } + TestUtils.waitForAllReassignmentsToComplete(adminClient, 100L); String output = captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe", "--under-min-isr-partitions")); - String[] rows = output.split("\n"); - assertTrue(rows[0].startsWith(String.format("Topic: %s", underMinIsrTopic))); - assertTrue(rows[1].startsWith(String.format("\tTopic: %s", offlineTopic))); + String[] rows = output.split(System.lineSeparator()); + assertTrue(rows[0].startsWith(String.format("Topic: %s", underMinIsrTopic)), + "Unexpected output: " + rows[0]); + assertTrue(rows[1].startsWith(String.format("\tTopic: %s", offlineTopic)), + "Unexpected output: " + rows[1]); assertEquals(2, rows.length); } finally { restartDeadBrokers(false); @@ -895,8 +956,14 @@ public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTe @ValueSource(strings = {"zk", "kraft"}) public void testDescribeReportOverriddenConfigs(String quorum) throws Exception { String config = "file.delete.delay.ms=1000"; - createAndWaitTopic(buildTopicCommandOptionsWithBootstrap("--create", "--partitions", "2", - "--replication-factor", "2", "--topic", testTopicName, "--config", config)); + Properties topicConfig = new Properties(); + topicConfig.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, "1000"); + + int partitions = 2; + short replicationFactor = 2; + TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, partitions, replicationFactor, + scala.collection.immutable.Map$.MODULE$.empty(), topicConfig + ); String output = captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe")); assertTrue(output.contains(config), String.format("Describe output should have contained %s", config)); } @@ -904,22 +971,24 @@ public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTe @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) @ValueSource(strings = {"zk", "kraft"}) public void testDescribeAndListTopicsWithoutInternalTopics(String quorum) throws Exception { - createAndWaitTopic( - buildTopicCommandOptionsWithBootstrap("--create", "--partitions", "1", "--replication-factor", "1", "--topic", testTopicName)); - // create a internal topic - createAndWaitTopic( - buildTopicCommandOptionsWithBootstrap("--create", "--partitions", "1", "--replication-factor", "1", "--topic", Topic.GROUP_METADATA_TOPIC_NAME)); - + TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, defaultNumPartitions, defaultReplicationFactor, + scala.collection.immutable.Map$.MODULE$.empty(), new Properties() + ); + TestUtils.createTopicWithAdmin(adminClient, Topic.GROUP_METADATA_TOPIC_NAME, scalaBrokers, scalaControllers, defaultNumPartitions, defaultReplicationFactor, + scala.collection.immutable.Map$.MODULE$.empty(), new Properties() + ); // test describe String output = captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe", "--describe", "--exclude-internal")); assertTrue(output.contains(testTopicName), String.format("Output should have contained %s", testTopicName)); - assertFalse(output.contains(Topic.GROUP_METADATA_TOPIC_NAME)); + assertFalse(output.contains(Topic.GROUP_METADATA_TOPIC_NAME), + "Output should not have contained " + Topic.GROUP_METADATA_TOPIC_NAME); // test list output = captureListTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--list", "--exclude-internal")); - assertTrue(output.contains(testTopicName)); - assertFalse(output.contains(Topic.GROUP_METADATA_TOPIC_NAME)); + assertTrue(output.contains(testTopicName), String.format("Output should have contained %s", testTopicName)); + assertFalse(output.contains(Topic.GROUP_METADATA_TOPIC_NAME), + "Output should not have contained " + Topic.GROUP_METADATA_TOPIC_NAME); } @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) @@ -936,13 +1005,12 @@ public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTe topicService = new TopicCommand.TopicService(adminClient); - adminClient.createTopics( - Collections.singletonList(new NewTopic(testTopicName, 1, (short) 1)) - ).all().get(); - waitForTopicCreated(testTopicName); + TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, defaultNumPartitions, defaultReplicationFactor, + scala.collection.immutable.Map$.MODULE$.empty(), new Properties() + ); String output = captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe", "--topic", testTopicName)); - String[] rows = output.split("\n"); + String[] rows = output.split(System.lineSeparator()); assertEquals(2, rows.length, "Unexpected output: " + output); assertTrue(rows[0].startsWith(String.format("Topic: %s", testTopicName)), "Unexpected output: " + rows[0]); } @@ -950,12 +1018,14 @@ public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTe @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) @ValueSource(strings = {"zk", "kraft"}) public void testCreateWithTopicNameCollision(String quorum) throws ExecutionException, InterruptedException { - adminClient.createTopics( - Collections.singletonList(new NewTopic("foo_bar", 1, (short) 6))).all().get(); - waitForTopicCreated("foo_bar"); - - assertThrows(InvalidTopicException.class, - () -> topicService.createTopic(buildTopicCommandOptionsWithBootstrap("--create", "--topic", "foo.bar"))); + String topic = "foo_bar"; + int partitions = 1; + short replicationFactor = 6; + TestUtils.createTopicWithAdmin(adminClient, topic, scalaBrokers, scalaControllers, partitions, replicationFactor, + scala.collection.immutable.Map$.MODULE$.empty(), new Properties() + ); + assertThrows(TopicExistsException.class, + () -> topicService.createTopic(buildTopicCommandOptionsWithBootstrap("--create", "--topic", topic))); } private void checkReplicaDistribution(Map<Integer, List<Integer>> assignment,