OmniaGM commented on code in PR #13201:
URL: https://github.com/apache/kafka/pull/13201#discussion_r1297476207


##########
tools/src/main/java/org/apache/kafka/tools/TopicCommand.java:
##########
@@ -0,0 +1,1019 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import joptsimple.ArgumentAcceptingOptionSpec;
+import joptsimple.OptionSpec;
+import joptsimple.OptionSpecBuilder;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.Config;
+import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.CreatePartitionsOptions;
+import org.apache.kafka.clients.admin.CreateTopicsOptions;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.DeleteTopicsOptions;
+import org.apache.kafka.clients.admin.ListTopicsOptions;
+import org.apache.kafka.clients.admin.ListTopicsResult;
+import org.apache.kafka.clients.admin.NewPartitions;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.PartitionReassignment;
+import org.apache.kafka.clients.admin.TopicListing;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicCollection;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.errors.ClusterAuthorizationException;
+import org.apache.kafka.common.errors.TopicExistsException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.AdminCommandFailedException;
+import org.apache.kafka.server.common.AdminOperationException;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.TopicFilter.IncludeList;
+import org.apache.kafka.storage.internals.log.LogConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public abstract class TopicCommand {
+    private static final Logger log = 
LoggerFactory.getLogger(TopicCommand.class);
+
+    public static void main(String[] args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    private static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    static void execute(String... args) throws Exception {
+        TopicCommandOptions opts = new TopicCommandOptions(args);
+        opts.checkArgs();
+        TopicService topicService = new TopicService(opts.commandConfig(), 
opts.bootstrapServer());
+        int exitCode = 0;
+        try {
+            if (opts.hasCreateOption()) {
+                topicService.createTopic(opts);
+            } else if (opts.hasAlterOption()) {
+                topicService.alterTopic(opts);
+            } else if (opts.hasListOption()) {
+                topicService.listTopics(opts);
+            } else if (opts.hasDescribeOption()) {
+                topicService.describeTopic(opts);
+            } else if (opts.hasDeleteOption()) {
+                topicService.deleteTopic(opts);
+            }
+        } catch (ExecutionException e) {
+            Throwable cause = e.getCause();
+            if (cause != null) {
+                printException(cause);
+            } else {
+                printException(e);
+            }
+            exitCode = 1;
+        } catch (Throwable e) {
+            printException(e);
+            exitCode = 1;
+        } finally {
+            topicService.close();
+            Exit.exit(exitCode);
+        }
+
+    }
+
+    private static void printException(Throwable e) {
+        System.out.println("Error while executing topic command : " + 
e.getMessage());
+        log.error(Utils.stackTrace(e));
+    }
+
+    @SuppressWarnings("deprecation")
+    static Map<Integer, List<Integer>> parseReplicaAssignment(String 
replicaAssignmentList) {
+        String[] partitionList = replicaAssignmentList.split(",");
+        Map<Integer, List<Integer>> ret = new LinkedHashMap<Integer, 
List<Integer>>();
+        for (int i = 0; i < partitionList.length; i++) {
+            List<Integer> brokerList = 
Arrays.stream(partitionList[i].split(":"))
+                .map(String::trim)
+                .mapToInt(Integer::parseInt)
+                .boxed()
+                .collect(Collectors.toList());
+            Collection<Integer> duplicateBrokers = 
ToolsUtils.duplicates(brokerList);
+            if (!duplicateBrokers.isEmpty()) {
+                throw new AdminCommandFailedException("Partition replica lists 
may not contain duplicate entries: " +
+                    duplicateBrokers.stream()
+                        .map(Object::toString)
+                        .collect(Collectors.joining(","))
+                );
+            }
+            ret.put(i, brokerList);
+            if (ret.get(i).size() != ret.get(0).size()) {
+                throw new AdminOperationException("Partition " + i + " has 
different replication factor: " + brokerList);
+            }
+        }
+        return ret;
+    }
+
+    @SuppressWarnings("deprecation")
+    private static Properties parseTopicConfigsToBeAdded(TopicCommandOptions 
opts) {
+        List<List<String>> configsToBeAdded = 
opts.topicConfig().orElse(Collections.emptyList())
+            .stream()
+            .map(s -> Arrays.asList(s.split("\\s*=\\s*")))
+            .collect(Collectors.toList());
+
+        if (!configsToBeAdded.stream().allMatch(config -> config.size() == 2)) 
{
+            throw new IllegalArgumentException("requirement failed: Invalid 
topic config: all configs to be added must be in the format \"key=val\".");
+        }
+
+        Properties props = new Properties();
+        configsToBeAdded.stream()
+            .forEach(pair -> props.setProperty(pair.get(0).trim(), 
pair.get(1).trim()));
+        LogConfig.validate(props);
+        if (props.containsKey(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG)) {
+            System.out.println("WARNING: The configuration 
${TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG}=${props.getProperty(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG)}
 is specified. " +
+                "This configuration will be ignored if the version is newer 
than the inter.broker.protocol.version specified in the broker or " +
+                "if the inter.broker.protocol.version is 3.0 or newer. This 
configuration is deprecated and it will be removed in Apache Kafka 4.0.");
+        }
+        return props;
+    }
+
+    // It is possible for a reassignment to complete between the time we have 
fetched its state and the time
+    // we fetch partition metadata. In this case, we ignore the reassignment 
when determining replication factor.
+    public static boolean isReassignmentInProgress(TopicPartitionInfo tpi, 
PartitionReassignment ra) {
+        // Reassignment is still in progress as long as the removing and 
adding replicas are still present
+        Set<Integer> allReplicaIds = 
tpi.replicas().stream().map(Node::id).collect(Collectors.toSet());
+        Set<Integer> changingReplicaIds = new HashSet<>();
+        if (ra != null) {
+            changingReplicaIds.addAll(ra.removingReplicas());
+            changingReplicaIds.addAll(ra.addingReplicas());
+        }
+        return allReplicaIds.stream().anyMatch(changingReplicaIds::contains);
+
+    }
+
+    private static Integer getReplicationFactor(TopicPartitionInfo tpi, 
PartitionReassignment reassignment) {
+        return isReassignmentInProgress(tpi, reassignment) ?
+            reassignment.replicas().size() - 
reassignment.addingReplicas().size() :
+            tpi.replicas().size();
+    }
+
+    /**
+     * ensures topic existence and throws exception if topic doesn't exist
+     *
+     * @param foundTopics        Topics that were found to match the requested 
topic name.
+     * @param requestedTopic     Name of the topic that was requested.
+     * @param requireTopicExists Indicates if the topic needs to exist for the 
operation to be successful.
+     *                           If set to true, the command will throw an 
exception if the topic with the
+     *                           requested name does not exist.
+     */
+    private static void ensureTopicExists(List<String> foundTopics, String 
requestedTopic, Boolean requireTopicExists) {
+        // If no topic name was mentioned, do not need to throw exception.
+        if (!(requestedTopic.isEmpty() || 
!Optional.ofNullable(requestedTopic).isPresent()) && requireTopicExists && 
foundTopics.isEmpty()) {
+            // If given topic doesn't exist then throw exception
+            throw new IllegalArgumentException(String.format("Topic '%s' does 
not exist as expected", requestedTopic));
+        }
+    }
+
+    private static List<String> doGetTopics(List<String> allTopics, 
Optional<String> topicIncludeList, Boolean excludeInternalTopics) {
+        if (topicIncludeList.isPresent()) {
+            IncludeList topicsFilter = new IncludeList(topicIncludeList.get());
+            return allTopics.stream()
+                .filter(topic -> topicsFilter.isTopicAllowed(topic, 
excludeInternalTopics))
+                .collect(Collectors.toList());
+        } else {
+            return allTopics.stream()
+                .filter(topic -> !(Topic.isInternal(topic) && 
excludeInternalTopics))
+                .collect(Collectors.toList());
+        }
+    }
+
+    /**
+     * ensures topic existence and throws exception if topic doesn't exist
+     *
+     * @param foundTopicIds        Topics that were found to match the 
requested topic id.
+     * @param requestedTopicId     Id of the topic that was requested.
+     * @param requireTopicIdExists Indicates if the topic needs to exist for 
the operation to be successful.
+     *                             If set to true, the command will throw an 
exception if the topic with the
+     *                             requested id does not exist.
+     */
+    private static void ensureTopicIdExists(List<Uuid> foundTopicIds, Uuid 
requestedTopicId, Boolean requireTopicIdExists) {
+        // If no topic id was mentioned, do not need to throw exception.
+        if (requestedTopicId != null && requireTopicIdExists && 
foundTopicIds.isEmpty()) {
+            // If given topicId doesn't exist then throw exception
+            throw new IllegalArgumentException(String.format("TopicId '%s' 
does not exist as expected", requestedTopicId));
+        }
+    }
+
+    static class CommandTopicPartition {
+        private final TopicCommandOptions opts;
+        private final Optional<String> name;
+        private final Optional<Integer> replicationFactor;
+        private final Optional<Integer> partitions;
+        private final Map<Integer, List<Integer>> replicaAssignment;
+        private final Properties configsToAdd;
+
+        public CommandTopicPartition(TopicCommandOptions options) {
+            opts = options;
+            name = options.topic();
+            partitions = options.partitions();
+            replicationFactor = options.replicationFactor();
+            replicaAssignment = 
options.replicaAssignment().orElse(Collections.emptyMap());
+            configsToAdd = parseTopicConfigsToBeAdded(options);
+        }
+
+        public Boolean hasReplicaAssignment() {
+            return !replicaAssignment.isEmpty();
+        }
+
+        public Boolean hasPartitions() {
+            return partitions.isPresent();
+        }
+
+        public Boolean ifTopicDoesntExist() {
+            return opts.ifNotExists();
+        }
+    }
+
+    static class TopicDescription {
+        private final String topic;
+        private final Uuid topicId;
+        private final Integer numPartitions;
+        private final Integer replicationFactor;
+        private final Config config;
+        private final Boolean markedForDeletion;
+
+        public TopicDescription(String topic, Uuid topicId, Integer 
numPartitions, Integer replicationFactor, Config config, Boolean 
markedForDeletion) {
+            this.topic = topic;
+            this.topicId = topicId;
+            this.numPartitions = numPartitions;
+            this.replicationFactor = replicationFactor;
+            this.config = config;
+            this.markedForDeletion = markedForDeletion;
+        }
+
+        public void printDescription() {
+            String configsAsString = config.entries().stream()
+                .filter(config -> !config.isDefault())
+                .map(ce -> String.format("%s=%s", ce.name(), ce.value()))
+                .collect(Collectors.joining(","));
+            System.out.print(String.format("Topic: %s", topic));
+            if (topicId != Uuid.ZERO_UUID)
+                System.out.print(String.format("\tTopicId: %s", topicId));
+            System.out.print(String.format("\tPartitionCount: %s", 
numPartitions));
+            System.out.print(String.format("\tReplicationFactor: %s", 
replicationFactor));
+            System.out.print(String.format("\tConfigs: %s", configsAsString));
+            System.out.print(markedForDeletion ? "\tMarkedForDeletion: true" : 
"");
+            System.out.println();
+        }
+    }
+
+    static class PartitionDescription {
+        private final String topic;
+        private final TopicPartitionInfo info;
+        private final Config config;
+        private final Boolean markedForDeletion;
+        private final PartitionReassignment reassignment;
+
+        PartitionDescription(String topic,
+                             TopicPartitionInfo info,
+                             Config config,
+                             Boolean markedForDeletion,
+                             PartitionReassignment reassignment) {
+            this.topic = topic;
+            this.info = info;
+            this.config = config;
+            this.markedForDeletion = markedForDeletion;
+            this.reassignment = reassignment;
+        }
+
+        public Integer minIsrCount() {
+            return 
Integer.parseInt(config.get(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG).value());
+        }
+
+        public Boolean isUnderReplicated() {
+            return getReplicationFactor(info, reassignment) - 
info.isr().size() > 0;
+        }
+
+        public boolean hasLeader() {
+            return info.leader() != null;
+        }
+
+        public Boolean isUnderMinIsr() {
+            return !hasLeader() ||  info.isr().size() < minIsrCount();
+        }
+
+        public Boolean isAtMinIsrPartitions() {
+            return minIsrCount() == info.isr().size();
+
+        }
+
+        public Boolean hasUnavailablePartitions(Set<Integer> liveBrokers) {
+            return !hasLeader() || !liveBrokers.contains(info.leader().id());
+        }
+
+        public void printDescription() {
+            System.out.print("\tTopic: " + topic);
+            System.out.print("\tPartition: " + info.partition());
+            System.out.print("\tLeader: " + (hasLeader() ? info.leader().id() 
: "none"));
+            System.out.print("\tReplicas: " + info.replicas().stream()
+                .map(node -> Integer.toString(node.id()))
+                .collect(Collectors.joining(",")));
+            System.out.print("\tIsr: " + info.isr().stream()
+                .map(node -> Integer.toString(node.id()))
+                .collect(Collectors.joining(",")));
+            if (reassignment != null) {
+                System.out.print("\tAdding Replicas: " + 
reassignment.addingReplicas().stream()
+                    .map(node -> node.toString())
+                    .collect(Collectors.joining(",")));
+                System.out.print("\tRemoving Replicas: " + 
reassignment.removingReplicas().stream()
+                    .map(node -> node.toString())
+                    .collect(Collectors.joining(",")));
+            }
+            System.out.print(markedForDeletion ? "\tMarkedForDeletion: true" : 
"");
+            System.out.println();
+        }
+
+    }
+
+    static class DescribeOptions {
+        private final TopicCommandOptions opts;
+        private final Set<Integer> liveBrokers;
+        private final boolean describeConfigs;
+        private final boolean describePartitions;
+
+        public DescribeOptions(TopicCommandOptions opts, Set<Integer> 
liveBrokers) {
+            this.opts = opts;
+            this.liveBrokers = liveBrokers;
+            this.describeConfigs = !opts.reportUnavailablePartitions() &&
+                !opts.reportUnderReplicatedPartitions() &&
+                !opts.reportUnderMinIsrPartitions() &&
+                !opts.reportAtMinIsrPartitions();
+            this.describePartitions = !opts.reportOverriddenConfigs();
+        }
+
+        private boolean 
shouldPrintUnderReplicatedPartitions(PartitionDescription partitionDescription) 
{
+            return opts.reportUnderReplicatedPartitions() && 
partitionDescription.isUnderReplicated();
+        }
+
+        private boolean shouldPrintUnavailablePartitions(PartitionDescription 
partitionDescription) {
+            return opts.reportUnavailablePartitions() && 
partitionDescription.hasUnavailablePartitions(liveBrokers);
+        }
+
+        private boolean shouldPrintUnderMinIsrPartitions(PartitionDescription 
partitionDescription) {
+            return opts.reportUnderMinIsrPartitions() && 
partitionDescription.isUnderMinIsr();
+        }
+
+        private boolean shouldPrintAtMinIsrPartitions(PartitionDescription 
partitionDescription) {
+            return opts.reportAtMinIsrPartitions() && 
partitionDescription.isAtMinIsrPartitions();
+        }
+
+        private boolean shouldPrintTopicPartition(PartitionDescription 
partitionDesc) {
+            return describeConfigs ||
+                shouldPrintUnderReplicatedPartitions(partitionDesc) ||
+                shouldPrintUnavailablePartitions(partitionDesc) ||
+                shouldPrintUnderMinIsrPartitions(partitionDesc) ||
+                shouldPrintAtMinIsrPartitions(partitionDesc);
+        }
+
+        public void maybePrintPartitionDescription(PartitionDescription desc) {
+            if (shouldPrintTopicPartition(desc)) {
+                desc.printDescription();
+            }
+        }
+    }
+
+    public static class TopicService implements AutoCloseable {
+        private Admin adminClient;
+
+        public TopicService(Properties commandConfig, Optional<String> 
bootstrapServer) {
+            this.adminClient = createAdminClient(commandConfig, 
bootstrapServer);
+        }
+
+        public TopicService(Admin admin) {
+            this.adminClient = admin;
+        }
+
+        private static Admin createAdminClient(Properties commandConfig, 
Optional<String> bootstrapServer) {
+            if (bootstrapServer.isPresent()) {
+                commandConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServer);
+            }
+            return Admin.create(commandConfig);
+        }
+
+        public void createTopic(TopicCommandOptions opts) throws Exception {
+            CommandTopicPartition topic = new CommandTopicPartition(opts);
+            if (Topic.hasCollisionChars(topic.name.get())) {
+                System.out.println("WARNING: Due to limitations in metric 
names, topics with a period ('.') or underscore ('_') could " +
+                    "collide. To avoid issues it is best to use either, but 
not both.");
+            }
+            createTopic(topic);
+        }
+
+        public void createTopic(CommandTopicPartition topic) throws Exception {
+            if (topic.replicationFactor.filter(rf -> rf > Short.MAX_VALUE || 
rf < 1).isPresent()) {
+                throw new IllegalArgumentException("The replication factor 
must be between 1 and " + Short.MAX_VALUE + " inclusive");
+            }
+            if (topic.partitions.filter(p -> p < 1).isPresent()) {
+                throw new IllegalArgumentException("The partitions must be 
greater than 0");
+            }
+
+            try {
+                NewTopic newTopic;
+                if (topic.hasReplicaAssignment()) {
+                    newTopic = new NewTopic(topic.name.get(), 
topic.replicaAssignment);
+                } else {
+                    newTopic = new NewTopic(topic.name.get(), 
topic.partitions, topic.replicationFactor.map(Integer::shortValue));
+                }
+
+                Map<String, String> configsMap = 
topic.configsToAdd.stringPropertyNames().stream()
+                    .collect(Collectors.toMap(name -> name, name -> 
topic.configsToAdd.getProperty(name)));
+
+                newTopic.configs(configsMap);
+                CreateTopicsResult createResult = 
adminClient.createTopics(Collections.singleton(newTopic),
+                    new CreateTopicsOptions().retryOnQuotaViolation(false));
+                createResult.all().get();
+                System.out.println("Created topic " + topic.name + ".");
+            } catch (ExecutionException e) {
+                if (e.getCause() == null) {
+                    throw e;
+                }
+                if (!(e.getCause() instanceof TopicExistsException && 
topic.ifTopicDoesntExist())) {
+                    throw (Exception) e.getCause();
+                }
+            }
+        }
+
+        public void listTopics(TopicCommandOptions opts) throws 
ExecutionException, InterruptedException {
+            String results = getTopics(opts.topic(), 
opts.excludeInternalTopics())
+                .stream()
+                .collect(Collectors.joining("\n"));
+            System.out.println(results);
+        }
+
+        public void alterTopic(TopicCommandOptions opts) throws 
ExecutionException, InterruptedException {
+            CommandTopicPartition topic = new CommandTopicPartition(opts);
+            List<String> topics = getTopics(opts.topic(), 
opts.excludeInternalTopics());
+            ensureTopicExists(topics, opts.topic().orElse(""), 
!opts.ifExists());
+
+            if (!topics.isEmpty()) {
+                Map<String, 
KafkaFuture<org.apache.kafka.clients.admin.TopicDescription>> topicsInfo = 
adminClient.describeTopics(topics).topicNameValues();
+                Map<String, NewPartitions> newPartitions = topics.stream()
+                    .map(topicName -> topicNewPartitions(topic, topicsInfo, 
topicName))
+                    .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+                adminClient.createPartitions(newPartitions, new 
CreatePartitionsOptions().retryOnQuotaViolation(false)).all().get();
+            }
+        }
+
+        private AbstractMap.SimpleEntry<String, NewPartitions> 
topicNewPartitions(
+            CommandTopicPartition topic,
+            Map<String, 
KafkaFuture<org.apache.kafka.clients.admin.TopicDescription>> topicsInfo,
+            String topicName) {
+            if (topic.hasReplicaAssignment()) {
+                try {
+                    Integer startPartitionId = 
topicsInfo.get(topicName).get().partitions().size();
+                    Map<Integer, List<Integer>> replicaMap = 
topic.replicaAssignment.entrySet().stream()
+                        .skip(startPartitionId)
+                        .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+                    List<List<Integer>> newAssignment = 
replicaMap.entrySet().stream()
+                        .map(Map.Entry::getValue)
+                        .collect(Collectors.toList());
+                    return new AbstractMap.SimpleEntry<>(topicName, 
NewPartitions.increaseTo(topic.partitions.get(), newAssignment));
+                } catch (InterruptedException | ExecutionException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+            return new AbstractMap.SimpleEntry<>(topicName, 
NewPartitions.increaseTo(topic.partitions.get()));
+        }
+
+        public Map<TopicPartition, PartitionReassignment> 
listAllReassignments(Set<TopicPartition> topicPartitions) {
+            try {
+                return 
adminClient.listPartitionReassignments(topicPartitions).reassignments().get();
+            } catch (ExecutionException e) {
+                Throwable cause = e.getCause();
+                if (cause instanceof UnsupportedVersionException || cause 
instanceof ClusterAuthorizationException) {
+                    log.debug("Couldn't query reassignments through the 
AdminClient API: " + cause.getMessage(), cause);
+                    return Collections.emptyMap();
+                } else {
+                    throw new RuntimeException(e);
+                }
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        public void describeTopic(TopicCommandOptions opts) throws 
ExecutionException, InterruptedException {
+            // If topicId is provided and not zero, will use topicId 
regardless of topic name
+            Optional<Uuid> inputTopicId = opts.topicId()
+                .map(Uuid::fromString).filter(uuid -> uuid != Uuid.ZERO_UUID);
+            Boolean useTopicId = inputTopicId.isPresent();
+
+            List<Uuid> topicIds;
+            List<String> topics;
+            if (useTopicId) {
+                topicIds = getTopicIds(inputTopicId.get(), 
opts.excludeInternalTopics());
+                topics = Collections.emptyList();
+            } else {
+                topicIds = Collections.emptyList();
+                topics = getTopics(opts.topic(), opts.excludeInternalTopics());
+
+            }
+
+            // Only check topic name when topicId is not provided
+            if (useTopicId) {
+                ensureTopicIdExists(topicIds, inputTopicId.get(), 
!opts.ifExists());
+            } else {
+                ensureTopicExists(topics, opts.topic().orElse(""), 
!opts.ifExists());
+            }
+            List<org.apache.kafka.clients.admin.TopicDescription> 
topicDescriptions = new ArrayList<>();
+
+            if (!topicIds.isEmpty()) {
+                Map<Uuid, org.apache.kafka.clients.admin.TopicDescription> 
descTopics =
+                    
adminClient.describeTopics(TopicCollection.ofTopicIds(topicIds)).allTopicIds().get();
+                topicDescriptions = new ArrayList<>(descTopics.values());
+            }
+
+            if (!topics.isEmpty()) {
+                Map<String, org.apache.kafka.clients.admin.TopicDescription> 
descTopics =
+                    
adminClient.describeTopics(TopicCollection.ofTopicNames(topics)).allTopicNames().get();
+                topicDescriptions = new ArrayList<>(descTopics.values());
+            }
+
+            List<String> topicNames = topicDescriptions.stream()
+                .map(org.apache.kafka.clients.admin.TopicDescription::name)
+                .collect(Collectors.toList());
+            Map<ConfigResource, KafkaFuture<Config>> allConfigs = 
adminClient.describeConfigs(
+                topicNames.stream()
+                    .map(name -> new ConfigResource(ConfigResource.Type.TOPIC, 
name))
+                    .collect(Collectors.toList())
+            ).values();
+            List<Integer> liveBrokers = 
adminClient.describeCluster().nodes().get().stream()
+                .map(Node::id)
+                .collect(Collectors.toList());
+            DescribeOptions describeOptions = new DescribeOptions(opts, new 
HashSet<>(liveBrokers));
+            Set<TopicPartition> topicPartitions = topicDescriptions
+                .stream()
+                .flatMap(td -> td.partitions().stream()
+                    .map(p -> new TopicPartition(td.name(), p.partition())))
+                .collect(Collectors.toSet());
+            Map<TopicPartition, PartitionReassignment> reassignments = 
listAllReassignments(topicPartitions);
+            for (org.apache.kafka.clients.admin.TopicDescription td : 
topicDescriptions) {
+                String topicName = td.name();
+                Uuid topicId = td.topicId();
+                Config config = allConfigs.get(new 
ConfigResource(ConfigResource.Type.TOPIC, topicName)).get();
+                ArrayList<TopicPartitionInfo> sortedPartitions = new 
ArrayList<>(td.partitions());
+                
sortedPartitions.sort(Comparator.comparingInt(TopicPartitionInfo::partition));
+                printDescribeConfig(opts, describeOptions, reassignments, td, 
topicName, topicId, config, sortedPartitions);
+                printPartitionDescription(describeOptions, reassignments, td, 
topicName, config, sortedPartitions);
+            }
+        }
+
+        private void printPartitionDescription(DescribeOptions 
describeOptions, Map<TopicPartition, PartitionReassignment> reassignments, 
org.apache.kafka.clients.admin.TopicDescription td, String topicName, Config 
config, ArrayList<TopicPartitionInfo> sortedPartitions) {
+            if (describeOptions.describePartitions) {
+                for (TopicPartitionInfo partition : sortedPartitions) {
+                    PartitionReassignment reassignment =
+                        reassignments.get(new TopicPartition(td.name(), 
partition.partition()));
+                    PartitionDescription partitionDesc = new 
PartitionDescription(topicName,
+                        partition, config, false, reassignment);
+                    
describeOptions.maybePrintPartitionDescription(partitionDesc);
+                }
+            }
+        }
+
+        private void printDescribeConfig(TopicCommandOptions opts, 
DescribeOptions describeOptions, Map<TopicPartition, PartitionReassignment> 
reassignments, org.apache.kafka.clients.admin.TopicDescription td, String 
topicName, Uuid topicId, Config config, ArrayList<TopicPartitionInfo> 
sortedPartitions) {
+            if (describeOptions.describeConfigs) {
+                List<ConfigEntry> entries = new ArrayList<>(config.entries());
+                boolean hasNonDefault = entries.stream().anyMatch(e -> 
!e.isDefault());
+                if (!opts.reportOverriddenConfigs() || hasNonDefault) {
+                    int numPartitions = td.partitions().size();
+                    TopicPartitionInfo firstPartition = 
sortedPartitions.get(0);
+                    PartitionReassignment reassignment =
+                        reassignments.get(new TopicPartition(td.name(), 
firstPartition.partition()));
+                    TopicDescription topicDesc = new 
TopicDescription(topicName, topicId,
+                        numPartitions, getReplicationFactor(firstPartition, 
reassignment),
+                        config, false);
+                    topicDesc.printDescription();
+                }
+            }
+        }
+
+        public void deleteTopic(TopicCommandOptions opts) throws 
ExecutionException, InterruptedException {
+            List<String> topics = getTopics(opts.topic(), 
opts.excludeInternalTopics());
+            ensureTopicExists(topics, opts.topic().orElse(""), 
!opts.ifExists());
+            adminClient.deleteTopics(Collections.unmodifiableList(topics),
+                new DeleteTopicsOptions().retryOnQuotaViolation(false)
+            ).all().get();
+        }
+
+        public List<String> getTopics(Optional<String> topicIncludeList, 
boolean excludeInternalTopics) throws ExecutionException, InterruptedException {
+            ListTopicsOptions listTopicsOptions = new ListTopicsOptions();
+            if (!excludeInternalTopics) {
+                listTopicsOptions.listInternal(true);
+            }
+
+            Set<String> allTopics = 
adminClient.listTopics(listTopicsOptions).names().get();
+            return 
doGetTopics(allTopics.stream().sorted().collect(Collectors.toList()), 
topicIncludeList, excludeInternalTopics);
+        }
+
+        public List<Uuid> getTopicIds(Uuid topicIdIncludeList, boolean 
excludeInternalTopics) throws ExecutionException, InterruptedException {
+            ListTopicsResult allTopics = excludeInternalTopics ? 
adminClient.listTopics() :
+                adminClient.listTopics(new 
ListTopicsOptions().listInternal(true));
+            List<Uuid> allTopicIds = null;
+            allTopicIds = allTopics.listings().get().stream()
+                .map(TopicListing::topicId)
+                .sorted()
+                .collect(Collectors.toList());
+            return allTopicIds.contains(topicIdIncludeList) ?
+                Collections.singletonList(topicIdIncludeList) :
+                Collections.emptyList();
+        }
+
+        @Override
+        public void close() throws Exception {
+            adminClient.close();
+        }
+    }
+
+    public static class TopicCommandOptions extends CommandDefaultOptions {
+        private final ArgumentAcceptingOptionSpec<String> bootstrapServerOpt;
+
+        private final ArgumentAcceptingOptionSpec<String> commandConfigOpt;
+
+        private final OptionSpecBuilder listOpt;
+
+        private final OptionSpecBuilder createOpt;
+
+        private final OptionSpecBuilder deleteOpt;
+
+        private final OptionSpecBuilder alterOpt;
+
+        private final OptionSpecBuilder describeOpt;
+
+        private final ArgumentAcceptingOptionSpec<String> topicOpt;
+
+        private final ArgumentAcceptingOptionSpec<String> topicIdOpt;
+
+        private final String nl;
+
+        private static final String 
KAFKA_CONFIGS_CLI_SUPPORTS_ALTERING_TOPIC_CONFIGS_WITH_A_BOOTSTRAP_SERVER =
+                " (the kafka-configs CLI supports altering topic configs with 
a --bootstrap-server option)";
+
+        private final ArgumentAcceptingOptionSpec<String> configOpt;
+
+        private final ArgumentAcceptingOptionSpec<String> deleteConfigOpt;
+
+        private final ArgumentAcceptingOptionSpec<Integer> partitionsOpt;
+
+        private final ArgumentAcceptingOptionSpec<Integer> 
replicationFactorOpt;
+
+        private final ArgumentAcceptingOptionSpec<String> replicaAssignmentOpt;
+
+        private final OptionSpecBuilder reportUnderReplicatedPartitionsOpt;
+
+        private final OptionSpecBuilder reportUnavailablePartitionsOpt;
+
+        private final OptionSpecBuilder reportUnderMinIsrPartitionsOpt;
+
+        private final OptionSpecBuilder reportAtMinIsrPartitionsOpt;
+
+        private final OptionSpecBuilder topicsWithOverridesOpt;
+
+        private final OptionSpecBuilder ifExistsOpt;
+
+        private final OptionSpecBuilder ifNotExistsOpt;
+
+        private final OptionSpecBuilder excludeInternalTopicOpt;
+
+        private final Set<OptionSpec<?>> allTopicLevelOpts;
+
+        private final Set<OptionSpecBuilder> allReplicationReportOpts;
+
+        public TopicCommandOptions(String[] args) {
+            super(args);
+            bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED: 
The Kafka server to connect to.")
+                .withRequiredArg()
+                .describedAs("server to connect to")
+                .ofType(String.class);
+            commandConfigOpt = parser.accepts("command-config", "Property file 
containing configs to be passed to Admin Client. " +
+                    "This is used only with --bootstrap-server option for 
describing and altering broker configs.")
+                .withRequiredArg()
+                .describedAs("command config property file")
+                .ofType(String.class);
+
+
+            listOpt = parser.accepts("list", "List all available topics.");
+            createOpt = parser.accepts("create", "Create a new topic.");
+            deleteOpt = parser.accepts("delete", "Delete a topic");
+            alterOpt = parser.accepts("alter", "Alter the number of 
partitions, replica assignment, and/or configuration for the topic.");
+            describeOpt = parser.accepts("describe", "List details for the 
given topics.");
+            topicOpt = parser.accepts("topic", "Alter the number of partitions 
and replica assignment. " +
+                            "Update the configuration of an existing topic via 
--alter is no longer supported here" +

Review Comment:
   This was a mistake during fixing a conflict with trunk



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to