OmniaGM commented on code in PR #13201: URL: https://github.com/apache/kafka/pull/13201#discussion_r1358474161
########## tools/src/test/java/org/apache/kafka/tools/TopicCommandIntegrationTest.java: ########## @@ -0,0 +1,1050 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.tools; + +import kafka.admin.RackAwareTest; +import kafka.server.KafkaBroker; +import kafka.server.KafkaConfig; +import kafka.utils.Logging; +import kafka.utils.TestUtils; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.Config; +import org.apache.kafka.clients.admin.NewPartitionReassignment; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.PartitionReassignment; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.TopicPartitionInfo; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.errors.TopicExistsException; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.message.MetadataResponseData; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import scala.collection.JavaConverters; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@Tag("integration") +@SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for usages of JavaConverters +public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTestHarness implements Logging, RackAwareTest { + private Short defaultReplicationFactor = 1; + private Integer numPartitions = 1; + private TopicCommand.TopicService topicService; + private Admin adminClient; + private String bootstrapServer; + private String testTopicName; + private Integer defaultTimeout = 10000; + + /** + * Implementations must override this method to return a set of KafkaConfigs. This method will be invoked for every + * test and should not reuse previous configurations unless they select their ports randomly when servers are started. + * + * Note the replica fetch max bytes is set to `1` in order to throttle the rate of replication for test + * `testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress`. + */ + @Override + public scala.collection.Seq<KafkaConfig> generateConfigs() { + Map<Integer, String> rackInfo = new HashMap<>(); + rackInfo.put(0, "rack1"); + rackInfo.put(1, "rack2"); + rackInfo.put(2, "rack2"); + rackInfo.put(3, "rack1"); + rackInfo.put(4, "rack3"); + rackInfo.put(5, "rack3"); + + List<Properties> brokerConfigs = ToolsTestUtils + .createBrokerProperties(6, zkConnectOrNull(), rackInfo, numPartitions, defaultReplicationFactor); + + List<KafkaConfig> configs = new ArrayList<>(); + for (Properties props : brokerConfigs) { + props.put(KafkaConfig.ReplicaFetchMaxBytesProp(), "1"); + configs.add(KafkaConfig.fromProps(props)); + } + return JavaConverters.asScalaBuffer(configs).toSeq(); + } + + private TopicCommand.TopicCommandOptions buildTopicCommandOptionsWithBootstrap(String... opts) { + String[] finalOptions = Stream.concat(Arrays.asList(opts).stream(), + Arrays.asList("--bootstrap-server", bootstrapServer).stream() + ).toArray(String[]::new); + return new TopicCommand.TopicCommandOptions(finalOptions); + } + + private void createAndWaitTopic(TopicCommand.TopicCommandOptions opts) throws Exception { + topicService.createTopic(opts); + waitForTopicCreated(opts.topic().get()); + } + + private void waitForTopicCreated(String topicName) { + waitForTopicCreated(topicName, defaultTimeout); + } + + private void waitForTopicCreated(String topicName, Integer timeout) { + TestUtils.waitForPartitionMetadata(brokers(), topicName, 0, timeout); + } + + @BeforeEach + public void setUp(TestInfo info) { + super.setUp(info); + // create adminClient + Properties props = new Properties(); + bootstrapServer = bootstrapServers(listenerName()); + props.put(org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); + adminClient = Admin.create(props); + topicService = new TopicCommand.TopicService(props, Optional.of(bootstrapServer)); + testTopicName = String.format("%s-%s", info.getTestMethod().get().getName(), + new scala.util.Random().alphanumeric().take(10).mkString()); + } + + @AfterEach + public void close() throws Exception { + if (topicService != null) + topicService.close(); + if (adminClient != null) + adminClient.close(); + } + + @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) + @ValueSource(strings = {"zk", "kraft"}) + public void testCreate(String quorum) throws Exception { + createAndWaitTopic(buildTopicCommandOptionsWithBootstrap( + "--create", "--partitions", "2", "--replication-factor", "1", "--topic", testTopicName)); + + adminClient.listTopics().names().get().contains(testTopicName); + } + + @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) + @ValueSource(strings = {"zk", "kraft"}) + public void testCreateWithDefaults(String quorum) throws Exception { + createAndWaitTopic(buildTopicCommandOptionsWithBootstrap("--create", "--topic", testTopicName)); + + List<TopicPartitionInfo> partitions = adminClient + .describeTopics(Collections.singletonList(testTopicName)) + .allTopicNames() + .get() + .get(testTopicName) + .partitions(); + assertEquals(partitions.size(), numPartitions); + assertEquals((short) partitions.get(0).replicas().size(), defaultReplicationFactor); + } + + @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) + @ValueSource(strings = {"zk", "kraft"}) + public void testCreateWithDefaultReplication(String quorum) throws Exception { + createAndWaitTopic(buildTopicCommandOptionsWithBootstrap("--create", "--topic", testTopicName, "--partitions", "2")); + + List<TopicPartitionInfo> partitions = adminClient + .describeTopics(Collections.singletonList(testTopicName)) + .allTopicNames() + .get() + .get(testTopicName) + .partitions(); + assertEquals(partitions.size(), 2); + assertEquals((short) partitions.get(0).replicas().size(), defaultReplicationFactor); + } + + @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) + @ValueSource(strings = {"zk", "kraft"}) + public void testCreateWithDefaultPartitions(String quorum) throws Exception { + createAndWaitTopic(buildTopicCommandOptionsWithBootstrap("--create", "--topic", testTopicName, "--replication-factor", "2")); + + List<TopicPartitionInfo> partitions = adminClient + .describeTopics(Collections.singletonList(testTopicName)) + .allTopicNames() + .get() + .get(testTopicName) + .partitions(); + + assertEquals(partitions.size(), numPartitions); + assertEquals((short) partitions.get(0).replicas().size(), 2); + } + + @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) + @ValueSource(strings = {"zk", "kraft"}) + public void testCreateWithConfigs(String quorum) throws Exception { + ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName); + createAndWaitTopic(buildTopicCommandOptionsWithBootstrap("--create", "--partitions", "2", "--replication-factor", "2", "--topic", testTopicName, "--config", + "delete.retention.ms=1000")); + + Config configs = adminClient.describeConfigs(Collections.singleton(configResource)).all().get().get(configResource); + assertEquals(1000, Integer.valueOf(configs.get("delete.retention.ms").value())); + } + + @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) + @ValueSource(strings = {"zk", "kraft"}) + public void testCreateWhenAlreadyExists(String quorum) throws Exception { + int numPartitions = 1; + + // create the topic + TopicCommand.TopicCommandOptions createOpts = buildTopicCommandOptionsWithBootstrap( + "--create", "--partitions", Integer.toString(numPartitions), "--replication-factor", "1", + "--topic", testTopicName); + createAndWaitTopic(createOpts); + + // try to re-create the topic + assertThrows(TopicExistsException.class, () -> topicService.createTopic(createOpts)); + } + + @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) + @ValueSource(strings = {"zk", "kraft"}) + public void testCreateWhenAlreadyExistsWithIfNotExists(String quorum) throws Exception { + TopicCommand.TopicCommandOptions createOpts = + buildTopicCommandOptionsWithBootstrap("--create", "--topic", testTopicName, "--if-not-exists"); + createAndWaitTopic(createOpts); + topicService.createTopic(createOpts); + } + + @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) + @ValueSource(strings = {"zk", "kraft"}) + public void testCreateWithReplicaAssignment(String quorum) throws Exception { + // create the topic + TopicCommand.TopicCommandOptions createOpts = + buildTopicCommandOptionsWithBootstrap("--create", "--replica-assignment", "5:4,3:2,1:0", "--topic", testTopicName); + createAndWaitTopic(createOpts); + + List<TopicPartitionInfo> partitions = adminClient + .describeTopics(Collections.singletonList(testTopicName)) + .allTopicNames() + .get() + .get(testTopicName) + .partitions(); + + assertEquals(3, partitions.size()); + assertEquals(Arrays.asList(5, 4), getPartitionReplicas(partitions, 0)); + assertEquals(Arrays.asList(3, 2), getPartitionReplicas(partitions, 1)); + assertEquals(Arrays.asList(1, 0), getPartitionReplicas(partitions, 2)); + } + + private List<Integer> getPartitionReplicas(List<TopicPartitionInfo> partitions, int partitionNumber) { + return partitions.get(partitionNumber).replicas().stream().map(Node::id).collect(Collectors.toList()); + } + + @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) + @ValueSource(strings = {"zk", "kraft"}) + public void testCreateWithInvalidReplicationFactor(String quorum) { + TopicCommand.TopicCommandOptions opts = buildTopicCommandOptionsWithBootstrap("--create", "--partitions", "2", "--replication-factor", Integer.toString(Short.MAX_VALUE + 1), + "--topic", testTopicName); + assertThrows(IllegalArgumentException.class, () -> topicService.createTopic(opts)); + } + + @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) + @ValueSource(strings = {"zk", "kraft"}) + public void testCreateWithNegativeReplicationFactor(String quorum) { + TopicCommand.TopicCommandOptions opts = buildTopicCommandOptionsWithBootstrap("--create", + "--partitions", "2", "--replication-factor", "-1", "--topic", testTopicName); + assertThrows(IllegalArgumentException.class, () -> topicService.createTopic(opts)); + } + + @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) + @ValueSource(strings = {"zk", "kraft"}) + public void testCreateWithNegativePartitionCount(String quorum) { + TopicCommand.TopicCommandOptions opts = buildTopicCommandOptionsWithBootstrap("--create", "--partitions", "-1", "--replication-factor", "1", "--topic", testTopicName); + assertThrows(IllegalArgumentException.class, () -> topicService.createTopic(opts)); + } + + @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) + @ValueSource(strings = {"zk", "kraft"}) + public void testInvalidTopicLevelConfig(String quorum) { + TopicCommand.TopicCommandOptions createOpts = buildTopicCommandOptionsWithBootstrap("--create", + "--partitions", "1", "--replication-factor", "1", "--topic", testTopicName, + "--config", "message.timestamp.type=boom"); + assertThrows(ConfigException.class, () -> topicService.createTopic(createOpts)); + } + + @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) + @ValueSource(strings = {"zk", "kraft"}) + public void testListTopics(String quorum) throws Exception { + createAndWaitTopic(buildTopicCommandOptionsWithBootstrap( + "--create", "--partitions", "1", "--replication-factor", "1", "--topic", testTopicName)); + + String output = captureListTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--list")); + assertTrue(output.contains(testTopicName)); + } + + @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) + @ValueSource(strings = {"zk", "kraft"}) + public void testListTopicsWithIncludeList(String quorum) throws ExecutionException, InterruptedException { + String topic1 = "kafka.testTopic1"; + String topic2 = "kafka.testTopic2"; + String topic3 = "oooof.testTopic1"; + adminClient.createTopics( + Arrays.asList(new NewTopic(topic1, 2, (short) 2), + new NewTopic(topic2, 2, (short) 2), + new NewTopic(topic3, 2, (short) 2))) + .all().get(); + waitForTopicCreated(topic1); + waitForTopicCreated(topic2); + waitForTopicCreated(topic3); + + String output = captureListTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--list", "--topic", "kafka.*")); + + assertTrue(output.contains(topic1)); + assertTrue(output.contains(topic2)); + assertFalse(output.contains(topic3)); + } + + @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) + @ValueSource(strings = {"zk", "kraft"}) + public void testListTopicsWithExcludeInternal(String quorum) throws ExecutionException, InterruptedException { + String topic1 = "kafka.testTopic1"; + adminClient.createTopics( + Arrays.asList(new NewTopic(topic1, 2, (short) 2), + new NewTopic(Topic.GROUP_METADATA_TOPIC_NAME, 2, (short) 2))) + .all().get(); + waitForTopicCreated(topic1); + + String output = captureListTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--list", "--exclude-internal")); + + assertTrue(output.contains(topic1)); + assertFalse(output.contains(Topic.GROUP_METADATA_TOPIC_NAME)); + } + + @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) + @ValueSource(strings = {"zk", "kraft"}) + public void testAlterPartitionCount(String quorum) throws ExecutionException, InterruptedException { + adminClient.createTopics( + Arrays.asList(new NewTopic(testTopicName, 2, (short) 2))).all().get(); + waitForTopicCreated(testTopicName); + + topicService.alterTopic(buildTopicCommandOptionsWithBootstrap("--alter", "--topic", testTopicName, "--partitions", "3")); + + kafka.utils.TestUtils.waitUntilTrue( + () -> brokers().forall(b -> b.metadataCache().getTopicPartitions(testTopicName).size() == 3), + () -> "Timeout waiting for new assignment propagating to broker", org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS, 100L); + TopicDescription topicDescription = adminClient.describeTopics(Collections.singletonList(testTopicName)).topicNameValues().get(testTopicName).get(); + assertTrue(topicDescription.partitions().size() == 3); + } + + @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) + @ValueSource(strings = {"zk", "kraft"}) + public void testAlterAssignment(String quorum) throws ExecutionException, InterruptedException { + adminClient.createTopics( + Collections.singletonList(new NewTopic(testTopicName, 2, (short) 2))).all().get(); + waitForTopicCreated(testTopicName); + + topicService.alterTopic(buildTopicCommandOptionsWithBootstrap("--alter", + "--topic", testTopicName, "--replica-assignment", "5:3,3:1,4:2", "--partitions", "3")); + kafka.utils.TestUtils.waitUntilTrue( + () -> brokers().forall(b -> b.metadataCache().getTopicPartitions(testTopicName).size() == 3), + () -> "Timeout waiting for new assignment propagating to broker", + org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS, 100L); + + TopicDescription topicDescription = adminClient.describeTopics(Collections.singletonList(testTopicName)).topicNameValues().get(testTopicName).get(); + assertTrue(topicDescription.partitions().size() == 3); + List<Integer> partitionReplicas = getPartitionReplicas(topicDescription.partitions(), 2); + assertEquals(Arrays.asList(4, 2), partitionReplicas); + } + + @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) + @ValueSource(strings = {"zk", "kraft"}) + public void testAlterAssignmentWithMoreAssignmentThanPartitions(String quorum) throws ExecutionException, InterruptedException { + adminClient.createTopics( + Arrays.asList(new NewTopic(testTopicName, 2, (short) 2))).all().get(); + waitForTopicCreated(testTopicName); + + assertThrows(ExecutionException.class, + () -> topicService.alterTopic(buildTopicCommandOptionsWithBootstrap("--alter", + "--topic", testTopicName, "--replica-assignment", "5:3,3:1,4:2,3:2", "--partitions", "3"))); + } + + @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) + @ValueSource(strings = {"zk", "kraft"}) + public void testAlterAssignmentWithMorePartitionsThanAssignment(String quorum) throws ExecutionException, InterruptedException { + adminClient.createTopics( + Arrays.asList(new NewTopic(testTopicName, 2, (short) 2))).all().get(); + waitForTopicCreated(testTopicName); + + assertThrows(ExecutionException.class, + () -> topicService.alterTopic(buildTopicCommandOptionsWithBootstrap("--alter", "--topic", testTopicName, + "--replica-assignment", "5:3,3:1,4:2", "--partitions", "6"))); + } + + @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) + @ValueSource(strings = {"zk", "kraft"}) + public void testAlterWithInvalidPartitionCount(String quorum) throws Exception { + createAndWaitTopic( + buildTopicCommandOptionsWithBootstrap("--create", "--partitions", "1", "--replication-factor", "1", "--topic", testTopicName) + ); + + assertThrows(ExecutionException.class, + () -> topicService.alterTopic(buildTopicCommandOptionsWithBootstrap("--alter", "--partitions", "-1", "--topic", testTopicName))); + } + + @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) + @ValueSource(strings = {"zk", "kraft"}) + public void testAlterWhenTopicDoesntExist(String quorum) { + // alter a topic that does not exist without --if-exists + TopicCommand.TopicCommandOptions alterOpts = buildTopicCommandOptionsWithBootstrap("--alter", "--topic", testTopicName, "--partitions", "1"); + TopicCommand.TopicService topicService = new TopicCommand.TopicService(adminClient); + assertThrows(IllegalArgumentException.class, () -> topicService.alterTopic(alterOpts)); + } + + @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) + @ValueSource(strings = {"zk", "kraft"}) + public void testAlterWhenTopicDoesntExistWithIfExists(String quorum) throws ExecutionException, InterruptedException { + topicService.alterTopic(buildTopicCommandOptionsWithBootstrap("--alter", "--topic", testTopicName, "--partitions", "1", "--if-exists")); + } + + @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) + @ValueSource(strings = {"zk", "kraft"}) + public void testCreateAlterTopicWithRackAware(String quorum) throws Exception { + Map<Integer, String> rackInfo = new HashMap<Integer, String>(); + rackInfo.put(0, "rack1"); + rackInfo.put(1, "rack2"); + rackInfo.put(2, "rack2"); + rackInfo.put(3, "rack1"); + rackInfo.put(4, "rack3"); + rackInfo.put(5, "rack3"); + + int numPartitions = 18; + int replicationFactor = 3; + TopicCommand.TopicCommandOptions createOpts = buildTopicCommandOptionsWithBootstrap("--create", + "--partitions", Integer.toString(numPartitions), + "--replication-factor", Integer.toString(replicationFactor), + "--topic", testTopicName); + createAndWaitTopic(createOpts); + + Map<Integer, List<Integer>> assignment = adminClient.describeTopics(Collections.singletonList(testTopicName)) + .allTopicNames().get().get(testTopicName).partitions() + .stream() + .collect(Collectors.toMap( + info -> info.partition(), + info -> info.replicas().stream().map(Node::id).collect(Collectors.toList()))); + checkReplicaDistribution(assignment, rackInfo, rackInfo.size(), numPartitions, + replicationFactor, true, true, true); + + int alteredNumPartitions = 36; + // verify that adding partitions will also be rack aware + TopicCommand.TopicCommandOptions alterOpts = buildTopicCommandOptionsWithBootstrap("--alter", + "--partitions", Integer.toString(alteredNumPartitions), + "--topic", testTopicName); + topicService.alterTopic(alterOpts); + + kafka.utils.TestUtils.waitUntilTrue( + () -> brokers().forall(p -> p.metadataCache().getTopicPartitions(testTopicName).size() == alteredNumPartitions), + () -> "Timeout waiting for new assignment propagating to broker", + org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS, 100L); + + assignment = adminClient.describeTopics(Collections.singletonList(testTopicName)) + .allTopicNames().get().get(testTopicName).partitions().stream() + .collect(Collectors.toMap(info -> info.partition(), info -> info.replicas().stream().map(Node::id).collect(Collectors.toList()))); + checkReplicaDistribution(assignment, rackInfo, rackInfo.size(), alteredNumPartitions, replicationFactor, + true, true, true); + } + + @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) + @ValueSource(strings = {"zk", "kraft"}) + public void testConfigPreservationAcrossPartitionAlteration(String quorum) throws Exception { + int numPartitionsOriginal = 1; + String cleanupKey = "cleanup.policy"; + String cleanupVal = "compact"; + + // create the topic + TopicCommand.TopicCommandOptions createOpts = buildTopicCommandOptionsWithBootstrap("--create", + "--partitions", Integer.toString(numPartitionsOriginal), + "--replication-factor", "1", + "--config", cleanupKey + "=" + cleanupVal, + "--topic", testTopicName); + createAndWaitTopic(createOpts); + ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName); + Config props = adminClient.describeConfigs(Collections.singleton(configResource)).all().get().get(configResource); + // val props = adminZkClient.fetchEntityConfig(ConfigType.Topic, testTopicName) + assertNotNull(props.get(cleanupKey), "Properties after creation don't contain " + cleanupKey); + assertEquals(cleanupVal, props.get(cleanupKey).value(), "Properties after creation have incorrect value"); + + // pre-create the topic config changes path to avoid a NoNodeException + if (!isKRaftTest()) { + zkClient().makeSurePersistentPathExists(kafka.zk.ConfigEntityChangeNotificationZNode.path()); + } + + // modify the topic to add new partitions + int numPartitionsModified = 3; + TopicCommand.TopicCommandOptions alterOpts = buildTopicCommandOptionsWithBootstrap("--alter", + "--partitions", Integer.toString(numPartitionsModified), "--topic", testTopicName); + topicService.alterTopic(alterOpts); + Config newProps = adminClient.describeConfigs(Collections.singleton(configResource)).all().get().get(configResource); + assertNotNull(newProps.get(cleanupKey), "Updated properties do not contain " + cleanupKey); + assertEquals(cleanupVal, newProps.get(cleanupKey).value(), "Updated properties have incorrect value"); + } + + @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) + @ValueSource(strings = {"zk", "kraft"}) + public void testTopicDeletion(String quorum) throws Exception { + // create the NormalTopic + TopicCommand.TopicCommandOptions createOpts = buildTopicCommandOptionsWithBootstrap("--create", + "--partitions", "1", + "--replication-factor", "1", + "--topic", testTopicName); + createAndWaitTopic(createOpts); + + // delete the NormalTopic + TopicCommand.TopicCommandOptions deleteOpts = buildTopicCommandOptionsWithBootstrap("--delete", "--topic", testTopicName); + + if (!isKRaftTest()) { + String deletePath = kafka.zk.DeleteTopicsTopicZNode.path(testTopicName); + assertFalse(zkClient().pathExists(deletePath), "Delete path for topic shouldn't exist before deletion."); + } + topicService.deleteTopic(deleteOpts); + TestUtils.verifyTopicDeletion(zkClientOrNull(), testTopicName, 1, brokers()); + } + + @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) + @ValueSource(strings = {"zk", "kraft"}) + public void testTopicWithCollidingCharDeletionAndCreateAgain(String quorum) throws Exception { + // create the topic with colliding chars + String topicWithCollidingChar = "test.a"; + TopicCommand.TopicCommandOptions createOpts = buildTopicCommandOptionsWithBootstrap("--create", + "--partitions", "1", + "--replication-factor", "1", + "--topic", topicWithCollidingChar); + createAndWaitTopic(createOpts); + + // delete the topic + TopicCommand.TopicCommandOptions deleteOpts = buildTopicCommandOptionsWithBootstrap("--delete", "--topic", topicWithCollidingChar); + + if (!isKRaftTest()) { + String deletePath = kafka.zk.DeleteTopicsTopicZNode.path(topicWithCollidingChar); + assertFalse(zkClient().pathExists(deletePath), "Delete path for topic shouldn't exist before deletion."); + } + topicService.deleteTopic(deleteOpts); + TestUtils.verifyTopicDeletion(zkClientOrNull(), topicWithCollidingChar, 1, brokers()); + assertDoesNotThrow(() -> createAndWaitTopic(createOpts)); + } + + @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) + @ValueSource(strings = {"zk", "kraft"}) + public void testDeleteInternalTopic(String quorum) throws Exception { + // create the offset topic + TopicCommand.TopicCommandOptions createOffsetTopicOpts = buildTopicCommandOptionsWithBootstrap("--create", + "--partitions", "1", + "--replication-factor", "1", + "--topic", Topic.GROUP_METADATA_TOPIC_NAME); + createAndWaitTopic(createOffsetTopicOpts); + + // Try to delete the Topic.GROUP_METADATA_TOPIC_NAME which is allowed by default. + // This is a difference between the new and the old command as the old one didn't allow internal topic deletion. + // If deleting internal topics is not desired, ACLS should be used to control it. + TopicCommand.TopicCommandOptions deleteOffsetTopicOpts = + buildTopicCommandOptionsWithBootstrap("--delete", "--topic", Topic.GROUP_METADATA_TOPIC_NAME); + String deleteOffsetTopicPath = kafka.zk.DeleteTopicsTopicZNode.path(Topic.GROUP_METADATA_TOPIC_NAME); + if (!isKRaftTest()) { + assertFalse(zkClient().pathExists(deleteOffsetTopicPath), "Delete path for topic shouldn't exist before deletion."); + } + topicService.deleteTopic(deleteOffsetTopicOpts); + TestUtils.verifyTopicDeletion(zkClientOrNull(), Topic.GROUP_METADATA_TOPIC_NAME, 1, brokers()); + } + + @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) + @ValueSource(strings = {"zk", "kraft"}) + public void testDeleteWhenTopicDoesntExist(String quorum) { + // delete a topic that does not exist + TopicCommand.TopicCommandOptions deleteOpts = buildTopicCommandOptionsWithBootstrap("--delete", "--topic", testTopicName); + assertThrows(IllegalArgumentException.class, () -> topicService.deleteTopic(deleteOpts)); + } + + @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) + @ValueSource(strings = {"zk", "kraft"}) + public void testDeleteWhenTopicDoesntExistWithIfExists(String quorum) throws ExecutionException, InterruptedException { + topicService.deleteTopic(buildTopicCommandOptionsWithBootstrap("--delete", "--topic", testTopicName, "--if-exists")); + } + + @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) + @ValueSource(strings = {"zk", "kraft"}) + public void testDescribe(String quorum) throws ExecutionException, InterruptedException { + adminClient.createTopics( + Collections.singletonList(new NewTopic(testTopicName, 2, (short) 2))).all().get(); + waitForTopicCreated(testTopicName); + + String output = captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe", "--topic", testTopicName)); + String[] rows = output.split("\n"); + assertEquals(3, rows.length); + assertTrue(rows[0].startsWith(String.format("Topic: %s", testTopicName))); + } + + @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) + @ValueSource(strings = {"zk", "kraft"}) + public void testDescribeWhenTopicDoesntExist(String quorum) { + assertThrows(IllegalArgumentException.class, + () -> topicService.describeTopic(buildTopicCommandOptionsWithBootstrap("--describe", "--topic", testTopicName))); + } + + @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) + @ValueSource(strings = {"zk", "kraft"}) + public void testDescribeWhenTopicDoesntExistWithIfExists(String quorum) throws ExecutionException, InterruptedException { + topicService.describeTopic(buildTopicCommandOptionsWithBootstrap("--describe", "--topic", testTopicName, "--if-exists")); + } + + @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) + @ValueSource(strings = {"zk", "kraft"}) + public void testDescribeUnavailablePartitions(String quorum) throws ExecutionException, InterruptedException { + adminClient.createTopics( + Collections.singletonList(new NewTopic(testTopicName, 6, (short) 1))).all().get(); + waitForTopicCreated(testTopicName); + + try { + // check which partition is on broker 0 which we'll kill + TopicDescription testTopicDescription = adminClient.describeTopics(Collections.singletonList(testTopicName)) + .allTopicNames().get().get(testTopicName); + int partitionOnBroker0 = testTopicDescription.partitions().stream() + .filter(partition -> partition.leader().id() == 0) + .findFirst().get().partition(); + + killBroker(0); + + // wait until the topic metadata for the test topic is propagated to each alive broker + kafka.utils.TestUtils.waitUntilTrue( + () -> { + boolean result = true; + for (KafkaBroker server : JavaConverters.asJavaCollection(brokers())) { + if (server.config().brokerId() != 0) { + Set<String> topicNames = Collections.singleton(testTopicName); + Collection<MetadataResponseData.MetadataResponseTopic> topicMetadatas = + JavaConverters.asJavaCollection(server.dataPlaneRequestProcessor().metadataCache() + .getTopicMetadata(JavaConverters.asScalaSetConverter(topicNames).asScala().toSet(), + ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), + false, false) + ); + Optional<MetadataResponseData.MetadataResponseTopic> testTopicMetadata = topicMetadatas.stream() + .filter(metadata -> metadata.name().equals(testTopicName)) + .findFirst(); + if (!testTopicMetadata.isPresent()) { + throw new AssertionError("Topic metadata is not found in metadata cache"); + } + Optional<MetadataResponseData.MetadataResponsePartition> testPartitionMetadata = testTopicMetadata.get().partitions().stream() + .filter(metadata -> metadata.partitionIndex() == partitionOnBroker0) + .findFirst(); + if (!testPartitionMetadata.isPresent()) { + throw new AssertionError("Partition metadata is not found in metadata cache"); + } + result = result && testPartitionMetadata.get().errorCode() == Errors.LEADER_NOT_AVAILABLE.code(); + } + } + return result; + }, + () -> String.format("Partition metadata for %s is not propagated", testTopicName), + org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS, 100L); + + // grab the console output and assert + String output = captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe", "--topic", testTopicName, "--unavailable-partitions")); + String[] rows = output.split("\n"); + assertTrue(rows[0].startsWith(String.format("Topic: %s", testTopicName))); + assertTrue(rows[0].contains("Leader: none\tReplicas: 0\tIsr:")); + } finally { + restartDeadBrokers(false); + } + } + + @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) + @ValueSource(strings = {"zk", "kraft"}) + public void testDescribeUnderReplicatedPartitions(String quorum) throws ExecutionException, InterruptedException { + adminClient.createTopics( + Collections.singletonList(new NewTopic(testTopicName, 1, (short) 6))).all().get(); + waitForTopicCreated(testTopicName); + + try { + killBroker(0); + if (isKRaftTest()) { + ensureConsistentKRaftMetadata(); + } else { + TestUtils.waitForPartitionMetadata(aliveBrokers(), testTopicName, 0, org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS); + } + String output = captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe", "--under-replicated-partitions")); + String[] rows = output.split("\n"); + assertTrue(rows[0].startsWith(String.format("Topic: %s", testTopicName)), String.format("Unexpected output: %s", rows[0])); + } finally { + restartDeadBrokers(false); + } + } + + @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) + @ValueSource(strings = {"zk", "kraft"}) + public void testDescribeUnderMinIsrPartitions(String quorum) throws ExecutionException, InterruptedException { + Map<String, String> configMap = new HashMap<>(); + configMap.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "6"); + + adminClient.createTopics( + Collections.singletonList(new NewTopic(testTopicName, 1, (short) 6).configs(configMap))).all().get(); + waitForTopicCreated(testTopicName); + + try { + killBroker(0); + if (isKRaftTest()) { + ensureConsistentKRaftMetadata(); + } else { + kafka.utils.TestUtils.waitUntilTrue( + () -> aliveBrokers().forall(b -> b.metadataCache().getPartitionInfo(testTopicName, 0).get().isr().size() == 5), + () -> String.format("Timeout waiting for partition metadata propagating to brokers for %s topic", testTopicName), + org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS, 100L + ); + } + String output = captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe", "--under-min-isr-partitions")); + String[] rows = output.split("\n"); + assertTrue(rows[0].startsWith(String.format("Topic: %s", testTopicName))); + } finally { + restartDeadBrokers(false); + } + } + + @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) + @ValueSource(strings = {"zk", "kraft"}) + public void testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress(String quorum) throws ExecutionException, InterruptedException { + Map<String, String> configMap = new HashMap<>(); + Short replicationFactor = 1; + int partitions = 1; + TopicPartition tp = new TopicPartition(testTopicName, 0); + + adminClient.createTopics( + Collections.singletonList(new NewTopic(testTopicName, partitions, replicationFactor).configs(configMap)) + ).all().get(); + waitForTopicCreated(testTopicName); + + // Produce multiple batches. + TestUtils.generateAndProduceMessages(brokers(), testTopicName, 10, -1); + TestUtils.generateAndProduceMessages(brokers(), testTopicName, 10, -1); + + // Enable throttling. Note the broker config sets the replica max fetch bytes to `1` upon to minimize replication + // throughput so the reassignment doesn't complete quickly. + List<Integer> brokerIds = JavaConverters.seqAsJavaList(brokers()).stream() + .map(broker -> broker.config().brokerId()).collect(Collectors.toList()); + + ToolsTestUtils.setReplicationThrottleForPartitions(adminClient, brokerIds, Collections.singleton(tp), 1); + + TopicDescription testTopicDesc = adminClient.describeTopics(Collections.singleton(testTopicName)).allTopicNames().get().get(testTopicName); + TopicPartitionInfo firstPartition = testTopicDesc.partitions().get(0); + + List<Integer> replicasOfFirstPartition = firstPartition.replicas().stream().map(Node::id).collect(Collectors.toList()); + List<Integer> replicasDiff = new ArrayList<>(brokerIds); + replicasDiff.removeAll(replicasOfFirstPartition); + Integer targetReplica = replicasDiff.get(0); + + adminClient.alterPartitionReassignments(Collections.singletonMap(tp, + Optional.of(new NewPartitionReassignment(Collections.singletonList(targetReplica))))).all().get(); + + // let's wait until the LAIR is propagated + kafka.utils.TestUtils.waitUntilTrue( + () -> { + try { + return !adminClient.listPartitionReassignments(Collections.singleton(tp)).reassignments().get() + .get(tp).addingReplicas().isEmpty(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + }, + () -> "Reassignment didn't add the second node", + org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS, 100L); + + // describe the topic and test if it's under-replicated + String simpleDescribeOutput = captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe", "--topic", testTopicName)); + String[] simpleDescribeOutputRows = simpleDescribeOutput.split("\n"); + assertTrue(simpleDescribeOutputRows[0].startsWith(String.format("Topic: %s", testTopicName))); + assertEquals(2, simpleDescribeOutputRows.length); + + String underReplicatedOutput = captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe", "--under-replicated-partitions")); + assertEquals("", underReplicatedOutput, + String.format("--under-replicated-partitions shouldn't return anything: '%s'", underReplicatedOutput)); + + // Verify reassignment is still ongoing. + PartitionReassignment reassignments = adminClient.listPartitionReassignments(Collections.singleton(tp)).reassignments().get().get(tp); + assertFalse(reassignments.addingReplicas().isEmpty()); + + ToolsTestUtils.removeReplicationThrottleForPartitions(adminClient, brokerIds, Collections.singleton(tp)); + TestUtils.waitForAllReassignmentsToComplete(adminClient, 100L); + } + + @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) + @ValueSource(strings = {"zk", "kraft"}) + public void testDescribeAtMinIsrPartitions(String quorum) throws ExecutionException, InterruptedException { + Map<String, String> configMap = new HashMap<>(); + configMap.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "4"); + + adminClient.createTopics( + Collections.singletonList(new NewTopic(testTopicName, 1, (short) 6).configs(configMap))).all().get(); + waitForTopicCreated(testTopicName); + + try { + killBroker(0); + killBroker(1); + + if (isKRaftTest()) { + ensureConsistentKRaftMetadata(); + } else { + kafka.utils.TestUtils.waitUntilTrue( + () -> aliveBrokers().forall(broker -> broker.metadataCache().getPartitionInfo(testTopicName, 0).get().isr().size() == 4), + () -> String.format("Timeout waiting for partition metadata propagating to brokers for %s topic", testTopicName), + org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS, 100L + ); + } + + String output = captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe", "--at-min-isr-partitions")); + String[] rows = output.split("\n"); + assertTrue(rows[0].startsWith(String.format("Topic: %s", testTopicName))); + assertEquals(1, rows.length); + } finally { + restartDeadBrokers(false); + } + } + + /** + * Test describe --under-min-isr-partitions option with four topics: + * (1) topic with partition under the configured min ISR count + * (2) topic with under-replicated partition (but not under min ISR count) + * (3) topic with offline partition + * (4) topic with fully replicated partition + * + * Output should only display the (1) topic with partition under min ISR count and (3) topic with offline partition + */ + @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) + @ValueSource(strings = {"zk", "kraft"}) + public void testDescribeUnderMinIsrPartitionsMixed(String quorum) throws ExecutionException, InterruptedException { + String underMinIsrTopic = "under-min-isr-topic"; + String notUnderMinIsrTopic = "not-under-min-isr-topic"; + String offlineTopic = "offline-topic"; + String fullyReplicatedTopic = "fully-replicated-topic"; + + Map<String, String> configMap = new HashMap<>(); + configMap.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "6"); + + adminClient.createTopics( + java.util.Arrays.asList( + new NewTopic(underMinIsrTopic, 1, (short) 6).configs(configMap), + new NewTopic(notUnderMinIsrTopic, 1, (short) 6), + new NewTopic(offlineTopic, Collections.singletonMap(0, Collections.singletonList(0))), + new NewTopic(fullyReplicatedTopic, Collections.singletonMap(0, java.util.Arrays.asList(1, 2, 3)))) + ).all().get(); + + waitForTopicCreated(underMinIsrTopic); + waitForTopicCreated(notUnderMinIsrTopic); + waitForTopicCreated(offlineTopic); + waitForTopicCreated(fullyReplicatedTopic); + + try { + killBroker(0); + if (isKRaftTest()) { + ensureConsistentKRaftMetadata(); + } else { + kafka.utils.TestUtils.waitUntilTrue( + () -> aliveBrokers().forall(broker -> + broker.metadataCache().getPartitionInfo(underMinIsrTopic, 0).get().isr().size() < 6 && + broker.metadataCache().getPartitionInfo(offlineTopic, 0).get().leader() == MetadataResponse.NO_LEADER_ID), + () -> "Timeout waiting for partition metadata propagating to brokers for underMinIsrTopic topic", + org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS, 100L); + } + String output = captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe", "--under-min-isr-partitions")); + String[] rows = output.split("\n"); + assertTrue(rows[0].startsWith(String.format("Topic: %s", underMinIsrTopic))); + assertTrue(rows[1].startsWith(String.format("\tTopic: %s", offlineTopic))); + assertEquals(2, rows.length); + } finally { + restartDeadBrokers(false); + } + } + + @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) + @ValueSource(strings = {"zk", "kraft"}) + public void testDescribeReportOverriddenConfigs(String quorum) throws Exception { + String config = "file.delete.delay.ms=1000"; + createAndWaitTopic(buildTopicCommandOptionsWithBootstrap("--create", "--partitions", "2", + "--replication-factor", "2", "--topic", testTopicName, "--config", config)); + String output = captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe")); + assertTrue(output.contains(config), String.format("Describe output should have contained %s", config)); + } + + @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) + @ValueSource(strings = {"zk", "kraft"}) + public void testDescribeAndListTopicsWithoutInternalTopics(String quorum) throws Exception { + createAndWaitTopic( + buildTopicCommandOptionsWithBootstrap("--create", "--partitions", "1", "--replication-factor", "1", "--topic", testTopicName)); + // create a internal topic + createAndWaitTopic( + buildTopicCommandOptionsWithBootstrap("--create", "--partitions", "1", "--replication-factor", "1", "--topic", Topic.GROUP_METADATA_TOPIC_NAME)); + + // test describe + String output = captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--describe", "--describe", "--exclude-internal")); + assertTrue(output.contains(testTopicName), + String.format("Output should have contained %s", testTopicName)); + assertFalse(output.contains(Topic.GROUP_METADATA_TOPIC_NAME)); + + // test list + output = captureListTopicStandardOut(buildTopicCommandOptionsWithBootstrap("--list", "--exclude-internal")); + assertTrue(output.contains(testTopicName)); + assertFalse(output.contains(Topic.GROUP_METADATA_TOPIC_NAME)); + } + + @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) + @ValueSource(strings = {"zk", "kraft"}) + public void testDescribeDoesNotFailWhenListingReassignmentIsUnauthorized(String quorum) throws ExecutionException, InterruptedException { Review Comment: good catch I updated the test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
