nizhikov commented on code in PR #13247:
URL: https://github.com/apache/kafka/pull/13247#discussion_r1346115686


##########
tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java:
##########
@@ -0,0 +1,1523 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.reassign;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import joptsimple.OptionSpec;
+import org.apache.kafka.admin.AdminUtils;
+import org.apache.kafka.admin.BrokerMetadata;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.AlterConfigOp;
+import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult;
+import org.apache.kafka.clients.admin.NewPartitionReassignment;
+import org.apache.kafka.clients.admin.PartitionReassignment;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionReplica;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.ReplicaNotAvailableException;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.AdminCommandFailedException;
+import org.apache.kafka.server.common.AdminOperationException;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.Json;
+import org.apache.kafka.server.util.json.DecodeJson;
+import org.apache.kafka.server.util.json.JsonObject;
+import org.apache.kafka.server.util.json.JsonValue;
+import org.apache.kafka.tools.TerseException;
+import org.apache.kafka.tools.ToolsUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+@SuppressWarnings("ClassDataAbstractionCoupling")
+public class ReassignPartitionsCommand {
+    private static final String ANY_LOG_DIR = "any";
+
+    static final String HELP_TEXT = "This tool helps to move topic partitions 
between replicas.";
+
+    private static final DecodeJson.DecodeInteger INT = new 
DecodeJson.DecodeInteger();
+
+    private static final DecodeJson.DecodeString STRING = new 
DecodeJson.DecodeString();
+
+    private static final DecodeJson<List<Integer>> INT_LIST = 
DecodeJson.decodeList(INT);
+
+    private static final DecodeJson<List<String>> STRING_LIST = 
DecodeJson.decodeList(STRING);
+
+    /**
+     * The earliest version of the partition reassignment JSON.  We will 
default to this
+     * version if no other version number is given.
+     */
+    static final int EARLIEST_VERSION = 1;
+
+    /**
+     * The earliest version of the JSON for each partition reassignment topic. 
 We will
+     * default to this version if no other version number is given.
+     */
+    static final int EARLIEST_TOPICS_JSON_VERSION = 1;
+
+    // Throttles that are set at the level of an individual broker.
+    //DynamicConfig.Broker.LeaderReplicationThrottledRateProp
+    static final String BROKER_LEVEL_LEADER_THROTTLE = 
"leader.replication.throttled.rate";
+    //DynamicConfig.Broker.FollowerReplicationThrottledRateProp
+    static final String BROKER_LEVEL_FOLLOWER_THROTTLE = 
"follower.replication.throttled.rate";
+    //DynamicConfig.Broker.ReplicaAlterLogDirsIoMaxBytesPerSecondProp
+    static final String BROKER_LEVEL_LOG_DIR_THROTTLE = 
"replica.alter.log.dirs.io.max.bytes.per.second";
+    static final List<String> BROKER_LEVEL_THROTTLES = Arrays.asList(
+        BROKER_LEVEL_LEADER_THROTTLE,
+        BROKER_LEVEL_FOLLOWER_THROTTLE,
+        BROKER_LEVEL_LOG_DIR_THROTTLE
+    );
+
+    // Throttles that are set at the level of an individual topic.
+    //LogConfig.LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG
+    static final String TOPIC_LEVEL_LEADER_THROTTLE = 
"leader.replication.throttled.replicas";
+    //LogConfig.FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG
+    static final String TOPIC_LEVEL_FOLLOWER_THROTTLE = 
"follower.replication.throttled.replicas";
+    private static final List<String> TOPIC_LEVEL_THROTTLES = Arrays.asList(
+        TOPIC_LEVEL_LEADER_THROTTLE,
+        TOPIC_LEVEL_FOLLOWER_THROTTLE
+    );
+
+    private static final String CANNOT_EXECUTE_BECAUSE_OF_EXISTING_MESSAGE = 
"Cannot execute because " +
+        "there is an existing partition assignment.  Use --additional to 
override this and " +
+        "create a new partition assignment in addition to the existing one. 
The --additional " +
+        "flag can also be used to change the throttle by resubmitting the 
current reassignment.";
+
+    private static final String YOU_MUST_RUN_VERIFY_PERIODICALLY_MESSAGE = 
"Warning: You must run " +
+        "--verify periodically, until the reassignment completes, to ensure 
the throttle " +
+        "is removed.";
+
+    public static void main(String[] args) {
+        ReassignPartitionsCommandOptions opts = validateAndParseArgs(args);
+        boolean failed = true;
+        Admin adminClient = null;
+
+        try {
+            Properties props = opts.options.has(opts.commandConfigOpt)
+                ? Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt))
+                : new Properties();
+            props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
opts.options.valueOf(opts.bootstrapServerOpt));
+            props.putIfAbsent(AdminClientConfig.CLIENT_ID_CONFIG, 
"reassign-partitions-tool");
+            adminClient = Admin.create(props);
+            handleAction(adminClient, opts);
+            failed = false;
+        } catch (TerseException e) {
+            System.out.println(e.getMessage());
+        } catch (Throwable e) {
+            System.out.println("Error: " + e.getMessage());
+            System.out.println(Utils.stackTrace(e));
+        } finally {
+            // It's good to do this after printing any error stack trace.
+            if (adminClient != null) {
+                adminClient.close();
+            }
+        }
+        // If the command failed, exit with a non-zero exit code.
+        if (failed) {
+            Exit.exit(1);
+        }
+    }
+
+    private static void handleAction(Admin adminClient, 
ReassignPartitionsCommandOptions opts) throws IOException, ExecutionException, 
InterruptedException, TerseException {
+        if (opts.options.has(opts.verifyOpt)) {
+            verifyAssignment(adminClient,
+                
Utils.readFileAsString(opts.options.valueOf(opts.reassignmentJsonFileOpt)),
+                opts.options.has(opts.preserveThrottlesOpt));
+        } else if (opts.options.has(opts.generateOpt)) {
+            generateAssignment(adminClient,
+                
Utils.readFileAsString(opts.options.valueOf(opts.topicsToMoveJsonFileOpt)),
+                opts.options.valueOf(opts.brokerListOpt),
+                !opts.options.has(opts.disableRackAware));
+        } else if (opts.options.has(opts.executeOpt)) {
+            executeAssignment(adminClient,
+                opts.options.has(opts.additionalOpt),
+                
Utils.readFileAsString(opts.options.valueOf(opts.reassignmentJsonFileOpt)),
+                opts.options.valueOf(opts.interBrokerThrottleOpt),
+                opts.options.valueOf(opts.replicaAlterLogDirsThrottleOpt),
+                opts.options.valueOf(opts.timeoutOpt),
+                Time.SYSTEM);
+        } else if (opts.options.has(opts.cancelOpt)) {
+            cancelAssignment(adminClient,
+                
Utils.readFileAsString(opts.options.valueOf(opts.reassignmentJsonFileOpt)),
+                opts.options.has(opts.preserveThrottlesOpt),
+                opts.options.valueOf(opts.timeoutOpt),
+                Time.SYSTEM);
+        } else if (opts.options.has(opts.listOpt)) {
+            listReassignments(adminClient);
+        } else {
+            throw new RuntimeException("Unsupported action.");
+        }
+    }
+
+    /**
+     * The entry point for the --verify command.
+     *
+     * @param adminClient           The AdminClient to use.
+     * @param jsonString            The JSON string to use for the topics and 
partitions to verify.
+     * @param preserveThrottles     True if we should avoid changing topic or 
broker throttles.
+     *
+     * @return                      A result that is useful for testing.
+     */
+    static VerifyAssignmentResult verifyAssignment(Admin adminClient,
+                                                   String jsonString,
+                                                   Boolean preserveThrottles
+    ) throws ExecutionException, InterruptedException, JsonProcessingException 
{
+        Tuple2<List<Tuple2<TopicPartition, List<Integer>>>, 
Map<TopicPartitionReplica, String>> t0 = 
parsePartitionReassignmentData(jsonString);
+
+        List<Tuple2<TopicPartition, List<Integer>>> targetParts = t0.v1;
+        Map<TopicPartitionReplica, String> targetLogDirs = t0.v2;
+
+        Tuple2<Map<TopicPartition, PartitionReassignmentState>, Boolean> t1 = 
verifyPartitionAssignments(adminClient, targetParts);
+
+        Map<TopicPartition, PartitionReassignmentState> partStates = t1.v1;
+        Boolean partsOngoing = t1.v2;
+
+        Tuple2<Map<TopicPartitionReplica, LogDirMoveState>, Boolean> t2 = 
verifyReplicaMoves(adminClient, targetLogDirs);
+
+        Map<TopicPartitionReplica, LogDirMoveState> moveStates = t2.v1;
+        Boolean movesOngoing = t2.v2;
+
+        if (!partsOngoing && !movesOngoing && !preserveThrottles) {
+            // If the partition assignments and replica assignments are done, 
clear any throttles
+            // that were set.  We have to clear all throttles, because we 
don't have enough
+            // information to know all of the source brokers that might have 
been involved in the
+            // previous reassignments.
+            clearAllThrottles(adminClient, targetParts);
+        }
+
+        return new VerifyAssignmentResult(partStates, partsOngoing, 
moveStates, movesOngoing);
+    }
+
+    /**
+     * Verify the partition reassignments specified by the user.
+     *
+     * @param adminClient           The AdminClient to use.
+     * @param targets               The partition reassignments specified by 
the user.
+     *
+     * @return                      A tuple of the partition reassignment 
states, and a
+     *                              boolean which is true if there are no 
ongoing
+     *                              reassignments (including reassignments not 
described
+     *                              in the JSON file.)
+     */
+    private static Tuple2<Map<TopicPartition, PartitionReassignmentState>, 
Boolean> verifyPartitionAssignments(Admin adminClient,
+                                                                               
                                List<Tuple2<TopicPartition, List<Integer>>> 
targets
+    ) throws ExecutionException, InterruptedException {
+        Tuple2<Map<TopicPartition, PartitionReassignmentState>, Boolean> t0 = 
findPartitionReassignmentStates(adminClient, targets);
+        System.out.println(partitionReassignmentStatesToString(t0.v1));
+        return t0;
+    }
+
+    static int compareTopicPartitions(TopicPartition a, TopicPartition b) {
+        int topicOrder = Objects.compare(a.topic(), b.topic(), 
String::compareTo);
+        return topicOrder == 0 ? Integer.compare(a.partition(), b.partition()) 
: topicOrder;
+    }
+
+    static int compareTopicPartitionReplicas(TopicPartitionReplica a, 
TopicPartitionReplica b) {
+        int brokerOrder =  Integer.compare(a.brokerId(), b.brokerId());
+
+        if (brokerOrder != 0)
+            return brokerOrder;
+
+        int topicOrder = Objects.compare(a.topic(), b.topic(), 
String::compareTo);
+        return topicOrder == 0 ? Integer.compare(a.partition(), b.partition()) 
: topicOrder;
+    }
+
+    /**
+     * Convert partition reassignment states to a human-readable string.
+     *
+     * @param states      A map from topic partitions to states.
+     * @return            A string summarizing the partition reassignment 
states.
+     */
+    static String partitionReassignmentStatesToString(Map<TopicPartition, 
PartitionReassignmentState> states) {
+        List<String> bld = new ArrayList<>();
+        bld.add("Status of partition reassignment:");
+        
states.keySet().stream().sorted(ReassignPartitionsCommand::compareTopicPartitions).forEach(topicPartition
 -> {
+            PartitionReassignmentState state = states.get(topicPartition);
+            if (state.done) {
+                if (state.currentReplicas.equals(state.targetReplicas)) {
+                    bld.add(String.format("Reassignment of partition %s is 
completed.", topicPartition));
+                } else {
+                    String currentReplicaStr = 
state.currentReplicas.stream().map(String::valueOf).collect(Collectors.joining(","));
+                    String targetReplicaStr = 
state.targetReplicas.stream().map(String::valueOf).collect(Collectors.joining(","));
+
+                    bld.add("There is no active reassignment of partition " + 
topicPartition + ", " +
+                        "but replica set is " + currentReplicaStr + " rather 
than " +
+                        targetReplicaStr + ".");
+                }
+            } else {
+                bld.add(String.format("Reassignment of partition %s is still 
in progress.", topicPartition));
+            }
+        });
+        return 
bld.stream().collect(Collectors.joining(System.lineSeparator()));
+    }
+
+    /**
+     * Find the state of the specified partition reassignments.
+     *
+     * @param adminClient          The Admin client to use.
+     * @param targetReassignments  The reassignments we want to learn about.
+     *
+     * @return                     A tuple containing the reassignment states 
for each topic
+     *                             partition, plus whether there are any 
ongoing reassignments.
+     */
+    static Tuple2<Map<TopicPartition, PartitionReassignmentState>, Boolean> 
findPartitionReassignmentStates(Admin adminClient,
+                                                                               
                             List<Tuple2<TopicPartition,
+                                                                               
                             List<Integer>>> targetReassignments
+    ) throws ExecutionException, InterruptedException {
+        Map<TopicPartition, PartitionReassignment> currentReassignments = 
adminClient.
+            listPartitionReassignments().reassignments().get();
+
+        List<Tuple2<TopicPartition, List<Integer>>> foundReassignments = new 
ArrayList<>();
+        List<Tuple2<TopicPartition, List<Integer>>> notFoundReassignments = 
new ArrayList<>();
+
+        targetReassignments.forEach(reassignment -> {
+            if (currentReassignments.containsKey(reassignment.v1))
+                foundReassignments.add(reassignment);
+            else
+                notFoundReassignments.add(reassignment);
+        });
+
+        List<Tuple2<TopicPartition, PartitionReassignmentState>> foundResults 
= foundReassignments.stream().map(e -> {
+            TopicPartition part = e.v1;
+            List<Integer> targetReplicas = e.v2;
+            return new Tuple2<>(part,
+                new PartitionReassignmentState(
+                    currentReassignments.get(part).replicas(),
+                    targetReplicas,
+                    false));
+        }).collect(Collectors.toList());
+
+        Set<String> topicNamesToLookUp = notFoundReassignments.stream()
+            .map(e -> e.v1)
+            .filter(part -> !currentReassignments.containsKey(part))
+            .map(TopicPartition::topic)
+            .collect(Collectors.toSet());
+
+        Map<String, KafkaFuture<TopicDescription>> topicDescriptions = 
adminClient.
+            describeTopics(topicNamesToLookUp).topicNameValues();
+
+        List<Tuple2<TopicPartition, PartitionReassignmentState>> 
notFoundResults = new ArrayList<>();
+        for (Tuple2<TopicPartition, List<Integer>> e : notFoundReassignments) {
+            TopicPartition part = e.v1;
+            List<Integer> targetReplicas = e.v2;
+
+            if (currentReassignments.containsKey(part)) {
+                PartitionReassignment reassignment = 
currentReassignments.get(part);
+                notFoundResults.add(new Tuple2<>(part, new 
PartitionReassignmentState(
+                    reassignment.replicas(),
+                    targetReplicas,
+                    false)));
+            } else {
+                notFoundResults.add(new Tuple2<>(part, 
topicDescriptionFutureToState(part.partition(),
+                    topicDescriptions.get(part.topic()), targetReplicas)));
+            }
+        }
+
+        Map<TopicPartition, PartitionReassignmentState> allResults = new 
HashMap<>();
+        foundResults.forEach(e -> allResults.put(e.v1, e.v2));
+        notFoundResults.forEach(e -> allResults.put(e.v1, e.v2));
+
+        return new Tuple2<>(allResults, currentReassignments.size() > 0);
+    }
+
+    private static PartitionReassignmentState 
topicDescriptionFutureToState(int partition,
+                                                                            
KafkaFuture<TopicDescription> future,
+                                                                            
List<Integer> targetReplicas
+    ) throws InterruptedException, ExecutionException {
+        try {
+            TopicDescription topicDescription = future.get();
+            if (topicDescription.partitions().size() < partition) {
+                throw new ExecutionException("Too few partitions found", new 
UnknownTopicOrPartitionException());
+            }
+            return new PartitionReassignmentState(
+                
topicDescription.partitions().get(partition).replicas().stream().map(Node::id).collect(Collectors.toList()),
+                targetReplicas,
+                true);
+        } catch (ExecutionException t) {
+            if (t.getCause() instanceof UnknownTopicOrPartitionException)
+                return new PartitionReassignmentState(Collections.emptyList(), 
targetReplicas, true);
+
+            throw t;
+        }
+    }
+
+    /**
+     * Verify the replica reassignments specified by the user.
+     *
+     * @param adminClient           The AdminClient to use.
+     * @param targetReassignments   The replica reassignments specified by the 
user.
+     *
+     * @return                      A tuple of the replica states, and a 
boolean which is true
+     *                              if there are any ongoing replica moves.
+     *
+     *                              Note: Unlike in 
verifyPartitionAssignments, we will
+     *                              return false here even if there are 
unrelated ongoing
+     *                              reassignments. (We don't have an efficient 
API that
+     *                              returns all ongoing replica reassignments.)
+     */
+    private static Tuple2<Map<TopicPartitionReplica, LogDirMoveState>, 
Boolean> verifyReplicaMoves(Admin adminClient,
+                                                                               
                    Map<TopicPartitionReplica, String> targetReassignments
+    ) throws ExecutionException, InterruptedException {
+        Map<TopicPartitionReplica, LogDirMoveState> moveStates = 
findLogDirMoveStates(adminClient, targetReassignments);
+        System.out.println(replicaMoveStatesToString(moveStates));
+        return new Tuple2<>(moveStates, 
!moveStates.values().stream().allMatch(LogDirMoveState::done));
+    }
+
+    /**
+     * Find the state of the specified partition reassignments.
+     *
+     * @param adminClient           The AdminClient to use.
+     * @param targetMoves           The movements we want to learn about.  The 
map is keyed
+     *                              by TopicPartitionReplica, and its values 
are target log
+     *                              directories.
+     *
+     * @return                      The states for each replica movement.
+     */
+    static Map<TopicPartitionReplica, LogDirMoveState> 
findLogDirMoveStates(Admin adminClient,
+                                                                            
Map<TopicPartitionReplica, String> targetMoves
+    ) throws ExecutionException, InterruptedException {
+        Map<TopicPartitionReplica, 
DescribeReplicaLogDirsResult.ReplicaLogDirInfo> replicaLogDirInfos = adminClient
+            .describeReplicaLogDirs(targetMoves.keySet()).all().get();
+
+        Map<TopicPartitionReplica, LogDirMoveState> res = new HashMap<>();
+
+        targetMoves.entrySet().forEach(e -> {
+            TopicPartitionReplica replica = e.getKey();
+            String targetLogDir = e.getValue();
+
+            LogDirMoveState moveState;
+
+            if (!replicaLogDirInfos.containsKey(replica)) {
+                moveState = new MissingReplicaMoveState(targetLogDir);
+            } else {
+                DescribeReplicaLogDirsResult.ReplicaLogDirInfo info = 
replicaLogDirInfos.get(replica);
+                if (info.getCurrentReplicaLogDir() == null) {
+                    moveState = new MissingLogDirMoveState(targetLogDir);
+                } else if (info.getFutureReplicaLogDir() == null) {
+                    if (info.getCurrentReplicaLogDir().equals(targetLogDir)) {
+                        moveState = new CompletedMoveState(targetLogDir);
+                    } else {
+                        moveState = new 
CancelledMoveState(info.getCurrentReplicaLogDir(), targetLogDir);
+                    }
+                } else {
+                    moveState = new 
ActiveMoveState(info.getCurrentReplicaLogDir(),
+                        targetLogDir,
+                        info.getFutureReplicaLogDir());
+                }
+            }
+            res.put(replica, moveState);
+        });
+
+        return res;
+    }
+
+    /**
+     * Convert replica move states to a human-readable string.
+     *
+     * @param states          A map from topic partition replicas to states.
+     * @return                A tuple of a summary string, and a boolean 
describing
+     *                        whether there are any active replica moves.
+     */
+    static String replicaMoveStatesToString(Map<TopicPartitionReplica, 
LogDirMoveState> states) {
+        List<String> bld = new ArrayList<>();
+        
states.keySet().stream().sorted(ReassignPartitionsCommand::compareTopicPartitionReplicas).forEach(replica
 -> {
+            LogDirMoveState state = states.get(replica);
+            if (state instanceof MissingLogDirMoveState) {
+                bld.add("Partition " + replica.topic() + "-" + 
replica.partition() + " is not found " +
+                    "in any live log dir on broker " + replica.brokerId() + ". 
There is likely an " +
+                    "offline log directory on the broker.");
+            } else if (state instanceof MissingReplicaMoveState) {
+                bld.add("Partition " + replica.topic() + "-" + 
replica.partition() + " cannot be found " +
+                    "in any live log directory on broker " + 
replica.brokerId() + ".");
+            } else if (state instanceof ActiveMoveState) {
+                String targetLogDir = ((ActiveMoveState) state).targetLogDir;
+                String futureLogDir = ((ActiveMoveState) state).futureLogDir;
+                if (targetLogDir.equals(futureLogDir)) {
+                    bld.add("Reassignment of replica " + replica + " is still 
in progress.");
+                } else {
+                    bld.add("Partition " + replica.topic() + "-" + 
replica.partition() + " on broker " +
+                        replica.brokerId() + " is being moved to log dir " + 
futureLogDir + " " +
+                        "instead of " + targetLogDir + ".");
+                }
+            } else if (state instanceof CancelledMoveState) {
+                String targetLogDir = ((CancelledMoveState) 
state).targetLogDir;
+                String currentLogDir = ((CancelledMoveState) 
state).currentLogDir;
+                bld.add("Partition " + replica.topic() + "-" + 
replica.partition() + " on broker " +
+                    replica.brokerId() + " is not being moved from log dir " + 
currentLogDir + " to " +
+                    targetLogDir + ".");
+            } else if (state instanceof CompletedMoveState) {
+                bld.add("Reassignment of replica " + replica + " completed 
successfully.");
+            }
+        });
+
+        return 
bld.stream().collect(Collectors.joining(System.lineSeparator()));
+    }
+
+    /**
+     * Clear all topic-level and broker-level throttles.
+     *
+     * @param adminClient     The AdminClient to use.
+     * @param targetParts     The target partitions loaded from the JSON file.
+     */
+    private static void clearAllThrottles(Admin adminClient,
+                                          List<Tuple2<TopicPartition, 
List<Integer>>> targetParts
+    ) throws ExecutionException, InterruptedException {
+        Set<Integer> activeBrokers = 
adminClient.describeCluster().nodes().get().stream().map(Node::id).collect(Collectors.toSet());
+        Set<Integer> brokers = new HashSet<>(activeBrokers);
+        targetParts.forEach(t -> brokers.addAll(t.v2));
+
+        System.out.printf("Clearing broker-level throttles on broker%s %s%n",
+            brokers.size() == 1 ? "" : "s", Utils.join(brokers, ","));
+        clearBrokerLevelThrottles(adminClient, brokers);
+
+        Set<String> topics = targetParts.stream().map(t -> 
t.v1.topic()).collect(Collectors.toSet());
+        System.out.printf("Clearing topic-level throttles on topic%s %s%n",
+            topics.size() == 1 ? "" : "s", Utils.join(topics, ","));
+        clearTopicLevelThrottles(adminClient, topics);
+    }
+
+    /**
+     * Clear all throttles which have been set at the broker level.
+     *
+     * @param adminClient       The AdminClient to use.
+     * @param brokers           The brokers to clear the throttles for.
+     */
+    private static void clearBrokerLevelThrottles(Admin adminClient, 
Set<Integer> brokers) throws ExecutionException, InterruptedException {
+        Map<ConfigResource, Collection<AlterConfigOp>> configOps = new 
HashMap<>();
+        brokers.forEach(brokerId -> configOps.put(
+            new ConfigResource(ConfigResource.Type.BROKER, 
brokerId.toString()),
+            BROKER_LEVEL_THROTTLES.stream().map(throttle -> new AlterConfigOp(
+                new ConfigEntry(throttle, null), 
AlterConfigOp.OpType.DELETE)).collect(Collectors.toList())
+        ));
+        adminClient.incrementalAlterConfigs(configOps).all().get();
+    }
+
+    /**
+     * Clear the reassignment throttles for the specified topics.
+     *
+     * @param adminClient           The AdminClient to use.
+     * @param topics                The topics to clear the throttles for.
+     */
+    private static void clearTopicLevelThrottles(Admin adminClient, 
Set<String> topics) throws ExecutionException, InterruptedException {
+        Map<ConfigResource, Collection<AlterConfigOp>> configOps = new 
HashMap<>();
+        topics.forEach(topicName -> configOps.put(
+            new ConfigResource(ConfigResource.Type.TOPIC, topicName),
+            TOPIC_LEVEL_THROTTLES.stream().map(throttle -> new 
AlterConfigOp(new ConfigEntry(throttle, null),
+                AlterConfigOp.OpType.DELETE)).collect(Collectors.toList())
+
+        ));

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to