OmniaGM commented on code in PR #13201:
URL: https://github.com/apache/kafka/pull/13201#discussion_r1297475863


##########
tools/src/main/java/org/apache/kafka/tools/TopicCommand.java:
##########
@@ -0,0 +1,1019 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import joptsimple.ArgumentAcceptingOptionSpec;
+import joptsimple.OptionSpec;
+import joptsimple.OptionSpecBuilder;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.Config;
+import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.CreatePartitionsOptions;
+import org.apache.kafka.clients.admin.CreateTopicsOptions;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.DeleteTopicsOptions;
+import org.apache.kafka.clients.admin.ListTopicsOptions;
+import org.apache.kafka.clients.admin.ListTopicsResult;
+import org.apache.kafka.clients.admin.NewPartitions;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.PartitionReassignment;
+import org.apache.kafka.clients.admin.TopicListing;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicCollection;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.errors.ClusterAuthorizationException;
+import org.apache.kafka.common.errors.TopicExistsException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.AdminCommandFailedException;
+import org.apache.kafka.server.common.AdminOperationException;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.TopicFilter.IncludeList;
+import org.apache.kafka.storage.internals.log.LogConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public abstract class TopicCommand {
+    private static final Logger log = 
LoggerFactory.getLogger(TopicCommand.class);
+
+    public static void main(String[] args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    private static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println(e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    static void execute(String... args) throws Exception {
+        TopicCommandOptions opts = new TopicCommandOptions(args);
+        opts.checkArgs();
+        TopicService topicService = new TopicService(opts.commandConfig(), 
opts.bootstrapServer());
+        int exitCode = 0;
+        try {
+            if (opts.hasCreateOption()) {
+                topicService.createTopic(opts);
+            } else if (opts.hasAlterOption()) {
+                topicService.alterTopic(opts);
+            } else if (opts.hasListOption()) {
+                topicService.listTopics(opts);
+            } else if (opts.hasDescribeOption()) {
+                topicService.describeTopic(opts);
+            } else if (opts.hasDeleteOption()) {
+                topicService.deleteTopic(opts);
+            }
+        } catch (ExecutionException e) {
+            Throwable cause = e.getCause();
+            if (cause != null) {
+                printException(cause);
+            } else {
+                printException(e);
+            }
+            exitCode = 1;
+        } catch (Throwable e) {
+            printException(e);
+            exitCode = 1;
+        } finally {
+            topicService.close();
+            Exit.exit(exitCode);
+        }
+
+    }
+
+    private static void printException(Throwable e) {
+        System.out.println("Error while executing topic command : " + 
e.getMessage());
+        log.error(Utils.stackTrace(e));
+    }
+
+    @SuppressWarnings("deprecation")
+    static Map<Integer, List<Integer>> parseReplicaAssignment(String 
replicaAssignmentList) {
+        String[] partitionList = replicaAssignmentList.split(",");
+        Map<Integer, List<Integer>> ret = new LinkedHashMap<Integer, 
List<Integer>>();
+        for (int i = 0; i < partitionList.length; i++) {
+            List<Integer> brokerList = 
Arrays.stream(partitionList[i].split(":"))
+                .map(String::trim)
+                .mapToInt(Integer::parseInt)
+                .boxed()
+                .collect(Collectors.toList());
+            Collection<Integer> duplicateBrokers = 
ToolsUtils.duplicates(brokerList);
+            if (!duplicateBrokers.isEmpty()) {
+                throw new AdminCommandFailedException("Partition replica lists 
may not contain duplicate entries: " +
+                    duplicateBrokers.stream()
+                        .map(Object::toString)
+                        .collect(Collectors.joining(","))
+                );
+            }
+            ret.put(i, brokerList);
+            if (ret.get(i).size() != ret.get(0).size()) {
+                throw new AdminOperationException("Partition " + i + " has 
different replication factor: " + brokerList);
+            }
+        }
+        return ret;
+    }
+
+    @SuppressWarnings("deprecation")
+    private static Properties parseTopicConfigsToBeAdded(TopicCommandOptions 
opts) {
+        List<List<String>> configsToBeAdded = 
opts.topicConfig().orElse(Collections.emptyList())
+            .stream()
+            .map(s -> Arrays.asList(s.split("\\s*=\\s*")))
+            .collect(Collectors.toList());
+
+        if (!configsToBeAdded.stream().allMatch(config -> config.size() == 2)) 
{
+            throw new IllegalArgumentException("requirement failed: Invalid 
topic config: all configs to be added must be in the format \"key=val\".");
+        }
+
+        Properties props = new Properties();
+        configsToBeAdded.stream()
+            .forEach(pair -> props.setProperty(pair.get(0).trim(), 
pair.get(1).trim()));
+        LogConfig.validate(props);
+        if (props.containsKey(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG)) {
+            System.out.println("WARNING: The configuration 
${TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG}=${props.getProperty(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG)}
 is specified. " +
+                "This configuration will be ignored if the version is newer 
than the inter.broker.protocol.version specified in the broker or " +
+                "if the inter.broker.protocol.version is 3.0 or newer. This 
configuration is deprecated and it will be removed in Apache Kafka 4.0.");
+        }
+        return props;
+    }
+
+    // It is possible for a reassignment to complete between the time we have 
fetched its state and the time
+    // we fetch partition metadata. In this case, we ignore the reassignment 
when determining replication factor.
+    public static boolean isReassignmentInProgress(TopicPartitionInfo tpi, 
PartitionReassignment ra) {
+        // Reassignment is still in progress as long as the removing and 
adding replicas are still present
+        Set<Integer> allReplicaIds = 
tpi.replicas().stream().map(Node::id).collect(Collectors.toSet());
+        Set<Integer> changingReplicaIds = new HashSet<>();
+        if (ra != null) {
+            changingReplicaIds.addAll(ra.removingReplicas());
+            changingReplicaIds.addAll(ra.addingReplicas());
+        }
+        return allReplicaIds.stream().anyMatch(changingReplicaIds::contains);
+
+    }
+
+    private static Integer getReplicationFactor(TopicPartitionInfo tpi, 
PartitionReassignment reassignment) {
+        return isReassignmentInProgress(tpi, reassignment) ?
+            reassignment.replicas().size() - 
reassignment.addingReplicas().size() :
+            tpi.replicas().size();
+    }
+
+    /**
+     * ensures topic existence and throws exception if topic doesn't exist
+     *
+     * @param foundTopics        Topics that were found to match the requested 
topic name.
+     * @param requestedTopic     Name of the topic that was requested.
+     * @param requireTopicExists Indicates if the topic needs to exist for the 
operation to be successful.
+     *                           If set to true, the command will throw an 
exception if the topic with the
+     *                           requested name does not exist.
+     */
+    private static void ensureTopicExists(List<String> foundTopics, String 
requestedTopic, Boolean requireTopicExists) {
+        // If no topic name was mentioned, do not need to throw exception.
+        if (!(requestedTopic.isEmpty() || 
!Optional.ofNullable(requestedTopic).isPresent()) && requireTopicExists && 
foundTopics.isEmpty()) {
+            // If given topic doesn't exist then throw exception
+            throw new IllegalArgumentException(String.format("Topic '%s' does 
not exist as expected", requestedTopic));
+        }
+    }
+
+    private static List<String> doGetTopics(List<String> allTopics, 
Optional<String> topicIncludeList, Boolean excludeInternalTopics) {
+        if (topicIncludeList.isPresent()) {
+            IncludeList topicsFilter = new IncludeList(topicIncludeList.get());
+            return allTopics.stream()
+                .filter(topic -> topicsFilter.isTopicAllowed(topic, 
excludeInternalTopics))
+                .collect(Collectors.toList());
+        } else {
+            return allTopics.stream()
+                .filter(topic -> !(Topic.isInternal(topic) && 
excludeInternalTopics))
+                .collect(Collectors.toList());
+        }
+    }
+
+    /**
+     * ensures topic existence and throws exception if topic doesn't exist
+     *
+     * @param foundTopicIds        Topics that were found to match the 
requested topic id.
+     * @param requestedTopicId     Id of the topic that was requested.
+     * @param requireTopicIdExists Indicates if the topic needs to exist for 
the operation to be successful.
+     *                             If set to true, the command will throw an 
exception if the topic with the
+     *                             requested id does not exist.
+     */
+    private static void ensureTopicIdExists(List<Uuid> foundTopicIds, Uuid 
requestedTopicId, Boolean requireTopicIdExists) {
+        // If no topic id was mentioned, do not need to throw exception.
+        if (requestedTopicId != null && requireTopicIdExists && 
foundTopicIds.isEmpty()) {
+            // If given topicId doesn't exist then throw exception
+            throw new IllegalArgumentException(String.format("TopicId '%s' 
does not exist as expected", requestedTopicId));
+        }
+    }
+
+    static class CommandTopicPartition {
+        private final TopicCommandOptions opts;
+        private final Optional<String> name;
+        private final Optional<Integer> replicationFactor;
+        private final Optional<Integer> partitions;
+        private final Map<Integer, List<Integer>> replicaAssignment;
+        private final Properties configsToAdd;
+
+        public CommandTopicPartition(TopicCommandOptions options) {
+            opts = options;
+            name = options.topic();
+            partitions = options.partitions();
+            replicationFactor = options.replicationFactor();
+            replicaAssignment = 
options.replicaAssignment().orElse(Collections.emptyMap());
+            configsToAdd = parseTopicConfigsToBeAdded(options);
+        }
+
+        public Boolean hasReplicaAssignment() {
+            return !replicaAssignment.isEmpty();
+        }
+
+        public Boolean hasPartitions() {
+            return partitions.isPresent();
+        }
+
+        public Boolean ifTopicDoesntExist() {
+            return opts.ifNotExists();
+        }
+    }
+
+    static class TopicDescription {
+        private final String topic;
+        private final Uuid topicId;
+        private final Integer numPartitions;
+        private final Integer replicationFactor;
+        private final Config config;
+        private final Boolean markedForDeletion;
+
+        public TopicDescription(String topic, Uuid topicId, Integer 
numPartitions, Integer replicationFactor, Config config, Boolean 
markedForDeletion) {
+            this.topic = topic;
+            this.topicId = topicId;
+            this.numPartitions = numPartitions;
+            this.replicationFactor = replicationFactor;
+            this.config = config;
+            this.markedForDeletion = markedForDeletion;
+        }
+
+        public void printDescription() {
+            String configsAsString = config.entries().stream()
+                .filter(config -> !config.isDefault())
+                .map(ce -> String.format("%s=%s", ce.name(), ce.value()))
+                .collect(Collectors.joining(","));
+            System.out.print(String.format("Topic: %s", topic));
+            if (topicId != Uuid.ZERO_UUID)
+                System.out.print(String.format("\tTopicId: %s", topicId));
+            System.out.print(String.format("\tPartitionCount: %s", 
numPartitions));
+            System.out.print(String.format("\tReplicationFactor: %s", 
replicationFactor));
+            System.out.print(String.format("\tConfigs: %s", configsAsString));
+            System.out.print(markedForDeletion ? "\tMarkedForDeletion: true" : 
"");
+            System.out.println();
+        }
+    }
+
+    static class PartitionDescription {
+        private final String topic;
+        private final TopicPartitionInfo info;
+        private final Config config;
+        private final Boolean markedForDeletion;
+        private final PartitionReassignment reassignment;
+
+        PartitionDescription(String topic,
+                             TopicPartitionInfo info,
+                             Config config,
+                             Boolean markedForDeletion,
+                             PartitionReassignment reassignment) {
+            this.topic = topic;
+            this.info = info;
+            this.config = config;
+            this.markedForDeletion = markedForDeletion;
+            this.reassignment = reassignment;
+        }
+
+        public Integer minIsrCount() {
+            return 
Integer.parseInt(config.get(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG).value());
+        }
+
+        public Boolean isUnderReplicated() {
+            return getReplicationFactor(info, reassignment) - 
info.isr().size() > 0;
+        }
+
+        public boolean hasLeader() {
+            return info.leader() != null;
+        }
+
+        public Boolean isUnderMinIsr() {
+            return !hasLeader() ||  info.isr().size() < minIsrCount();
+        }
+
+        public Boolean isAtMinIsrPartitions() {
+            return minIsrCount() == info.isr().size();
+
+        }
+
+        public Boolean hasUnavailablePartitions(Set<Integer> liveBrokers) {
+            return !hasLeader() || !liveBrokers.contains(info.leader().id());
+        }
+
+        public void printDescription() {
+            System.out.print("\tTopic: " + topic);
+            System.out.print("\tPartition: " + info.partition());
+            System.out.print("\tLeader: " + (hasLeader() ? info.leader().id() 
: "none"));
+            System.out.print("\tReplicas: " + info.replicas().stream()
+                .map(node -> Integer.toString(node.id()))
+                .collect(Collectors.joining(",")));
+            System.out.print("\tIsr: " + info.isr().stream()
+                .map(node -> Integer.toString(node.id()))
+                .collect(Collectors.joining(",")));
+            if (reassignment != null) {
+                System.out.print("\tAdding Replicas: " + 
reassignment.addingReplicas().stream()
+                    .map(node -> node.toString())
+                    .collect(Collectors.joining(",")));
+                System.out.print("\tRemoving Replicas: " + 
reassignment.removingReplicas().stream()
+                    .map(node -> node.toString())
+                    .collect(Collectors.joining(",")));
+            }
+            System.out.print(markedForDeletion ? "\tMarkedForDeletion: true" : 
"");
+            System.out.println();
+        }
+
+    }
+
+    static class DescribeOptions {
+        private final TopicCommandOptions opts;
+        private final Set<Integer> liveBrokers;
+        private final boolean describeConfigs;
+        private final boolean describePartitions;
+
+        public DescribeOptions(TopicCommandOptions opts, Set<Integer> 
liveBrokers) {
+            this.opts = opts;
+            this.liveBrokers = liveBrokers;
+            this.describeConfigs = !opts.reportUnavailablePartitions() &&
+                !opts.reportUnderReplicatedPartitions() &&
+                !opts.reportUnderMinIsrPartitions() &&
+                !opts.reportAtMinIsrPartitions();
+            this.describePartitions = !opts.reportOverriddenConfigs();
+        }
+
+        private boolean 
shouldPrintUnderReplicatedPartitions(PartitionDescription partitionDescription) 
{
+            return opts.reportUnderReplicatedPartitions() && 
partitionDescription.isUnderReplicated();
+        }
+
+        private boolean shouldPrintUnavailablePartitions(PartitionDescription 
partitionDescription) {
+            return opts.reportUnavailablePartitions() && 
partitionDescription.hasUnavailablePartitions(liveBrokers);
+        }
+
+        private boolean shouldPrintUnderMinIsrPartitions(PartitionDescription 
partitionDescription) {
+            return opts.reportUnderMinIsrPartitions() && 
partitionDescription.isUnderMinIsr();
+        }
+
+        private boolean shouldPrintAtMinIsrPartitions(PartitionDescription 
partitionDescription) {
+            return opts.reportAtMinIsrPartitions() && 
partitionDescription.isAtMinIsrPartitions();
+        }
+
+        private boolean shouldPrintTopicPartition(PartitionDescription 
partitionDesc) {
+            return describeConfigs ||
+                shouldPrintUnderReplicatedPartitions(partitionDesc) ||
+                shouldPrintUnavailablePartitions(partitionDesc) ||
+                shouldPrintUnderMinIsrPartitions(partitionDesc) ||
+                shouldPrintAtMinIsrPartitions(partitionDesc);
+        }
+
+        public void maybePrintPartitionDescription(PartitionDescription desc) {
+            if (shouldPrintTopicPartition(desc)) {
+                desc.printDescription();
+            }
+        }
+    }
+
+    public static class TopicService implements AutoCloseable {
+        private Admin adminClient;
+
+        public TopicService(Properties commandConfig, Optional<String> 
bootstrapServer) {
+            this.adminClient = createAdminClient(commandConfig, 
bootstrapServer);
+        }
+
+        public TopicService(Admin admin) {
+            this.adminClient = admin;
+        }
+
+        private static Admin createAdminClient(Properties commandConfig, 
Optional<String> bootstrapServer) {
+            if (bootstrapServer.isPresent()) {
+                commandConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServer);

Review Comment:
   Good catch. I updated the integration test to use --bootstrap-server. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to