This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f895ab51450 KAFKA-16455: Check partition exists before send 
reassignments to server in ReassignPartitionsCommand (#15659)
f895ab51450 is described below

commit f895ab5145077c5efa10a4a898628d901b01e2c2
Author: Kuan-Po (Cooper) Tseng <brandb...@gmail.com>
AuthorDate: Tue Apr 9 07:56:31 2024 +0800

    KAFKA-16455: Check partition exists before send reassignments to server in 
ReassignPartitionsCommand (#15659)
    
    Currently, when executing kafka-reassign-partitions.sh with the --execute 
option, if a partition number specified in the JSON file does not exist, this 
check occurs only when submitting the reassignments to 
alterPartitionReassignments on the server-side.
    
    We can perform this check in advance before submitting the reassignments to 
the server side.
    
    Reviewers: Luke Chen <show...@gmail.com>
---
 .../apache/kafka/tools/reassign/ReassignPartitionsCommand.java   | 9 ++++++++-
 .../apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java  | 8 ++++++++
 2 files changed, 16 insertions(+), 1 deletion(-)

diff --git 
a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java
 
b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java
index a23a25e6f57..1f63a23b290 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java
@@ -641,7 +641,7 @@ public class ReassignPartitionsCommand {
      * @param adminClient     The AdminClient to use.
      * @param partitions      The partitions to get information about.
      * @return                A map from partitions to broker assignments.
-     *                        If any topic can't be found, an exception will 
be thrown.
+     *                        If any topic or partition can't be found, an 
exception will be thrown.
      */
     static Map<TopicPartition, List<Integer>> 
getReplicaAssignmentForPartitions(Admin adminClient,
                                                                                
 Set<TopicPartition> partitions
@@ -654,6 +654,13 @@ public class ReassignPartitionsCommand {
                     res.put(tp, 
info.replicas().stream().map(Node::id).collect(Collectors.toList()));
             })
         );
+
+        if (!res.keySet().equals(partitions)) {
+            Set<TopicPartition> missingPartitions = new HashSet<>(partitions);
+            missingPartitions.removeAll(res.keySet());
+            throw new ExecutionException(new 
UnknownTopicOrPartitionException("Unable to find partition: " +
+                
missingPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(",
 "))));
+        }
         return res;
     }
 
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java
index c6f145d9a55..9d248ee2072 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java
@@ -83,6 +83,7 @@ import static 
org.apache.kafka.tools.reassign.ReassignPartitionsCommand.partitio
 import static 
org.apache.kafka.tools.reassign.ReassignPartitionsCommand.replicaMoveStatesToString;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -300,6 +301,13 @@ public class ReassignPartitionsUnitTest {
 
             assertEquals(assignments,
                 getReplicaAssignmentForPartitions(adminClient, new 
HashSet<>(asList(new TopicPartition("foo", 0), new TopicPartition("bar", 0)))));
+
+            UnknownTopicOrPartitionException exception =
+                assertInstanceOf(UnknownTopicOrPartitionException.class,
+                    assertThrows(ExecutionException.class,
+                        () -> getReplicaAssignmentForPartitions(adminClient,
+                            new HashSet<>(asList(new TopicPartition("foo", 0), 
new TopicPartition("foo", 10))))).getCause());
+            assertEquals("Unable to find partition: foo-10", 
exception.getMessage());
         }
     }
 

Reply via email to