nizhikov commented on code in PR #14456:
URL: https://github.com/apache/kafka/pull/14456#discussion_r1341916682


##########
tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsIntegrationTest.java:
##########
@@ -0,0 +1,924 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.reassign;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import kafka.admin.ReassignPartitionsCommand;
+import kafka.cluster.Partition;
+import kafka.log.UnifiedLog;
+import kafka.server.HostedPartition;
+import kafka.server.IsrChangePropagationConfig;
+import kafka.server.KafkaBroker;
+import kafka.server.KafkaConfig;
+import kafka.server.QuorumTestHarness;
+import kafka.server.ZkAlterPartitionManager;
+import kafka.utils.TestUtils;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.AlterConfigOp;
+import org.apache.kafka.clients.admin.Config;
+import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.DescribeLogDirsResult;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionReplica;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import scala.None$;
+import scala.Option;
+import scala.Some$;
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.kafka.server.common.MetadataVersion.IBP_2_7_IV1;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@SuppressWarnings("ClassFanOutComplexity")
+@Timeout(300)
+public class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
+    ReassignPartitionsTestCluster cluster;
+
+    @AfterEach
+    @Override
+    public void tearDown() {
+        Utils.closeQuietly(cluster, "ReassignPartitionsTestCluster");
+        super.tearDown();
+    }
+
+    private final Map<Integer, Map<String, Long>> unthrottledBrokerConfigs = 
new HashMap<>(); {
+        IntStream.range(0, 4).forEach(brokerId -> {
+            Map<String, Long> brokerConfig = new HashMap<>();
+
+            ReassignPartitionsCommand.brokerLevelThrottles().foreach(throttle 
-> {
+                brokerConfig.put(throttle, -1L);
+                return null;
+            });
+
+            unthrottledBrokerConfigs.put(brokerId, brokerConfig);
+        });
+    }
+
+    @ParameterizedTest(name = "{displayName}.quorum={0}")
+    @ValueSource(strings = {"zk", "kraft"})
+    public void testReassignment(String quorum) throws Exception {
+        cluster = new ReassignPartitionsTestCluster(Collections.emptyMap(), 
Collections.emptyMap());
+        cluster.setup();
+        executeAndVerifyReassignment();
+    }
+
+    @ParameterizedTest(name = "{displayName}.quorum={0}")
+    @ValueSource(strings = "zk") // Note: KRaft requires AlterPartition
+    public void testReassignmentWithAlterPartitionDisabled(String quorum) 
throws Exception {
+        // Test reassignment when the IBP is on an older version which does 
not use
+        // the `AlterPartition` API. In this case, the controller will 
register individual
+        // watches for each reassigning partition so that the reassignment can 
be
+        // completed as soon as the ISR is expanded.
+        Map<String, String> configOverrides = 
Collections.singletonMap(KafkaConfig.InterBrokerProtocolVersionProp(), 
IBP_2_7_IV1.version());
+        cluster = new ReassignPartitionsTestCluster(configOverrides, 
Collections.emptyMap());
+        cluster.setup();
+        executeAndVerifyReassignment();
+    }
+
+    @ParameterizedTest(name = "{displayName}.quorum={0}")
+    @ValueSource(strings = "zk") // Note: KRaft requires AlterPartition
+    public void testReassignmentCompletionDuringPartialUpgrade(String quorum) 
throws Exception {
+        // Test reassignment during a partial upgrade when some brokers are 
relying on
+        // `AlterPartition` and some rely on the old notification logic 
through Zookeeper.
+        // In this test case, broker 0 starts up first on the latest IBP and 
is typically
+        // elected as controller. The three remaining brokers start up on the 
older IBP.
+        // We want to ensure that reassignment can still complete through the 
ISR change
+        // notification path even though the controller expects 
`AlterPartition`.
+
+        // Override change notification settings so that test is not delayed 
by ISR
+        // change notification delay
+        ZkAlterPartitionManager.DefaultIsrPropagationConfig_$eq(new 
IsrChangePropagationConfig(500, 100, 500));
+
+        Map<String, String> oldIbpConfig = 
Collections.singletonMap(KafkaConfig.InterBrokerProtocolVersionProp(), 
IBP_2_7_IV1.version());
+        Map<Integer, Map<String, String>> brokerConfigOverrides = new 
HashMap<>();
+        brokerConfigOverrides.put(1, oldIbpConfig);
+        brokerConfigOverrides.put(2, oldIbpConfig);
+        brokerConfigOverrides.put(3, oldIbpConfig);
+
+        cluster = new ReassignPartitionsTestCluster(Collections.emptyMap(), 
brokerConfigOverrides);
+        cluster.setup();
+
+        executeAndVerifyReassignment();
+    }
+
+    private void executeAndVerifyReassignment() throws ExecutionException, 
InterruptedException, JsonProcessingException {
+        String assignment = "{\"version\":1,\"partitions\":" +
+            
"[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,1,3],\"log_dirs\":[\"any\",\"any\",\"any\"]},"
 +
+            
"{\"topic\":\"bar\",\"partition\":0,\"replicas\":[3,2,0],\"log_dirs\":[\"any\",\"any\",\"any\"]}"
 +
+            "]}";
+
+        TopicPartition foo0 = new TopicPartition("foo", 0);
+        TopicPartition bar0 = new TopicPartition("bar", 0);
+
+        // Check that the assignment has not yet been started yet.
+        Map<TopicPartition, PartitionReassignmentState> initialAssignment = 
new HashMap<>();
+
+        initialAssignment.put(foo0, new 
PartitionReassignmentState(Arrays.asList(0, 1, 2), Arrays.asList(0, 1, 3), 
true));
+        initialAssignment.put(bar0, new 
PartitionReassignmentState(Arrays.asList(3, 2, 1), Arrays.asList(3, 2, 0), 
true));
+
+        waitForVerifyAssignment(cluster.adminClient, assignment, false,
+            new VerifyAssignmentResult(initialAssignment));
+
+        // Execute the assignment
+        runExecuteAssignment(cluster.adminClient, false, assignment, -1L, -1L);
+        assertEquals(unthrottledBrokerConfigs, 
describeBrokerLevelThrottles(unthrottledBrokerConfigs.keySet()));
+        Map<TopicPartition, PartitionReassignmentState> finalAssignment = new 
HashMap<>();
+        finalAssignment.put(foo0, new 
PartitionReassignmentState(Arrays.asList(0, 1, 3), Arrays.asList(0, 1, 3), 
true));
+        finalAssignment.put(bar0, new 
PartitionReassignmentState(Arrays.asList(3, 2, 0), Arrays.asList(3, 2, 0), 
true));
+
+        kafka.admin.ReassignPartitionsCommand.VerifyAssignmentResult 
verifyAssignmentResult = runVerifyAssignment(cluster.adminClient, assignment, 
false);
+        assertFalse(verifyAssignmentResult.movesOngoing());
+
+        // Wait for the assignment to complete
+        waitForVerifyAssignment(cluster.adminClient, assignment, false,
+            new VerifyAssignmentResult(finalAssignment));
+
+        assertEquals(unthrottledBrokerConfigs,
+            describeBrokerLevelThrottles(unthrottledBrokerConfigs.keySet()));
+
+        // Verify that partitions are removed from brokers no longer assigned
+        verifyReplicaDeleted(foo0, 2);
+        verifyReplicaDeleted(bar0, 1);
+    }
+
+    @ParameterizedTest(name = "{displayName}.quorum={0}")
+    @ValueSource(strings = {"zk", "kraft"})
+    public void testHighWaterMarkAfterPartitionReassignment(String quorum) 
throws Exception {
+        cluster = new ReassignPartitionsTestCluster(Collections.emptyMap(), 
Collections.emptyMap());
+        cluster.setup();
+        String assignment = "{\"version\":1,\"partitions\":" +
+            
"[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[3,1,2],\"log_dirs\":[\"any\",\"any\",\"any\"]}"
 +
+            "]}";
+
+        // Set the high water mark of foo-0 to 123 on its leader.
+        TopicPartition part = new TopicPartition("foo", 0);
+        
cluster.servers.get(0).replicaManager().logManager().truncateFullyAndStartAt(part,
 123L, false, None$.empty());
+
+        // Execute the assignment
+        runExecuteAssignment(cluster.adminClient, false, assignment, -1L, -1L);
+        Map<TopicPartition, PartitionReassignmentState> finalAssignment = 
Collections.singletonMap(part,
+            new PartitionReassignmentState(Arrays.asList(3, 1, 2), 
Arrays.asList(3, 1, 2), true));
+
+        // Wait for the assignment to complete
+        waitForVerifyAssignment(cluster.adminClient, assignment, false,
+            new VerifyAssignmentResult(finalAssignment));
+
+        TestUtils.waitUntilTrue(() ->
+                cluster.servers.get(3).replicaManager().onlinePartition(part).
+                    map(Partition::leaderLogIfLocal).isDefined(),
+            () -> "broker 3 should be the new leader", 
org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS, 10L);
+        assertEquals(123L, 
cluster.servers.get(3).replicaManager().localLogOrException(part).highWatermark(),
+            "Expected broker 3 to have the correct high water mark for the 
partition.");
+    }
+
+    @ParameterizedTest(name = "{displayName}.quorum={0}")
+    @ValueSource(strings = {"zk", "kraft"})
+    public void testAlterReassignmentThrottle(String quorum) throws Exception {
+        cluster = new ReassignPartitionsTestCluster(Collections.emptyMap(), 
Collections.emptyMap());
+        cluster.setup();
+        cluster.produceMessages("foo", 0, 50);
+        cluster.produceMessages("baz", 2, 60);
+        String assignment = "{\"version\":1,\"partitions\":" +
+            
"[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,3,2],\"log_dirs\":[\"any\",\"any\",\"any\"]},"
 +
+            
"{\"topic\":\"baz\",\"partition\":2,\"replicas\":[3,2,1],\"log_dirs\":[\"any\",\"any\",\"any\"]}"
 +
+            "]}";
+
+        // Execute the assignment with a low throttle
+        long initialThrottle = 1L;
+        runExecuteAssignment(cluster.adminClient, false, assignment, 
initialThrottle, -1L);
+        waitForInterBrokerThrottle(Arrays.asList(0, 1, 2, 3), initialThrottle);
+
+        // Now update the throttle and verify the reassignment completes
+        long updatedThrottle = 300000L;
+        runExecuteAssignment(cluster.adminClient, true, assignment, 
updatedThrottle, -1L);
+        waitForInterBrokerThrottle(Arrays.asList(0, 1, 2, 3), updatedThrottle);
+
+        Map<TopicPartition, PartitionReassignmentState> finalAssignment = new 
HashMap<>();
+        finalAssignment.put(new TopicPartition("foo", 0),
+            new PartitionReassignmentState(Arrays.asList(0, 3, 2), 
Arrays.asList(0, 3, 2), true));
+        finalAssignment.put(new TopicPartition("baz", 2),
+            new PartitionReassignmentState(Arrays.asList(3, 2, 1), 
Arrays.asList(3, 2, 1), true));
+
+        // Now remove the throttles.
+        waitForVerifyAssignment(cluster.adminClient, assignment, false,
+            new VerifyAssignmentResult(finalAssignment));
+        waitForBrokerLevelThrottles(unthrottledBrokerConfigs);
+    }
+
+    /**
+     * Test running a reassignment with the interBrokerThrottle set.
+     */
+    @ParameterizedTest(name = "{displayName}.quorum={0}")
+    @ValueSource(strings = {"zk", "kraft"})
+    public void testThrottledReassignment(String quorum) throws Exception {
+        cluster = new ReassignPartitionsTestCluster(Collections.emptyMap(), 
Collections.emptyMap());
+        cluster.setup();
+        cluster.produceMessages("foo", 0, 50);
+        cluster.produceMessages("baz", 2, 60);
+        String assignment = "{\"version\":1,\"partitions\":" +
+            
"[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,3,2],\"log_dirs\":[\"any\",\"any\",\"any\"]},"
 +
+            
"{\"topic\":\"baz\",\"partition\":2,\"replicas\":[3,2,1],\"log_dirs\":[\"any\",\"any\",\"any\"]}"
 +
+            "]}";
+
+        // Check that the assignment has not yet been started yet.
+        Map<TopicPartition, PartitionReassignmentState> initialAssignment = 
new HashMap<>();
+        initialAssignment.put(new TopicPartition("foo", 0),
+            new PartitionReassignmentState(Arrays.asList(0, 1, 2), 
Arrays.asList(0, 3, 2), true));
+        initialAssignment.put(new TopicPartition("baz", 2),
+            new PartitionReassignmentState(Arrays.asList(0, 2, 1), 
Arrays.asList(3, 2, 1), true));
+        assertEquals(asScala(new VerifyAssignmentResult(initialAssignment)), 
runVerifyAssignment(cluster.adminClient, assignment, false));
+        assertEquals(unthrottledBrokerConfigs, 
describeBrokerLevelThrottles(unthrottledBrokerConfigs.keySet()));
+
+        // Execute the assignment
+        long interBrokerThrottle = 300000L;
+        runExecuteAssignment(cluster.adminClient, false, assignment, 
interBrokerThrottle, -1L);
+        waitForInterBrokerThrottle(Arrays.asList(0, 1, 2, 3), 
interBrokerThrottle);
+
+        Map<TopicPartition, PartitionReassignmentState> finalAssignment = new 
HashMap<>();
+        finalAssignment.put(new TopicPartition("foo", 0),
+            new PartitionReassignmentState(Arrays.asList(0, 3, 2), 
Arrays.asList(0, 3, 2), true));
+        finalAssignment.put(new TopicPartition("baz", 2),
+            new PartitionReassignmentState(Arrays.asList(3, 2, 1), 
Arrays.asList(3, 2, 1), true));
+
+        // Wait for the assignment to complete
+        TestUtils.waitUntilTrue(
+            () -> {
+                try {
+                    // Check the reassignment status.
+                    
kafka.admin.ReassignPartitionsCommand.VerifyAssignmentResult result = 
runVerifyAssignment(cluster.adminClient, assignment, true);
+
+                    if (!result.partsOngoing()) {
+                        return true;
+                    } else {
+                        assertFalse(
+                            
result.partStates().values().forall(ReassignPartitionsCommand.PartitionReassignmentState::done),
+                            "Expected at least one partition reassignment to 
be ongoing when result = " + result
+                        );
+                        assertEquals(seq(0, 3, 2), result.partStates().get(new 
TopicPartition("foo", 0)).get().targetReplicas());
+                        assertEquals(seq(3, 2, 1), result.partStates().get(new 
TopicPartition("baz", 2)).get().targetReplicas());
+                        System.out.println("Current result: " + result);
+                        waitForInterBrokerThrottle(Arrays.asList(0, 1, 2, 3), 
interBrokerThrottle);
+                        return false;
+                    }
+                } catch (ExecutionException | InterruptedException | 
JsonProcessingException e) {
+                    throw new RuntimeException(e);
+                }
+            }, () -> "Expected reassignment to complete.", 
org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS, 100L);
+        waitForVerifyAssignment(cluster.adminClient, assignment, true,
+            new VerifyAssignmentResult(finalAssignment));
+        // The throttles should still have been preserved, since we ran with 
--preserve-throttles
+        waitForInterBrokerThrottle(Arrays.asList(0, 1, 2, 3), 
interBrokerThrottle);
+        // Now remove the throttles.
+        waitForVerifyAssignment(cluster.adminClient, assignment, false,
+            new VerifyAssignmentResult(finalAssignment));
+        waitForBrokerLevelThrottles(unthrottledBrokerConfigs);
+    }
+
+    @ParameterizedTest(name = "{displayName}.quorum={0}")
+    @ValueSource(strings = {"zk", "kraft"})
+    public void testProduceAndConsumeWithReassignmentInProgress(String quorum) 
throws Exception {
+        cluster = new ReassignPartitionsTestCluster(Collections.emptyMap(), 
Collections.emptyMap());
+        cluster.setup();
+        cluster.produceMessages("baz", 2, 60);
+        String assignment = "{\"version\":1,\"partitions\":" +
+            
"[{\"topic\":\"baz\",\"partition\":2,\"replicas\":[3,2,1],\"log_dirs\":[\"any\",\"any\",\"any\"]}"
 +
+            "]}";
+        runExecuteAssignment(cluster.adminClient, false, assignment, 300L, 
-1L);
+        cluster.produceMessages("baz", 2, 100);
+        Consumer<byte[], byte[]> consumer = 
TestUtils.createConsumer(cluster.brokerList,
+            "group",
+            "earliest",
+            true,
+            false,
+            500,
+            SecurityProtocol.PLAINTEXT,
+            None$.empty(),
+            None$.empty(),
+            new ByteArrayDeserializer(),
+            new ByteArrayDeserializer()
+        );
+
+        TopicPartition part = new TopicPartition("baz", 2);
+        try {
+            consumer.assign(Collections.singleton(part));
+            TestUtils.pollUntilAtLeastNumRecords(consumer, 100, 
org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS);
+        } finally {
+            consumer.close();
+        }
+        TestUtils.removeReplicationThrottleForPartitions(cluster.adminClient, 
seq(0, 1, 2, 3), set(part));
+        Map<TopicPartition, PartitionReassignmentState> finalAssignment = 
Collections.singletonMap(part,
+            new PartitionReassignmentState(Arrays.asList(3, 2, 1), 
Arrays.asList(3, 2, 1), true));
+        waitForVerifyAssignment(cluster.adminClient, assignment, false,
+            new VerifyAssignmentResult(finalAssignment));
+    }
+
+    /**
+     * Test running a reassignment and then cancelling it.
+     */
+    @ParameterizedTest(name = "{displayName}.quorum={0}")
+    @ValueSource(strings = {"zk", "kraft"})
+    public void testCancellation(String quorum) throws Exception {
+        TopicPartition foo0 = new TopicPartition("foo", 0);
+        TopicPartition baz1 = new TopicPartition("baz", 1);
+
+        cluster = new ReassignPartitionsTestCluster(Collections.emptyMap(), 
Collections.emptyMap());
+        cluster.setup();
+        cluster.produceMessages(foo0.topic(), foo0.partition(), 200);
+        cluster.produceMessages(baz1.topic(), baz1.partition(), 200);
+        String assignment = "{\"version\":1,\"partitions\":" +
+            
"[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,1,3],\"log_dirs\":[\"any\",\"any\",\"any\"]},"
 +
+            
"{\"topic\":\"baz\",\"partition\":1,\"replicas\":[0,2,3],\"log_dirs\":[\"any\",\"any\",\"any\"]}"
 +
+            "]}";
+        assertEquals(unthrottledBrokerConfigs,
+            describeBrokerLevelThrottles(unthrottledBrokerConfigs.keySet()));
+        long interBrokerThrottle = 1L;
+        runExecuteAssignment(cluster.adminClient, false, assignment, 
interBrokerThrottle, -1L);
+        waitForInterBrokerThrottle(Arrays.asList(0, 1, 2, 3), 
interBrokerThrottle);
+
+        Map<TopicPartition, PartitionReassignmentState> partStates = new 
HashMap<>();
+
+        partStates.put(foo0, new PartitionReassignmentState(Arrays.asList(0, 
1, 3, 2), Arrays.asList(0, 1, 3), false));
+        partStates.put(baz1, new PartitionReassignmentState(Arrays.asList(0, 
2, 3, 1), Arrays.asList(0, 2, 3), false));
+
+        // Verify that the reassignment is running.  The very low throttle 
should keep it
+        // from completing before this runs.
+        waitForVerifyAssignment(cluster.adminClient, assignment, true,
+            new VerifyAssignmentResult(partStates, true, 
Collections.emptyMap(), false));
+        // Cancel the reassignment.
+        assertEquals(new Tuple2<>(set(foo0, baz1), set()), 
runCancelAssignment(cluster.adminClient, assignment, true));
+        // Broker throttles are still active because we passed 
--preserve-throttles
+        waitForInterBrokerThrottle(Arrays.asList(0, 1, 2, 3), 
interBrokerThrottle);
+        // Cancelling the reassignment again should reveal nothing to cancel.
+        assertEquals(new Tuple2<>(set(), set()), 
runCancelAssignment(cluster.adminClient, assignment, false));
+        // This time, the broker throttles were removed.
+        waitForBrokerLevelThrottles(unthrottledBrokerConfigs);
+        // Verify that there are no ongoing reassignments.
+        assertFalse(runVerifyAssignment(cluster.adminClient, assignment, 
false).partsOngoing());
+        // Verify that the partition is removed from cancelled replicas
+        verifyReplicaDeleted(foo0, 3);
+        verifyReplicaDeleted(baz1, 3);
+    }
+
+    @ParameterizedTest(name = "{displayName}.quorum={0}")
+    @ValueSource(strings = {"zk", "kraft"})
+    public void testCancellationWithAddingReplicaInIsr(String quorum) throws 
Exception {
+        TopicPartition foo0 = new TopicPartition("foo", 0);
+
+        cluster = new ReassignPartitionsTestCluster(Collections.emptyMap(), 
Collections.emptyMap());
+        cluster.setup();
+        cluster.produceMessages(foo0.topic(), foo0.partition(), 200);
+
+        // The reassignment will bring replicas 3 and 4 into the replica set 
and remove 1 and 2.
+        String assignment = "{\"version\":1,\"partitions\":" +
+            
"[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,3,4],\"log_dirs\":[\"any\",\"any\",\"any\"]}"
 +
+            "]}";
+
+        // We will throttle replica 4 so that only replica 3 joins the ISR
+        TestUtils.setReplicationThrottleForPartitions(
+            cluster.adminClient,
+            seq(4),
+            set(foo0),
+            1
+        );
+
+        // Execute the assignment and wait for replica 3 (only) to join the ISR
+        runExecuteAssignment(
+            cluster.adminClient,
+            false,
+            assignment,
+            -1L,
+            -1L
+        );
+        TestUtils.waitUntilTrue(
+            () -> Objects.equals(TestUtils.currentIsr(cluster.adminClient, 
foo0), set(0, 1, 2, 3)),
+            () -> "Timed out while waiting for replica 3 to join the ISR",
+            org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS, 100L
+        );
+
+        // Now cancel the assignment and verify that the partition is removed 
from cancelled replicas
+        assertEquals(new Tuple2<>(set(foo0), set()), 
runCancelAssignment(cluster.adminClient, assignment, true));
+        verifyReplicaDeleted(foo0, 3);
+        verifyReplicaDeleted(foo0, 4);
+    }
+
+    private void verifyReplicaDeleted(
+        TopicPartition topicPartition,
+        Integer replicaId
+    ) {
+        TestUtils.waitUntilTrue(
+            () -> {
+                KafkaBroker server = cluster.servers.get(replicaId);
+                HostedPartition partition = 
server.replicaManager().getPartition(topicPartition);
+                Option<UnifiedLog> log = 
server.logManager().getLog(topicPartition, false);
+                return partition == HostedPartition.None$.MODULE$ && 
log.isEmpty();
+            },
+            () -> "Timed out waiting for replica " + replicaId + " of " + 
topicPartition + " to be deleted",
+            org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS,
+            100L
+        );
+    }
+
+    private void waitForLogDirThrottle(Set<Integer> throttledBrokers, Long 
logDirThrottle) {
+        Map<String, Long> throttledConfigMap = new HashMap<>();
+        
throttledConfigMap.put(ReassignPartitionsCommand.brokerLevelLeaderThrottle(), 
-1L);
+        
throttledConfigMap.put(ReassignPartitionsCommand.brokerLevelFollowerThrottle(), 
-1L);
+        
throttledConfigMap.put(ReassignPartitionsCommand.brokerLevelLogDirThrottle(), 
logDirThrottle);
+        waitForBrokerThrottles(throttledBrokers, throttledConfigMap);
+    }
+
+    private void waitForInterBrokerThrottle(List<Integer> throttledBrokers, 
Long interBrokerThrottle) {
+        Map<String, Long> throttledConfigMap = new HashMap<>();
+        
throttledConfigMap.put(ReassignPartitionsCommand.brokerLevelLeaderThrottle(), 
interBrokerThrottle);
+        
throttledConfigMap.put(ReassignPartitionsCommand.brokerLevelFollowerThrottle(), 
interBrokerThrottle);
+        
throttledConfigMap.put(ReassignPartitionsCommand.brokerLevelLogDirThrottle(), 
-1L);
+        waitForBrokerThrottles(throttledBrokers, throttledConfigMap);
+    }
+
+    private void waitForBrokerThrottles(Collection<Integer> throttledBrokers, 
Map<String, Long> throttleConfig) {
+        Map<Integer, Map<String, Long>> throttledBrokerConfigs = new 
HashMap<>();
+        unthrottledBrokerConfigs.forEach((brokerId, unthrottledConfig) -> {
+            Map<String, Long> expectedThrottleConfig = 
throttledBrokers.contains(brokerId)
+                ? throttleConfig
+                : unthrottledConfig;
+            throttledBrokerConfigs.put(brokerId, expectedThrottleConfig);
+        });
+        waitForBrokerLevelThrottles(throttledBrokerConfigs);
+    }
+
+    private void waitForBrokerLevelThrottles(Map<Integer, Map<String, Long>> 
targetThrottles) {
+        AtomicReference<Map<Integer, Map<String, Long>>> curThrottles = new 
AtomicReference<>(new HashMap<>());
+        TestUtils.waitUntilTrue(() -> {
+            try {
+                
curThrottles.set(describeBrokerLevelThrottles(targetThrottles.keySet()));
+                return targetThrottles.equals(curThrottles.get());
+            } catch (ExecutionException | InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        }, () -> "timed out waiting for broker throttle to become " + 
targetThrottles + ".  " +
+            "Latest throttles were " + curThrottles.get(), 
org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS, 25);
+    }
+
+    /**
+     * Describe the broker-level throttles in the cluster.
+     *
+     * @return                A map whose keys are broker IDs and whose values 
are throttle
+     *                        information.  The nested maps are keyed on 
throttle name.
+     */
+    private Map<Integer, Map<String, Long>> 
describeBrokerLevelThrottles(Collection<Integer> brokerIds) throws 
ExecutionException, InterruptedException {
+        Map<Integer, Map<String, Long>> results = new HashMap<>();
+        for (Integer brokerId : brokerIds) {
+            ConfigResource brokerResource = new 
ConfigResource(ConfigResource.Type.BROKER, brokerId.toString());
+            Config brokerConfigs = 
cluster.adminClient.describeConfigs(Collections.singleton(brokerResource)).values()
+                .get(brokerResource)
+                .get();
+
+            Map<String, Long> throttles = new HashMap<>();
+            
ReassignPartitionsCommand.brokerLevelThrottles().foreach(throttleName -> {
+                String configValue = 
Optional.ofNullable(brokerConfigs.get(throttleName)).map(ConfigEntry::value).orElse("-1");
+                throttles.put(throttleName, Long.parseLong(configValue));
+                return null;
+            });
+            results.put(brokerId, throttles);
+        }
+        return results;
+    }
+
+    /**
+     * Test moving partitions between directories.
+     */
+    @ParameterizedTest(name = "{displayName}.quorum={0}")
+    @ValueSource(strings = "zk") // JBOD not yet implemented for KRaft
+    public void testLogDirReassignment(String quorum) throws Exception {
+        TopicPartition topicPartition = new TopicPartition("foo", 0);
+
+        cluster = new ReassignPartitionsTestCluster(Collections.emptyMap(), 
Collections.emptyMap());
+        cluster.setup();
+        cluster.produceMessages(topicPartition.topic(), 
topicPartition.partition(), 700);
+
+        int targetBrokerId = 0;
+        List<Integer> replicas = Arrays.asList(0, 1, 2);
+        LogDirReassignment reassignment = 
buildLogDirReassignment(topicPartition, targetBrokerId, replicas);
+
+        // Start the replica move, but throttle it to be very slow so that it 
can't complete
+        // before our next checks happen.
+        long logDirThrottle = 1L;
+        runExecuteAssignment(cluster.adminClient, false, reassignment.json,
+            -1L, logDirThrottle);
+
+        // Check the output of --verify
+        waitForVerifyAssignment(cluster.adminClient, reassignment.json, true,
+            new VerifyAssignmentResult(Collections.singletonMap(
+                topicPartition, new 
PartitionReassignmentState(Arrays.asList(0, 1, 2), Arrays.asList(0, 1, 2), true)
+            ), false, Collections.singletonMap(
+                new TopicPartitionReplica(topicPartition.topic(), 
topicPartition.partition(), 0),
+                new ActiveMoveState(reassignment.currentDir, 
reassignment.targetDir, reassignment.targetDir)
+            ), true));
+        waitForLogDirThrottle(Collections.singleton(0), logDirThrottle);
+
+        // Remove the throttle
+        cluster.adminClient.incrementalAlterConfigs(Collections.singletonMap(
+                new ConfigResource(ConfigResource.Type.BROKER, "0"),
+                Collections.singletonList(new AlterConfigOp(
+                    new 
ConfigEntry(ReassignPartitionsCommand.brokerLevelLogDirThrottle(), ""), 
AlterConfigOp.OpType.DELETE))))
+            .all().get();
+        waitForBrokerLevelThrottles(unthrottledBrokerConfigs);
+
+        // Wait for the directory movement to complete.
+        waitForVerifyAssignment(cluster.adminClient, reassignment.json, true,
+            new VerifyAssignmentResult(Collections.singletonMap(
+                topicPartition, new 
PartitionReassignmentState(Arrays.asList(0, 1, 2), Arrays.asList(0, 1, 2), true)
+            ), false, Collections.singletonMap(
+                new TopicPartitionReplica(topicPartition.topic(), 
topicPartition.partition(), 0),
+                new CompletedMoveState(reassignment.targetDir)
+            ), false));
+
+        BrokerDirs info1 = new 
BrokerDirs(cluster.adminClient.describeLogDirs(IntStream.range(0, 
4).boxed().collect(Collectors.toList())), 0);
+        assertEquals(reassignment.targetDir, 
info1.curLogDirs.getOrDefault(topicPartition, ""));
+    }
+
+    @ParameterizedTest(name = "{displayName}.quorum={0}")
+    @ValueSource(strings = "zk") // JBOD not yet implemented for KRaft
+    public void testAlterLogDirReassignmentThrottle(String quorum) throws 
Exception {
+        TopicPartition topicPartition = new TopicPartition("foo", 0);
+
+        cluster = new ReassignPartitionsTestCluster(Collections.emptyMap(), 
Collections.emptyMap());
+        cluster.setup();
+        cluster.produceMessages(topicPartition.topic(), 
topicPartition.partition(), 700);
+
+        int targetBrokerId = 0;
+        List<Integer> replicas = Arrays.asList(0, 1, 2);
+        LogDirReassignment reassignment = 
buildLogDirReassignment(topicPartition, targetBrokerId, replicas);
+
+        // Start the replica move with a low throttle so it does not complete
+        long initialLogDirThrottle = 1L;
+        runExecuteAssignment(cluster.adminClient, false, reassignment.json,
+            -1L, initialLogDirThrottle);
+        waitForLogDirThrottle(new HashSet<>(Collections.singletonList(0)), 
initialLogDirThrottle);
+
+        // Now increase the throttle and verify that the log dir movement 
completes
+        long updatedLogDirThrottle = 3000000L;
+        runExecuteAssignment(cluster.adminClient, true, reassignment.json,
+            -1L, updatedLogDirThrottle);
+        waitForLogDirThrottle(Collections.singleton(0), updatedLogDirThrottle);
+
+        waitForVerifyAssignment(cluster.adminClient, reassignment.json, true,
+            new VerifyAssignmentResult(Collections.singletonMap(
+                topicPartition, new 
PartitionReassignmentState(Arrays.asList(0, 1, 2), Arrays.asList(0, 1, 2), true)
+            ), false, Collections.singletonMap(
+                new TopicPartitionReplica(topicPartition.topic(), 
topicPartition.partition(), targetBrokerId),
+                new CompletedMoveState(reassignment.targetDir)
+            ), false));
+    }
+
+    static class LogDirReassignment {
+        final String json;
+        final String currentDir;
+        final String targetDir;
+
+        public LogDirReassignment(String json, String currentDir, String 
targetDir) {
+            this.json = json;
+            this.currentDir = currentDir;
+            this.targetDir = targetDir;
+        }
+    }
+
+    private LogDirReassignment buildLogDirReassignment(TopicPartition 
topicPartition,
+                                                       int brokerId,
+                                                       List<Integer> replicas) 
throws ExecutionException, InterruptedException {
+
+        DescribeLogDirsResult describeLogDirsResult = 
cluster.adminClient.describeLogDirs(
+            IntStream.range(0, 4).boxed().collect(Collectors.toList()));
+
+        BrokerDirs logDirInfo = new BrokerDirs(describeLogDirsResult, 
brokerId);
+        assertTrue(logDirInfo.futureLogDirs.isEmpty());
+
+        String currentDir = logDirInfo.curLogDirs.get(topicPartition);
+        String newDir = logDirInfo.logDirs.stream().filter(dir -> 
!dir.equals(currentDir)).findFirst().get();
+
+        List<String> logDirs = replicas.stream().map(replicaId -> {
+            if (replicaId == brokerId)
+                return "\"" + newDir + "\"";
+            else
+                return "\"any\"";
+        }).collect(Collectors.toList());
+
+        String reassignmentJson =
+            " { \"version\": 1," +
+                "  \"partitions\": [" +
+                "    {" +
+                "     \"topic\": \"" + topicPartition.topic() + "\"," +
+                "     \"partition\": " + topicPartition.partition() + "," +
+                "     \"replicas\": [" + 
replicas.stream().map(Object::toString).collect(Collectors.joining(",")) + "]," 
+
+                "     \"log_dirs\": [" + String.join(",", logDirs) + "]" +
+                "    }" +
+                "   ]" +
+                "  }";
+
+        return new LogDirReassignment(reassignmentJson, currentDir, newDir);
+    }
+
+
+
+    private kafka.admin.ReassignPartitionsCommand.VerifyAssignmentResult 
runVerifyAssignment(Admin adminClient, String jsonString,
+                                                       Boolean 
preserveThrottles) throws ExecutionException, InterruptedException, 
JsonProcessingException {
+        System.out.println("==> verifyAssignment(adminClient, jsonString=" + 
jsonString);
+        return ReassignPartitionsCommand.verifyAssignment(adminClient, 
jsonString, preserveThrottles);
+    }
+
+    private void waitForVerifyAssignment(Admin adminClient,
+                                         String jsonString,
+                                         Boolean preserveThrottles,
+                                         VerifyAssignmentResult 
expectedResult) {
+        final kafka.admin.ReassignPartitionsCommand.VerifyAssignmentResult 
expectedResult0 = asScala(expectedResult);
+        final kafka.admin.ReassignPartitionsCommand.VerifyAssignmentResult[] 
latestResult = {null};
+        TestUtils.waitUntilTrue(
+            () -> {
+                try {
+                    latestResult[0] = runVerifyAssignment(adminClient, 
jsonString, preserveThrottles);
+                } catch (ExecutionException | InterruptedException | 
JsonProcessingException e) {
+                    throw new RuntimeException(e);
+                }
+                return expectedResult0.equals(latestResult[0]);
+            }, () -> "Timed out waiting for verifyAssignment result " + 
expectedResult + ".  " +
+                "The latest result was " + latestResult[0], 
org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS, 10L);
+    }
+
+    private void runExecuteAssignment(Admin adminClient,
+                                      Boolean additional,
+                                      String reassignmentJson,
+                                      Long interBrokerThrottle,
+                                      Long replicaAlterLogDirsThrottle) {
+        System.out.println("==> executeAssignment(adminClient, additional=" + 
additional + ", " +
+            "reassignmentJson=" + reassignmentJson + ", " +
+            "interBrokerThrottle=" + interBrokerThrottle + ", " +
+            "replicaAlterLogDirsThrottle=" + replicaAlterLogDirsThrottle + 
"))");
+        ReassignPartitionsCommand.executeAssignment(adminClient, additional, 
reassignmentJson,

Review Comment:
   Yes. fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to