This is an automated email from the ASF dual-hosted git repository. showuon pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new f895ab51450 KAFKA-16455: Check partition exists before send reassignments to server in ReassignPartitionsCommand (#15659) f895ab51450 is described below commit f895ab5145077c5efa10a4a898628d901b01e2c2 Author: Kuan-Po (Cooper) Tseng <brandb...@gmail.com> AuthorDate: Tue Apr 9 07:56:31 2024 +0800 KAFKA-16455: Check partition exists before send reassignments to server in ReassignPartitionsCommand (#15659) Currently, when executing kafka-reassign-partitions.sh with the --execute option, if a partition number specified in the JSON file does not exist, this check occurs only when submitting the reassignments to alterPartitionReassignments on the server-side. We can perform this check in advance before submitting the reassignments to the server side. Reviewers: Luke Chen <show...@gmail.com> --- .../apache/kafka/tools/reassign/ReassignPartitionsCommand.java | 9 ++++++++- .../apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java | 8 ++++++++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java index a23a25e6f57..1f63a23b290 100644 --- a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java @@ -641,7 +641,7 @@ public class ReassignPartitionsCommand { * @param adminClient The AdminClient to use. * @param partitions The partitions to get information about. * @return A map from partitions to broker assignments. - * If any topic can't be found, an exception will be thrown. + * If any topic or partition can't be found, an exception will be thrown. */ static Map<TopicPartition, List<Integer>> getReplicaAssignmentForPartitions(Admin adminClient, Set<TopicPartition> partitions @@ -654,6 +654,13 @@ public class ReassignPartitionsCommand { res.put(tp, info.replicas().stream().map(Node::id).collect(Collectors.toList())); }) ); + + if (!res.keySet().equals(partitions)) { + Set<TopicPartition> missingPartitions = new HashSet<>(partitions); + missingPartitions.removeAll(res.keySet()); + throw new ExecutionException(new UnknownTopicOrPartitionException("Unable to find partition: " + + missingPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(", ")))); + } return res; } diff --git a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java index c6f145d9a55..9d248ee2072 100644 --- a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java @@ -83,6 +83,7 @@ import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.partitio import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.replicaMoveStatesToString; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -300,6 +301,13 @@ public class ReassignPartitionsUnitTest { assertEquals(assignments, getReplicaAssignmentForPartitions(adminClient, new HashSet<>(asList(new TopicPartition("foo", 0), new TopicPartition("bar", 0))))); + + UnknownTopicOrPartitionException exception = + assertInstanceOf(UnknownTopicOrPartitionException.class, + assertThrows(ExecutionException.class, + () -> getReplicaAssignmentForPartitions(adminClient, + new HashSet<>(asList(new TopicPartition("foo", 0), new TopicPartition("foo", 10))))).getCause()); + assertEquals("Unable to find partition: foo-10", exception.getMessage()); } }