This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 51f2c7b2b65 MINOR: fix reassign command bug (#20003)
51f2c7b2b65 is described below
commit 51f2c7b2b650ab24f4dcf557e32f691b73cc71d0
Author: Lan Ding <[email protected]>
AuthorDate: Wed Jun 25 02:34:13 2025 +0800
MINOR: fix reassign command bug (#20003)
see
https://github.com/apache/kafka/blob/9570c67b8c4ed1a4c3888511adad58d9b3a8bc0f/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala#L1208
During the rewrite for
[KAFKA-14595](https://github.com/apache/kafka/pull/13247), the relevant
condition was omitted.
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../tools/reassign/ReassignPartitionsCommand.java | 4 +--
.../reassign/ReassignPartitionsCommandTest.java | 38 +++++++++++++++++++++-
2 files changed, 39 insertions(+), 3 deletions(-)
diff --git
a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java
b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java
index 72c49410e13..4893bb25830 100644
---
a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java
+++
b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java
@@ -1271,7 +1271,7 @@ public class ReassignPartitionsCommand {
Set<TopicPartition> targetPartsSet = targetParts.stream().map(t ->
t.getKey()).collect(Collectors.toSet());
Set<TopicPartition> curReassigningParts = new HashSet<>();
adminClient.listPartitionReassignments(targetPartsSet).reassignments().get().forEach((part,
reassignment) -> {
- if (reassignment.addingReplicas().isEmpty() ||
!reassignment.removingReplicas().isEmpty())
+ if (!reassignment.addingReplicas().isEmpty() ||
!reassignment.removingReplicas().isEmpty())
curReassigningParts.add(part);
});
if (!curReassigningParts.isEmpty()) {
@@ -1440,7 +1440,7 @@ public class ReassignPartitionsCommand {
}
OptionSpec<?> action = allActions.get(0);
-
+
if (opts.options.has(opts.bootstrapServerOpt) &&
opts.options.has(opts.bootstrapControllerOpt))
CommandLineUtils.printUsageAndExit(opts.parser, "Please don't
specify both --bootstrap-server and --bootstrap-controller");
else if (!opts.options.has(opts.bootstrapServerOpt) &&
!opts.options.has(opts.bootstrapControllerOpt))
diff --git
a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java
b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java
index 1ee55c6eace..069a64234c6 100644
---
a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java
@@ -316,7 +316,7 @@ public class ReassignPartitionsCommandTest {
}
@ClusterTest
- public void testCancellationWithAddingReplicaInIsr() throws Exception {
+ public void testCancellationWithAddingAndRemovingReplicaInIsr() throws
Exception {
createTopics();
TopicPartition foo0 = new TopicPartition("foo", 0);
produceMessages(foo0.topic(), foo0.partition(), 200);
@@ -351,6 +351,42 @@ public class ReassignPartitionsCommandTest {
verifyReplicaDeleted(new TopicPartitionReplica(foo0.topic(),
foo0.partition(), 4));
}
+ @ClusterTest
+ public void testCancellationWithAddingReplicaInIsr() throws Exception {
+ createTopics();
+ TopicPartition foo0 = new TopicPartition("foo", 0);
+ produceMessages(foo0.topic(), foo0.partition(), 200);
+
+ // The reassignment will bring replicas 3 and 4 into the replica set.
+ String assignment = "{\"version\":1,\"partitions\":" +
+
"[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,1,2,3,4],\"log_dirs\":[\"any\",\"any\",\"any\",\"any\",\"any\"]}"
+
+ "]}";
+
+ // We will throttle replica 4 so that only replica 3 joins the ISR
+ setReplicationThrottleForPartitions(foo0);
+
+ // Execute the assignment and wait for replica 3 (only) to join the ISR
+ runExecuteAssignment(false, assignment, -1L, -1L);
+ try (Admin admin =
Admin.create(Collections.singletonMap(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
clusterInstance.bootstrapServers()))) {
+ TestUtils.waitForCondition(
+ () -> {
+ Set<Integer> isr =
admin.describeTopics(Collections.singleton(foo0.topic()))
+
.allTopicNames().get().get(foo0.topic()).partitions().stream()
+ .filter(p -> p.partition() == foo0.partition())
+ .flatMap(p -> p.isr().stream())
+ .map(Node::id).collect(Collectors.toSet());
+ return isr.containsAll(Arrays.asList(0, 1, 2, 3));
+ },
+ "Timed out while waiting for replica 3 to join the ISR"
+ );
+ }
+
+ // Now cancel the assignment and verify that the partition is removed
from cancelled replicas
+ assertEquals(new AbstractMap.SimpleImmutableEntry<>(singleton(foo0),
Collections.emptySet()), runCancelAssignment(assignment, true, true));
+ verifyReplicaDeleted(new TopicPartitionReplica(foo0.topic(),
foo0.partition(), 3));
+ verifyReplicaDeleted(new TopicPartitionReplica(foo0.topic(),
foo0.partition(), 4));
+ }
+
/**
* Test moving partitions between directories.
*/