OmniaGM commented on code in PR #13201: URL: https://github.com/apache/kafka/pull/13201#discussion_r1297475863
########## tools/src/main/java/org/apache/kafka/tools/TopicCommand.java: ########## @@ -0,0 +1,1019 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.tools; + +import joptsimple.ArgumentAcceptingOptionSpec; +import joptsimple.OptionSpec; +import joptsimple.OptionSpecBuilder; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.Config; +import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.clients.admin.CreatePartitionsOptions; +import org.apache.kafka.clients.admin.CreateTopicsOptions; +import org.apache.kafka.clients.admin.CreateTopicsResult; +import org.apache.kafka.clients.admin.DeleteTopicsOptions; +import org.apache.kafka.clients.admin.ListTopicsOptions; +import org.apache.kafka.clients.admin.ListTopicsResult; +import org.apache.kafka.clients.admin.NewPartitions; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.PartitionReassignment; +import org.apache.kafka.clients.admin.TopicListing; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicCollection; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.TopicPartitionInfo; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.errors.ClusterAuthorizationException; +import org.apache.kafka.common.errors.TopicExistsException; +import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.common.AdminCommandFailedException; +import org.apache.kafka.server.common.AdminOperationException; +import org.apache.kafka.server.util.CommandDefaultOptions; +import org.apache.kafka.server.util.CommandLineUtils; +import org.apache.kafka.server.util.TopicFilter.IncludeList; +import org.apache.kafka.storage.internals.log.LogConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public abstract class TopicCommand { + private static final Logger log = LoggerFactory.getLogger(TopicCommand.class); + + public static void main(String[] args) { + Exit.exit(mainNoExit(args)); + } + + private static int mainNoExit(String... args) { + try { + execute(args); + return 0; + } catch (TerseException e) { + System.err.println(e.getMessage()); + return 1; + } catch (Throwable e) { + System.err.println(e.getMessage()); + System.err.println(Utils.stackTrace(e)); + return 1; + } + } + + static void execute(String... args) throws Exception { + TopicCommandOptions opts = new TopicCommandOptions(args); + opts.checkArgs(); + TopicService topicService = new TopicService(opts.commandConfig(), opts.bootstrapServer()); + int exitCode = 0; + try { + if (opts.hasCreateOption()) { + topicService.createTopic(opts); + } else if (opts.hasAlterOption()) { + topicService.alterTopic(opts); + } else if (opts.hasListOption()) { + topicService.listTopics(opts); + } else if (opts.hasDescribeOption()) { + topicService.describeTopic(opts); + } else if (opts.hasDeleteOption()) { + topicService.deleteTopic(opts); + } + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + if (cause != null) { + printException(cause); + } else { + printException(e); + } + exitCode = 1; + } catch (Throwable e) { + printException(e); + exitCode = 1; + } finally { + topicService.close(); + Exit.exit(exitCode); + } + + } + + private static void printException(Throwable e) { + System.out.println("Error while executing topic command : " + e.getMessage()); + log.error(Utils.stackTrace(e)); + } + + @SuppressWarnings("deprecation") + static Map<Integer, List<Integer>> parseReplicaAssignment(String replicaAssignmentList) { + String[] partitionList = replicaAssignmentList.split(","); + Map<Integer, List<Integer>> ret = new LinkedHashMap<Integer, List<Integer>>(); + for (int i = 0; i < partitionList.length; i++) { + List<Integer> brokerList = Arrays.stream(partitionList[i].split(":")) + .map(String::trim) + .mapToInt(Integer::parseInt) + .boxed() + .collect(Collectors.toList()); + Collection<Integer> duplicateBrokers = ToolsUtils.duplicates(brokerList); + if (!duplicateBrokers.isEmpty()) { + throw new AdminCommandFailedException("Partition replica lists may not contain duplicate entries: " + + duplicateBrokers.stream() + .map(Object::toString) + .collect(Collectors.joining(",")) + ); + } + ret.put(i, brokerList); + if (ret.get(i).size() != ret.get(0).size()) { + throw new AdminOperationException("Partition " + i + " has different replication factor: " + brokerList); + } + } + return ret; + } + + @SuppressWarnings("deprecation") + private static Properties parseTopicConfigsToBeAdded(TopicCommandOptions opts) { + List<List<String>> configsToBeAdded = opts.topicConfig().orElse(Collections.emptyList()) + .stream() + .map(s -> Arrays.asList(s.split("\\s*=\\s*"))) + .collect(Collectors.toList()); + + if (!configsToBeAdded.stream().allMatch(config -> config.size() == 2)) { + throw new IllegalArgumentException("requirement failed: Invalid topic config: all configs to be added must be in the format \"key=val\"."); + } + + Properties props = new Properties(); + configsToBeAdded.stream() + .forEach(pair -> props.setProperty(pair.get(0).trim(), pair.get(1).trim())); + LogConfig.validate(props); + if (props.containsKey(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG)) { + System.out.println("WARNING: The configuration ${TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG}=${props.getProperty(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG)} is specified. " + + "This configuration will be ignored if the version is newer than the inter.broker.protocol.version specified in the broker or " + + "if the inter.broker.protocol.version is 3.0 or newer. This configuration is deprecated and it will be removed in Apache Kafka 4.0."); + } + return props; + } + + // It is possible for a reassignment to complete between the time we have fetched its state and the time + // we fetch partition metadata. In this case, we ignore the reassignment when determining replication factor. + public static boolean isReassignmentInProgress(TopicPartitionInfo tpi, PartitionReassignment ra) { + // Reassignment is still in progress as long as the removing and adding replicas are still present + Set<Integer> allReplicaIds = tpi.replicas().stream().map(Node::id).collect(Collectors.toSet()); + Set<Integer> changingReplicaIds = new HashSet<>(); + if (ra != null) { + changingReplicaIds.addAll(ra.removingReplicas()); + changingReplicaIds.addAll(ra.addingReplicas()); + } + return allReplicaIds.stream().anyMatch(changingReplicaIds::contains); + + } + + private static Integer getReplicationFactor(TopicPartitionInfo tpi, PartitionReassignment reassignment) { + return isReassignmentInProgress(tpi, reassignment) ? + reassignment.replicas().size() - reassignment.addingReplicas().size() : + tpi.replicas().size(); + } + + /** + * ensures topic existence and throws exception if topic doesn't exist + * + * @param foundTopics Topics that were found to match the requested topic name. + * @param requestedTopic Name of the topic that was requested. + * @param requireTopicExists Indicates if the topic needs to exist for the operation to be successful. + * If set to true, the command will throw an exception if the topic with the + * requested name does not exist. + */ + private static void ensureTopicExists(List<String> foundTopics, String requestedTopic, Boolean requireTopicExists) { + // If no topic name was mentioned, do not need to throw exception. + if (!(requestedTopic.isEmpty() || !Optional.ofNullable(requestedTopic).isPresent()) && requireTopicExists && foundTopics.isEmpty()) { + // If given topic doesn't exist then throw exception + throw new IllegalArgumentException(String.format("Topic '%s' does not exist as expected", requestedTopic)); + } + } + + private static List<String> doGetTopics(List<String> allTopics, Optional<String> topicIncludeList, Boolean excludeInternalTopics) { + if (topicIncludeList.isPresent()) { + IncludeList topicsFilter = new IncludeList(topicIncludeList.get()); + return allTopics.stream() + .filter(topic -> topicsFilter.isTopicAllowed(topic, excludeInternalTopics)) + .collect(Collectors.toList()); + } else { + return allTopics.stream() + .filter(topic -> !(Topic.isInternal(topic) && excludeInternalTopics)) + .collect(Collectors.toList()); + } + } + + /** + * ensures topic existence and throws exception if topic doesn't exist + * + * @param foundTopicIds Topics that were found to match the requested topic id. + * @param requestedTopicId Id of the topic that was requested. + * @param requireTopicIdExists Indicates if the topic needs to exist for the operation to be successful. + * If set to true, the command will throw an exception if the topic with the + * requested id does not exist. + */ + private static void ensureTopicIdExists(List<Uuid> foundTopicIds, Uuid requestedTopicId, Boolean requireTopicIdExists) { + // If no topic id was mentioned, do not need to throw exception. + if (requestedTopicId != null && requireTopicIdExists && foundTopicIds.isEmpty()) { + // If given topicId doesn't exist then throw exception + throw new IllegalArgumentException(String.format("TopicId '%s' does not exist as expected", requestedTopicId)); + } + } + + static class CommandTopicPartition { + private final TopicCommandOptions opts; + private final Optional<String> name; + private final Optional<Integer> replicationFactor; + private final Optional<Integer> partitions; + private final Map<Integer, List<Integer>> replicaAssignment; + private final Properties configsToAdd; + + public CommandTopicPartition(TopicCommandOptions options) { + opts = options; + name = options.topic(); + partitions = options.partitions(); + replicationFactor = options.replicationFactor(); + replicaAssignment = options.replicaAssignment().orElse(Collections.emptyMap()); + configsToAdd = parseTopicConfigsToBeAdded(options); + } + + public Boolean hasReplicaAssignment() { + return !replicaAssignment.isEmpty(); + } + + public Boolean hasPartitions() { + return partitions.isPresent(); + } + + public Boolean ifTopicDoesntExist() { + return opts.ifNotExists(); + } + } + + static class TopicDescription { + private final String topic; + private final Uuid topicId; + private final Integer numPartitions; + private final Integer replicationFactor; + private final Config config; + private final Boolean markedForDeletion; + + public TopicDescription(String topic, Uuid topicId, Integer numPartitions, Integer replicationFactor, Config config, Boolean markedForDeletion) { + this.topic = topic; + this.topicId = topicId; + this.numPartitions = numPartitions; + this.replicationFactor = replicationFactor; + this.config = config; + this.markedForDeletion = markedForDeletion; + } + + public void printDescription() { + String configsAsString = config.entries().stream() + .filter(config -> !config.isDefault()) + .map(ce -> String.format("%s=%s", ce.name(), ce.value())) + .collect(Collectors.joining(",")); + System.out.print(String.format("Topic: %s", topic)); + if (topicId != Uuid.ZERO_UUID) + System.out.print(String.format("\tTopicId: %s", topicId)); + System.out.print(String.format("\tPartitionCount: %s", numPartitions)); + System.out.print(String.format("\tReplicationFactor: %s", replicationFactor)); + System.out.print(String.format("\tConfigs: %s", configsAsString)); + System.out.print(markedForDeletion ? "\tMarkedForDeletion: true" : ""); + System.out.println(); + } + } + + static class PartitionDescription { + private final String topic; + private final TopicPartitionInfo info; + private final Config config; + private final Boolean markedForDeletion; + private final PartitionReassignment reassignment; + + PartitionDescription(String topic, + TopicPartitionInfo info, + Config config, + Boolean markedForDeletion, + PartitionReassignment reassignment) { + this.topic = topic; + this.info = info; + this.config = config; + this.markedForDeletion = markedForDeletion; + this.reassignment = reassignment; + } + + public Integer minIsrCount() { + return Integer.parseInt(config.get(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG).value()); + } + + public Boolean isUnderReplicated() { + return getReplicationFactor(info, reassignment) - info.isr().size() > 0; + } + + public boolean hasLeader() { + return info.leader() != null; + } + + public Boolean isUnderMinIsr() { + return !hasLeader() || info.isr().size() < minIsrCount(); + } + + public Boolean isAtMinIsrPartitions() { + return minIsrCount() == info.isr().size(); + + } + + public Boolean hasUnavailablePartitions(Set<Integer> liveBrokers) { + return !hasLeader() || !liveBrokers.contains(info.leader().id()); + } + + public void printDescription() { + System.out.print("\tTopic: " + topic); + System.out.print("\tPartition: " + info.partition()); + System.out.print("\tLeader: " + (hasLeader() ? info.leader().id() : "none")); + System.out.print("\tReplicas: " + info.replicas().stream() + .map(node -> Integer.toString(node.id())) + .collect(Collectors.joining(","))); + System.out.print("\tIsr: " + info.isr().stream() + .map(node -> Integer.toString(node.id())) + .collect(Collectors.joining(","))); + if (reassignment != null) { + System.out.print("\tAdding Replicas: " + reassignment.addingReplicas().stream() + .map(node -> node.toString()) + .collect(Collectors.joining(","))); + System.out.print("\tRemoving Replicas: " + reassignment.removingReplicas().stream() + .map(node -> node.toString()) + .collect(Collectors.joining(","))); + } + System.out.print(markedForDeletion ? "\tMarkedForDeletion: true" : ""); + System.out.println(); + } + + } + + static class DescribeOptions { + private final TopicCommandOptions opts; + private final Set<Integer> liveBrokers; + private final boolean describeConfigs; + private final boolean describePartitions; + + public DescribeOptions(TopicCommandOptions opts, Set<Integer> liveBrokers) { + this.opts = opts; + this.liveBrokers = liveBrokers; + this.describeConfigs = !opts.reportUnavailablePartitions() && + !opts.reportUnderReplicatedPartitions() && + !opts.reportUnderMinIsrPartitions() && + !opts.reportAtMinIsrPartitions(); + this.describePartitions = !opts.reportOverriddenConfigs(); + } + + private boolean shouldPrintUnderReplicatedPartitions(PartitionDescription partitionDescription) { + return opts.reportUnderReplicatedPartitions() && partitionDescription.isUnderReplicated(); + } + + private boolean shouldPrintUnavailablePartitions(PartitionDescription partitionDescription) { + return opts.reportUnavailablePartitions() && partitionDescription.hasUnavailablePartitions(liveBrokers); + } + + private boolean shouldPrintUnderMinIsrPartitions(PartitionDescription partitionDescription) { + return opts.reportUnderMinIsrPartitions() && partitionDescription.isUnderMinIsr(); + } + + private boolean shouldPrintAtMinIsrPartitions(PartitionDescription partitionDescription) { + return opts.reportAtMinIsrPartitions() && partitionDescription.isAtMinIsrPartitions(); + } + + private boolean shouldPrintTopicPartition(PartitionDescription partitionDesc) { + return describeConfigs || + shouldPrintUnderReplicatedPartitions(partitionDesc) || + shouldPrintUnavailablePartitions(partitionDesc) || + shouldPrintUnderMinIsrPartitions(partitionDesc) || + shouldPrintAtMinIsrPartitions(partitionDesc); + } + + public void maybePrintPartitionDescription(PartitionDescription desc) { + if (shouldPrintTopicPartition(desc)) { + desc.printDescription(); + } + } + } + + public static class TopicService implements AutoCloseable { + private Admin adminClient; + + public TopicService(Properties commandConfig, Optional<String> bootstrapServer) { + this.adminClient = createAdminClient(commandConfig, bootstrapServer); + } + + public TopicService(Admin admin) { + this.adminClient = admin; + } + + private static Admin createAdminClient(Properties commandConfig, Optional<String> bootstrapServer) { + if (bootstrapServer.isPresent()) { + commandConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); Review Comment: Good catch. I updated the integration test to use --bootstrap-server. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org