TaiJuWu commented on code in PR #16127:
URL: https://github.com/apache/kafka/pull/16127#discussion_r1683113545


##########
tools/src/test/java/org/apache/kafka/tools/TopicCommandIntegrationTest.java:
##########
@@ -86,652 +90,792 @@
 
 @Tag("integration")
 @SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for 
usages of JavaConverters
-public class TopicCommandIntegrationTest extends 
kafka.integration.KafkaServerTestHarness implements Logging, RackAwareTest {
+@ExtendWith(ClusterTestExtensions.class)
+public class TopicCommandIntegrationTest {
     private final short defaultReplicationFactor = 1;
     private final int defaultNumPartitions = 1;
-    private TopicCommand.TopicService topicService;
-    private Admin adminClient;
-    private String bootstrapServer;
-    private String testTopicName;
-    private Buffer<KafkaBroker> scalaBrokers;
-    private Seq<ControllerServer> scalaControllers;
 
-    /**
-     * Implementations must override this method to return a set of 
KafkaConfigs. This method will be invoked for every
-     * test and should not reuse previous configurations unless they select 
their ports randomly when servers are started.
-     *
-     * Note the replica fetch max bytes is set to `1` in order to throttle the 
rate of replication for test
-     * `testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress`.
-     */
-    @Override
-    public scala.collection.Seq<KafkaConfig> generateConfigs() {
-        Map<Integer, String> rackInfo = new HashMap<>();
-        rackInfo.put(0, "rack1");
-        rackInfo.put(1, "rack2");
-        rackInfo.put(2, "rack2");
-        rackInfo.put(3, "rack1");
-        rackInfo.put(4, "rack3");
-        rackInfo.put(5, "rack3");
-
-        List<Properties> brokerConfigs = ToolsTestUtils
-            .createBrokerProperties(6, zkConnectOrNull(), rackInfo, 
defaultNumPartitions, defaultReplicationFactor);
-
-        List<KafkaConfig> configs = new ArrayList<>();
-        for (Properties props : brokerConfigs) {
-            props.put(REPLICA_FETCH_MAX_BYTES_CONFIG, "1");
-            configs.add(KafkaConfig.fromProps(props));
-        }
-        return JavaConverters.asScalaBuffer(configs).toSeq();
-    }
+    private final ClusterInstance clusterInstance;
 
     private TopicCommand.TopicCommandOptions 
buildTopicCommandOptionsWithBootstrap(String... opts) {
+        String bootstrapServer = clusterInstance.bootstrapServers();
         String[] finalOptions = Stream.concat(Arrays.stream(opts),
                 Stream.of("--bootstrap-server", bootstrapServer)
         ).toArray(String[]::new);
         return new TopicCommand.TopicCommandOptions(finalOptions);
     }
 
-    @BeforeEach
-    public void setUp(TestInfo info) {
-        super.setUp(info);
-        // create adminClient
-        Properties props = new Properties();
-        bootstrapServer = bootstrapServers(listenerName());
-        props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServer);
-        adminClient = Admin.create(props);
-        topicService = new TopicCommand.TopicService(props, 
Optional.of(bootstrapServer));
-        testTopicName = String.format("%s-%s", 
info.getTestMethod().get().getName(), 
org.apache.kafka.test.TestUtils.randomString(10));
-        scalaBrokers = brokers();
-        scalaControllers = controllerServers();
+    static List<ClusterConfig> generate1() {
+        Map<String, String> serverProp = new HashMap<>();
+        serverProp.put(REPLICA_FETCH_MAX_BYTES_CONFIG, "1"); // if config name 
error, no exception throw
+
+        Map<Integer, Map<String, String>> rackInfo = new HashMap<>();
+        Map<String, String> infoPerBroker1 = new HashMap<>();
+        infoPerBroker1.put("broker.rack", "rack1");
+        Map<String, String> infoPerBroker2 = new HashMap<>();
+        infoPerBroker2.put("broker.rack", "rack2");
+        Map<String, String> infoPerBroker3 = new HashMap<>();
+        infoPerBroker3.put("broker.rack", "rack2");
+        Map<String, String> infoPerBroker4 = new HashMap<>();
+        infoPerBroker4.put("broker.rack", "rack1");
+        Map<String, String> infoPerBroker5 = new HashMap<>();
+        infoPerBroker5.put("broker.rack", "rack3");
+        Map<String, String> infoPerBroker6 = new HashMap<>();
+        infoPerBroker6.put("broker.rack", "rack3");
+
+        rackInfo.put(0, infoPerBroker1);
+        rackInfo.put(1, infoPerBroker2);
+        rackInfo.put(2, infoPerBroker3);
+        rackInfo.put(3, infoPerBroker4);
+        rackInfo.put(4, infoPerBroker5);
+        rackInfo.put(5, infoPerBroker6);
+
+        return Collections.singletonList(ClusterConfig.defaultBuilder()
+                .setBrokers(6)
+                .setServerProperties(serverProp)
+                .setPerServerProperties(rackInfo)
+                .build()
+        );
     }
 
-    @AfterEach
-    public void close() throws Exception {
-        if (topicService != null)
-            topicService.close();
-        if (adminClient != null)
-            adminClient.close();
+    TopicCommandIntegrationTest(ClusterInstance clusterInstance) {
+        this.clusterInstance = clusterInstance;
     }
 
-    @ParameterizedTest
-    @ValueSource(strings = {"zk", "kraft"})
-    public void testCreate(String quorum) throws Exception {
-        TestUtils.createTopicWithAdmin(adminClient, testTopicName, 
scalaBrokers, scalaControllers, 2, 1,
-                scala.collection.immutable.Map$.MODULE$.empty(), new 
Properties()
-        );
-        
assertTrue(adminClient.listTopics().names().get().contains(testTopicName),
-                "Admin client didn't see the created topic. It saw: " + 
adminClient.listTopics().names().get());
+    @ClusterTemplate("generate1")
+    public void testCreate(TestInfo testInfo) throws InterruptedException, 
ExecutionException {
+        String testTopicName = testInfo.getTestMethod().get().getName() + "-" +
+                TestUtils.randomString(10);
+
+        try (Admin adminClient = clusterInstance.createAdminClient()) {
+            adminClient.createTopics(Collections.singletonList(new 
NewTopic(testTopicName, defaultNumPartitions, defaultReplicationFactor)));
+
+            clusterInstance.waitForTopic(testTopicName, defaultNumPartitions);
+            
Assertions.assertTrue(adminClient.listTopics().names().get().contains(testTopicName),
+                    "Admin client didn't see the created topic. It saw: " + 
adminClient.listTopics().names().get());
+
+            adminClient.deleteTopics(Collections.singletonList(testTopicName));
+            clusterInstance.waitForTopic(testTopicName, 0);
+            
Assertions.assertTrue(adminClient.listTopics().names().get().isEmpty(),
+                    "Admin client see the created topic. It saw: " + 
adminClient.listTopics().names().get());
+        }
     }
 
-    @ParameterizedTest
-    @ValueSource(strings = {"zk", "kraft"})
-    public void testCreateWithDefaults(String quorum) throws Exception {
-        TestUtils.createTopicWithAdmin(adminClient, testTopicName, 
scalaBrokers, scalaControllers, defaultNumPartitions, defaultReplicationFactor,
-                scala.collection.immutable.Map$.MODULE$.empty(), new 
Properties()
-        );
-        List<TopicPartitionInfo> partitions = adminClient
-            .describeTopics(Collections.singletonList(testTopicName))
-            .allTopicNames()
-            .get()
-            .get(testTopicName)
-            .partitions();
-        assertEquals(defaultNumPartitions, partitions.size(), "Unequal 
partition size: " + partitions.size());
-        assertEquals(defaultReplicationFactor, (short) 
partitions.get(0).replicas().size(), "Unequal replication factor: " + 
partitions.get(0).replicas().size());
+    @ClusterTemplate("generate1")
+    public void testCreateWithDefaults(TestInfo testInfo) throws 
InterruptedException, ExecutionException {
+        String testTopicName = testInfo.getTestMethod().get().getName() + "-" +
+                TestUtils.randomString(10);
+
+        try (Admin adminClient = clusterInstance.createAdminClient()) {
+            adminClient.createTopics(Collections.singletonList(new 
NewTopic(testTopicName, defaultNumPartitions, defaultReplicationFactor)));
+
+            clusterInstance.waitForTopic(testTopicName, defaultNumPartitions);
+            
Assertions.assertTrue(adminClient.listTopics().names().get().contains(testTopicName),
+                    "Admin client didn't see the created topic. It saw: " + 
adminClient.listTopics().names().get());
+
+            List<TopicPartitionInfo> partitions = adminClient
+                    .describeTopics(Collections.singletonList(testTopicName))
+                    .allTopicNames()
+                    .get()
+                    .get(testTopicName)
+                    .partitions();
+            Assertions.assertEquals(defaultNumPartitions, partitions.size(), 
"Unequal partition size: " + partitions.size());
+            Assertions.assertEquals(defaultReplicationFactor, (short) 
partitions.get(0).replicas().size(), "Unequal replication factor: " + 
partitions.get(0).replicas().size());
+
+            adminClient.deleteTopics(Collections.singletonList(testTopicName));
+            clusterInstance.waitForTopic(testTopicName, 0);
+            
Assertions.assertTrue(adminClient.listTopics().names().get().isEmpty(),
+                    "Admin client see the created topic. It saw: " + 
adminClient.listTopics().names().get());
+        }
     }
 
-    @ParameterizedTest
-    @ValueSource(strings = {"zk", "kraft"})
-    public void testCreateWithDefaultReplication(String quorum) throws 
Exception {
-        TestUtils.createTopicWithAdmin(adminClient, testTopicName, 
scalaBrokers, scalaControllers, 2, defaultReplicationFactor,
-                scala.collection.immutable.Map$.MODULE$.empty(), new 
Properties()
-        );
-        List<TopicPartitionInfo>  partitions = adminClient
-            .describeTopics(Collections.singletonList(testTopicName))
-            .allTopicNames()
-            .get()
-            .get(testTopicName)
-            .partitions();
-        assertEquals(2, partitions.size(), "Unequal partition size: " + 
partitions.size());
-        assertEquals(defaultReplicationFactor, (short) 
partitions.get(0).replicas().size(), "Unequal replication factor: " + 
partitions.get(0).replicas().size());
+    @ClusterTemplate("generate1")
+    public void testCreateWithDefaultReplication(TestInfo testInfo) throws 
InterruptedException, ExecutionException {
+        String testTopicName = testInfo.getTestMethod().get().getName() + "-" +
+                TestUtils.randomString(10);
+
+        try (Admin adminClient = clusterInstance.createAdminClient()) {
+            adminClient.createTopics(Collections.singletonList(new 
NewTopic(testTopicName, 2, defaultReplicationFactor)));
+            clusterInstance.waitForTopic(testTopicName, 2);
+            List<TopicPartitionInfo>  partitions = adminClient
+                    .describeTopics(Collections.singletonList(testTopicName))
+                    .allTopicNames()
+                    .get()
+                    .get(testTopicName)
+                    .partitions();
+            assertEquals(2, partitions.size(), "Unequal partition size: " + 
partitions.size());
+            assertEquals(defaultReplicationFactor, (short) 
partitions.get(0).replicas().size(), "Unequal replication factor: " + 
partitions.get(0).replicas().size());
+        }
     }
 
-    @ParameterizedTest
-    @ValueSource(strings = {"zk", "kraft"})
-    public void testCreateWithDefaultPartitions(String quorum) throws 
Exception {
-        TestUtils.createTopicWithAdmin(adminClient, testTopicName, 
scalaBrokers, scalaControllers, defaultNumPartitions, 2,
-                scala.collection.immutable.Map$.MODULE$.empty(), new 
Properties()
-        );
-        List<TopicPartitionInfo> partitions = adminClient
-            .describeTopics(Collections.singletonList(testTopicName))
-            .allTopicNames()
-            .get()
-            .get(testTopicName)
-            .partitions();
-
-        assertEquals(defaultNumPartitions, partitions.size(), "Unequal 
partition size: " + partitions.size());
-        assertEquals(2, (short) partitions.get(0).replicas().size(), 
"Partitions not replicated: " + partitions.get(0).replicas().size());
+    @ClusterTemplate("generate1")
+    public void testCreateWithDefaultPartitions(TestInfo testInfo) throws 
InterruptedException, ExecutionException {
+        String testTopicName = testInfo.getTestMethod().get().getName() + "-" +
+                TestUtils.randomString(10);
+
+        try (Admin adminClient = clusterInstance.createAdminClient()) {
+            adminClient.createTopics(Collections.singletonList(new 
NewTopic(testTopicName, defaultNumPartitions, (short) 2)));
+            clusterInstance.waitForTopic(testTopicName, defaultNumPartitions);
+            List<TopicPartitionInfo> partitions = adminClient
+                    .describeTopics(Collections.singletonList(testTopicName))
+                    .allTopicNames()
+                    .get()
+                    .get(testTopicName)
+                    .partitions();
+
+            assertEquals(defaultNumPartitions, partitions.size(), "Unequal 
partition size: " + partitions.size());
+            assertEquals(2, (short) partitions.get(0).replicas().size(), 
"Partitions not replicated: " + partitions.get(0).replicas().size());
+        }
     }
 
-    @ParameterizedTest
-    @ValueSource(strings = {"zk", "kraft"})
-    public void testCreateWithConfigs(String quorum) throws Exception {
-        ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.TOPIC, testTopicName);
-        Properties topicConfig = new Properties();
-        topicConfig.put(TopicConfig.DELETE_RETENTION_MS_CONFIG, "1000");
+    @ClusterTemplate("generate1")
+    public void testCreateWithConfigs(TestInfo testInfo) throws Exception {
+        String testTopicName = testInfo.getTestMethod().get().getName() + "-" +
+                TestUtils.randomString(10);
 
-        TestUtils.createTopicWithAdmin(adminClient, testTopicName, 
scalaBrokers, scalaControllers, 2, 2,
-                scala.collection.immutable.Map$.MODULE$.empty(), topicConfig
-        );
+        try (Admin adminClient = clusterInstance.createAdminClient()) {
+            ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.TOPIC, testTopicName);
+            Map<String, String> topicConfig = new HashMap<>();
+            topicConfig.put(TopicConfig.DELETE_RETENTION_MS_CONFIG, "1000");
+
+            adminClient.createTopics(Collections.singletonList(new 
NewTopic(testTopicName, 2, (short) 2).configs(topicConfig)));
+            clusterInstance.waitForTopic(testTopicName, 2);
 
-        Config configs = 
adminClient.describeConfigs(Collections.singleton(configResource)).all().get().get(configResource);
-        assertEquals(1000, 
Integer.valueOf(configs.get("delete.retention.ms").value()),
-                "Config not set correctly: " + 
configs.get("delete.retention.ms").value());
+
+            Config configs = 
adminClient.describeConfigs(Collections.singleton(configResource)).all().get().get(configResource);
+            assertEquals(1000, 
Integer.valueOf(configs.get("delete.retention.ms").value()),
+                    "Config not set correctly: " + 
configs.get("delete.retention.ms").value());
+        }
     }
 
-    @ParameterizedTest
-    @ValueSource(strings = {"zk", "kraft"})
-    public void testCreateWhenAlreadyExists(String quorum) {
-        // create the topic
-        TopicCommand.TopicCommandOptions createOpts = 
buildTopicCommandOptionsWithBootstrap(
-            "--create", "--partitions", 
Integer.toString(defaultNumPartitions), "--replication-factor", "1",
-                "--topic", testTopicName);
+    @ClusterTemplate("generate1")
+    public void testCreateWhenAlreadyExists(TestInfo testInfo) throws 
Exception {
+        String testTopicName = testInfo.getTestMethod().get().getName() + "-" +
+                TestUtils.randomString(10);
+        try (Admin adminClient = clusterInstance.createAdminClient();
+             TopicCommand.TopicService topicService = new 
TopicCommand.TopicService(adminClient)) {
+            TopicCommand.TopicCommandOptions createOpts = 
buildTopicCommandOptionsWithBootstrap(
+                    "--create", "--partitions", 
Integer.toString(defaultNumPartitions), "--replication-factor", "1",
+                    "--topic", testTopicName);
+
+            adminClient.createTopics(Collections.singletonList(new 
NewTopic(testTopicName, defaultNumPartitions, defaultReplicationFactor)));
+            clusterInstance.waitForTopic(testTopicName, defaultNumPartitions);
+
+            // try to re-create the topic
+            assertThrows(TopicExistsException.class, () -> 
topicService.createTopic(createOpts),
+                    "Expected TopicExistsException to throw");
+        }
+    }
 
-        TestUtils.createTopicWithAdmin(adminClient, testTopicName, 
scalaBrokers, scalaControllers, defaultNumPartitions, defaultReplicationFactor,
-                scala.collection.immutable.Map$.MODULE$.empty(), new 
Properties()
-        );
-        // try to re-create the topic
-        assertThrows(TopicExistsException.class, () -> 
topicService.createTopic(createOpts),
-                "Expected TopicExistsException to throw");
+    @ClusterTemplate("generate1")
+    public void testCreateWhenAlreadyExistsWithIfNotExists(TestInfo testInfo) 
throws Exception {
+        String testTopicName = testInfo.getTestMethod().get().getName() + "-" +
+                TestUtils.randomString(10);
+        try (Admin adminClient = clusterInstance.createAdminClient();
+             TopicCommand.TopicService topicService = new 
TopicCommand.TopicService(adminClient)) {
+            adminClient.createTopics(Collections.singletonList(new 
NewTopic(testTopicName, defaultNumPartitions, defaultReplicationFactor)));
+            clusterInstance.waitForTopic(testTopicName, defaultNumPartitions);
+
+            TopicCommand.TopicCommandOptions createOpts =
+                    buildTopicCommandOptionsWithBootstrap("--create", 
"--topic", testTopicName, "--if-not-exists");
+            topicService.createTopic(createOpts);
+        }
     }
 
-    @ParameterizedTest
-    @ValueSource(strings = {"zk", "kraft"})
-    public void testCreateWhenAlreadyExistsWithIfNotExists(String quorum) 
throws Exception {
-        TestUtils.createTopicWithAdmin(adminClient, testTopicName, 
scalaBrokers, scalaControllers, defaultNumPartitions, defaultReplicationFactor,
-                scala.collection.immutable.Map$.MODULE$.empty(), new 
Properties()
-        );
-        TopicCommand.TopicCommandOptions createOpts =
-                buildTopicCommandOptionsWithBootstrap("--create", "--topic", 
testTopicName, "--if-not-exists");
-        topicService.createTopic(createOpts);
+    private List<Integer> getPartitionReplicas(List<TopicPartitionInfo> 
partitions, int partitionNumber) {
+        return 
partitions.get(partitionNumber).replicas().stream().map(Node::id).collect(Collectors.toList());
     }
 
-    @ParameterizedTest
-    @ValueSource(strings = {"zk", "kraft"})
-    public void testCreateWithReplicaAssignment(String quorum) throws 
Exception {
-        scala.collection.mutable.HashMap<Object, Seq<Object>> 
replicaAssignmentMap = new scala.collection.mutable.HashMap<>();
+    @ClusterTemplate("generate1")
+    public void testCreateWithReplicaAssignment(TestInfo testInfo) throws 
Exception {
+        Map<Integer, List<Integer>> replicaAssignmentMap = new HashMap<>();
+        try (Admin adminClient = clusterInstance.createAdminClient()) {
+            String testTopicName = testInfo.getTestMethod().get().getName() + 
"-" +
+                    TestUtils.randomString(10);
 
-        replicaAssignmentMap.put(0, 
JavaConverters.asScalaBufferConverter(Arrays.asList((Object) 5, (Object) 
4)).asScala().toSeq());
-        replicaAssignmentMap.put(1, 
JavaConverters.asScalaBufferConverter(Arrays.asList((Object) 3, (Object) 
2)).asScala().toSeq());
-        replicaAssignmentMap.put(2, 
JavaConverters.asScalaBufferConverter(Arrays.asList((Object) 1, (Object) 
0)).asScala().toSeq());
+            replicaAssignmentMap.put(0, Arrays.asList(5, 4));
+            replicaAssignmentMap.put(1, Arrays.asList(3, 2));
+            replicaAssignmentMap.put(2, Arrays.asList(1, 0));
 
-        TestUtils.createTopicWithAdmin(adminClient, testTopicName, 
scalaBrokers, scalaControllers, defaultNumPartitions,
-                defaultReplicationFactor, replicaAssignmentMap, new 
Properties()
-        );
+            adminClient.createTopics(Collections.singletonList(new 
NewTopic(testTopicName, replicaAssignmentMap)));
+            clusterInstance.waitForTopic(testTopicName, 3);
 
-        List<TopicPartitionInfo> partitions = adminClient
-            .describeTopics(Collections.singletonList(testTopicName))
-            .allTopicNames()
-            .get()
-            .get(testTopicName)
-            .partitions();
-        
-        assertEquals(3, partitions.size(),
-                "Unequal partition size: " + partitions.size());
-        assertEquals(Arrays.asList(5, 4), getPartitionReplicas(partitions, 0),
-                "Unexpected replica assignment: " + 
getPartitionReplicas(partitions, 0));
-        assertEquals(Arrays.asList(3, 2), getPartitionReplicas(partitions, 1),
-                "Unexpected replica assignment: " + 
getPartitionReplicas(partitions, 1));
-        assertEquals(Arrays.asList(1, 0), getPartitionReplicas(partitions, 2),
-                "Unexpected replica assignment: " + 
getPartitionReplicas(partitions, 2));
-    }
+            List<TopicPartitionInfo> partitions = adminClient
+                    .describeTopics(Collections.singletonList(testTopicName))
+                    .allTopicNames()
+                    .get()
+                    .get(testTopicName)
+                    .partitions();
 
-    private List<Integer> getPartitionReplicas(List<TopicPartitionInfo> 
partitions, int partitionNumber) {
-        return 
partitions.get(partitionNumber).replicas().stream().map(Node::id).collect(Collectors.toList());
+            adminClient.close();
+            assertEquals(3, partitions.size(),
+                    "Unequal partition size: " + partitions.size());
+            assertEquals(Arrays.asList(5, 4), getPartitionReplicas(partitions, 
0),
+                    "Unexpected replica assignment: " + 
getPartitionReplicas(partitions, 0));
+            assertEquals(Arrays.asList(3, 2), getPartitionReplicas(partitions, 
1),
+                    "Unexpected replica assignment: " + 
getPartitionReplicas(partitions, 1));
+            assertEquals(Arrays.asList(1, 0), getPartitionReplicas(partitions, 
2),
+                    "Unexpected replica assignment: " + 
getPartitionReplicas(partitions, 2));
+        }
     }
 
-    @ParameterizedTest
-    @ValueSource(strings = {"zk", "kraft"})
-    public void testCreateWithInvalidReplicationFactor(String quorum) {
-        TopicCommand.TopicCommandOptions opts = 
buildTopicCommandOptionsWithBootstrap("--create", "--partitions", "2", 
"--replication-factor", Integer.toString(Short.MAX_VALUE + 1),
-            "--topic", testTopicName);
-        assertThrows(IllegalArgumentException.class, () -> 
topicService.createTopic(opts), "Expected IllegalArgumentException to throw");
+    @ClusterTemplate("generate1")
+    public void testCreateWithInvalidReplicationFactor(TestInfo testInfo) 
throws Exception {
+        String testTopicName = testInfo.getTestMethod().get().getName() + "-" +
+                TestUtils.randomString(10);
+        try (Admin adminClient = clusterInstance.createAdminClient();
+             TopicCommand.TopicService topicService = new 
TopicCommand.TopicService(adminClient)) {
+
+            TopicCommand.TopicCommandOptions opts = 
buildTopicCommandOptionsWithBootstrap("--create", "--partitions", "2", 
"--replication-factor", Integer.toString(Short.MAX_VALUE + 1),
+                    "--topic", testTopicName);
+            assertThrows(IllegalArgumentException.class, () -> 
topicService.createTopic(opts), "Expected IllegalArgumentException to throw");
+        }
     }
 
-    @ParameterizedTest
-    @ValueSource(strings = {"zk", "kraft"})
-    public void testCreateWithNegativeReplicationFactor(String quorum) {
-        TopicCommand.TopicCommandOptions opts = 
buildTopicCommandOptionsWithBootstrap("--create",
-            "--partitions", "2", "--replication-factor", "-1", "--topic", 
testTopicName);
-        assertThrows(IllegalArgumentException.class, () -> 
topicService.createTopic(opts), "Expected IllegalArgumentException to throw");
+    @ClusterTemplate("generate1")
+    public void testCreateWithNegativeReplicationFactor(TestInfo testInfo) 
throws Exception {
+        String testTopicName = testInfo.getTestMethod().get().getName() + "-" +
+                TestUtils.randomString(10);
+        try (Admin adminClient = clusterInstance.createAdminClient();
+             TopicCommand.TopicService topicService = new 
TopicCommand.TopicService(adminClient)) {
+            TopicCommand.TopicCommandOptions opts = 
buildTopicCommandOptionsWithBootstrap("--create",
+                    "--partitions", "2", "--replication-factor", "-1", 
"--topic", testTopicName);
+            assertThrows(IllegalArgumentException.class, () -> 
topicService.createTopic(opts), "Expected IllegalArgumentException to throw");
+        }
     }
 
-    @ParameterizedTest
-    @ValueSource(strings = {"zk", "kraft"})
-    public void testCreateWithNegativePartitionCount(String quorum) {
-        TopicCommand.TopicCommandOptions opts = 
buildTopicCommandOptionsWithBootstrap("--create", "--partitions", "-1", 
"--replication-factor", "1", "--topic", testTopicName);
-        assertThrows(IllegalArgumentException.class, () -> 
topicService.createTopic(opts), "Expected IllegalArgumentException to throw");
+    @ClusterTemplate("generate1")
+    public void testCreateWithNegativePartitionCount(TestInfo testInfo) throws 
Exception {
+        String testTopicName = testInfo.getTestMethod().get().getName() + "-" +
+                TestUtils.randomString(10);
+        try (Admin adminClient = clusterInstance.createAdminClient();
+             TopicCommand.TopicService topicService = new 
TopicCommand.TopicService(adminClient)) {
+            TopicCommand.TopicCommandOptions opts = 
buildTopicCommandOptionsWithBootstrap("--create", "--partitions", "-1", 
"--replication-factor", "1", "--topic", testTopicName);
+            assertThrows(IllegalArgumentException.class, () -> 
topicService.createTopic(opts), "Expected IllegalArgumentException to throw");
+        }
     }
 
-    @ParameterizedTest
-    @ValueSource(strings = {"zk", "kraft"})
-    public void testInvalidTopicLevelConfig(String quorum) {
-        TopicCommand.TopicCommandOptions createOpts = 
buildTopicCommandOptionsWithBootstrap("--create",
-            "--partitions", "1", "--replication-factor", "1", "--topic", 
testTopicName,
-            "--config", "message.timestamp.type=boom");
-        assertThrows(ConfigException.class, () -> 
topicService.createTopic(createOpts), "Expected ConfigException to throw");
+    @ClusterTemplate("generate1")
+    public void testInvalidTopicLevelConfig(TestInfo testInfo) {
+        String testTopicName = testInfo.getTestMethod().get().getName() + "-" +
+                TestUtils.randomString(10);
+        try (Admin adminClient = clusterInstance.createAdminClient()) {
+            TopicCommand.TopicService topicService = new 
TopicCommand.TopicService(adminClient);
+
+            TopicCommand.TopicCommandOptions createOpts = 
buildTopicCommandOptionsWithBootstrap("--create",
+                    "--partitions", "1", "--replication-factor", "1", 
"--topic", testTopicName,
+                    "--config", "message.timestamp.type=boom");
+            assertThrows(ConfigException.class, () -> 
topicService.createTopic(createOpts), "Expected ConfigException to throw");
+        }
     }
 
-    @ParameterizedTest
-    @ValueSource(strings = {"zk", "kraft"})
-    public void testListTopics(String quorum) {
-        TestUtils.createTopicWithAdmin(adminClient, testTopicName, 
scalaBrokers, scalaControllers, defaultNumPartitions, defaultReplicationFactor,
-                scala.collection.immutable.Map$.MODULE$.empty(), new 
Properties()
-        );
+    @ClusterTemplate("generate1")
+    public void testListTopics(TestInfo testInfo) throws InterruptedException {
+        String testTopicName = testInfo.getTestMethod().get().getName() + "-" +
+                TestUtils.randomString(10);
+        try (Admin adminClient = clusterInstance.createAdminClient()) {
+            adminClient.createTopics(Collections.singletonList(new 
NewTopic(testTopicName, defaultNumPartitions, defaultReplicationFactor)));
+            clusterInstance.waitForTopic(testTopicName, defaultNumPartitions);
 
-        String output = 
captureListTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--list"));
-        assertTrue(output.contains(testTopicName), "Expected topic name to be 
present in output: " + output);
+            String output = 
captureListTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--list"));
+            assertTrue(output.contains(testTopicName), "Expected topic name to 
be present in output: " + output);
+        }
     }
 
-    @ParameterizedTest
-    @ValueSource(strings = {"zk", "kraft"})
-    public void testListTopicsWithIncludeList(String quorum) {
-        String topic1 = "kafka.testTopic1";
-        String topic2 = "kafka.testTopic2";
-        String topic3 = "oooof.testTopic1";
-        int partition = 2;
-        short replicationFactor = 2;
-        TestUtils.createTopicWithAdmin(adminClient, topic1, scalaBrokers, 
scalaControllers, partition, replicationFactor,
-                scala.collection.immutable.Map$.MODULE$.empty(), new 
Properties()
-        );
-        TestUtils.createTopicWithAdmin(adminClient, topic2, scalaBrokers, 
scalaControllers, partition, replicationFactor,
-                scala.collection.immutable.Map$.MODULE$.empty(), new 
Properties()
-        );
-        TestUtils.createTopicWithAdmin(adminClient, topic3, scalaBrokers, 
scalaControllers, partition, replicationFactor,
-                scala.collection.immutable.Map$.MODULE$.empty(), new 
Properties()
-        );
+    @ClusterTemplate("generate1")
+    public void testListTopicsWithIncludeList(TestInfo testInfo) throws 
InterruptedException {
+        try (Admin adminClient = clusterInstance.createAdminClient()) {
+            String topic1 = "kafka.testTopic1";
+            String topic2 = "kafka.testTopic2";
+            String topic3 = "oooof.testTopic1";
+            int partition = 2;
+            short replicationFactor = 2;
+            adminClient.createTopics(Collections.singletonList(new 
NewTopic(topic1, partition, replicationFactor)));
+            adminClient.createTopics(Collections.singletonList(new 
NewTopic(topic2, partition, replicationFactor)));
+            adminClient.createTopics(Collections.singletonList(new 
NewTopic(topic3, partition, replicationFactor)));
+            clusterInstance.waitForTopic(topic1, partition);
+            clusterInstance.waitForTopic(topic2, partition);
+            clusterInstance.waitForTopic(topic3, partition);
+
+            String output = 
captureListTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--list", 
"--topic", "kafka.*"));
+            assertTrue(output.contains(topic1), "Expected topic name " + 
topic1 + " to be present in output: " + output);
+            assertTrue(output.contains(topic2), "Expected topic name " + 
topic2 + " to be present in output: " + output);
+            assertFalse(output.contains(topic3), "Do not expect topic name " + 
topic3 + " to be present in output: " + output);
+        }
+    }
 
-        String output = 
captureListTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--list", 
"--topic", "kafka.*"));
+    @ClusterTemplate("generate1")
+    public void testListTopicsWithExcludeInternal(TestInfo testInfo) throws 
InterruptedException {
+        try (Admin adminClient = clusterInstance.createAdminClient();) {
+            String topic1 = "kafka.testTopic1";
+            String hiddenConsumerTopic = Topic.GROUP_METADATA_TOPIC_NAME;
+            int partition = 2;
+            short replicationFactor = 2;
+            adminClient.createTopics(Collections.singletonList(new 
NewTopic(topic1, partition, replicationFactor)));
+            clusterInstance.waitForTopic(topic1, partition);
+
+            String output = 
captureListTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--list", 
"--exclude-internal"));
+            assertTrue(output.contains(topic1), "Expected topic name " + 
topic1 + " to be present in output: " + output);
+            assertFalse(output.contains(hiddenConsumerTopic), "Do not expect 
topic name " + hiddenConsumerTopic + " to be present in output: " + output);
+        }
+    }
 
-        assertTrue(output.contains(topic1), "Expected topic name " + topic1 + 
" to be present in output: " + output);
-        assertTrue(output.contains(topic2), "Expected topic name " + topic2 + 
" to be present in output: " + output);
-        assertFalse(output.contains(topic3), "Do not expect topic name " + 
topic3 + " to be present in output: " + output);
+    @ClusterTemplate("generate1")
+    public void testAlterPartitionCount(TestInfo testInfo) throws Exception {
+        String testTopicName = testInfo.getTestMethod().get().getName() + "-" +
+                TestUtils.randomString(10);
+        try (Admin adminClient = clusterInstance.createAdminClient();
+             TopicCommand.TopicService topicService = new 
TopicCommand.TopicService(adminClient)) {
+            int partition = 2;
+            short replicationFactor = 2;
+            adminClient.createTopics(Collections.singletonList(new 
NewTopic(testTopicName, partition, replicationFactor)));
+            clusterInstance.waitForTopic(testTopicName, partition);
+            
topicService.alterTopic(buildTopicCommandOptionsWithBootstrap("--alter", 
"--topic", testTopicName, "--partitions", "3"));
+
+            TestUtils.waitForCondition(
+                    () -> 
adminClient.listPartitionReassignments().reassignments().get().isEmpty(),
+                    60000, testTopicName + "reassignmet not finished after 
60000 ms"
+            );
+
+            TestUtils.waitForCondition(
+                    () -> clusterInstance.brokers().values().stream().allMatch(
+                            b -> 
b.metadataCache().getTopicPartitions(testTopicName).size() == 3),
+                    TestUtils.DEFAULT_MAX_WAIT_MS, "Timeout waiting for new 
assignment propagating to broker");
+            TopicDescription topicDescription = 
adminClient.describeTopics(Collections.singletonList(testTopicName)).topicNameValues().get(testTopicName).get();
+            assertEquals(3, topicDescription.partitions().size(), "Expected 
partition count to be 3. Got: " + topicDescription.partitions().size());
+        }
     }
 
-    @ParameterizedTest
-    @ValueSource(strings = {"zk", "kraft"})
-    public void testListTopicsWithExcludeInternal(String quorum) {
-        String topic1 = "kafka.testTopic1";
-        String hiddenConsumerTopic = Topic.GROUP_METADATA_TOPIC_NAME;
-        int partition = 2;
-        short replicationFactor = 2;
-        TestUtils.createTopicWithAdmin(adminClient, topic1, scalaBrokers, 
scalaControllers, partition, replicationFactor,
-                scala.collection.immutable.Map$.MODULE$.empty(), new 
Properties()
-        );
-        TestUtils.createTopicWithAdmin(adminClient, hiddenConsumerTopic, 
scalaBrokers, scalaControllers, partition, replicationFactor,
-                scala.collection.immutable.Map$.MODULE$.empty(), new 
Properties()
-        );
+    @ClusterTemplate("generate1")
+    public void testAlterAssignment(TestInfo testInfo) throws Exception {
+        String testTopicName = testInfo.getTestMethod().get().getName() + "-" +
+                TestUtils.randomString(10);
+        try (Admin adminClient = clusterInstance.createAdminClient();
+             TopicCommand.TopicService topicService = new 
TopicCommand.TopicService(adminClient)) {
+            int partition = 2;
+            short replicationFactor = 2;
 
-        String output = 
captureListTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--list", 
"--exclude-internal"));
+            adminClient.createTopics(Collections.singletonList(new 
NewTopic(testTopicName, partition, replicationFactor)));
+            clusterInstance.waitForTopic(testTopicName, partition);
 
-        assertTrue(output.contains(topic1), "Expected topic name " + topic1 + 
" to be present in output: " + output);
-        assertFalse(output.contains(hiddenConsumerTopic), "Do not expect topic 
name " + hiddenConsumerTopic + " to be present in output: " + output);
-    }
+            
topicService.alterTopic(buildTopicCommandOptionsWithBootstrap("--alter",
+                    "--topic", testTopicName, "--replica-assignment", 
"5:3,3:1,4:2", "--partitions", "3"));
 
-    @ParameterizedTest
-    @ValueSource(strings = {"zk", "kraft"})
-    public void testAlterPartitionCount(String quorum) throws 
ExecutionException, InterruptedException {
-        int partition = 2;
-        short replicationFactor = 2;
-        TestUtils.createTopicWithAdmin(adminClient, testTopicName, 
scalaBrokers, scalaControllers, partition, replicationFactor,
-                scala.collection.immutable.Map$.MODULE$.empty(), new 
Properties()
-        );
-        
topicService.alterTopic(buildTopicCommandOptionsWithBootstrap("--alter", 
"--topic", testTopicName, "--partitions", "3"));
-
-        TestUtils.waitForAllReassignmentsToComplete(adminClient, 100L);
-        kafka.utils.TestUtils.waitUntilTrue(
-            () -> brokers().forall(b -> 
b.metadataCache().getTopicPartitions(testTopicName).size() == 3),
-            () -> "Timeout waiting for new assignment propagating to broker", 
org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS, 100L);
-        TopicDescription topicDescription = 
adminClient.describeTopics(Collections.singletonList(testTopicName)).topicNameValues().get(testTopicName).get();
-        assertEquals(3, topicDescription.partitions().size(), "Expected 
partition count to be 3. Got: " + topicDescription.partitions().size());
-    }
+            TestUtils.waitForCondition(
+                    () -> 
adminClient.listPartitionReassignments().reassignments().get().isEmpty(),
+                    60000, testTopicName + "reassignmet not finished after 
60000 ms"
+            );
 
-    @ParameterizedTest
-    @ValueSource(strings = {"zk", "kraft"})
-    public void testAlterAssignment(String quorum) throws ExecutionException, 
InterruptedException {
-        int partition = 2;
-        short replicationFactor = 2;
-        TestUtils.createTopicWithAdmin(adminClient, testTopicName, 
scalaBrokers, scalaControllers, partition, replicationFactor,
-                scala.collection.immutable.Map$.MODULE$.empty(), new 
Properties()
-        );
-        
topicService.alterTopic(buildTopicCommandOptionsWithBootstrap("--alter",
-            "--topic", testTopicName, "--replica-assignment", "5:3,3:1,4:2", 
"--partitions", "3"));
-
-        TestUtils.waitForAllReassignmentsToComplete(adminClient, 100L);
-        kafka.utils.TestUtils.waitUntilTrue(
-            () -> brokers().forall(b -> 
b.metadataCache().getTopicPartitions(testTopicName).size() == 3),
-            () -> "Timeout waiting for new assignment propagating to broker",
-            org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS, 100L);
-
-        TopicDescription topicDescription = 
adminClient.describeTopics(Collections.singletonList(testTopicName)).topicNameValues().get(testTopicName).get();
-        assertEquals(3, topicDescription.partitions().size(), "Expected 
partition count to be 3. Got: " + topicDescription.partitions().size());
-        List<Integer> partitionReplicas = 
getPartitionReplicas(topicDescription.partitions(), 2);
-        assertEquals(Arrays.asList(4, 2), partitionReplicas, "Expected to have 
replicas 4,2. Got: " + partitionReplicas);
+            TestUtils.waitForCondition(
+                    () -> clusterInstance.brokers().values().stream().allMatch(
+                            b -> 
b.metadataCache().getTopicPartitions(testTopicName).size() == 3),
+                    TestUtils.DEFAULT_MAX_WAIT_MS, "Timeout waiting for new 
assignment propagating to broker");
+
+            TopicDescription topicDescription = 
adminClient.describeTopics(Collections.singletonList(testTopicName)).topicNameValues().get(testTopicName).get();
+            assertEquals(3, topicDescription.partitions().size(), "Expected 
partition count to be 3. Got: " + topicDescription.partitions().size());
+            List<Integer> partitionReplicas = 
getPartitionReplicas(topicDescription.partitions(), 2);
+            assertEquals(Arrays.asList(4, 2), partitionReplicas, "Expected to 
have replicas 4,2. Got: " + partitionReplicas);
+
+        }
     }
 
-    @ParameterizedTest
-    @ValueSource(strings = {"zk", "kraft"})
-    public void testAlterAssignmentWithMoreAssignmentThanPartitions(String 
quorum) {
-        int partition = 2;
-        short replicationFactor = 2;
-        TestUtils.createTopicWithAdmin(adminClient, testTopicName, 
scalaBrokers, scalaControllers, partition, replicationFactor,
-                scala.collection.immutable.Map$.MODULE$.empty(), new 
Properties()
-        );
-        assertThrows(ExecutionException.class,
-            () -> 
topicService.alterTopic(buildTopicCommandOptionsWithBootstrap("--alter",
-                "--topic", testTopicName, "--replica-assignment", 
"5:3,3:1,4:2,3:2", "--partitions", "3")),
-                "Expected to fail with ExecutionException");
+    @ClusterTemplate("generate1")
+    public void testAlterAssignmentWithMoreAssignmentThanPartitions(TestInfo 
testInfo) throws Exception {
+        String testTopicName = testInfo.getTestMethod().get().getName() + "-" +
+                TestUtils.randomString(10);
+        try (Admin adminClient = clusterInstance.createAdminClient();
+             TopicCommand.TopicService topicService = new 
TopicCommand.TopicService(adminClient)) {
+
+            int partition = 2;
+            short replicationFactor = 2;
+            adminClient.createTopics(Collections.singletonList(new 
NewTopic(testTopicName, partition, replicationFactor)));
+            clusterInstance.waitForTopic(testTopicName, partition);
+
+            assertThrows(ExecutionException.class,
+                    () -> 
topicService.alterTopic(buildTopicCommandOptionsWithBootstrap("--alter",
+                            "--topic", testTopicName, "--replica-assignment", 
"5:3,3:1,4:2,3:2", "--partitions", "3")),
+                    "Expected to fail with ExecutionException");
+
+        }
     }
 
-    @ParameterizedTest
-    @ValueSource(strings = {"zk", "kraft"})
-    public void testAlterAssignmentWithMorePartitionsThanAssignment(String 
quorum) {
-        int partition = 2;
-        short replicationFactor = 2;
-        TestUtils.createTopicWithAdmin(adminClient, testTopicName, 
scalaBrokers, scalaControllers, partition, replicationFactor,
-                scala.collection.immutable.Map$.MODULE$.empty(), new 
Properties()
-        );
+    @ClusterTemplate("generate1")
+    public void testAlterAssignmentWithMorePartitionsThanAssignment(TestInfo 
testInfo) throws Exception {
+        String testTopicName = testInfo.getTestMethod().get().getName() + "-" +
+                TestUtils.randomString(10);
+        try (Admin adminClient = clusterInstance.createAdminClient();
+             TopicCommand.TopicService topicService = new 
TopicCommand.TopicService(adminClient)) {
+            int partition = 2;
+            short replicationFactor = 2;
+            adminClient.createTopics(Collections.singletonList(new 
NewTopic(testTopicName, partition, replicationFactor)));
+            clusterInstance.waitForTopic(testTopicName, partition);
+
+            assertThrows(ExecutionException.class,
+                    () -> 
topicService.alterTopic(buildTopicCommandOptionsWithBootstrap("--alter", 
"--topic", testTopicName,
+                            "--replica-assignment", "5:3,3:1,4:2", 
"--partitions", "6")),
+                    "Expected to fail with ExecutionException");
 
-        assertThrows(ExecutionException.class,
-            () -> 
topicService.alterTopic(buildTopicCommandOptionsWithBootstrap("--alter", 
"--topic", testTopicName,
-                "--replica-assignment", "5:3,3:1,4:2", "--partitions", "6")),
-                "Expected to fail with ExecutionException");
+        }
     }
 
-    @ParameterizedTest
-    @ValueSource(strings = {"zk", "kraft"})
-    public void testAlterWithInvalidPartitionCount(String quorum) {
-        TestUtils.createTopicWithAdmin(adminClient, testTopicName, 
scalaBrokers, scalaControllers, defaultNumPartitions, defaultReplicationFactor,
-                scala.collection.immutable.Map$.MODULE$.empty(), new 
Properties()
-        );
-        assertThrows(ExecutionException.class,
-            () -> 
topicService.alterTopic(buildTopicCommandOptionsWithBootstrap("--alter", 
"--partitions", "-1", "--topic", testTopicName)),
-                "Expected to fail with ExecutionException");
+    @ClusterTemplate("generate1")
+    public void testAlterWithInvalidPartitionCount(TestInfo testInfo) throws 
Exception {
+        String testTopicName = testInfo.getTestMethod().get().getName() + "-" +
+                TestUtils.randomString(10);
+
+        try (Admin adminClient = clusterInstance.createAdminClient();
+             TopicCommand.TopicService topicService = new 
TopicCommand.TopicService(adminClient)) {
+            adminClient.createTopics(Collections.singletonList(new 
NewTopic(testTopicName, defaultNumPartitions, defaultReplicationFactor)));
+            clusterInstance.waitForTopic(testTopicName, defaultNumPartitions);
+
+            assertThrows(ExecutionException.class,
+                    () -> 
topicService.alterTopic(buildTopicCommandOptionsWithBootstrap("--alter", 
"--partitions", "-1", "--topic", testTopicName)),
+                    "Expected to fail with ExecutionException");
+        }
     }
 
-    @ParameterizedTest
-    @ValueSource(strings = {"zk", "kraft"})
-    public void testAlterWhenTopicDoesntExist(String quorum) {
-        // alter a topic that does not exist without --if-exists
-        TopicCommand.TopicCommandOptions alterOpts = 
buildTopicCommandOptionsWithBootstrap("--alter", "--topic", testTopicName, 
"--partitions", "1");
-        TopicCommand.TopicService topicService = new 
TopicCommand.TopicService(adminClient);
-        assertThrows(IllegalArgumentException.class, () -> 
topicService.alterTopic(alterOpts), "Expected to fail with 
IllegalArgumentException");
+    @ClusterTemplate("generate1")
+    public void testAlterWhenTopicDoesntExist(TestInfo testInfo) throws 
Exception {
+        String testTopicName = testInfo.getTestMethod().get().getName() + "-" +
+                TestUtils.randomString(10);
+
+        try (Admin adminClient = clusterInstance.createAdminClient();
+             TopicCommand.TopicService topicService = new 
TopicCommand.TopicService(adminClient)) {
+            // alter a topic that does not exist without --if-exists
+            TopicCommand.TopicCommandOptions alterOpts = 
buildTopicCommandOptionsWithBootstrap("--alter", "--topic", testTopicName, 
"--partitions", "1");
+            assertThrows(IllegalArgumentException.class, () -> 
topicService.alterTopic(alterOpts), "Expected to fail with 
IllegalArgumentException");
+
+        }
     }
 
-    @ParameterizedTest
-    @ValueSource(strings = {"zk", "kraft"})
-    public void testAlterWhenTopicDoesntExistWithIfExists(String quorum) 
throws ExecutionException, InterruptedException {
+    @ClusterTemplate("generate1")
+    public void testAlterWhenTopicDoesntExistWithIfExists(TestInfo testInfo) 
throws Exception {
+        String testTopicName = testInfo.getTestMethod().get().getName() + "-" +
+                TestUtils.randomString(10);
+        Admin adminClient = clusterInstance.createAdminClient();
+
+        TopicCommand.TopicService topicService = new 
TopicCommand.TopicService(adminClient);
         
topicService.alterTopic(buildTopicCommandOptionsWithBootstrap("--alter", 
"--topic", testTopicName, "--partitions", "1", "--if-exists"));
+        adminClient.close();
+        topicService.close();
     }
 
-    @ParameterizedTest
-    @ValueSource(strings = {"zk", "kraft"})
-    public void testCreateAlterTopicWithRackAware(String quorum) throws 
Exception {
-        Map<Integer, String> rackInfo = new HashMap<>();
-        rackInfo.put(0, "rack1");
-        rackInfo.put(1, "rack2");
-        rackInfo.put(2, "rack2");
-        rackInfo.put(3, "rack1");
-        rackInfo.put(4, "rack3");
-        rackInfo.put(5, "rack3");
-
-        int numPartitions = 18;
-        int replicationFactor = 3;
-        TestUtils.createTopicWithAdmin(adminClient, testTopicName, 
scalaBrokers, scalaControllers, numPartitions, replicationFactor,
-                scala.collection.immutable.Map$.MODULE$.empty(), new 
Properties()
-        );
+    @ClusterTemplate("generate1")
+    public void testCreateAlterTopicWithRackAware(TestInfo testInfo) throws 
Exception {
+        String testTopicName = testInfo.getTestMethod().get().getName() + "-" +
+                TestUtils.randomString(10);
+        try (Admin adminClient = clusterInstance.createAdminClient();
+             TopicCommand.TopicService topicService = new 
TopicCommand.TopicService(adminClient)) {
+
+            Map<Integer, String> rackInfo = new HashMap<>();
+            rackInfo.put(0, "rack1");
+            rackInfo.put(1, "rack2");
+            rackInfo.put(2, "rack2");
+            rackInfo.put(3, "rack1");
+            rackInfo.put(4, "rack3");
+            rackInfo.put(5, "rack3");
+
+            int numPartitions = 18;
+            int replicationFactor = 3;
+            adminClient.createTopics(Collections.singletonList(new 
NewTopic(testTopicName, numPartitions, (short) replicationFactor)));
+            clusterInstance.waitForTopic(testTopicName, numPartitions);
+
+            Map<Integer, List<Integer>> assignment = 
adminClient.describeTopics(Collections.singletonList(testTopicName))
+                    .allTopicNames().get().get(testTopicName).partitions()
+                    .stream()
+                    .collect(Collectors.toMap(
+                            info -> info.partition(),
+                            info -> 
info.replicas().stream().map(Node::id).collect(Collectors.toList())));
+            checkReplicaDistribution(assignment, rackInfo, rackInfo.size(), 
numPartitions,
+                    replicationFactor, true, true, true);
+
+            int alteredNumPartitions = 36;
+            // verify that adding partitions will also be rack aware
+            TopicCommand.TopicCommandOptions alterOpts = 
buildTopicCommandOptionsWithBootstrap("--alter",
+                    "--partitions", Integer.toString(alteredNumPartitions),
+                    "--topic", testTopicName);
+            topicService.alterTopic(alterOpts);
+
+            TestUtils.waitForCondition(
+                    () -> 
adminClient.listPartitionReassignments().reassignments().get().isEmpty(),
+                    60000, testTopicName + "reassignmet not finished after 
60000 ms"
+            );
+            TestUtils.waitForCondition(
+                    () -> 
clusterInstance.brokers().values().stream().allMatch(p -> 
p.metadataCache().getTopicPartitions(testTopicName).size() == 
alteredNumPartitions),
+                    TestUtils.DEFAULT_MAX_WAIT_MS, "Timeout waiting for new 
assignment propagating to broker");
+
+            assignment = 
adminClient.describeTopics(Collections.singletonList(testTopicName))
+                    
.allTopicNames().get().get(testTopicName).partitions().stream()
+                    .collect(Collectors.toMap(info -> info.partition(), info 
-> info.replicas().stream().map(Node::id).collect(Collectors.toList())));
+            checkReplicaDistribution(assignment, rackInfo, rackInfo.size(), 
alteredNumPartitions, replicationFactor,
+                    true, true, true);
 
-        Map<Integer, List<Integer>> assignment = 
adminClient.describeTopics(Collections.singletonList(testTopicName))
-            .allTopicNames().get().get(testTopicName).partitions()
-            .stream()
-            .collect(Collectors.toMap(
-                info -> info.partition(),
-                info -> 
info.replicas().stream().map(Node::id).collect(Collectors.toList())));
-        checkReplicaDistribution(assignment, rackInfo, rackInfo.size(), 
numPartitions,
-            replicationFactor, true, true, true);
-
-        int alteredNumPartitions = 36;
-        // verify that adding partitions will also be rack aware
-        TopicCommand.TopicCommandOptions alterOpts = 
buildTopicCommandOptionsWithBootstrap("--alter",
-            "--partitions", Integer.toString(alteredNumPartitions),
-            "--topic", testTopicName);
-        topicService.alterTopic(alterOpts);
-
-        TestUtils.waitForAllReassignmentsToComplete(adminClient, 100L);
-        kafka.utils.TestUtils.waitUntilTrue(
-            () -> brokers().forall(p -> 
p.metadataCache().getTopicPartitions(testTopicName).size() == 
alteredNumPartitions),
-            () -> "Timeout waiting for new assignment propagating to broker",
-            org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS, 100L);
-
-        assignment = 
adminClient.describeTopics(Collections.singletonList(testTopicName))
-            .allTopicNames().get().get(testTopicName).partitions().stream()
-            .collect(Collectors.toMap(info -> info.partition(), info -> 
info.replicas().stream().map(Node::id).collect(Collectors.toList())));
-        checkReplicaDistribution(assignment, rackInfo, rackInfo.size(), 
alteredNumPartitions, replicationFactor,
-            true, true, true);
+        }
     }
 
-    @ParameterizedTest
-    @ValueSource(strings = {"zk", "kraft"})
-    public void testConfigPreservationAcrossPartitionAlteration(String quorum) 
throws Exception {
-        String cleanUpPolicy = "compact";
-        Properties topicConfig = new Properties();
-        topicConfig.put(TopicConfig.CLEANUP_POLICY_CONFIG, cleanUpPolicy);
-        TestUtils.createTopicWithAdmin(adminClient, testTopicName, 
scalaBrokers, scalaControllers, defaultNumPartitions, defaultReplicationFactor,
-                scala.collection.immutable.Map$.MODULE$.empty(), topicConfig
-        );
-
-        ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.TOPIC, testTopicName);
-        Config props = 
adminClient.describeConfigs(Collections.singleton(configResource)).all().get().get(configResource);
-        // val props = adminZkClient.fetchEntityConfig(ConfigType.Topic, 
testTopicName)
-        assertNotNull(props.get(TopicConfig.CLEANUP_POLICY_CONFIG), 
"Properties after creation don't contain " + cleanUpPolicy);
-        assertEquals(cleanUpPolicy, 
props.get(TopicConfig.CLEANUP_POLICY_CONFIG).value(), "Properties after 
creation have incorrect value");
+    @ClusterTemplate("generate1")
+    public void testConfigPreservationAcrossPartitionAlteration(TestInfo 
testInfo) throws Exception {
+        String testTopicName = testInfo.getTestMethod().get().getName() + "-" +
+                TestUtils.randomString(10);
+        try (Admin adminClient = clusterInstance.createAdminClient();
+             TopicCommand.TopicService topicService = new 
TopicCommand.TopicService(adminClient)) {
+
+            String cleanUpPolicy = "compact";
+            HashMap<String, String> topicConfig = new HashMap<>();
+            topicConfig.put(TopicConfig.CLEANUP_POLICY_CONFIG, cleanUpPolicy);
+            adminClient.createTopics(Collections.singletonList(new 
NewTopic(testTopicName, defaultNumPartitions, 
defaultReplicationFactor).configs(topicConfig)));
+            clusterInstance.waitForTopic(testTopicName, defaultNumPartitions);
+
+            ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.TOPIC, testTopicName);
+            Config props = 
adminClient.describeConfigs(Collections.singleton(configResource)).all().get().get(configResource);
+            assertNotNull(props.get(TopicConfig.CLEANUP_POLICY_CONFIG), 
"Properties after creation don't contain " + cleanUpPolicy);
+            assertEquals(cleanUpPolicy, 
props.get(TopicConfig.CLEANUP_POLICY_CONFIG).value(), "Properties after 
creation have incorrect value");
+
+            // modify the topic to add new partitions
+            int numPartitionsModified = 3;
+            TopicCommand.TopicCommandOptions alterOpts = 
buildTopicCommandOptionsWithBootstrap("--alter",
+                    "--partitions", Integer.toString(numPartitionsModified), 
"--topic", testTopicName);
+            topicService.alterTopic(alterOpts);
+
+            TestUtils.waitForCondition(
+                    () -> 
clusterInstance.brokers().values().stream().allMatch(p -> 
p.metadataCache().getTopicPartitions(testTopicName).size() == 
numPartitionsModified),
+                    TestUtils.DEFAULT_MAX_WAIT_MS, "Timeout waiting for new 
assignment propagating to broker");
+
+            Config newProps = 
adminClient.describeConfigs(Collections.singleton(configResource)).all().get().get(configResource);
+            assertNotNull(newProps.get(TopicConfig.CLEANUP_POLICY_CONFIG), 
"Updated properties do not contain " + TopicConfig.CLEANUP_POLICY_CONFIG);
+            assertEquals(cleanUpPolicy, 
newProps.get(TopicConfig.CLEANUP_POLICY_CONFIG).value(), "Updated properties 
have incorrect value");
 
-        // pre-create the topic config changes path to avoid a NoNodeException
-        if (!isKRaftTest()) {
-            
zkClient().makeSurePersistentPathExists(kafka.zk.ConfigEntityChangeNotificationZNode.path());
         }
+    }
 
-        // modify the topic to add new partitions
-        int numPartitionsModified = 3;
-        TopicCommand.TopicCommandOptions alterOpts = 
buildTopicCommandOptionsWithBootstrap("--alter",
-            "--partitions", Integer.toString(numPartitionsModified), 
"--topic", testTopicName);
-        topicService.alterTopic(alterOpts);
+    @ClusterTemplate("generate1")
+    public void testTopicDeletion(TestInfo testInfo) throws Exception {
+        try (Admin adminClient = clusterInstance.createAdminClient();
+             TopicCommand.TopicService topicService = new 
TopicCommand.TopicService(adminClient)) {
+            String testTopicName = testInfo.getTestMethod().get().getName() + 
"-" +
+                    TestUtils.randomString(10);
 
-        TestUtils.waitForAllReassignmentsToComplete(adminClient, 100L);
-        Config newProps = 
adminClient.describeConfigs(Collections.singleton(configResource)).all().get().get(configResource);
-        assertNotNull(newProps.get(TopicConfig.CLEANUP_POLICY_CONFIG), 
"Updated properties do not contain " + TopicConfig.CLEANUP_POLICY_CONFIG);
-        assertEquals(cleanUpPolicy, 
newProps.get(TopicConfig.CLEANUP_POLICY_CONFIG).value(), "Updated properties 
have incorrect value");
-    }
+            adminClient.createTopics(Collections.singletonList(new 
NewTopic(testTopicName, defaultNumPartitions, defaultReplicationFactor)));
+            clusterInstance.waitForTopic(testTopicName, defaultNumPartitions);
 
-    @ParameterizedTest
-    @ValueSource(strings = {"zk", "kraft"})
-    public void testTopicDeletion(String quorum) throws Exception {
-        // create the NormalTopic
-        TestUtils.createTopicWithAdmin(adminClient, testTopicName, 
scalaBrokers, scalaControllers, defaultNumPartitions, defaultReplicationFactor,
-                scala.collection.immutable.Map$.MODULE$.empty(), new 
Properties()
-        );
-        // delete the NormalTopic
-        TopicCommand.TopicCommandOptions deleteOpts = 
buildTopicCommandOptionsWithBootstrap("--delete", "--topic", testTopicName);
+            // delete the NormalTopic
+            TopicCommand.TopicCommandOptions deleteOpts = 
buildTopicCommandOptionsWithBootstrap("--delete", "--topic", testTopicName);
+            topicService.deleteTopic(deleteOpts);
 
-        if (!isKRaftTest()) {
-            String deletePath = 
kafka.zk.DeleteTopicsTopicZNode.path(testTopicName);
-            assertFalse(zkClient().pathExists(deletePath), "Delete path for 
topic shouldn't exist before deletion.");
+            TestUtils.waitForCondition(
+                    () -> 
adminClient.listTopics().listings().get().stream().noneMatch(topic -> 
topic.name().equals(testTopicName)),
+                    60000, "Delete topic fail in 60000 ms"
+            );
         }
-        topicService.deleteTopic(deleteOpts);
-        TestUtils.verifyTopicDeletion(zkClientOrNull(), testTopicName, 1, 
brokers());
     }
 
-    @ParameterizedTest
-    @ValueSource(strings = {"zk", "kraft"})
-    public void testTopicWithCollidingCharDeletionAndCreateAgain(String 
quorum) throws Exception {
-        // create the topic with colliding chars
-        String topicWithCollidingChar = "test.a";
-        TestUtils.createTopicWithAdmin(adminClient, topicWithCollidingChar, 
scalaBrokers, scalaControllers, defaultNumPartitions, defaultReplicationFactor,
-                scala.collection.immutable.Map$.MODULE$.empty(), new 
Properties()
-        );
-        // delete the topic
-        TopicCommand.TopicCommandOptions deleteOpts = 
buildTopicCommandOptionsWithBootstrap("--delete", "--topic", 
topicWithCollidingChar);
-
-        if (!isKRaftTest()) {
-            String deletePath = 
kafka.zk.DeleteTopicsTopicZNode.path(topicWithCollidingChar);
-            assertFalse(zkClient().pathExists(deletePath), "Delete path for 
topic shouldn't exist before deletion.");
+    @ClusterTemplate("generate1")
+    public void testTopicWithCollidingCharDeletionAndCreateAgain(TestInfo 
testInfo) throws Exception {
+        try (Admin adminClient = clusterInstance.createAdminClient();
+             TopicCommand.TopicService topicService = new 
TopicCommand.TopicService(adminClient)) {
+            // create the topic with colliding chars
+            String topicWithCollidingChar = "test.a";
+            adminClient.createTopics(Collections.singletonList(new 
NewTopic(topicWithCollidingChar, defaultNumPartitions, 
defaultReplicationFactor)));
+            clusterInstance.waitForTopic(topicWithCollidingChar, 
defaultNumPartitions);
+
+            // delete the topic
+            TopicCommand.TopicCommandOptions deleteOpts = 
buildTopicCommandOptionsWithBootstrap("--delete", "--topic", 
topicWithCollidingChar);
+            topicService.deleteTopic(deleteOpts);
+            TestUtils.waitForCondition(
+                    () -> 
adminClient.listTopics().listings().get().stream().noneMatch(topic -> 
topic.name().equals(topicWithCollidingChar)),
+                    60000, "Delete topic fail in 60000 ms"
+            );
+
+            kafka.utils.TestUtils.verifyTopicDeletion(null, 
topicWithCollidingChar, 1, JavaConverters.asScalaIteratorConverter(

Review Comment:
   I opened a PR [here]( 
https://github.com/apache/kafka/pull/16627/commits/3883c4b2115be2c59c86e51fc8b67648321aa4df).
 Please take a look.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to