OmniaGM commented on code in PR #13201: URL: https://github.com/apache/kafka/pull/13201#discussion_r1297476446
########## tools/src/main/java/org/apache/kafka/tools/TopicCommand.java: ########## @@ -0,0 +1,1019 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.tools; + +import joptsimple.ArgumentAcceptingOptionSpec; +import joptsimple.OptionSpec; +import joptsimple.OptionSpecBuilder; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.Config; +import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.clients.admin.CreatePartitionsOptions; +import org.apache.kafka.clients.admin.CreateTopicsOptions; +import org.apache.kafka.clients.admin.CreateTopicsResult; +import org.apache.kafka.clients.admin.DeleteTopicsOptions; +import org.apache.kafka.clients.admin.ListTopicsOptions; +import org.apache.kafka.clients.admin.ListTopicsResult; +import org.apache.kafka.clients.admin.NewPartitions; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.PartitionReassignment; +import org.apache.kafka.clients.admin.TopicListing; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicCollection; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.TopicPartitionInfo; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.errors.ClusterAuthorizationException; +import org.apache.kafka.common.errors.TopicExistsException; +import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.common.AdminCommandFailedException; +import org.apache.kafka.server.common.AdminOperationException; +import org.apache.kafka.server.util.CommandDefaultOptions; +import org.apache.kafka.server.util.CommandLineUtils; +import org.apache.kafka.server.util.TopicFilter.IncludeList; +import org.apache.kafka.storage.internals.log.LogConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public abstract class TopicCommand { + private static final Logger log = LoggerFactory.getLogger(TopicCommand.class); + + public static void main(String[] args) { + Exit.exit(mainNoExit(args)); + } + + private static int mainNoExit(String... args) { + try { + execute(args); + return 0; + } catch (TerseException e) { + System.err.println(e.getMessage()); + return 1; + } catch (Throwable e) { + System.err.println(e.getMessage()); + System.err.println(Utils.stackTrace(e)); + return 1; + } + } + + static void execute(String... args) throws Exception { + TopicCommandOptions opts = new TopicCommandOptions(args); + opts.checkArgs(); + TopicService topicService = new TopicService(opts.commandConfig(), opts.bootstrapServer()); + int exitCode = 0; + try { + if (opts.hasCreateOption()) { + topicService.createTopic(opts); + } else if (opts.hasAlterOption()) { + topicService.alterTopic(opts); + } else if (opts.hasListOption()) { + topicService.listTopics(opts); + } else if (opts.hasDescribeOption()) { + topicService.describeTopic(opts); + } else if (opts.hasDeleteOption()) { + topicService.deleteTopic(opts); + } + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + if (cause != null) { + printException(cause); + } else { + printException(e); + } + exitCode = 1; + } catch (Throwable e) { + printException(e); + exitCode = 1; + } finally { + topicService.close(); + Exit.exit(exitCode); + } + + } + + private static void printException(Throwable e) { + System.out.println("Error while executing topic command : " + e.getMessage()); + log.error(Utils.stackTrace(e)); + } + + @SuppressWarnings("deprecation") + static Map<Integer, List<Integer>> parseReplicaAssignment(String replicaAssignmentList) { + String[] partitionList = replicaAssignmentList.split(","); + Map<Integer, List<Integer>> ret = new LinkedHashMap<Integer, List<Integer>>(); + for (int i = 0; i < partitionList.length; i++) { + List<Integer> brokerList = Arrays.stream(partitionList[i].split(":")) + .map(String::trim) + .mapToInt(Integer::parseInt) + .boxed() + .collect(Collectors.toList()); + Collection<Integer> duplicateBrokers = ToolsUtils.duplicates(brokerList); + if (!duplicateBrokers.isEmpty()) { + throw new AdminCommandFailedException("Partition replica lists may not contain duplicate entries: " + + duplicateBrokers.stream() + .map(Object::toString) + .collect(Collectors.joining(",")) + ); + } + ret.put(i, brokerList); + if (ret.get(i).size() != ret.get(0).size()) { + throw new AdminOperationException("Partition " + i + " has different replication factor: " + brokerList); + } + } + return ret; + } + + @SuppressWarnings("deprecation") + private static Properties parseTopicConfigsToBeAdded(TopicCommandOptions opts) { + List<List<String>> configsToBeAdded = opts.topicConfig().orElse(Collections.emptyList()) + .stream() + .map(s -> Arrays.asList(s.split("\\s*=\\s*"))) + .collect(Collectors.toList()); + + if (!configsToBeAdded.stream().allMatch(config -> config.size() == 2)) { + throw new IllegalArgumentException("requirement failed: Invalid topic config: all configs to be added must be in the format \"key=val\"."); + } + + Properties props = new Properties(); + configsToBeAdded.stream() + .forEach(pair -> props.setProperty(pair.get(0).trim(), pair.get(1).trim())); + LogConfig.validate(props); + if (props.containsKey(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG)) { + System.out.println("WARNING: The configuration ${TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG}=${props.getProperty(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG)} is specified. " + + "This configuration will be ignored if the version is newer than the inter.broker.protocol.version specified in the broker or " + + "if the inter.broker.protocol.version is 3.0 or newer. This configuration is deprecated and it will be removed in Apache Kafka 4.0."); + } + return props; + } + + // It is possible for a reassignment to complete between the time we have fetched its state and the time + // we fetch partition metadata. In this case, we ignore the reassignment when determining replication factor. + public static boolean isReassignmentInProgress(TopicPartitionInfo tpi, PartitionReassignment ra) { + // Reassignment is still in progress as long as the removing and adding replicas are still present + Set<Integer> allReplicaIds = tpi.replicas().stream().map(Node::id).collect(Collectors.toSet()); + Set<Integer> changingReplicaIds = new HashSet<>(); + if (ra != null) { + changingReplicaIds.addAll(ra.removingReplicas()); + changingReplicaIds.addAll(ra.addingReplicas()); + } + return allReplicaIds.stream().anyMatch(changingReplicaIds::contains); + + } + + private static Integer getReplicationFactor(TopicPartitionInfo tpi, PartitionReassignment reassignment) { + return isReassignmentInProgress(tpi, reassignment) ? + reassignment.replicas().size() - reassignment.addingReplicas().size() : + tpi.replicas().size(); + } + + /** + * ensures topic existence and throws exception if topic doesn't exist + * + * @param foundTopics Topics that were found to match the requested topic name. + * @param requestedTopic Name of the topic that was requested. + * @param requireTopicExists Indicates if the topic needs to exist for the operation to be successful. + * If set to true, the command will throw an exception if the topic with the + * requested name does not exist. + */ + private static void ensureTopicExists(List<String> foundTopics, String requestedTopic, Boolean requireTopicExists) { + // If no topic name was mentioned, do not need to throw exception. + if (!(requestedTopic.isEmpty() || !Optional.ofNullable(requestedTopic).isPresent()) && requireTopicExists && foundTopics.isEmpty()) { + // If given topic doesn't exist then throw exception + throw new IllegalArgumentException(String.format("Topic '%s' does not exist as expected", requestedTopic)); + } + } + + private static List<String> doGetTopics(List<String> allTopics, Optional<String> topicIncludeList, Boolean excludeInternalTopics) { + if (topicIncludeList.isPresent()) { + IncludeList topicsFilter = new IncludeList(topicIncludeList.get()); + return allTopics.stream() + .filter(topic -> topicsFilter.isTopicAllowed(topic, excludeInternalTopics)) + .collect(Collectors.toList()); + } else { + return allTopics.stream() + .filter(topic -> !(Topic.isInternal(topic) && excludeInternalTopics)) + .collect(Collectors.toList()); + } + } + + /** + * ensures topic existence and throws exception if topic doesn't exist + * + * @param foundTopicIds Topics that were found to match the requested topic id. + * @param requestedTopicId Id of the topic that was requested. + * @param requireTopicIdExists Indicates if the topic needs to exist for the operation to be successful. + * If set to true, the command will throw an exception if the topic with the + * requested id does not exist. + */ + private static void ensureTopicIdExists(List<Uuid> foundTopicIds, Uuid requestedTopicId, Boolean requireTopicIdExists) { + // If no topic id was mentioned, do not need to throw exception. + if (requestedTopicId != null && requireTopicIdExists && foundTopicIds.isEmpty()) { + // If given topicId doesn't exist then throw exception + throw new IllegalArgumentException(String.format("TopicId '%s' does not exist as expected", requestedTopicId)); + } + } + + static class CommandTopicPartition { + private final TopicCommandOptions opts; + private final Optional<String> name; + private final Optional<Integer> replicationFactor; + private final Optional<Integer> partitions; + private final Map<Integer, List<Integer>> replicaAssignment; + private final Properties configsToAdd; + + public CommandTopicPartition(TopicCommandOptions options) { + opts = options; + name = options.topic(); + partitions = options.partitions(); + replicationFactor = options.replicationFactor(); + replicaAssignment = options.replicaAssignment().orElse(Collections.emptyMap()); + configsToAdd = parseTopicConfigsToBeAdded(options); + } + + public Boolean hasReplicaAssignment() { + return !replicaAssignment.isEmpty(); + } + + public Boolean hasPartitions() { + return partitions.isPresent(); + } + + public Boolean ifTopicDoesntExist() { + return opts.ifNotExists(); + } + } + + static class TopicDescription { + private final String topic; + private final Uuid topicId; + private final Integer numPartitions; + private final Integer replicationFactor; + private final Config config; + private final Boolean markedForDeletion; + + public TopicDescription(String topic, Uuid topicId, Integer numPartitions, Integer replicationFactor, Config config, Boolean markedForDeletion) { + this.topic = topic; + this.topicId = topicId; + this.numPartitions = numPartitions; + this.replicationFactor = replicationFactor; + this.config = config; + this.markedForDeletion = markedForDeletion; + } + + public void printDescription() { + String configsAsString = config.entries().stream() + .filter(config -> !config.isDefault()) + .map(ce -> String.format("%s=%s", ce.name(), ce.value())) + .collect(Collectors.joining(",")); + System.out.print(String.format("Topic: %s", topic)); + if (topicId != Uuid.ZERO_UUID) + System.out.print(String.format("\tTopicId: %s", topicId)); + System.out.print(String.format("\tPartitionCount: %s", numPartitions)); + System.out.print(String.format("\tReplicationFactor: %s", replicationFactor)); + System.out.print(String.format("\tConfigs: %s", configsAsString)); + System.out.print(markedForDeletion ? "\tMarkedForDeletion: true" : ""); + System.out.println(); + } + } + + static class PartitionDescription { + private final String topic; + private final TopicPartitionInfo info; + private final Config config; + private final Boolean markedForDeletion; + private final PartitionReassignment reassignment; + + PartitionDescription(String topic, + TopicPartitionInfo info, + Config config, + Boolean markedForDeletion, + PartitionReassignment reassignment) { + this.topic = topic; + this.info = info; + this.config = config; + this.markedForDeletion = markedForDeletion; + this.reassignment = reassignment; + } + + public Integer minIsrCount() { + return Integer.parseInt(config.get(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG).value()); + } + + public Boolean isUnderReplicated() { + return getReplicationFactor(info, reassignment) - info.isr().size() > 0; + } + + public boolean hasLeader() { + return info.leader() != null; + } + + public Boolean isUnderMinIsr() { + return !hasLeader() || info.isr().size() < minIsrCount(); + } + + public Boolean isAtMinIsrPartitions() { + return minIsrCount() == info.isr().size(); + + } + + public Boolean hasUnavailablePartitions(Set<Integer> liveBrokers) { + return !hasLeader() || !liveBrokers.contains(info.leader().id()); + } + + public void printDescription() { + System.out.print("\tTopic: " + topic); + System.out.print("\tPartition: " + info.partition()); + System.out.print("\tLeader: " + (hasLeader() ? info.leader().id() : "none")); + System.out.print("\tReplicas: " + info.replicas().stream() + .map(node -> Integer.toString(node.id())) + .collect(Collectors.joining(","))); + System.out.print("\tIsr: " + info.isr().stream() + .map(node -> Integer.toString(node.id())) + .collect(Collectors.joining(","))); + if (reassignment != null) { + System.out.print("\tAdding Replicas: " + reassignment.addingReplicas().stream() + .map(node -> node.toString()) + .collect(Collectors.joining(","))); + System.out.print("\tRemoving Replicas: " + reassignment.removingReplicas().stream() + .map(node -> node.toString()) + .collect(Collectors.joining(","))); + } + System.out.print(markedForDeletion ? "\tMarkedForDeletion: true" : ""); + System.out.println(); + } + + } + + static class DescribeOptions { + private final TopicCommandOptions opts; + private final Set<Integer> liveBrokers; + private final boolean describeConfigs; + private final boolean describePartitions; + + public DescribeOptions(TopicCommandOptions opts, Set<Integer> liveBrokers) { + this.opts = opts; + this.liveBrokers = liveBrokers; + this.describeConfigs = !opts.reportUnavailablePartitions() && + !opts.reportUnderReplicatedPartitions() && + !opts.reportUnderMinIsrPartitions() && + !opts.reportAtMinIsrPartitions(); + this.describePartitions = !opts.reportOverriddenConfigs(); + } + + private boolean shouldPrintUnderReplicatedPartitions(PartitionDescription partitionDescription) { + return opts.reportUnderReplicatedPartitions() && partitionDescription.isUnderReplicated(); + } + + private boolean shouldPrintUnavailablePartitions(PartitionDescription partitionDescription) { + return opts.reportUnavailablePartitions() && partitionDescription.hasUnavailablePartitions(liveBrokers); + } + + private boolean shouldPrintUnderMinIsrPartitions(PartitionDescription partitionDescription) { + return opts.reportUnderMinIsrPartitions() && partitionDescription.isUnderMinIsr(); + } + + private boolean shouldPrintAtMinIsrPartitions(PartitionDescription partitionDescription) { + return opts.reportAtMinIsrPartitions() && partitionDescription.isAtMinIsrPartitions(); + } + + private boolean shouldPrintTopicPartition(PartitionDescription partitionDesc) { + return describeConfigs || + shouldPrintUnderReplicatedPartitions(partitionDesc) || + shouldPrintUnavailablePartitions(partitionDesc) || + shouldPrintUnderMinIsrPartitions(partitionDesc) || + shouldPrintAtMinIsrPartitions(partitionDesc); + } + + public void maybePrintPartitionDescription(PartitionDescription desc) { + if (shouldPrintTopicPartition(desc)) { + desc.printDescription(); + } + } + } + + public static class TopicService implements AutoCloseable { + private Admin adminClient; + + public TopicService(Properties commandConfig, Optional<String> bootstrapServer) { + this.adminClient = createAdminClient(commandConfig, bootstrapServer); + } + + public TopicService(Admin admin) { + this.adminClient = admin; + } + + private static Admin createAdminClient(Properties commandConfig, Optional<String> bootstrapServer) { + if (bootstrapServer.isPresent()) { + commandConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); + } + return Admin.create(commandConfig); + } + + public void createTopic(TopicCommandOptions opts) throws Exception { + CommandTopicPartition topic = new CommandTopicPartition(opts); + if (Topic.hasCollisionChars(topic.name.get())) { + System.out.println("WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could " + + "collide. To avoid issues it is best to use either, but not both."); + } + createTopic(topic); + } + + public void createTopic(CommandTopicPartition topic) throws Exception { + if (topic.replicationFactor.filter(rf -> rf > Short.MAX_VALUE || rf < 1).isPresent()) { + throw new IllegalArgumentException("The replication factor must be between 1 and " + Short.MAX_VALUE + " inclusive"); + } + if (topic.partitions.filter(p -> p < 1).isPresent()) { + throw new IllegalArgumentException("The partitions must be greater than 0"); + } + + try { + NewTopic newTopic; + if (topic.hasReplicaAssignment()) { + newTopic = new NewTopic(topic.name.get(), topic.replicaAssignment); + } else { + newTopic = new NewTopic(topic.name.get(), topic.partitions, topic.replicationFactor.map(Integer::shortValue)); + } + + Map<String, String> configsMap = topic.configsToAdd.stringPropertyNames().stream() + .collect(Collectors.toMap(name -> name, name -> topic.configsToAdd.getProperty(name))); + + newTopic.configs(configsMap); + CreateTopicsResult createResult = adminClient.createTopics(Collections.singleton(newTopic), + new CreateTopicsOptions().retryOnQuotaViolation(false)); + createResult.all().get(); + System.out.println("Created topic " + topic.name + "."); + } catch (ExecutionException e) { + if (e.getCause() == null) { + throw e; + } + if (!(e.getCause() instanceof TopicExistsException && topic.ifTopicDoesntExist())) { + throw (Exception) e.getCause(); + } + } + } + + public void listTopics(TopicCommandOptions opts) throws ExecutionException, InterruptedException { + String results = getTopics(opts.topic(), opts.excludeInternalTopics()) + .stream() + .collect(Collectors.joining("\n")); + System.out.println(results); + } + + public void alterTopic(TopicCommandOptions opts) throws ExecutionException, InterruptedException { + CommandTopicPartition topic = new CommandTopicPartition(opts); + List<String> topics = getTopics(opts.topic(), opts.excludeInternalTopics()); + ensureTopicExists(topics, opts.topic().orElse(""), !opts.ifExists()); + + if (!topics.isEmpty()) { + Map<String, KafkaFuture<org.apache.kafka.clients.admin.TopicDescription>> topicsInfo = adminClient.describeTopics(topics).topicNameValues(); + Map<String, NewPartitions> newPartitions = topics.stream() + .map(topicName -> topicNewPartitions(topic, topicsInfo, topicName)) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + adminClient.createPartitions(newPartitions, new CreatePartitionsOptions().retryOnQuotaViolation(false)).all().get(); + } + } + + private AbstractMap.SimpleEntry<String, NewPartitions> topicNewPartitions( + CommandTopicPartition topic, + Map<String, KafkaFuture<org.apache.kafka.clients.admin.TopicDescription>> topicsInfo, + String topicName) { + if (topic.hasReplicaAssignment()) { + try { + Integer startPartitionId = topicsInfo.get(topicName).get().partitions().size(); + Map<Integer, List<Integer>> replicaMap = topic.replicaAssignment.entrySet().stream() + .skip(startPartitionId) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + List<List<Integer>> newAssignment = replicaMap.entrySet().stream() + .map(Map.Entry::getValue) + .collect(Collectors.toList()); + return new AbstractMap.SimpleEntry<>(topicName, NewPartitions.increaseTo(topic.partitions.get(), newAssignment)); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + return new AbstractMap.SimpleEntry<>(topicName, NewPartitions.increaseTo(topic.partitions.get())); + } + + public Map<TopicPartition, PartitionReassignment> listAllReassignments(Set<TopicPartition> topicPartitions) { + try { + return adminClient.listPartitionReassignments(topicPartitions).reassignments().get(); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + if (cause instanceof UnsupportedVersionException || cause instanceof ClusterAuthorizationException) { + log.debug("Couldn't query reassignments through the AdminClient API: " + cause.getMessage(), cause); + return Collections.emptyMap(); + } else { + throw new RuntimeException(e); + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + public void describeTopic(TopicCommandOptions opts) throws ExecutionException, InterruptedException { + // If topicId is provided and not zero, will use topicId regardless of topic name + Optional<Uuid> inputTopicId = opts.topicId() + .map(Uuid::fromString).filter(uuid -> uuid != Uuid.ZERO_UUID); + Boolean useTopicId = inputTopicId.isPresent(); + + List<Uuid> topicIds; + List<String> topics; + if (useTopicId) { + topicIds = getTopicIds(inputTopicId.get(), opts.excludeInternalTopics()); + topics = Collections.emptyList(); + } else { + topicIds = Collections.emptyList(); + topics = getTopics(opts.topic(), opts.excludeInternalTopics()); + + } + + // Only check topic name when topicId is not provided + if (useTopicId) { + ensureTopicIdExists(topicIds, inputTopicId.get(), !opts.ifExists()); + } else { + ensureTopicExists(topics, opts.topic().orElse(""), !opts.ifExists()); + } + List<org.apache.kafka.clients.admin.TopicDescription> topicDescriptions = new ArrayList<>(); + + if (!topicIds.isEmpty()) { + Map<Uuid, org.apache.kafka.clients.admin.TopicDescription> descTopics = + adminClient.describeTopics(TopicCollection.ofTopicIds(topicIds)).allTopicIds().get(); + topicDescriptions = new ArrayList<>(descTopics.values()); + } + + if (!topics.isEmpty()) { + Map<String, org.apache.kafka.clients.admin.TopicDescription> descTopics = + adminClient.describeTopics(TopicCollection.ofTopicNames(topics)).allTopicNames().get(); + topicDescriptions = new ArrayList<>(descTopics.values()); + } + + List<String> topicNames = topicDescriptions.stream() + .map(org.apache.kafka.clients.admin.TopicDescription::name) + .collect(Collectors.toList()); + Map<ConfigResource, KafkaFuture<Config>> allConfigs = adminClient.describeConfigs( + topicNames.stream() + .map(name -> new ConfigResource(ConfigResource.Type.TOPIC, name)) + .collect(Collectors.toList()) + ).values(); + List<Integer> liveBrokers = adminClient.describeCluster().nodes().get().stream() + .map(Node::id) + .collect(Collectors.toList()); + DescribeOptions describeOptions = new DescribeOptions(opts, new HashSet<>(liveBrokers)); + Set<TopicPartition> topicPartitions = topicDescriptions + .stream() + .flatMap(td -> td.partitions().stream() + .map(p -> new TopicPartition(td.name(), p.partition()))) + .collect(Collectors.toSet()); + Map<TopicPartition, PartitionReassignment> reassignments = listAllReassignments(topicPartitions); + for (org.apache.kafka.clients.admin.TopicDescription td : topicDescriptions) { + String topicName = td.name(); + Uuid topicId = td.topicId(); + Config config = allConfigs.get(new ConfigResource(ConfigResource.Type.TOPIC, topicName)).get(); + ArrayList<TopicPartitionInfo> sortedPartitions = new ArrayList<>(td.partitions()); + sortedPartitions.sort(Comparator.comparingInt(TopicPartitionInfo::partition)); + printDescribeConfig(opts, describeOptions, reassignments, td, topicName, topicId, config, sortedPartitions); + printPartitionDescription(describeOptions, reassignments, td, topicName, config, sortedPartitions); + } + } + + private void printPartitionDescription(DescribeOptions describeOptions, Map<TopicPartition, PartitionReassignment> reassignments, org.apache.kafka.clients.admin.TopicDescription td, String topicName, Config config, ArrayList<TopicPartitionInfo> sortedPartitions) { + if (describeOptions.describePartitions) { + for (TopicPartitionInfo partition : sortedPartitions) { + PartitionReassignment reassignment = + reassignments.get(new TopicPartition(td.name(), partition.partition())); + PartitionDescription partitionDesc = new PartitionDescription(topicName, + partition, config, false, reassignment); + describeOptions.maybePrintPartitionDescription(partitionDesc); + } + } + } + + private void printDescribeConfig(TopicCommandOptions opts, DescribeOptions describeOptions, Map<TopicPartition, PartitionReassignment> reassignments, org.apache.kafka.clients.admin.TopicDescription td, String topicName, Uuid topicId, Config config, ArrayList<TopicPartitionInfo> sortedPartitions) { + if (describeOptions.describeConfigs) { + List<ConfigEntry> entries = new ArrayList<>(config.entries()); + boolean hasNonDefault = entries.stream().anyMatch(e -> !e.isDefault()); + if (!opts.reportOverriddenConfigs() || hasNonDefault) { + int numPartitions = td.partitions().size(); + TopicPartitionInfo firstPartition = sortedPartitions.get(0); + PartitionReassignment reassignment = + reassignments.get(new TopicPartition(td.name(), firstPartition.partition())); + TopicDescription topicDesc = new TopicDescription(topicName, topicId, + numPartitions, getReplicationFactor(firstPartition, reassignment), + config, false); + topicDesc.printDescription(); + } + } + } + + public void deleteTopic(TopicCommandOptions opts) throws ExecutionException, InterruptedException { + List<String> topics = getTopics(opts.topic(), opts.excludeInternalTopics()); + ensureTopicExists(topics, opts.topic().orElse(""), !opts.ifExists()); + adminClient.deleteTopics(Collections.unmodifiableList(topics), + new DeleteTopicsOptions().retryOnQuotaViolation(false) + ).all().get(); + } + + public List<String> getTopics(Optional<String> topicIncludeList, boolean excludeInternalTopics) throws ExecutionException, InterruptedException { + ListTopicsOptions listTopicsOptions = new ListTopicsOptions(); + if (!excludeInternalTopics) { + listTopicsOptions.listInternal(true); + } + + Set<String> allTopics = adminClient.listTopics(listTopicsOptions).names().get(); + return doGetTopics(allTopics.stream().sorted().collect(Collectors.toList()), topicIncludeList, excludeInternalTopics); + } + + public List<Uuid> getTopicIds(Uuid topicIdIncludeList, boolean excludeInternalTopics) throws ExecutionException, InterruptedException { + ListTopicsResult allTopics = excludeInternalTopics ? adminClient.listTopics() : + adminClient.listTopics(new ListTopicsOptions().listInternal(true)); + List<Uuid> allTopicIds = null; + allTopicIds = allTopics.listings().get().stream() + .map(TopicListing::topicId) + .sorted() + .collect(Collectors.toList()); + return allTopicIds.contains(topicIdIncludeList) ? + Collections.singletonList(topicIdIncludeList) : + Collections.emptyList(); + } + + @Override + public void close() throws Exception { + adminClient.close(); + } + } + + public static class TopicCommandOptions extends CommandDefaultOptions { + private final ArgumentAcceptingOptionSpec<String> bootstrapServerOpt; + + private final ArgumentAcceptingOptionSpec<String> commandConfigOpt; + + private final OptionSpecBuilder listOpt; + + private final OptionSpecBuilder createOpt; + + private final OptionSpecBuilder deleteOpt; + + private final OptionSpecBuilder alterOpt; + + private final OptionSpecBuilder describeOpt; + + private final ArgumentAcceptingOptionSpec<String> topicOpt; + + private final ArgumentAcceptingOptionSpec<String> topicIdOpt; + + private final String nl; + + private static final String KAFKA_CONFIGS_CLI_SUPPORTS_ALTERING_TOPIC_CONFIGS_WITH_A_BOOTSTRAP_SERVER = + " (the kafka-configs CLI supports altering topic configs with a --bootstrap-server option)"; + + private final ArgumentAcceptingOptionSpec<String> configOpt; + + private final ArgumentAcceptingOptionSpec<String> deleteConfigOpt; + + private final ArgumentAcceptingOptionSpec<Integer> partitionsOpt; + + private final ArgumentAcceptingOptionSpec<Integer> replicationFactorOpt; + + private final ArgumentAcceptingOptionSpec<String> replicaAssignmentOpt; + + private final OptionSpecBuilder reportUnderReplicatedPartitionsOpt; + + private final OptionSpecBuilder reportUnavailablePartitionsOpt; + + private final OptionSpecBuilder reportUnderMinIsrPartitionsOpt; + + private final OptionSpecBuilder reportAtMinIsrPartitionsOpt; + + private final OptionSpecBuilder topicsWithOverridesOpt; + + private final OptionSpecBuilder ifExistsOpt; + + private final OptionSpecBuilder ifNotExistsOpt; + + private final OptionSpecBuilder excludeInternalTopicOpt; + + private final Set<OptionSpec<?>> allTopicLevelOpts; + + private final Set<OptionSpecBuilder> allReplicationReportOpts; + + public TopicCommandOptions(String[] args) { + super(args); + bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED: The Kafka server to connect to.") + .withRequiredArg() + .describedAs("server to connect to") + .ofType(String.class); + commandConfigOpt = parser.accepts("command-config", "Property file containing configs to be passed to Admin Client. " + + "This is used only with --bootstrap-server option for describing and altering broker configs.") + .withRequiredArg() + .describedAs("command config property file") + .ofType(String.class); + + + listOpt = parser.accepts("list", "List all available topics."); + createOpt = parser.accepts("create", "Create a new topic."); + deleteOpt = parser.accepts("delete", "Delete a topic"); + alterOpt = parser.accepts("alter", "Alter the number of partitions, replica assignment, and/or configuration for the topic."); + describeOpt = parser.accepts("describe", "List details for the given topics."); + topicOpt = parser.accepts("topic", "Alter the number of partitions and replica assignment. " + + "Update the configuration of an existing topic via --alter is no longer supported here" + + KAFKA_CONFIGS_CLI_SUPPORTS_ALTERING_TOPIC_CONFIGS_WITH_A_BOOTSTRAP_SERVER + ".") + .withRequiredArg() + .describedAs("topic") + .ofType(String.class); + topicIdOpt = parser.accepts("topic-id", "The topic-id to describe." + + "This is used only with --bootstrap-server option for describing topics.") + .withRequiredArg() + .describedAs("topic-id") + .ofType(String.class); + nl = System.getProperty("line.separator"); + + String configNames = LogConfig.configNames().stream() + .map(name -> "\t" + name) + .collect(Collectors.joining("nl")); + configOpt = parser.accepts("config", "A topic configuration override for the topic being created or altered." + Review Comment: We had it in the original scala code. I removed it from the new code. ########## tools/src/main/java/org/apache/kafka/tools/TopicCommand.java: ########## @@ -0,0 +1,1019 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.tools; + +import joptsimple.ArgumentAcceptingOptionSpec; +import joptsimple.OptionSpec; +import joptsimple.OptionSpecBuilder; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.Config; +import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.clients.admin.CreatePartitionsOptions; +import org.apache.kafka.clients.admin.CreateTopicsOptions; +import org.apache.kafka.clients.admin.CreateTopicsResult; +import org.apache.kafka.clients.admin.DeleteTopicsOptions; +import org.apache.kafka.clients.admin.ListTopicsOptions; +import org.apache.kafka.clients.admin.ListTopicsResult; +import org.apache.kafka.clients.admin.NewPartitions; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.PartitionReassignment; +import org.apache.kafka.clients.admin.TopicListing; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicCollection; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.TopicPartitionInfo; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.errors.ClusterAuthorizationException; +import org.apache.kafka.common.errors.TopicExistsException; +import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.common.AdminCommandFailedException; +import org.apache.kafka.server.common.AdminOperationException; +import org.apache.kafka.server.util.CommandDefaultOptions; +import org.apache.kafka.server.util.CommandLineUtils; +import org.apache.kafka.server.util.TopicFilter.IncludeList; +import org.apache.kafka.storage.internals.log.LogConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public abstract class TopicCommand { + private static final Logger log = LoggerFactory.getLogger(TopicCommand.class); Review Comment: done ########## tools/src/main/java/org/apache/kafka/tools/TopicCommand.java: ########## @@ -0,0 +1,1019 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.tools; + +import joptsimple.ArgumentAcceptingOptionSpec; +import joptsimple.OptionSpec; +import joptsimple.OptionSpecBuilder; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.Config; +import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.clients.admin.CreatePartitionsOptions; +import org.apache.kafka.clients.admin.CreateTopicsOptions; +import org.apache.kafka.clients.admin.CreateTopicsResult; +import org.apache.kafka.clients.admin.DeleteTopicsOptions; +import org.apache.kafka.clients.admin.ListTopicsOptions; +import org.apache.kafka.clients.admin.ListTopicsResult; +import org.apache.kafka.clients.admin.NewPartitions; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.PartitionReassignment; +import org.apache.kafka.clients.admin.TopicListing; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicCollection; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.TopicPartitionInfo; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.errors.ClusterAuthorizationException; +import org.apache.kafka.common.errors.TopicExistsException; +import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.common.AdminCommandFailedException; +import org.apache.kafka.server.common.AdminOperationException; +import org.apache.kafka.server.util.CommandDefaultOptions; +import org.apache.kafka.server.util.CommandLineUtils; +import org.apache.kafka.server.util.TopicFilter.IncludeList; +import org.apache.kafka.storage.internals.log.LogConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public abstract class TopicCommand { + private static final Logger log = LoggerFactory.getLogger(TopicCommand.class); + + public static void main(String[] args) { + Exit.exit(mainNoExit(args)); + } + + private static int mainNoExit(String... args) { + try { + execute(args); + return 0; + } catch (TerseException e) { + System.err.println(e.getMessage()); + return 1; + } catch (Throwable e) { + System.err.println(e.getMessage()); + System.err.println(Utils.stackTrace(e)); + return 1; + } + } + + static void execute(String... args) throws Exception { + TopicCommandOptions opts = new TopicCommandOptions(args); + opts.checkArgs(); Review Comment: Moved the `checkArgs` inside `TopicCommandOptions`. This will have some impact on the test as all the tests were originally running without validating the args -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org