This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 71a4e6fc0ce KAFKA-15140: improve TopicCommandIntegrationTest to be 
less flaky (#14891)
71a4e6fc0ce is described below

commit 71a4e6fc0ce43e907c320ed5afca95b84620b0e8
Author: Owen Leung <owen.leu...@gmail.com>
AuthorDate: Mon Feb 19 19:37:31 2024 +0800

    KAFKA-15140: improve TopicCommandIntegrationTest to be less flaky (#14891)
    
    This PR improves TopicCommandIntegrationTest by :
        - using TestUtils.createTopicWithAdmin
        - replacing \n with lineSeperator
        - using waitForAllReassignmentsToComplete
        - adding more log when assertion fails
    
    Reviewers: Luke Chen <show...@gmail.com>, Justine Olshan 
<jols...@confluent.io>
---
 .../kafka/tools/TopicCommandIntegrationTest.java   | 526 ++++++++++++---------
 1 file changed, 298 insertions(+), 228 deletions(-)

diff --git 
a/tools/src/test/java/org/apache/kafka/tools/TopicCommandIntegrationTest.java 
b/tools/src/test/java/org/apache/kafka/tools/TopicCommandIntegrationTest.java
index b934e04012c..fa1d8ea8c51 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/TopicCommandIntegrationTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/TopicCommandIntegrationTest.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.tools;
 
 import kafka.admin.RackAwareTest;
+import kafka.server.ControllerServer;
 import kafka.server.KafkaBroker;
 import kafka.server.KafkaConfig;
 import kafka.utils.Logging;
@@ -28,7 +29,6 @@ import org.apache.kafka.clients.admin.AdminClientTestUtils;
 import org.apache.kafka.clients.admin.Config;
 import org.apache.kafka.clients.admin.ListPartitionReassignmentsResult;
 import org.apache.kafka.clients.admin.NewPartitionReassignment;
-import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.admin.PartitionReassignment;
 import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.common.Node;
@@ -38,7 +38,6 @@ import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.errors.ClusterAuthorizationException;
-import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.TopicExistsException;
 import org.apache.kafka.common.internals.Topic;
 import org.apache.kafka.common.message.MetadataResponseData;
@@ -53,6 +52,8 @@ import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 import scala.collection.JavaConverters;
+import scala.collection.mutable.Buffer;
+import scala.collection.Seq;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -66,6 +67,7 @@ import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -82,13 +84,16 @@ import static org.mockito.Mockito.spy;
 @SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for 
usages of JavaConverters
 public class TopicCommandIntegrationTest extends 
kafka.integration.KafkaServerTestHarness implements Logging, RackAwareTest {
     private short defaultReplicationFactor = 1;
-    private int numPartitions = 1;
+    private int defaultNumPartitions = 1;
     private TopicCommand.TopicService topicService;
     private Admin adminClient;
     private String bootstrapServer;
     private String testTopicName;
     private long defaultTimeout = 10000;
 
+    private Buffer<KafkaBroker> scalaBrokers;
+    private Seq<ControllerServer> scalaControllers;
+
     /**
      * Implementations must override this method to return a set of 
KafkaConfigs. This method will be invoked for every
      * test and should not reuse previous configurations unless they select 
their ports randomly when servers are started.
@@ -107,7 +112,7 @@ public class TopicCommandIntegrationTest extends 
kafka.integration.KafkaServerTe
         rackInfo.put(5, "rack3");
 
         List<Properties> brokerConfigs = ToolsTestUtils
-            .createBrokerProperties(6, zkConnectOrNull(), rackInfo, 
numPartitions, defaultReplicationFactor);
+            .createBrokerProperties(6, zkConnectOrNull(), rackInfo, 
defaultNumPartitions, defaultReplicationFactor);
 
         List<KafkaConfig> configs = new ArrayList<>();
         for (Properties props : brokerConfigs) {
@@ -124,19 +129,6 @@ public class TopicCommandIntegrationTest extends 
kafka.integration.KafkaServerTe
         return new TopicCommand.TopicCommandOptions(finalOptions);
     }
 
-    private void createAndWaitTopic(TopicCommand.TopicCommandOptions opts) 
throws Exception {
-        topicService.createTopic(opts);
-        waitForTopicCreated(opts.topic().get());
-    }
-
-    private void waitForTopicCreated(String topicName) {
-        waitForTopicCreated(topicName, defaultTimeout);
-    }
-
-    private void waitForTopicCreated(String topicName, long timeout) {
-        TestUtils.waitForPartitionMetadata(brokers(), topicName, 0, timeout);
-    }
-
     @BeforeEach
     public void setUp(TestInfo info) {
         super.setUp(info);
@@ -147,7 +139,10 @@ public class TopicCommandIntegrationTest extends 
kafka.integration.KafkaServerTe
         adminClient = Admin.create(props);
         topicService = new TopicCommand.TopicService(props, 
Optional.of(bootstrapServer));
         testTopicName = String.format("%s-%s", 
info.getTestMethod().get().getName(), TestUtils.randomString(10));
+        scalaBrokers = brokers();
+        scalaControllers = controllerServers();
     }
+
     @AfterEach
     public void close() throws Exception {
         if (topicService != null)
@@ -159,47 +154,51 @@ public class TopicCommandIntegrationTest extends 
kafka.integration.KafkaServerTe
     @ParameterizedTest(name = 
ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
     @ValueSource(strings = {"zk", "kraft"})
     public void testCreate(String quorum) throws Exception {
-        createAndWaitTopic(buildTopicCommandOptionsWithBootstrap(
-            "--create", "--partitions", "2", "--replication-factor", "1", 
"--topic", testTopicName));
-
-        
assertTrue(adminClient.listTopics().names().get().contains(testTopicName));
+        TestUtils.createTopicWithAdmin(adminClient, testTopicName, 
scalaBrokers, scalaControllers, 2, 1,
+                scala.collection.immutable.Map$.MODULE$.empty(), new 
Properties()
+        );
+        
assertTrue(adminClient.listTopics().names().get().contains(testTopicName),
+                "Admin client didn't see the created topic. It saw: " + 
adminClient.listTopics().names().get());
     }
 
     @ParameterizedTest(name = 
ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
     @ValueSource(strings = {"zk", "kraft"})
     public void testCreateWithDefaults(String quorum) throws Exception {
-        createAndWaitTopic(buildTopicCommandOptionsWithBootstrap("--create", 
"--topic", testTopicName));
-
+        TestUtils.createTopicWithAdmin(adminClient, testTopicName, 
scalaBrokers, scalaControllers, defaultNumPartitions, defaultReplicationFactor,
+                scala.collection.immutable.Map$.MODULE$.empty(), new 
Properties()
+        );
         List<TopicPartitionInfo> partitions = adminClient
             .describeTopics(Collections.singletonList(testTopicName))
             .allTopicNames()
             .get()
             .get(testTopicName)
             .partitions();
-        assertEquals(numPartitions, partitions.size());
-        assertEquals(defaultReplicationFactor, (short) 
partitions.get(0).replicas().size());
+        assertEquals(defaultNumPartitions, partitions.size(), "Unequal 
partition size: " + partitions.size());
+        assertEquals(defaultReplicationFactor, (short) 
partitions.get(0).replicas().size(), "Unequal replication factor: " + 
partitions.get(0).replicas().size());
     }
 
     @ParameterizedTest(name = 
ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
     @ValueSource(strings = {"zk", "kraft"})
     public void testCreateWithDefaultReplication(String quorum) throws 
Exception {
-        createAndWaitTopic(buildTopicCommandOptionsWithBootstrap("--create", 
"--topic", testTopicName, "--partitions", "2"));
-
+        TestUtils.createTopicWithAdmin(adminClient, testTopicName, 
scalaBrokers, scalaControllers, 2, defaultReplicationFactor,
+                scala.collection.immutable.Map$.MODULE$.empty(), new 
Properties()
+        );
         List<TopicPartitionInfo>  partitions = adminClient
             .describeTopics(Collections.singletonList(testTopicName))
             .allTopicNames()
             .get()
             .get(testTopicName)
             .partitions();
-        assertEquals(2, partitions.size());
-        assertEquals(defaultReplicationFactor, (short) 
partitions.get(0).replicas().size());
+        assertEquals(2, partitions.size(), "Unequal partition size: " + 
partitions.size());
+        assertEquals(defaultReplicationFactor, (short) 
partitions.get(0).replicas().size(), "Unequal replication factor: " + 
partitions.get(0).replicas().size());
     }
 
     @ParameterizedTest(name = 
ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
     @ValueSource(strings = {"zk", "kraft"})
     public void testCreateWithDefaultPartitions(String quorum) throws 
Exception {
-        createAndWaitTopic(buildTopicCommandOptionsWithBootstrap("--create", 
"--topic", testTopicName, "--replication-factor", "2"));
-
+        TestUtils.createTopicWithAdmin(adminClient, testTopicName, 
scalaBrokers, scalaControllers, defaultNumPartitions, 2,
+                scala.collection.immutable.Map$.MODULE$.empty(), new 
Properties()
+        );
         List<TopicPartitionInfo> partitions = adminClient
             .describeTopics(Collections.singletonList(testTopicName))
             .allTopicNames()
@@ -207,52 +206,65 @@ public class TopicCommandIntegrationTest extends 
kafka.integration.KafkaServerTe
             .get(testTopicName)
             .partitions();
 
-        assertEquals(numPartitions, partitions.size());
-        assertEquals(2, (short) partitions.get(0).replicas().size());
+        assertEquals(defaultNumPartitions, partitions.size(), "Unequal 
partition size: " + partitions.size());
+        assertEquals(2, (short) partitions.get(0).replicas().size(), 
"Partitions not replicated: " + partitions.get(0).replicas().size());
     }
 
     @ParameterizedTest(name = 
ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
     @ValueSource(strings = {"zk", "kraft"})
     public void testCreateWithConfigs(String quorum) throws Exception {
         ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.TOPIC, testTopicName);
-        createAndWaitTopic(buildTopicCommandOptionsWithBootstrap("--create", 
"--partitions", "2", "--replication-factor", "2", "--topic", testTopicName, 
"--config",
-                "delete.retention.ms=1000"));
+        Properties topicConfig = new Properties();
+        topicConfig.put(TopicConfig.DELETE_RETENTION_MS_CONFIG, "1000");
+
+        TestUtils.createTopicWithAdmin(adminClient, testTopicName, 
scalaBrokers, scalaControllers, 2, 2,
+                scala.collection.immutable.Map$.MODULE$.empty(), topicConfig
+        );
 
         Config configs = 
adminClient.describeConfigs(Collections.singleton(configResource)).all().get().get(configResource);
-        assertEquals(1000, 
Integer.valueOf(configs.get("delete.retention.ms").value()));
+        assertEquals(1000, 
Integer.valueOf(configs.get("delete.retention.ms").value()),
+                "Config not set correctly: " + 
configs.get("delete.retention.ms").value());
     }
 
     @ParameterizedTest(name = 
ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
     @ValueSource(strings = {"zk", "kraft"})
     public void testCreateWhenAlreadyExists(String quorum) throws Exception {
-        int numPartitions = 1;
-
         // create the topic
         TopicCommand.TopicCommandOptions createOpts = 
buildTopicCommandOptionsWithBootstrap(
-            "--create", "--partitions", Integer.toString(numPartitions), 
"--replication-factor", "1",
+            "--create", "--partitions", 
Integer.toString(defaultNumPartitions), "--replication-factor", "1",
                 "--topic", testTopicName);
-        createAndWaitTopic(createOpts);
 
+        TestUtils.createTopicWithAdmin(adminClient, testTopicName, 
scalaBrokers, scalaControllers, defaultNumPartitions, defaultReplicationFactor,
+                scala.collection.immutable.Map$.MODULE$.empty(), new 
Properties()
+        );
         // try to re-create the topic
-        assertThrows(TopicExistsException.class, () -> 
topicService.createTopic(createOpts));
+        assertThrows(TopicExistsException.class, () -> 
topicService.createTopic(createOpts),
+                "Expected TopicExistsException to throw");
     }
 
     @ParameterizedTest(name = 
ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
     @ValueSource(strings = {"zk", "kraft"})
     public void testCreateWhenAlreadyExistsWithIfNotExists(String quorum) 
throws Exception {
+        TestUtils.createTopicWithAdmin(adminClient, testTopicName, 
scalaBrokers, scalaControllers, defaultNumPartitions, defaultReplicationFactor,
+                scala.collection.immutable.Map$.MODULE$.empty(), new 
Properties()
+        );
         TopicCommand.TopicCommandOptions createOpts =
                 buildTopicCommandOptionsWithBootstrap("--create", "--topic", 
testTopicName, "--if-not-exists");
-        createAndWaitTopic(createOpts);
         topicService.createTopic(createOpts);
     }
 
     @ParameterizedTest(name = 
ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
     @ValueSource(strings = {"zk", "kraft"})
     public void testCreateWithReplicaAssignment(String quorum) throws 
Exception {
-        // create the topic
-        TopicCommand.TopicCommandOptions createOpts =
-                buildTopicCommandOptionsWithBootstrap("--create", 
"--replica-assignment", "5:4,3:2,1:0", "--topic", testTopicName);
-        createAndWaitTopic(createOpts);
+        scala.collection.mutable.HashMap<Object, Seq<Object>> 
replicaAssignmentMap = new scala.collection.mutable.HashMap<>();
+
+        replicaAssignmentMap.put(0, 
JavaConverters.asScalaBufferConverter(Arrays.asList((Object) 5, (Object) 
4)).asScala().toSeq());
+        replicaAssignmentMap.put(1, 
JavaConverters.asScalaBufferConverter(Arrays.asList((Object) 3, (Object) 
2)).asScala().toSeq());
+        replicaAssignmentMap.put(2, 
JavaConverters.asScalaBufferConverter(Arrays.asList((Object) 1, (Object) 
0)).asScala().toSeq());
+
+        TestUtils.createTopicWithAdmin(adminClient, testTopicName, 
scalaBrokers, scalaControllers, defaultNumPartitions,
+                defaultReplicationFactor, replicaAssignmentMap, new 
Properties()
+        );
 
         List<TopicPartitionInfo> partitions = adminClient
             .describeTopics(Collections.singletonList(testTopicName))
@@ -261,10 +273,14 @@ public class TopicCommandIntegrationTest extends 
kafka.integration.KafkaServerTe
             .get(testTopicName)
             .partitions();
         
-        assertEquals(3, partitions.size());
-        assertEquals(Arrays.asList(5, 4), getPartitionReplicas(partitions, 0));
-        assertEquals(Arrays.asList(3, 2), getPartitionReplicas(partitions, 1));
-        assertEquals(Arrays.asList(1, 0), getPartitionReplicas(partitions, 2));
+        assertEquals(3, partitions.size(),
+                "Unequal partition size: " + partitions.size());
+        assertEquals(Arrays.asList(5, 4), getPartitionReplicas(partitions, 0),
+                "Unexpected replica assignment: " + 
getPartitionReplicas(partitions, 0));
+        assertEquals(Arrays.asList(3, 2), getPartitionReplicas(partitions, 1),
+                "Unexpected replica assignment: " + 
getPartitionReplicas(partitions, 1));
+        assertEquals(Arrays.asList(1, 0), getPartitionReplicas(partitions, 2),
+                "Unexpected replica assignment: " + 
getPartitionReplicas(partitions, 2));
     }
 
     private List<Integer> getPartitionReplicas(List<TopicPartitionInfo> 
partitions, int partitionNumber) {
@@ -276,7 +292,7 @@ public class TopicCommandIntegrationTest extends 
kafka.integration.KafkaServerTe
     public void testCreateWithInvalidReplicationFactor(String quorum) {
         TopicCommand.TopicCommandOptions opts = 
buildTopicCommandOptionsWithBootstrap("--create", "--partitions", "2", 
"--replication-factor", Integer.toString(Short.MAX_VALUE + 1),
             "--topic", testTopicName);
-        assertThrows(IllegalArgumentException.class, () -> 
topicService.createTopic(opts));
+        assertThrows(IllegalArgumentException.class, () -> 
topicService.createTopic(opts), "Expected IllegalArgumentException to throw");
     }
 
     @ParameterizedTest(name = 
ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
@@ -284,14 +300,14 @@ public class TopicCommandIntegrationTest extends 
kafka.integration.KafkaServerTe
     public void testCreateWithNegativeReplicationFactor(String quorum) {
         TopicCommand.TopicCommandOptions opts = 
buildTopicCommandOptionsWithBootstrap("--create",
             "--partitions", "2", "--replication-factor", "-1", "--topic", 
testTopicName);
-        assertThrows(IllegalArgumentException.class, () -> 
topicService.createTopic(opts));
+        assertThrows(IllegalArgumentException.class, () -> 
topicService.createTopic(opts), "Expected IllegalArgumentException to throw");
     }
 
     @ParameterizedTest(name = 
ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
     @ValueSource(strings = {"zk", "kraft"})
     public void testCreateWithNegativePartitionCount(String quorum) {
         TopicCommand.TopicCommandOptions opts = 
buildTopicCommandOptionsWithBootstrap("--create", "--partitions", "-1", 
"--replication-factor", "1", "--topic", testTopicName);
-        assertThrows(IllegalArgumentException.class, () -> 
topicService.createTopic(opts));
+        assertThrows(IllegalArgumentException.class, () -> 
topicService.createTopic(opts), "Expected IllegalArgumentException to throw");
     }
 
     @ParameterizedTest(name = 
ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
@@ -300,17 +316,18 @@ public class TopicCommandIntegrationTest extends 
kafka.integration.KafkaServerTe
         TopicCommand.TopicCommandOptions createOpts = 
buildTopicCommandOptionsWithBootstrap("--create",
             "--partitions", "1", "--replication-factor", "1", "--topic", 
testTopicName,
             "--config", "message.timestamp.type=boom");
-        assertThrows(ConfigException.class, () -> 
topicService.createTopic(createOpts));
+        assertThrows(ConfigException.class, () -> 
topicService.createTopic(createOpts), "Expected ConfigException to throw");
     }
 
     @ParameterizedTest(name = 
ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
     @ValueSource(strings = {"zk", "kraft"})
     public void testListTopics(String quorum) throws Exception {
-        createAndWaitTopic(buildTopicCommandOptionsWithBootstrap(
-            "--create", "--partitions", "1", "--replication-factor", "1", 
"--topic", testTopicName));
+        TestUtils.createTopicWithAdmin(adminClient, testTopicName, 
scalaBrokers, scalaControllers, defaultNumPartitions, defaultReplicationFactor,
+                scala.collection.immutable.Map$.MODULE$.empty(), new 
Properties()
+        );
 
         String output = 
captureListTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--list"));
-        assertTrue(output.contains(testTopicName));
+        assertTrue(output.contains(testTopicName), "Expected topic name to be 
present in output: " + output);
     }
 
     @ParameterizedTest(name = 
ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
@@ -319,107 +336,124 @@ public class TopicCommandIntegrationTest extends 
kafka.integration.KafkaServerTe
         String topic1 = "kafka.testTopic1";
         String topic2 = "kafka.testTopic2";
         String topic3 = "oooof.testTopic1";
-        adminClient.createTopics(
-                Arrays.asList(new NewTopic(topic1, 2, (short) 2),
-                    new NewTopic(topic2, 2, (short) 2),
-                    new NewTopic(topic3, 2, (short) 2)))
-            .all().get();
-        waitForTopicCreated(topic1);
-        waitForTopicCreated(topic2);
-        waitForTopicCreated(topic3);
+        int partition = 2;
+        short replicationFactor = 2;
+        TestUtils.createTopicWithAdmin(adminClient, topic1, scalaBrokers, 
scalaControllers, partition, replicationFactor,
+                scala.collection.immutable.Map$.MODULE$.empty(), new 
Properties()
+        );
+        TestUtils.createTopicWithAdmin(adminClient, topic2, scalaBrokers, 
scalaControllers, partition, replicationFactor,
+                scala.collection.immutable.Map$.MODULE$.empty(), new 
Properties()
+        );
+        TestUtils.createTopicWithAdmin(adminClient, topic3, scalaBrokers, 
scalaControllers, partition, replicationFactor,
+                scala.collection.immutable.Map$.MODULE$.empty(), new 
Properties()
+        );
 
         String output = 
captureListTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--list", 
"--topic", "kafka.*"));
 
-        assertTrue(output.contains(topic1));
-        assertTrue(output.contains(topic2));
-        assertFalse(output.contains(topic3));
+        assertTrue(output.contains(topic1), "Expected topic name " + topic1 + 
" to be present in output: " + output);
+        assertTrue(output.contains(topic2), "Expected topic name " + topic2 + 
" to be present in output: " + output);
+        assertFalse(output.contains(topic3), "Do not expect topic name " + 
topic3 + " to be present in output: " + output);
     }
 
     @ParameterizedTest(name = 
ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
     @ValueSource(strings = {"zk", "kraft"})
     public void testListTopicsWithExcludeInternal(String quorum) throws 
ExecutionException, InterruptedException {
         String topic1 = "kafka.testTopic1";
-        adminClient.createTopics(
-                Arrays.asList(new NewTopic(topic1, 2, (short) 2),
-                    new NewTopic(Topic.GROUP_METADATA_TOPIC_NAME, 2, (short) 
2)))
-            .all().get();
-        waitForTopicCreated(topic1);
+        String hiddenConsumerTopic = Topic.GROUP_METADATA_TOPIC_NAME;
+        int partition = 2;
+        short replicationFactor = 2;
+        TestUtils.createTopicWithAdmin(adminClient, topic1, scalaBrokers, 
scalaControllers, partition, replicationFactor,
+                scala.collection.immutable.Map$.MODULE$.empty(), new 
Properties()
+        );
+        TestUtils.createTopicWithAdmin(adminClient, hiddenConsumerTopic, 
scalaBrokers, scalaControllers, partition, replicationFactor,
+                scala.collection.immutable.Map$.MODULE$.empty(), new 
Properties()
+        );
 
         String output = 
captureListTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--list", 
"--exclude-internal"));
 
-        assertTrue(output.contains(topic1));
-        assertFalse(output.contains(Topic.GROUP_METADATA_TOPIC_NAME));
+        assertTrue(output.contains(topic1), "Expected topic name " + topic1 + 
" to be present in output: " + output);
+        assertFalse(output.contains(hiddenConsumerTopic), "Do not expect topic 
name " + hiddenConsumerTopic + " to be present in output: " + output);
     }
 
     @ParameterizedTest(name = 
ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
     @ValueSource(strings = {"zk", "kraft"})
     public void testAlterPartitionCount(String quorum) throws 
ExecutionException, InterruptedException {
-        adminClient.createTopics(
-            Arrays.asList(new NewTopic(testTopicName, 2, (short) 
2))).all().get();
-        waitForTopicCreated(testTopicName);
-
+        int partition = 2;
+        short replicationFactor = 2;
+        TestUtils.createTopicWithAdmin(adminClient, testTopicName, 
scalaBrokers, scalaControllers, partition, replicationFactor,
+                scala.collection.immutable.Map$.MODULE$.empty(), new 
Properties()
+        );
         
topicService.alterTopic(buildTopicCommandOptionsWithBootstrap("--alter", 
"--topic", testTopicName, "--partitions", "3"));
 
+        TestUtils.waitForAllReassignmentsToComplete(adminClient, 100L);
         kafka.utils.TestUtils.waitUntilTrue(
             () -> brokers().forall(b -> 
b.metadataCache().getTopicPartitions(testTopicName).size() == 3),
             () -> "Timeout waiting for new assignment propagating to broker", 
org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS, 100L);
         TopicDescription topicDescription = 
adminClient.describeTopics(Collections.singletonList(testTopicName)).topicNameValues().get(testTopicName).get();
-        assertEquals(3, topicDescription.partitions().size());
+        assertEquals(3, topicDescription.partitions().size(), "Expected 
partition count to be 3. Got: " + topicDescription.partitions().size());
     }
 
     @ParameterizedTest(name = 
ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
     @ValueSource(strings = {"zk", "kraft"})
     public void testAlterAssignment(String quorum) throws ExecutionException, 
InterruptedException {
-        adminClient.createTopics(
-            Collections.singletonList(new NewTopic(testTopicName, 2, (short) 
2))).all().get();
-        waitForTopicCreated(testTopicName);
-
+        int partition = 2;
+        short replicationFactor = 2;
+        TestUtils.createTopicWithAdmin(adminClient, testTopicName, 
scalaBrokers, scalaControllers, partition, replicationFactor,
+                scala.collection.immutable.Map$.MODULE$.empty(), new 
Properties()
+        );
         
topicService.alterTopic(buildTopicCommandOptionsWithBootstrap("--alter",
             "--topic", testTopicName, "--replica-assignment", "5:3,3:1,4:2", 
"--partitions", "3"));
+
+        TestUtils.waitForAllReassignmentsToComplete(adminClient, 100L);
         kafka.utils.TestUtils.waitUntilTrue(
             () -> brokers().forall(b -> 
b.metadataCache().getTopicPartitions(testTopicName).size() == 3),
             () -> "Timeout waiting for new assignment propagating to broker",
             org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS, 100L);
 
         TopicDescription topicDescription = 
adminClient.describeTopics(Collections.singletonList(testTopicName)).topicNameValues().get(testTopicName).get();
-        assertTrue(topicDescription.partitions().size() == 3);
+        assertTrue(topicDescription.partitions().size() == 3, "Expected 
partition count to be 3. Got: " + topicDescription.partitions().size());
         List<Integer> partitionReplicas = 
getPartitionReplicas(topicDescription.partitions(), 2);
-        assertEquals(Arrays.asList(4, 2), partitionReplicas);
+        assertEquals(Arrays.asList(4, 2), partitionReplicas, "Expected to have 
replicas 4,2. Got: " + partitionReplicas);
     }
 
     @ParameterizedTest(name = 
ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
     @ValueSource(strings = {"zk", "kraft"})
     public void testAlterAssignmentWithMoreAssignmentThanPartitions(String 
quorum) throws ExecutionException, InterruptedException {
-        adminClient.createTopics(
-            Arrays.asList(new NewTopic(testTopicName, 2, (short) 
2))).all().get();
-        waitForTopicCreated(testTopicName);
-
+        int partition = 2;
+        short replicationFactor = 2;
+        TestUtils.createTopicWithAdmin(adminClient, testTopicName, 
scalaBrokers, scalaControllers, partition, replicationFactor,
+                scala.collection.immutable.Map$.MODULE$.empty(), new 
Properties()
+        );
         assertThrows(ExecutionException.class,
             () -> 
topicService.alterTopic(buildTopicCommandOptionsWithBootstrap("--alter",
-                "--topic", testTopicName, "--replica-assignment", 
"5:3,3:1,4:2,3:2", "--partitions", "3")));
+                "--topic", testTopicName, "--replica-assignment", 
"5:3,3:1,4:2,3:2", "--partitions", "3")),
+                "Expected to fail with ExecutionException");
     }
 
     @ParameterizedTest(name = 
ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
     @ValueSource(strings = {"zk", "kraft"})
     public void testAlterAssignmentWithMorePartitionsThanAssignment(String 
quorum) throws ExecutionException, InterruptedException {
-        adminClient.createTopics(
-            Arrays.asList(new NewTopic(testTopicName, 2, (short) 
2))).all().get();
-        waitForTopicCreated(testTopicName);
+        int partition = 2;
+        short replicationFactor = 2;
+        TestUtils.createTopicWithAdmin(adminClient, testTopicName, 
scalaBrokers, scalaControllers, partition, replicationFactor,
+                scala.collection.immutable.Map$.MODULE$.empty(), new 
Properties()
+        );
 
         assertThrows(ExecutionException.class,
             () -> 
topicService.alterTopic(buildTopicCommandOptionsWithBootstrap("--alter", 
"--topic", testTopicName,
-                "--replica-assignment", "5:3,3:1,4:2", "--partitions", "6")));
+                "--replica-assignment", "5:3,3:1,4:2", "--partitions", "6")),
+                "Expected to fail with ExecutionException");
     }
 
     @ParameterizedTest(name = 
ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
     @ValueSource(strings = {"zk", "kraft"})
     public void testAlterWithInvalidPartitionCount(String quorum) throws 
Exception {
-        createAndWaitTopic(
-                buildTopicCommandOptionsWithBootstrap("--create", 
"--partitions", "1", "--replication-factor", "1", "--topic", testTopicName)
+        TestUtils.createTopicWithAdmin(adminClient, testTopicName, 
scalaBrokers, scalaControllers, defaultNumPartitions, defaultReplicationFactor,
+                scala.collection.immutable.Map$.MODULE$.empty(), new 
Properties()
         );
-
         assertThrows(ExecutionException.class,
-            () -> 
topicService.alterTopic(buildTopicCommandOptionsWithBootstrap("--alter", 
"--partitions", "-1", "--topic", testTopicName)));
+            () -> 
topicService.alterTopic(buildTopicCommandOptionsWithBootstrap("--alter", 
"--partitions", "-1", "--topic", testTopicName)),
+                "Expected to fail with ExecutionException");
     }
 
     @ParameterizedTest(name = 
ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
@@ -428,7 +462,7 @@ public class TopicCommandIntegrationTest extends 
kafka.integration.KafkaServerTe
         // alter a topic that does not exist without --if-exists
         TopicCommand.TopicCommandOptions alterOpts = 
buildTopicCommandOptionsWithBootstrap("--alter", "--topic", testTopicName, 
"--partitions", "1");
         TopicCommand.TopicService topicService = new 
TopicCommand.TopicService(adminClient);
-        assertThrows(IllegalArgumentException.class, () -> 
topicService.alterTopic(alterOpts));
+        assertThrows(IllegalArgumentException.class, () -> 
topicService.alterTopic(alterOpts), "Expected to fail with 
IllegalArgumentException");
     }
 
     @ParameterizedTest(name = 
ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
@@ -450,11 +484,9 @@ public class TopicCommandIntegrationTest extends 
kafka.integration.KafkaServerTe
 
         int numPartitions = 18;
         int replicationFactor = 3;
-        TopicCommand.TopicCommandOptions createOpts = 
buildTopicCommandOptionsWithBootstrap("--create",
-            "--partitions", Integer.toString(numPartitions),
-            "--replication-factor", Integer.toString(replicationFactor),
-            "--topic", testTopicName);
-        createAndWaitTopic(createOpts);
+        TestUtils.createTopicWithAdmin(adminClient, testTopicName, 
scalaBrokers, scalaControllers, numPartitions, replicationFactor,
+                scala.collection.immutable.Map$.MODULE$.empty(), new 
Properties()
+        );
 
         Map<Integer, List<Integer>> assignment = 
adminClient.describeTopics(Collections.singletonList(testTopicName))
             .allTopicNames().get().get(testTopicName).partitions()
@@ -472,6 +504,7 @@ public class TopicCommandIntegrationTest extends 
kafka.integration.KafkaServerTe
             "--topic", testTopicName);
         topicService.alterTopic(alterOpts);
 
+        TestUtils.waitForAllReassignmentsToComplete(adminClient, 100L);
         kafka.utils.TestUtils.waitUntilTrue(
             () -> brokers().forall(p -> 
p.metadataCache().getTopicPartitions(testTopicName).size() == 
alteredNumPartitions),
             () -> "Timeout waiting for new assignment propagating to broker",
@@ -487,22 +520,18 @@ public class TopicCommandIntegrationTest extends 
kafka.integration.KafkaServerTe
     @ParameterizedTest(name = 
ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
     @ValueSource(strings = {"zk", "kraft"})
     public void testConfigPreservationAcrossPartitionAlteration(String quorum) 
throws Exception {
-        int numPartitionsOriginal = 1;
-        String cleanupKey = "cleanup.policy";
-        String cleanupVal = "compact";
+        String cleanUpPolicy = "compact";
+        Properties topicConfig = new Properties();
+        topicConfig.put(TopicConfig.CLEANUP_POLICY_CONFIG, cleanUpPolicy);
+        TestUtils.createTopicWithAdmin(adminClient, testTopicName, 
scalaBrokers, scalaControllers, defaultNumPartitions, defaultReplicationFactor,
+                scala.collection.immutable.Map$.MODULE$.empty(), topicConfig
+        );
 
-        // create the topic
-        TopicCommand.TopicCommandOptions createOpts = 
buildTopicCommandOptionsWithBootstrap("--create",
-            "--partitions", Integer.toString(numPartitionsOriginal),
-            "--replication-factor", "1",
-            "--config", cleanupKey + "=" + cleanupVal,
-            "--topic", testTopicName);
-        createAndWaitTopic(createOpts);
         ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.TOPIC, testTopicName);
         Config props = 
adminClient.describeConfigs(Collections.singleton(configResource)).all().get().get(configResource);
         // val props = adminZkClient.fetchEntityConfig(ConfigType.Topic, 
testTopicName)
-        assertNotNull(props.get(cleanupKey), "Properties after creation don't 
contain " + cleanupKey);
-        assertEquals(cleanupVal, props.get(cleanupKey).value(), "Properties 
after creation have incorrect value");
+        assertNotNull(props.get(TopicConfig.CLEANUP_POLICY_CONFIG), 
"Properties after creation don't contain " + cleanUpPolicy);
+        assertEquals(cleanUpPolicy, 
props.get(TopicConfig.CLEANUP_POLICY_CONFIG).value(), "Properties after 
creation have incorrect value");
 
         // pre-create the topic config changes path to avoid a NoNodeException
         if (!isKRaftTest()) {
@@ -514,21 +543,20 @@ public class TopicCommandIntegrationTest extends 
kafka.integration.KafkaServerTe
         TopicCommand.TopicCommandOptions alterOpts = 
buildTopicCommandOptionsWithBootstrap("--alter",
             "--partitions", Integer.toString(numPartitionsModified), 
"--topic", testTopicName);
         topicService.alterTopic(alterOpts);
+
+        TestUtils.waitForAllReassignmentsToComplete(adminClient, 100L);
         Config newProps = 
adminClient.describeConfigs(Collections.singleton(configResource)).all().get().get(configResource);
-        assertNotNull(newProps.get(cleanupKey), "Updated properties do not 
contain " + cleanupKey);
-        assertEquals(cleanupVal, newProps.get(cleanupKey).value(), "Updated 
properties have incorrect value");
+        assertNotNull(newProps.get(TopicConfig.CLEANUP_POLICY_CONFIG), 
"Updated properties do not contain " + TopicConfig.CLEANUP_POLICY_CONFIG);
+        assertEquals(cleanUpPolicy, 
newProps.get(TopicConfig.CLEANUP_POLICY_CONFIG).value(), "Updated properties 
have incorrect value");
     }
 
     @ParameterizedTest(name = 
ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
     @ValueSource(strings = {"zk", "kraft"})
     public void testTopicDeletion(String quorum) throws Exception {
         // create the NormalTopic
-        TopicCommand.TopicCommandOptions createOpts = 
buildTopicCommandOptionsWithBootstrap("--create",
-            "--partitions", "1",
-            "--replication-factor", "1",
-            "--topic", testTopicName);
-        createAndWaitTopic(createOpts);
-
+        TestUtils.createTopicWithAdmin(adminClient, testTopicName, 
scalaBrokers, scalaControllers, defaultNumPartitions, defaultReplicationFactor,
+                scala.collection.immutable.Map$.MODULE$.empty(), new 
Properties()
+        );
         // delete the NormalTopic
         TopicCommand.TopicCommandOptions deleteOpts = 
buildTopicCommandOptionsWithBootstrap("--delete", "--topic", testTopicName);
 
@@ -545,12 +573,9 @@ public class TopicCommandIntegrationTest extends 
kafka.integration.KafkaServerTe
     public void testTopicWithCollidingCharDeletionAndCreateAgain(String 
quorum) throws Exception {
         // create the topic with colliding chars
         String topicWithCollidingChar = "test.a";
-        TopicCommand.TopicCommandOptions createOpts = 
buildTopicCommandOptionsWithBootstrap("--create",
-            "--partitions", "1",
-            "--replication-factor", "1",
-            "--topic", topicWithCollidingChar);
-        createAndWaitTopic(createOpts);
-
+        TestUtils.createTopicWithAdmin(adminClient, topicWithCollidingChar, 
scalaBrokers, scalaControllers, defaultNumPartitions, defaultReplicationFactor,
+                scala.collection.immutable.Map$.MODULE$.empty(), new 
Properties()
+        );
         // delete the topic
         TopicCommand.TopicCommandOptions deleteOpts = 
buildTopicCommandOptionsWithBootstrap("--delete", "--topic", 
topicWithCollidingChar);
 
@@ -560,19 +585,18 @@ public class TopicCommandIntegrationTest extends 
kafka.integration.KafkaServerTe
         }
         topicService.deleteTopic(deleteOpts);
         TestUtils.verifyTopicDeletion(zkClientOrNull(), 
topicWithCollidingChar, 1, brokers());
-        assertDoesNotThrow(() -> createAndWaitTopic(createOpts));
+        assertDoesNotThrow(() -> TestUtils.createTopicWithAdmin(adminClient, 
topicWithCollidingChar, scalaBrokers, scalaControllers, defaultNumPartitions, 
defaultReplicationFactor,
+                scala.collection.immutable.Map$.MODULE$.empty(), new 
Properties()
+        ), "Should be able to create a topic with colliding chars after 
deletion.");
     }
 
     @ParameterizedTest(name = 
ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
     @ValueSource(strings = {"zk", "kraft"})
     public void testDeleteInternalTopic(String quorum) throws Exception {
         // create the offset topic
-        TopicCommand.TopicCommandOptions createOffsetTopicOpts = 
buildTopicCommandOptionsWithBootstrap("--create",
-            "--partitions", "1",
-            "--replication-factor", "1",
-            "--topic", Topic.GROUP_METADATA_TOPIC_NAME);
-        createAndWaitTopic(createOffsetTopicOpts);
-
+        TestUtils.createTopicWithAdmin(adminClient, 
Topic.GROUP_METADATA_TOPIC_NAME, scalaBrokers, scalaControllers, 
defaultNumPartitions, defaultReplicationFactor,
+                scala.collection.immutable.Map$.MODULE$.empty(), new 
Properties()
+        );
         // Try to delete the Topic.GROUP_METADATA_TOPIC_NAME which is allowed 
by default.
         // This is a difference between the new and the old command as the old 
one didn't allow internal topic deletion.
         // If deleting internal topics is not desired, ACLS should be used to 
control it.
@@ -583,7 +607,7 @@ public class TopicCommandIntegrationTest extends 
kafka.integration.KafkaServerTe
             assertFalse(zkClient().pathExists(deleteOffsetTopicPath), "Delete 
path for topic shouldn't exist before deletion.");
         }
         topicService.deleteTopic(deleteOffsetTopicOpts);
-        TestUtils.verifyTopicDeletion(zkClientOrNull(), 
Topic.GROUP_METADATA_TOPIC_NAME, 1, brokers());
+        TestUtils.verifyTopicDeletion(zkClientOrNull(), 
Topic.GROUP_METADATA_TOPIC_NAME, defaultNumPartitions, brokers());
     }
 
     @ParameterizedTest(name = 
ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
@@ -591,7 +615,8 @@ public class TopicCommandIntegrationTest extends 
kafka.integration.KafkaServerTe
     public void testDeleteWhenTopicDoesntExist(String quorum) {
         // delete a topic that does not exist
         TopicCommand.TopicCommandOptions deleteOpts = 
buildTopicCommandOptionsWithBootstrap("--delete", "--topic", testTopicName);
-        assertThrows(IllegalArgumentException.class, () -> 
topicService.deleteTopic(deleteOpts));
+        assertThrows(IllegalArgumentException.class, () -> 
topicService.deleteTopic(deleteOpts),
+                "Expected an exception when trying to delete a topic that does 
not exist.");
     }
 
     @ParameterizedTest(name = 
ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
@@ -603,21 +628,23 @@ public class TopicCommandIntegrationTest extends 
kafka.integration.KafkaServerTe
     @ParameterizedTest(name = 
ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
     @ValueSource(strings = {"zk", "kraft"})
     public void testDescribe(String quorum) throws ExecutionException, 
InterruptedException {
-        adminClient.createTopics(
-            Collections.singletonList(new NewTopic(testTopicName, 2, (short) 
2))).all().get();
-        waitForTopicCreated(testTopicName);
-
+        int partition = 2;
+        short replicationFactor = 2;
+        TestUtils.createTopicWithAdmin(adminClient, testTopicName, 
scalaBrokers, scalaControllers, partition, replicationFactor,
+                scala.collection.immutable.Map$.MODULE$.empty(), new 
Properties()
+        );
         String output = 
captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe",
 "--topic", testTopicName));
-        String[] rows = output.split("\n");
-        assertEquals(3, rows.length);
-        assertTrue(rows[0].startsWith(String.format("Topic: %s", 
testTopicName)));
+        String[] rows = output.split(System.lineSeparator());
+        assertEquals(3, rows.length, "Expected 3 rows in output, got " + 
rows.length);
+        assertTrue(rows[0].startsWith(String.format("Topic: %s", 
testTopicName)), "Row does not start with " + testTopicName + ". Row is: " + 
rows[0]);
     }
 
     @ParameterizedTest(name = 
ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
     @ValueSource(strings = {"zk", "kraft"})
     public void testDescribeWhenTopicDoesntExist(String quorum) {
         assertThrows(IllegalArgumentException.class,
-            () -> 
topicService.describeTopic(buildTopicCommandOptionsWithBootstrap("--describe", 
"--topic", testTopicName)));
+            () -> 
topicService.describeTopic(buildTopicCommandOptionsWithBootstrap("--describe", 
"--topic", testTopicName)),
+                "Expected an exception when trying to describe a topic that 
does not exist.");
     }
 
     @ParameterizedTest(name = 
ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
@@ -629,10 +656,11 @@ public class TopicCommandIntegrationTest extends 
kafka.integration.KafkaServerTe
     @ParameterizedTest(name = 
ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
     @ValueSource(strings = {"zk", "kraft"})
     public void testDescribeUnavailablePartitions(String quorum) throws 
ExecutionException, InterruptedException {
-        adminClient.createTopics(
-            Collections.singletonList(new NewTopic(testTopicName, 6, (short) 
1))).all().get();
-        waitForTopicCreated(testTopicName);
-
+        int partitions = 6;
+        short replicationFactor = 1;
+        TestUtils.createTopicWithAdmin(adminClient, testTopicName, 
scalaBrokers, scalaControllers, partitions, replicationFactor,
+                scala.collection.immutable.Map$.MODULE$.empty(), new 
Properties()
+        );
         try {
             // check which partition is on broker 0 which we'll kill
             TopicDescription testTopicDescription = 
adminClient.describeTopics(Collections.singletonList(testTopicName))
@@ -678,9 +706,11 @@ public class TopicCommandIntegrationTest extends 
kafka.integration.KafkaServerTe
 
             // grab the console output and assert
             String output = 
captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe",
 "--topic", testTopicName, "--unavailable-partitions"));
-            String[] rows = output.split("\n");
-            assertTrue(rows[0].startsWith(String.format("Topic: %s", 
testTopicName)));
-            assertTrue(rows[0].contains("Leader: none\tReplicas: 0\tIsr:"));
+            String[] rows = output.split(System.lineSeparator());
+            assertTrue(rows[0].startsWith(String.format("Topic: %s", 
testTopicName)),
+                    "Unexpected Topic " + rows[0] + " received. Expect " + 
String.format("Topic: %s", testTopicName));
+            assertTrue(rows[0].contains("Leader: none\tReplicas: 0\tIsr:"),
+                    "Rows did not contain 'Leader: none\tReplicas: 0\tIsr:'");
         } finally {
             restartDeadBrokers(false);
         }
@@ -689,10 +719,11 @@ public class TopicCommandIntegrationTest extends 
kafka.integration.KafkaServerTe
     @ParameterizedTest(name = 
ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
     @ValueSource(strings = {"zk", "kraft"})
     public void testDescribeUnderReplicatedPartitions(String quorum) throws 
ExecutionException, InterruptedException {
-        adminClient.createTopics(
-            Collections.singletonList(new NewTopic(testTopicName, 1, (short) 
6))).all().get();
-        waitForTopicCreated(testTopicName);
-
+        int partitions = 1;
+        short replicationFactor = 6;
+        TestUtils.createTopicWithAdmin(adminClient, testTopicName, 
scalaBrokers, scalaControllers, partitions, replicationFactor,
+                scala.collection.immutable.Map$.MODULE$.empty(), new 
Properties()
+        );
         try {
             killBroker(0);
             if (isKRaftTest()) {
@@ -701,7 +732,7 @@ public class TopicCommandIntegrationTest extends 
kafka.integration.KafkaServerTe
                 TestUtils.waitForPartitionMetadata(aliveBrokers(), 
testTopicName, 0, org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS);
             }
             String output = 
captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe",
 "--under-replicated-partitions"));
-            String[] rows = output.split("\n");
+            String[] rows = output.split(System.lineSeparator());
             assertTrue(rows[0].startsWith(String.format("Topic: %s", 
testTopicName)), String.format("Unexpected output: %s", rows[0]));
         } finally {
             restartDeadBrokers(false);
@@ -711,13 +742,13 @@ public class TopicCommandIntegrationTest extends 
kafka.integration.KafkaServerTe
     @ParameterizedTest(name = 
ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
     @ValueSource(strings = {"zk", "kraft"})
     public void testDescribeUnderMinIsrPartitions(String quorum) throws 
ExecutionException, InterruptedException {
-        Map<String, String> configMap = new HashMap<>();
-        configMap.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "6");
-
-        adminClient.createTopics(
-            Collections.singletonList(new NewTopic(testTopicName, 1, (short) 
6).configs(configMap))).all().get();
-        waitForTopicCreated(testTopicName);
-
+        Properties topicConfig = new Properties();
+        topicConfig.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "6");
+        int partitions = 1;
+        short replicationFactor = 6;
+        TestUtils.createTopicWithAdmin(adminClient, testTopicName, 
scalaBrokers, scalaControllers, partitions, replicationFactor,
+                scala.collection.immutable.Map$.MODULE$.empty(), topicConfig
+        );
         try {
             killBroker(0);
             if (isKRaftTest()) {
@@ -730,8 +761,9 @@ public class TopicCommandIntegrationTest extends 
kafka.integration.KafkaServerTe
                 );
             }
             String output = 
captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe",
 "--under-min-isr-partitions"));
-            String[] rows = output.split("\n");
-            assertTrue(rows[0].startsWith(String.format("Topic: %s", 
testTopicName)));
+            String[] rows = output.split(System.lineSeparator());
+            assertTrue(rows[0].startsWith(String.format("Topic: %s", 
testTopicName)),
+                    "Unexpected topic: " + rows[0]);
         } finally {
             restartDeadBrokers(false);
         }
@@ -740,15 +772,11 @@ public class TopicCommandIntegrationTest extends 
kafka.integration.KafkaServerTe
     @ParameterizedTest(name = 
ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
     @ValueSource(strings = {"zk", "kraft"})
     public void 
testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress(String 
quorum) throws ExecutionException, InterruptedException {
-        Map<String, String> configMap = new HashMap<>();
-        short replicationFactor = 1;
-        int partitions = 1;
         TopicPartition tp = new TopicPartition(testTopicName, 0);
 
-        adminClient.createTopics(
-            Collections.singletonList(new NewTopic(testTopicName, partitions, 
replicationFactor).configs(configMap))
-        ).all().get();
-        waitForTopicCreated(testTopicName);
+        TestUtils.createTopicWithAdmin(adminClient, testTopicName, 
scalaBrokers, scalaControllers, defaultNumPartitions, defaultReplicationFactor,
+                scala.collection.immutable.Map$.MODULE$.empty(), new 
Properties()
+        );
 
         // Produce multiple batches.
         TestUtils.generateAndProduceMessages(brokers(), testTopicName, 10, -1);
@@ -785,19 +813,36 @@ public class TopicCommandIntegrationTest extends 
kafka.integration.KafkaServerTe
             () -> "Reassignment didn't add the second node",
             org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS, 100L);
 
+        ensureConsistentKRaftMetadata();
+
         // describe the topic and test if it's under-replicated
         String simpleDescribeOutput = 
captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe",
 "--topic", testTopicName));
-        String[] simpleDescribeOutputRows = simpleDescribeOutput.split("\n");
-        
assertTrue(simpleDescribeOutputRows[0].startsWith(String.format("Topic: %s", 
testTopicName)));
-        assertEquals(2, simpleDescribeOutputRows.length);
+        String[] simpleDescribeOutputRows = 
simpleDescribeOutput.split(System.lineSeparator());
+        
assertTrue(simpleDescribeOutputRows[0].startsWith(String.format("Topic: %s", 
testTopicName)),
+                "Unexpected describe output: " + simpleDescribeOutputRows[0]);
+        assertEquals(2, simpleDescribeOutputRows.length,
+                "Unexpected describe output length: " + 
simpleDescribeOutputRows.length);
 
         String underReplicatedOutput = 
captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe",
 "--under-replicated-partitions"));
         assertEquals("", underReplicatedOutput,
             String.format("--under-replicated-partitions shouldn't return 
anything: '%s'", underReplicatedOutput));
 
-        // Verify reassignment is still ongoing.
-        PartitionReassignment reassignments = 
adminClient.listPartitionReassignments(Collections.singleton(tp)).reassignments().get().get(tp);
-        assertFalse(reassignments.addingReplicas().isEmpty());
+        int maxRetries = 20;
+        long pause = 100L;
+        long waitTimeMs = maxRetries * pause;
+        AtomicReference<PartitionReassignment> reassignmentsRef = new 
AtomicReference<>();
+
+        TestUtils.waitUntilTrue(() -> {
+            try {
+                PartitionReassignment tempReassignments = 
adminClient.listPartitionReassignments(Collections.singleton(tp)).reassignments().get().get(tp);
+                reassignmentsRef.set(tempReassignments);
+            } catch (InterruptedException | ExecutionException e) {
+                throw new RuntimeException("Error while fetching 
reassignments", e);
+            }
+            return reassignmentsRef.get() != null;
+        }, () -> "Reassignments did not become non-null within the specified 
time", waitTimeMs, pause);
+
+        assertFalse(reassignmentsRef.get().addingReplicas().isEmpty());
 
         ToolsTestUtils.removeReplicationThrottleForPartitions(adminClient, 
brokerIds, Collections.singleton(tp));
         TestUtils.waitForAllReassignmentsToComplete(adminClient, 100L);
@@ -806,13 +851,14 @@ public class TopicCommandIntegrationTest extends 
kafka.integration.KafkaServerTe
     @ParameterizedTest(name = 
ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
     @ValueSource(strings = {"zk", "kraft"})
     public void testDescribeAtMinIsrPartitions(String quorum) throws 
ExecutionException, InterruptedException {
-        Map<String, String> configMap = new HashMap<>();
-        configMap.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "4");
-
-        adminClient.createTopics(
-            Collections.singletonList(new NewTopic(testTopicName, 1, (short) 
6).configs(configMap))).all().get();
-        waitForTopicCreated(testTopicName);
+        Properties topicConfig = new Properties();
+        topicConfig.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "4");
 
+        int partitions = 1;
+        short replicationFactor = 6;
+        TestUtils.createTopicWithAdmin(adminClient, testTopicName, 
scalaBrokers, scalaControllers, partitions, replicationFactor,
+                scala.collection.immutable.Map$.MODULE$.empty(), topicConfig
+        );
         try {
             killBroker(0);
             killBroker(1);
@@ -828,8 +874,9 @@ public class TopicCommandIntegrationTest extends 
kafka.integration.KafkaServerTe
             }
 
             String output = 
captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe",
 "--at-min-isr-partitions"));
-            String[] rows = output.split("\n");
-            assertTrue(rows[0].startsWith(String.format("Topic: %s", 
testTopicName)));
+            String[] rows = output.split(System.lineSeparator());
+            assertTrue(rows[0].startsWith(String.format("Topic: %s", 
testTopicName)),
+                    "Unexpected output: " + rows[0]);
             assertEquals(1, rows.length);
         } finally {
             restartDeadBrokers(false);
@@ -853,21 +900,32 @@ public class TopicCommandIntegrationTest extends 
kafka.integration.KafkaServerTe
         String offlineTopic = "offline-topic";
         String fullyReplicatedTopic = "fully-replicated-topic";
 
-        Map<String, String> configMap = new HashMap<>();
-        configMap.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "6");
+        scala.collection.mutable.HashMap<Object, Seq<Object>> 
fullyReplicatedReplicaAssignmentMap = new scala.collection.mutable.HashMap<>();
+        fullyReplicatedReplicaAssignmentMap.put(0, 
JavaConverters.asScalaBufferConverter(Arrays.asList((Object) 1, (Object) 2, 
(Object) 3)).asScala().toSeq());
 
-        adminClient.createTopics(
-            java.util.Arrays.asList(
-                new NewTopic(underMinIsrTopic, 1, (short) 
6).configs(configMap),
-                new NewTopic(notUnderMinIsrTopic, 1, (short) 6),
-                new NewTopic(offlineTopic, Collections.singletonMap(0, 
Collections.singletonList(0))),
-                new NewTopic(fullyReplicatedTopic, Collections.singletonMap(0, 
java.util.Arrays.asList(1, 2, 3))))
-        ).all().get();
+        scala.collection.mutable.HashMap<Object, Seq<Object>> 
offlineReplicaAssignmentMap = new scala.collection.mutable.HashMap<>();
+        offlineReplicaAssignmentMap.put(0, 
JavaConverters.asScalaBufferConverter(Arrays.asList((Object) 
0)).asScala().toSeq());
+
+        Properties topicConfig = new Properties();
+        topicConfig.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "6");
+
+        int partitions = 1;
+        short replicationFactor = 6;
+        int negativePartition = -1;
+        short negativeReplicationFactor = -1;
+        TestUtils.createTopicWithAdmin(adminClient, underMinIsrTopic, 
scalaBrokers, scalaControllers, partitions, replicationFactor,
+                scala.collection.immutable.Map$.MODULE$.empty(), topicConfig
+        );
+        TestUtils.createTopicWithAdmin(adminClient, notUnderMinIsrTopic, 
scalaBrokers, scalaControllers, partitions, replicationFactor,
+                scala.collection.immutable.Map$.MODULE$.empty(), new 
Properties()
+        );
+        TestUtils.createTopicWithAdmin(adminClient, offlineTopic, 
scalaBrokers, scalaControllers, negativePartition, negativeReplicationFactor,
+                offlineReplicaAssignmentMap, new Properties()
+        );
+        TestUtils.createTopicWithAdmin(adminClient, fullyReplicatedTopic, 
scalaBrokers, scalaControllers, negativePartition, negativeReplicationFactor,
+                fullyReplicatedReplicaAssignmentMap, new Properties()
+        );
 
-        waitForTopicCreated(underMinIsrTopic);
-        waitForTopicCreated(notUnderMinIsrTopic);
-        waitForTopicCreated(offlineTopic);
-        waitForTopicCreated(fullyReplicatedTopic);
 
         try {
             killBroker(0);
@@ -881,10 +939,13 @@ public class TopicCommandIntegrationTest extends 
kafka.integration.KafkaServerTe
                     () -> "Timeout waiting for partition metadata propagating 
to brokers for underMinIsrTopic topic",
                     org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS, 100L);
             }
+            TestUtils.waitForAllReassignmentsToComplete(adminClient, 100L);
             String output = 
captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe",
 "--under-min-isr-partitions"));
-            String[] rows = output.split("\n");
-            assertTrue(rows[0].startsWith(String.format("Topic: %s", 
underMinIsrTopic)));
-            assertTrue(rows[1].startsWith(String.format("\tTopic: %s", 
offlineTopic)));
+            String[] rows = output.split(System.lineSeparator());
+            assertTrue(rows[0].startsWith(String.format("Topic: %s", 
underMinIsrTopic)),
+                    "Unexpected output: " + rows[0]);
+            assertTrue(rows[1].startsWith(String.format("\tTopic: %s", 
offlineTopic)),
+                    "Unexpected output: " + rows[1]);
             assertEquals(2, rows.length);
         } finally {
             restartDeadBrokers(false);
@@ -895,8 +956,14 @@ public class TopicCommandIntegrationTest extends 
kafka.integration.KafkaServerTe
     @ValueSource(strings = {"zk", "kraft"})
     public void testDescribeReportOverriddenConfigs(String quorum) throws 
Exception {
         String config = "file.delete.delay.ms=1000";
-        createAndWaitTopic(buildTopicCommandOptionsWithBootstrap("--create", 
"--partitions", "2",
-            "--replication-factor", "2", "--topic", testTopicName, "--config", 
config));
+        Properties topicConfig = new Properties();
+        topicConfig.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, "1000");
+
+        int partitions = 2;
+        short replicationFactor = 2;
+        TestUtils.createTopicWithAdmin(adminClient, testTopicName, 
scalaBrokers, scalaControllers, partitions, replicationFactor,
+                scala.collection.immutable.Map$.MODULE$.empty(), topicConfig
+        );
         String output = 
captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe"));
         assertTrue(output.contains(config), String.format("Describe output 
should have contained %s", config));
     }
@@ -904,22 +971,24 @@ public class TopicCommandIntegrationTest extends 
kafka.integration.KafkaServerTe
     @ParameterizedTest(name = 
ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
     @ValueSource(strings = {"zk", "kraft"})
     public void testDescribeAndListTopicsWithoutInternalTopics(String quorum) 
throws Exception {
-        createAndWaitTopic(
-            buildTopicCommandOptionsWithBootstrap("--create", "--partitions", 
"1", "--replication-factor", "1", "--topic", testTopicName));
-        // create a internal topic
-        createAndWaitTopic(
-            buildTopicCommandOptionsWithBootstrap("--create", "--partitions", 
"1", "--replication-factor", "1", "--topic", Topic.GROUP_METADATA_TOPIC_NAME));
-
+        TestUtils.createTopicWithAdmin(adminClient, testTopicName, 
scalaBrokers, scalaControllers, defaultNumPartitions, defaultReplicationFactor,
+                scala.collection.immutable.Map$.MODULE$.empty(), new 
Properties()
+        );
+        TestUtils.createTopicWithAdmin(adminClient, 
Topic.GROUP_METADATA_TOPIC_NAME, scalaBrokers, scalaControllers, 
defaultNumPartitions, defaultReplicationFactor,
+                scala.collection.immutable.Map$.MODULE$.empty(), new 
Properties()
+        );
         // test describe
         String output = 
captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe",
 "--describe", "--exclude-internal"));
         assertTrue(output.contains(testTopicName),
             String.format("Output should have contained %s", testTopicName));
-        assertFalse(output.contains(Topic.GROUP_METADATA_TOPIC_NAME));
+        assertFalse(output.contains(Topic.GROUP_METADATA_TOPIC_NAME),
+                "Output should not have contained " + 
Topic.GROUP_METADATA_TOPIC_NAME);
 
         // test list
         output = 
captureListTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--list", 
"--exclude-internal"));
-        assertTrue(output.contains(testTopicName));
-        assertFalse(output.contains(Topic.GROUP_METADATA_TOPIC_NAME));
+        assertTrue(output.contains(testTopicName), String.format("Output 
should have contained %s", testTopicName));
+        assertFalse(output.contains(Topic.GROUP_METADATA_TOPIC_NAME),
+                "Output should not have contained " + 
Topic.GROUP_METADATA_TOPIC_NAME);
     }
 
     @ParameterizedTest(name = 
ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
@@ -936,13 +1005,12 @@ public class TopicCommandIntegrationTest extends 
kafka.integration.KafkaServerTe
 
         topicService = new TopicCommand.TopicService(adminClient);
 
-        adminClient.createTopics(
-                Collections.singletonList(new NewTopic(testTopicName, 1, 
(short) 1))
-        ).all().get();
-        waitForTopicCreated(testTopicName);
+        TestUtils.createTopicWithAdmin(adminClient, testTopicName, 
scalaBrokers, scalaControllers, defaultNumPartitions, defaultReplicationFactor,
+                scala.collection.immutable.Map$.MODULE$.empty(), new 
Properties()
+        );
 
         String output = 
captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe",
 "--topic", testTopicName));
-        String[] rows = output.split("\n");
+        String[] rows = output.split(System.lineSeparator());
         assertEquals(2, rows.length, "Unexpected output: " + output);
         assertTrue(rows[0].startsWith(String.format("Topic: %s", 
testTopicName)), "Unexpected output: " + rows[0]);
     }
@@ -950,12 +1018,14 @@ public class TopicCommandIntegrationTest extends 
kafka.integration.KafkaServerTe
     @ParameterizedTest(name = 
ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
     @ValueSource(strings = {"zk", "kraft"})
     public void testCreateWithTopicNameCollision(String quorum) throws 
ExecutionException, InterruptedException {
-        adminClient.createTopics(
-            Collections.singletonList(new NewTopic("foo_bar", 1, (short) 
6))).all().get();
-        waitForTopicCreated("foo_bar");
-
-        assertThrows(InvalidTopicException.class,
-            () -> 
topicService.createTopic(buildTopicCommandOptionsWithBootstrap("--create", 
"--topic", "foo.bar")));
+        String topic = "foo_bar";
+        int partitions = 1;
+        short replicationFactor = 6;
+        TestUtils.createTopicWithAdmin(adminClient, topic, scalaBrokers, 
scalaControllers, partitions, replicationFactor,
+                scala.collection.immutable.Map$.MODULE$.empty(), new 
Properties()
+        );
+        assertThrows(TopicExistsException.class,
+            () -> 
topicService.createTopic(buildTopicCommandOptionsWithBootstrap("--create", 
"--topic", topic)));
     }
 
     private void checkReplicaDistribution(Map<Integer, List<Integer>> 
assignment,

Reply via email to