nizhikov commented on code in PR #13247: URL: https://github.com/apache/kafka/pull/13247#discussion_r1345657229
########## tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java: ########## @@ -0,0 +1,1501 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.reassign; + +import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonMappingException; +import joptsimple.OptionSpec; +import org.apache.kafka.admin.AdminUtils; +import org.apache.kafka.admin.BrokerMetadata; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.AlterConfigOp; +import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult; +import org.apache.kafka.clients.admin.NewPartitionReassignment; +import org.apache.kafka.clients.admin.PartitionReassignment; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.TopicPartitionReplica; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.errors.ReplicaNotAvailableException; +import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.common.AdminCommandFailedException; +import org.apache.kafka.server.common.AdminOperationException; +import org.apache.kafka.server.util.CommandLineUtils; +import org.apache.kafka.server.util.Json; +import org.apache.kafka.server.util.json.DecodeJson; +import org.apache.kafka.server.util.json.JsonObject; +import org.apache.kafka.server.util.json.JsonValue; +import org.apache.kafka.tools.TerseException; +import org.apache.kafka.tools.ToolsUtils; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +@SuppressWarnings("ClassDataAbstractionCoupling") +public class ReassignPartitionsCommand { + private static final String ANY_LOG_DIR = "any"; + + static final String HELP_TEXT = "This tool helps to move topic partitions between replicas."; + + private static final DecodeJson.DecodeInteger INT = new DecodeJson.DecodeInteger(); + + private static final DecodeJson.DecodeString STRING = new DecodeJson.DecodeString(); + + private static final DecodeJson<List<Integer>> INT_LIST = DecodeJson.decodeList(INT); + + private static final DecodeJson<List<String>> STRING_LIST = DecodeJson.decodeList(STRING); + + /** + * The earliest version of the partition reassignment JSON. We will default to this + * version if no other version number is given. + */ + static final int EARLIEST_VERSION = 1; + + /** + * The earliest version of the JSON for each partition reassignment topic. We will + * default to this version if no other version number is given. + */ + static final int EARLIEST_TOPICS_JSON_VERSION = 1; + + // Throttles that are set at the level of an individual broker. + //DynamicConfig.Broker.LeaderReplicationThrottledRateProp + static final String BROKER_LEVEL_LEADER_THROTTLE = "leader.replication.throttled.rate"; + //DynamicConfig.Broker.FollowerReplicationThrottledRateProp + static final String BROKER_LEVEL_FOLLOWER_THROTTLE = "follower.replication.throttled.rate"; + //DynamicConfig.Broker.ReplicaAlterLogDirsIoMaxBytesPerSecondProp + static final String BROKER_LEVEL_LOG_DIR_THROTTLE = "replica.alter.log.dirs.io.max.bytes.per.second"; + static final List<String> BROKER_LEVEL_THROTTLES = Arrays.asList( + BROKER_LEVEL_LEADER_THROTTLE, + BROKER_LEVEL_FOLLOWER_THROTTLE, + BROKER_LEVEL_LOG_DIR_THROTTLE + ); + + // Throttles that are set at the level of an individual topic. + //LogConfig.LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG + static final String TOPIC_LEVEL_LEADER_THROTTLE = "leader.replication.throttled.replicas"; + //LogConfig.FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG + static final String TOPIC_LEVEL_FOLLOWER_THROTTLE = "follower.replication.throttled.replicas"; + private static final List<String> TOPIC_LEVEL_THROTTLES = Arrays.asList( + TOPIC_LEVEL_LEADER_THROTTLE, + TOPIC_LEVEL_FOLLOWER_THROTTLE + ); + + private static final String CANNOT_EXECUTE_BECAUSE_OF_EXISTING_MESSAGE = "Cannot execute because " + + "there is an existing partition assignment. Use --additional to override this and " + + "create a new partition assignment in addition to the existing one. The --additional " + + "flag can also be used to change the throttle by resubmitting the current reassignment."; + + private static final String YOU_MUST_RUN_VERIFY_PERIODICALLY_MESSAGE = "Warning: You must run " + + "--verify periodically, until the reassignment completes, to ensure the throttle " + + "is removed."; + + public static void main(String[] args) { + ReassignPartitionsCommandOptions opts = validateAndParseArgs(args); + boolean failed = true; + Admin adminClient = null; + + try { + Properties props = opts.options.has(opts.commandConfigOpt) + ? Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt)) + : new Properties(); + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt)); + props.putIfAbsent(AdminClientConfig.CLIENT_ID_CONFIG, "reassign-partitions-tool"); + adminClient = Admin.create(props); + handleAction(adminClient, opts); + failed = false; + } catch (TerseException e) { + System.out.println(e.getMessage()); + } catch (Throwable e) { + System.out.println("Error: " + e.getMessage()); + System.out.println(Utils.stackTrace(e)); + } finally { + // It's good to do this after printing any error stack trace. + if (adminClient != null) { + adminClient.close(); + } + } + // If the command failed, exit with a non-zero exit code. + if (failed) { + Exit.exit(1); + } + } + + private static void handleAction(Admin adminClient, ReassignPartitionsCommandOptions opts) throws IOException, ExecutionException, InterruptedException, TerseException { + if (opts.options.has(opts.verifyOpt)) { + verifyAssignment(adminClient, + Utils.readFileAsString(opts.options.valueOf(opts.reassignmentJsonFileOpt)), + opts.options.has(opts.preserveThrottlesOpt)); + } else if (opts.options.has(opts.generateOpt)) { + generateAssignment(adminClient, + Utils.readFileAsString(opts.options.valueOf(opts.topicsToMoveJsonFileOpt)), + opts.options.valueOf(opts.brokerListOpt), + !opts.options.has(opts.disableRackAware)); + } else if (opts.options.has(opts.executeOpt)) { + executeAssignment(adminClient, + opts.options.has(opts.additionalOpt), + Utils.readFileAsString(opts.options.valueOf(opts.reassignmentJsonFileOpt)), + opts.options.valueOf(opts.interBrokerThrottleOpt), + opts.options.valueOf(opts.replicaAlterLogDirsThrottleOpt), + opts.options.valueOf(opts.timeoutOpt), + Time.SYSTEM); + } else if (opts.options.has(opts.cancelOpt)) { + cancelAssignment(adminClient, + Utils.readFileAsString(opts.options.valueOf(opts.reassignmentJsonFileOpt)), + opts.options.has(opts.preserveThrottlesOpt), + opts.options.valueOf(opts.timeoutOpt), + Time.SYSTEM); + } else if (opts.options.has(opts.listOpt)) { + listReassignments(adminClient); + } else { + throw new RuntimeException("Unsupported action."); + } + } + + /** + * The entry point for the --verify command. + * + * @param adminClient The AdminClient to use. + * @param jsonString The JSON string to use for the topics and partitions to verify. + * @param preserveThrottles True if we should avoid changing topic or broker throttles. + * + * @return A result that is useful for testing. + */ + static VerifyAssignmentResult verifyAssignment(Admin adminClient, String jsonString, Boolean preserveThrottles) throws ExecutionException, InterruptedException, JsonProcessingException { + Tuple2<List<Tuple2<TopicPartition, List<Integer>>>, Map<TopicPartitionReplica, String>> t0 = parsePartitionReassignmentData(jsonString); + + List<Tuple2<TopicPartition, List<Integer>>> targetParts = t0.v1; + Map<TopicPartitionReplica, String> targetLogDirs = t0.v2; + + Tuple2<Map<TopicPartition, PartitionReassignmentState>, Boolean> t1 = verifyPartitionAssignments(adminClient, targetParts); + + Map<TopicPartition, PartitionReassignmentState> partStates = t1.v1; + Boolean partsOngoing = t1.v2; + + Tuple2<Map<TopicPartitionReplica, LogDirMoveState>, Boolean> t2 = verifyReplicaMoves(adminClient, targetLogDirs); + + Map<TopicPartitionReplica, LogDirMoveState> moveStates = t2.v1; + Boolean movesOngoing = t2.v2; + + if (!partsOngoing && !movesOngoing && !preserveThrottles) { + // If the partition assignments and replica assignments are done, clear any throttles + // that were set. We have to clear all throttles, because we don't have enough + // information to know all of the source brokers that might have been involved in the + // previous reassignments. + clearAllThrottles(adminClient, targetParts); + } + + return new VerifyAssignmentResult(partStates, partsOngoing, moveStates, movesOngoing); + } + + /** + * Verify the partition reassignments specified by the user. + * + * @param adminClient The AdminClient to use. + * @param targets The partition reassignments specified by the user. + * + * @return A tuple of the partition reassignment states, and a + * boolean which is true if there are no ongoing + * reassignments (including reassignments not described + * in the JSON file.) + */ + private static Tuple2<Map<TopicPartition, PartitionReassignmentState>, Boolean> verifyPartitionAssignments(Admin adminClient, + List<Tuple2<TopicPartition, List<Integer>>> targets) throws ExecutionException, InterruptedException { + Tuple2<Map<TopicPartition, PartitionReassignmentState>, Boolean> t0 = findPartitionReassignmentStates(adminClient, targets); + + Map<TopicPartition, PartitionReassignmentState> partStates = t0.v1; + Boolean partsOngoing = t0.v2; + + System.out.println(partitionReassignmentStatesToString(partStates)); + return new Tuple2<>(partStates, partsOngoing); Review Comment: Good catch. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org