This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1fd58e30cf3 KAFKA-14595: Move classes from ReassignPartitionsCommand 
to tools (#14172)
1fd58e30cf3 is described below

commit 1fd58e30cf38587a66c1f7188c7667b555624485
Author: Nikolay <[email protected]>
AuthorDate: Fri Aug 11 15:52:14 2023 +0300

    KAFKA-14595: Move classes from ReassignPartitionsCommand to tools (#14172)
    
    
    Reviewers: Mickael Maison <[email protected]>
---
 .../apache/kafka/server/util/json/DecodeJson.java  |   4 +-
 .../kafka/tools/reassign/ActiveMoveState.java      |  60 +++++++
 .../kafka/tools/reassign/CancelledMoveState.java   |  57 +++++++
 .../kafka/tools/reassign/CompletedMoveState.java   |  52 ++++++
 .../kafka/tools/reassign/LogDirMoveState.java      |  28 ++++
 .../tools/reassign/MissingLogDirMoveState.java     |  52 ++++++
 .../tools/reassign/MissingReplicaMoveState.java    |  52 ++++++
 .../apache/kafka/tools/reassign/PartitionMove.java |  52 ++++++
 .../tools/reassign/PartitionReassignmentState.java |  57 +++++++
 .../reassign/ReassignPartitionsCommandOptions.java | 176 +++++++++++++++++++++
 .../tools/reassign/VerifyAssignmentResult.java     |  70 ++++++++
 11 files changed, 658 insertions(+), 2 deletions(-)

diff --git 
a/server-common/src/main/java/org/apache/kafka/server/util/json/DecodeJson.java 
b/server-common/src/main/java/org/apache/kafka/server/util/json/DecodeJson.java
index dbc28548a34..e46c85c4d48 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/util/json/DecodeJson.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/util/json/DecodeJson.java
@@ -96,7 +96,7 @@ public interface DecodeJson<T> {
         };
     }
 
-    static <E> DecodeJson<List<E>> decodeList(DecodeJson<E> decodeJson) throws 
JsonMappingException {
+    static <E> DecodeJson<List<E>> decodeList(DecodeJson<E> decodeJson) {
         return node -> {
             if (node.isArray()) {
                 List<E> result = new ArrayList<>();
@@ -110,7 +110,7 @@ public interface DecodeJson<T> {
         };
     }
 
-    static <V> DecodeJson<Map<String, V>> decodeMap(DecodeJson<V> decodeJson) 
throws JsonMappingException {
+    static <V> DecodeJson<Map<String, V>> decodeMap(DecodeJson<V> decodeJson) {
         return node -> {
             if (node.isObject()) {
                 Map<String, V> result = new HashMap<>();
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/reassign/ActiveMoveState.java 
b/tools/src/main/java/org/apache/kafka/tools/reassign/ActiveMoveState.java
new file mode 100644
index 00000000000..842d46ec587
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/tools/reassign/ActiveMoveState.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools.reassign;
+
+import java.util.Objects;
+
+/**
+ * A replica log directory move state where the move is in progress.
+ */
+final class ActiveMoveState implements LogDirMoveState {
+    public final String currentLogDir;
+
+    public final String targetLogDir;
+
+    public final String futureLogDir;
+
+    /**
+     * @param currentLogDir       The current log directory.
+     * @param futureLogDir        The log directory that the replica is moving 
to.
+     * @param targetLogDir        The log directory that we wanted the replica 
to move to.
+     */
+    public ActiveMoveState(String currentLogDir, String targetLogDir, String 
futureLogDir) {
+        this.currentLogDir = currentLogDir;
+        this.targetLogDir = targetLogDir;
+        this.futureLogDir = futureLogDir;
+    }
+
+    @Override
+    public boolean done() {
+        return false;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        ActiveMoveState that = (ActiveMoveState) o;
+        return Objects.equals(currentLogDir, that.currentLogDir) && 
Objects.equals(targetLogDir, that.targetLogDir) && Objects.equals(futureLogDir, 
that.futureLogDir);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(currentLogDir, targetLogDir, futureLogDir);
+    }
+}
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/reassign/CancelledMoveState.java 
b/tools/src/main/java/org/apache/kafka/tools/reassign/CancelledMoveState.java
new file mode 100644
index 00000000000..f405eedd403
--- /dev/null
+++ 
b/tools/src/main/java/org/apache/kafka/tools/reassign/CancelledMoveState.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools.reassign;
+
+import java.util.Objects;
+
+/**
+ * A replica log directory move state where there is no move in progress, but 
we did not
+ * reach the target log directory.
+ */
+final class CancelledMoveState implements LogDirMoveState {
+    public final String currentLogDir;
+
+    public final String targetLogDir;
+
+    /**
+     * @param currentLogDir       The current log directory.
+     * @param targetLogDir        The log directory that we wanted the replica 
to move to.
+     */
+    public CancelledMoveState(String currentLogDir, String targetLogDir) {
+        this.currentLogDir = currentLogDir;
+        this.targetLogDir = targetLogDir;
+    }
+
+    @Override
+    public boolean done() {
+        return true;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        CancelledMoveState that = (CancelledMoveState) o;
+        return Objects.equals(currentLogDir, that.currentLogDir) && 
Objects.equals(targetLogDir, that.targetLogDir);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(currentLogDir, targetLogDir);
+    }
+}
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/reassign/CompletedMoveState.java 
b/tools/src/main/java/org/apache/kafka/tools/reassign/CompletedMoveState.java
new file mode 100644
index 00000000000..df5fb890148
--- /dev/null
+++ 
b/tools/src/main/java/org/apache/kafka/tools/reassign/CompletedMoveState.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools.reassign;
+
+import java.util.Objects;
+
+/**
+ * The completed replica log directory move state.
+ */
+final class CompletedMoveState implements LogDirMoveState {
+    public final String targetLogDir;
+
+    /**
+     * @param targetLogDir        The log directory that we wanted the replica 
to move to.
+     */
+    public CompletedMoveState(String targetLogDir) {
+        this.targetLogDir = targetLogDir;
+    }
+
+    @Override
+    public boolean done() {
+        return true;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        CompletedMoveState that = (CompletedMoveState) o;
+        return Objects.equals(targetLogDir, that.targetLogDir);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(targetLogDir);
+    }
+}
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/reassign/LogDirMoveState.java 
b/tools/src/main/java/org/apache/kafka/tools/reassign/LogDirMoveState.java
new file mode 100644
index 00000000000..983d6af0167
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/tools/reassign/LogDirMoveState.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools.reassign;
+
+/**
+ * The state of a replica log directory movement.
+ */
+interface LogDirMoveState {
+    /**
+     * True if the move is done without errors.
+     */
+    boolean done();
+}
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/reassign/MissingLogDirMoveState.java
 
b/tools/src/main/java/org/apache/kafka/tools/reassign/MissingLogDirMoveState.java
new file mode 100644
index 00000000000..eb3f592841c
--- /dev/null
+++ 
b/tools/src/main/java/org/apache/kafka/tools/reassign/MissingLogDirMoveState.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools.reassign;
+
+import java.util.Objects;
+
+/**
+ * A replica log directory move state where the source replica is missing.
+ */
+final class MissingLogDirMoveState implements LogDirMoveState {
+    public final String targetLogDir;
+
+    /**
+     * @param targetLogDir        The log directory that we wanted the replica 
to move to.
+     */
+    public MissingLogDirMoveState(String targetLogDir) {
+        this.targetLogDir = targetLogDir;
+    }
+
+    @Override
+    public boolean done() {
+        return false;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        MissingLogDirMoveState that = (MissingLogDirMoveState) o;
+        return Objects.equals(targetLogDir, that.targetLogDir);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(targetLogDir);
+    }
+}
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/reassign/MissingReplicaMoveState.java
 
b/tools/src/main/java/org/apache/kafka/tools/reassign/MissingReplicaMoveState.java
new file mode 100644
index 00000000000..eda9c22b829
--- /dev/null
+++ 
b/tools/src/main/java/org/apache/kafka/tools/reassign/MissingReplicaMoveState.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools.reassign;
+
+import java.util.Objects;
+
+/**
+ * A replica log directory move state where the source log directory is 
missing.
+ */
+final class MissingReplicaMoveState implements LogDirMoveState {
+    public final String targetLogDir;
+
+    /**
+     * @param targetLogDir        The log directory that we wanted the replica 
to move to.
+     */
+    public MissingReplicaMoveState(String targetLogDir) {
+        this.targetLogDir = targetLogDir;
+    }
+
+    @Override
+    public boolean done() {
+        return false;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        MissingReplicaMoveState that = (MissingReplicaMoveState) o;
+        return Objects.equals(targetLogDir, that.targetLogDir);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(targetLogDir);
+    }
+}
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/reassign/PartitionMove.java 
b/tools/src/main/java/org/apache/kafka/tools/reassign/PartitionMove.java
new file mode 100644
index 00000000000..571c21f9858
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/tools/reassign/PartitionMove.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools.reassign;
+
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * A partition movement.  The source and destination brokers may overlap.
+ */
+final class PartitionMove {
+    public final Set<Integer> sources;
+
+    public final Set<Integer> destinations;
+
+    /**
+     * @param sources         The source brokers.
+     * @param destinations    The destination brokers.
+     */
+    public PartitionMove(Set<Integer> sources, Set<Integer> destinations) {
+        this.sources = sources;
+        this.destinations = destinations;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        PartitionMove that = (PartitionMove) o;
+        return Objects.equals(sources, that.sources) && 
Objects.equals(destinations, that.destinations);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(sources, destinations);
+    }
+}
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/reassign/PartitionReassignmentState.java
 
b/tools/src/main/java/org/apache/kafka/tools/reassign/PartitionReassignmentState.java
new file mode 100644
index 00000000000..53ae960f863
--- /dev/null
+++ 
b/tools/src/main/java/org/apache/kafka/tools/reassign/PartitionReassignmentState.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools.reassign;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * The state of a partition reassignment.  The current replicas and target 
replicas
+ * may overlap.
+ */
+final class PartitionReassignmentState {
+    public final List<Integer> currentReplicas;
+
+    public final List<Integer> targetReplicas;
+
+    public final boolean done;
+
+    /**
+     * @param currentReplicas The current replicas.
+     * @param targetReplicas  The target replicas.
+     * @param done            True if the reassignment is done.
+     */
+    public PartitionReassignmentState(List<Integer> currentReplicas, 
List<Integer> targetReplicas, boolean done) {
+        this.currentReplicas = currentReplicas;
+        this.targetReplicas = targetReplicas;
+        this.done = done;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        PartitionReassignmentState state = (PartitionReassignmentState) o;
+        return done == state.done && Objects.equals(currentReplicas, 
state.currentReplicas) && Objects.equals(targetReplicas, state.targetReplicas);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(currentReplicas, targetReplicas, done);
+    }
+}
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandOptions.java
 
b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandOptions.java
new file mode 100644
index 00000000000..d0e97c036dd
--- /dev/null
+++ 
b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandOptions.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools.reassign;
+
+import joptsimple.OptionSpec;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+
+public class ReassignPartitionsCommandOptions extends CommandDefaultOptions {
+    // Actions
+    final OptionSpec<?> verifyOpt;
+    final OptionSpec<?> generateOpt;
+    final OptionSpec<?> executeOpt;
+    final OptionSpec<?> cancelOpt;
+    final OptionSpec<?> listOpt;
+
+    // Arguments
+    final OptionSpec<String> bootstrapServerOpt;
+    final OptionSpec<String> commandConfigOpt;
+    final OptionSpec<String> reassignmentJsonFileOpt;
+    final OptionSpec<String> topicsToMoveJsonFileOpt;
+    final OptionSpec<String> brokerListOpt;
+    final OptionSpec<?> disableRackAware;
+    final OptionSpec<Long> interBrokerThrottleOpt;
+    final OptionSpec<Long> replicaAlterLogDirsThrottleOpt;
+    final OptionSpec<Long> timeoutOpt;
+    final OptionSpec<?> additionalOpt;
+    final OptionSpec<?> preserveThrottlesOpt;
+
+    public ReassignPartitionsCommandOptions(String[] args) {
+        super(args);
+
+        verifyOpt = parser.accepts("verify", "Verify if the reassignment 
completed as specified by the " +
+            "--reassignment-json-file option. If there is a throttle engaged 
for the replicas specified, and the rebalance has completed, the throttle will 
be removed");
+        generateOpt = parser.accepts("generate", "Generate a candidate 
partition reassignment configuration." +
+            " Note that this only generates a candidate assignment, it does 
not execute it.");
+        executeOpt = parser.accepts("execute", "Kick off the reassignment as 
specified by the --reassignment-json-file option.");
+        cancelOpt = parser.accepts("cancel", "Cancel an active reassignment.");
+        listOpt = parser.accepts("list", "List all active partition 
reassignments.");
+
+        // Arguments
+        bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED: the 
server(s) to use for bootstrapping.")
+            .withRequiredArg()
+            .describedAs("Server(s) to use for bootstrapping")
+            .ofType(String.class);
+
+        commandConfigOpt = parser.accepts("command-config", "Property file 
containing configs to be passed to Admin Client.")
+            .withRequiredArg()
+            .describedAs("Admin client property file")
+            .ofType(String.class);
+
+        reassignmentJsonFileOpt = parser.accepts("reassignment-json-file", 
"The JSON file with the partition reassignment configuration" +
+                "The format to use is - \n" +
+                "{\"partitions\":\n\t[{\"topic\": \"foo\",\n\t  \"partition\": 
1,\n\t  \"replicas\": [1,2,3],\n\t  \"log_dirs\": [\"dir1\",\"dir2\",\"dir3\"] 
}],\n\"version\":1\n}\n" +
+                "Note that \"log_dirs\" is optional. When it is specified, its 
length must equal the length of the replicas list. The value in this list " +
+                "can be either \"any\" or the absolution path of the log 
directory on the broker. If absolute log directory path is specified, the 
replica will be moved to the specified log directory on the broker.")
+            .withRequiredArg()
+            .describedAs("manual assignment json file path")
+            .ofType(String.class);
+        topicsToMoveJsonFileOpt = parser.accepts("topics-to-move-json-file", 
"Generate a reassignment configuration to move the partitions" +
+                " of the specified topics to the list of brokers specified by 
the --broker-list option. The format to use is - \n" +
+                "{\"topics\":\n\t[{\"topic\": \"foo\"},{\"topic\": 
\"foo1\"}],\n\"version\":1\n}")
+            .withRequiredArg()
+            .describedAs("topics to reassign json file path")
+            .ofType(String.class);
+        brokerListOpt = parser.accepts("broker-list", "The list of brokers to 
which the partitions need to be reassigned" +
+                " in the form \"0,1,2\". This is required if 
--topics-to-move-json-file is used to generate reassignment configuration")
+            .withRequiredArg()
+            .describedAs("brokerlist")
+            .ofType(String.class);
+        disableRackAware = parser.accepts("disable-rack-aware", "Disable rack 
aware replica assignment");
+        interBrokerThrottleOpt = parser.accepts("throttle", "The movement of 
partitions between brokers will be throttled to this value (bytes/sec). " +
+                "This option can be included with --execute when a 
reassignment is started, and it can be altered by resubmitting the current 
reassignment " +
+                "along with the --additional flag. The throttle rate should be 
at least 1 KB/s.")
+            .withRequiredArg()
+            .describedAs("throttle")
+            .ofType(Long.class)
+            .defaultsTo(-1L);
+        replicaAlterLogDirsThrottleOpt = 
parser.accepts("replica-alter-log-dirs-throttle",
+                "The movement of replicas between log directories on the same 
broker will be throttled to this value (bytes/sec). " +
+                    "This option can be included with --execute when a 
reassignment is started, and it can be altered by resubmitting the current 
reassignment " +
+                    "along with the --additional flag. The throttle rate 
should be at least 1 KB/s.")
+            .withRequiredArg()
+            .describedAs("replicaAlterLogDirsThrottle")
+            .ofType(Long.class)
+            .defaultsTo(-1L);
+        timeoutOpt = parser.accepts("timeout", "The maximum time in ms to wait 
for log directory replica assignment to begin.")
+            .withRequiredArg()
+            .describedAs("timeout")
+            .ofType(Long.class)
+            .defaultsTo(10000L);
+        additionalOpt = parser.accepts("additional", "Execute this 
reassignment in addition to any " +
+            "other ongoing ones. This option can also be used to change the 
throttle of an ongoing reassignment.");
+        preserveThrottlesOpt = parser.accepts("preserve-throttles", "Do not 
modify broker or topic throttles.");
+
+        options = parser.parse(args);
+    }
+
+    public OptionSpec<?> verifyOpt() {
+        return verifyOpt;
+    }
+
+    public OptionSpec<?> generateOpt() {
+        return generateOpt;
+    }
+
+    public OptionSpec<?> executeOpt() {
+        return executeOpt;
+    }
+
+    public OptionSpec<?> cancelOpt() {
+        return cancelOpt;
+    }
+
+    public OptionSpec<?> listOpt() {
+        return listOpt;
+    }
+
+    public OptionSpec<String> bootstrapServerOpt() {
+        return bootstrapServerOpt;
+    }
+
+    public OptionSpec<String> commandConfigOpt() {
+        return commandConfigOpt;
+    }
+
+    public OptionSpec<String> reassignmentJsonFileOpt() {
+        return reassignmentJsonFileOpt;
+    }
+
+    public OptionSpec<String> topicsToMoveJsonFileOpt() {
+        return topicsToMoveJsonFileOpt;
+    }
+
+    public OptionSpec<String> brokerListOpt() {
+        return brokerListOpt;
+    }
+
+    public OptionSpec<?> disableRackAware() {
+        return disableRackAware;
+    }
+
+    public OptionSpec<Long> interBrokerThrottleOpt() {
+        return interBrokerThrottleOpt;
+    }
+
+    public OptionSpec<Long> replicaAlterLogDirsThrottleOpt() {
+        return replicaAlterLogDirsThrottleOpt;
+    }
+
+    public OptionSpec<Long> timeoutOpt() {
+        return timeoutOpt;
+    }
+
+    public OptionSpec<?> additionalOpt() {
+        return additionalOpt;
+    }
+
+    public OptionSpec<?> preserveThrottlesOpt() {
+        return preserveThrottlesOpt;
+    }
+}
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/reassign/VerifyAssignmentResult.java
 
b/tools/src/main/java/org/apache/kafka/tools/reassign/VerifyAssignmentResult.java
new file mode 100644
index 00000000000..c20cfd1029c
--- /dev/null
+++ 
b/tools/src/main/java/org/apache/kafka/tools/reassign/VerifyAssignmentResult.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools.reassign;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionReplica;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * A result returned from verifyAssignment.
+ */
+public final class VerifyAssignmentResult {
+    public final Map<TopicPartition, PartitionReassignmentState> partStates;
+    public final boolean partsOngoing;
+    public final Map<TopicPartitionReplica, LogDirMoveState> moveStates;
+    public final boolean movesOngoing;
+
+    public VerifyAssignmentResult(Map<TopicPartition, 
PartitionReassignmentState> partStates) {
+        this(partStates, false, Collections.emptyMap(), false);
+    }
+
+    /**
+     * @param partStates    A map from partitions to reassignment states.
+     * @param partsOngoing  True if there are any ongoing partition 
reassignments.
+     * @param moveStates    A map from log directories to movement states.
+     * @param movesOngoing  True if there are any ongoing moves that we know 
about.
+     */
+    public VerifyAssignmentResult(
+        Map<TopicPartition, PartitionReassignmentState> partStates,
+        boolean partsOngoing,
+        Map<org.apache.kafka.common.TopicPartitionReplica, LogDirMoveState> 
moveStates,
+        boolean movesOngoing
+    ) {
+        this.partStates = partStates;
+        this.partsOngoing = partsOngoing;
+        this.moveStates = moveStates;
+        this.movesOngoing = movesOngoing;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        VerifyAssignmentResult that = (VerifyAssignmentResult) o;
+        return partsOngoing == that.partsOngoing && movesOngoing == 
that.movesOngoing && Objects.equals(partStates, that.partStates) && 
Objects.equals(moveStates, that.moveStates);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(partStates, partsOngoing, moveStates, 
movesOngoing);
+    }
+}

Reply via email to