jolshan commented on code in PR #14456:
URL: https://github.com/apache/kafka/pull/14456#discussion_r1341851404


##########
tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsIntegrationTest.java:
##########
@@ -0,0 +1,924 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.reassign;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import kafka.admin.ReassignPartitionsCommand;
+import kafka.cluster.Partition;
+import kafka.log.UnifiedLog;
+import kafka.server.HostedPartition;
+import kafka.server.IsrChangePropagationConfig;
+import kafka.server.KafkaBroker;
+import kafka.server.KafkaConfig;
+import kafka.server.QuorumTestHarness;
+import kafka.server.ZkAlterPartitionManager;
+import kafka.utils.TestUtils;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.AlterConfigOp;
+import org.apache.kafka.clients.admin.Config;
+import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.DescribeLogDirsResult;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionReplica;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import scala.None$;
+import scala.Option;
+import scala.Some$;
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.kafka.server.common.MetadataVersion.IBP_2_7_IV1;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@SuppressWarnings("ClassFanOutComplexity")
+@Timeout(300)
+public class ReassignPartitionsIntegrationTest extends QuorumTestHarness {
+    ReassignPartitionsTestCluster cluster;
+
+    @AfterEach
+    @Override
+    public void tearDown() {
+        Utils.closeQuietly(cluster, "ReassignPartitionsTestCluster");
+        super.tearDown();
+    }
+
+    private final Map<Integer, Map<String, Long>> unthrottledBrokerConfigs = 
new HashMap<>(); {
+        IntStream.range(0, 4).forEach(brokerId -> {
+            Map<String, Long> brokerConfig = new HashMap<>();
+
+            ReassignPartitionsCommand.brokerLevelThrottles().foreach(throttle 
-> {
+                brokerConfig.put(throttle, -1L);
+                return null;
+            });
+
+            unthrottledBrokerConfigs.put(brokerId, brokerConfig);
+        });
+    }
+
+    @ParameterizedTest(name = "{displayName}.quorum={0}")
+    @ValueSource(strings = {"zk", "kraft"})
+    public void testReassignment(String quorum) throws Exception {
+        cluster = new ReassignPartitionsTestCluster(Collections.emptyMap(), 
Collections.emptyMap());
+        cluster.setup();
+        executeAndVerifyReassignment();
+    }
+
+    @ParameterizedTest(name = "{displayName}.quorum={0}")
+    @ValueSource(strings = "zk") // Note: KRaft requires AlterPartition
+    public void testReassignmentWithAlterPartitionDisabled(String quorum) 
throws Exception {
+        // Test reassignment when the IBP is on an older version which does 
not use
+        // the `AlterPartition` API. In this case, the controller will 
register individual
+        // watches for each reassigning partition so that the reassignment can 
be
+        // completed as soon as the ISR is expanded.
+        Map<String, String> configOverrides = 
Collections.singletonMap(KafkaConfig.InterBrokerProtocolVersionProp(), 
IBP_2_7_IV1.version());
+        cluster = new ReassignPartitionsTestCluster(configOverrides, 
Collections.emptyMap());
+        cluster.setup();
+        executeAndVerifyReassignment();
+    }
+
+    @ParameterizedTest(name = "{displayName}.quorum={0}")
+    @ValueSource(strings = "zk") // Note: KRaft requires AlterPartition
+    public void testReassignmentCompletionDuringPartialUpgrade(String quorum) 
throws Exception {
+        // Test reassignment during a partial upgrade when some brokers are 
relying on
+        // `AlterPartition` and some rely on the old notification logic 
through Zookeeper.
+        // In this test case, broker 0 starts up first on the latest IBP and 
is typically
+        // elected as controller. The three remaining brokers start up on the 
older IBP.
+        // We want to ensure that reassignment can still complete through the 
ISR change
+        // notification path even though the controller expects 
`AlterPartition`.
+
+        // Override change notification settings so that test is not delayed 
by ISR
+        // change notification delay
+        ZkAlterPartitionManager.DefaultIsrPropagationConfig_$eq(new 
IsrChangePropagationConfig(500, 100, 500));
+
+        Map<String, String> oldIbpConfig = 
Collections.singletonMap(KafkaConfig.InterBrokerProtocolVersionProp(), 
IBP_2_7_IV1.version());
+        Map<Integer, Map<String, String>> brokerConfigOverrides = new 
HashMap<>();
+        brokerConfigOverrides.put(1, oldIbpConfig);
+        brokerConfigOverrides.put(2, oldIbpConfig);
+        brokerConfigOverrides.put(3, oldIbpConfig);
+
+        cluster = new ReassignPartitionsTestCluster(Collections.emptyMap(), 
brokerConfigOverrides);
+        cluster.setup();
+
+        executeAndVerifyReassignment();
+    }
+
+    private void executeAndVerifyReassignment() throws ExecutionException, 
InterruptedException, JsonProcessingException {
+        String assignment = "{\"version\":1,\"partitions\":" +
+            
"[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,1,3],\"log_dirs\":[\"any\",\"any\",\"any\"]},"
 +
+            
"{\"topic\":\"bar\",\"partition\":0,\"replicas\":[3,2,0],\"log_dirs\":[\"any\",\"any\",\"any\"]}"
 +
+            "]}";
+
+        TopicPartition foo0 = new TopicPartition("foo", 0);
+        TopicPartition bar0 = new TopicPartition("bar", 0);
+
+        // Check that the assignment has not yet been started yet.
+        Map<TopicPartition, PartitionReassignmentState> initialAssignment = 
new HashMap<>();
+
+        initialAssignment.put(foo0, new 
PartitionReassignmentState(Arrays.asList(0, 1, 2), Arrays.asList(0, 1, 3), 
true));
+        initialAssignment.put(bar0, new 
PartitionReassignmentState(Arrays.asList(3, 2, 1), Arrays.asList(3, 2, 0), 
true));
+
+        waitForVerifyAssignment(cluster.adminClient, assignment, false,
+            new VerifyAssignmentResult(initialAssignment));
+
+        // Execute the assignment
+        runExecuteAssignment(cluster.adminClient, false, assignment, -1L, -1L);
+        assertEquals(unthrottledBrokerConfigs, 
describeBrokerLevelThrottles(unthrottledBrokerConfigs.keySet()));
+        Map<TopicPartition, PartitionReassignmentState> finalAssignment = new 
HashMap<>();
+        finalAssignment.put(foo0, new 
PartitionReassignmentState(Arrays.asList(0, 1, 3), Arrays.asList(0, 1, 3), 
true));
+        finalAssignment.put(bar0, new 
PartitionReassignmentState(Arrays.asList(3, 2, 0), Arrays.asList(3, 2, 0), 
true));
+
+        kafka.admin.ReassignPartitionsCommand.VerifyAssignmentResult 
verifyAssignmentResult = runVerifyAssignment(cluster.adminClient, assignment, 
false);
+        assertFalse(verifyAssignmentResult.movesOngoing());
+
+        // Wait for the assignment to complete
+        waitForVerifyAssignment(cluster.adminClient, assignment, false,
+            new VerifyAssignmentResult(finalAssignment));
+
+        assertEquals(unthrottledBrokerConfigs,
+            describeBrokerLevelThrottles(unthrottledBrokerConfigs.keySet()));
+
+        // Verify that partitions are removed from brokers no longer assigned
+        verifyReplicaDeleted(foo0, 2);
+        verifyReplicaDeleted(bar0, 1);
+    }
+
+    @ParameterizedTest(name = "{displayName}.quorum={0}")
+    @ValueSource(strings = {"zk", "kraft"})
+    public void testHighWaterMarkAfterPartitionReassignment(String quorum) 
throws Exception {
+        cluster = new ReassignPartitionsTestCluster(Collections.emptyMap(), 
Collections.emptyMap());
+        cluster.setup();
+        String assignment = "{\"version\":1,\"partitions\":" +
+            
"[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[3,1,2],\"log_dirs\":[\"any\",\"any\",\"any\"]}"
 +
+            "]}";
+
+        // Set the high water mark of foo-0 to 123 on its leader.
+        TopicPartition part = new TopicPartition("foo", 0);
+        
cluster.servers.get(0).replicaManager().logManager().truncateFullyAndStartAt(part,
 123L, false, None$.empty());
+
+        // Execute the assignment
+        runExecuteAssignment(cluster.adminClient, false, assignment, -1L, -1L);
+        Map<TopicPartition, PartitionReassignmentState> finalAssignment = 
Collections.singletonMap(part,
+            new PartitionReassignmentState(Arrays.asList(3, 1, 2), 
Arrays.asList(3, 1, 2), true));
+
+        // Wait for the assignment to complete
+        waitForVerifyAssignment(cluster.adminClient, assignment, false,
+            new VerifyAssignmentResult(finalAssignment));
+
+        TestUtils.waitUntilTrue(() ->
+                cluster.servers.get(3).replicaManager().onlinePartition(part).
+                    map(Partition::leaderLogIfLocal).isDefined(),
+            () -> "broker 3 should be the new leader", 
org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS, 10L);
+        assertEquals(123L, 
cluster.servers.get(3).replicaManager().localLogOrException(part).highWatermark(),
+            "Expected broker 3 to have the correct high water mark for the 
partition.");
+    }
+
+    @ParameterizedTest(name = "{displayName}.quorum={0}")
+    @ValueSource(strings = {"zk", "kraft"})
+    public void testAlterReassignmentThrottle(String quorum) throws Exception {
+        cluster = new ReassignPartitionsTestCluster(Collections.emptyMap(), 
Collections.emptyMap());
+        cluster.setup();
+        cluster.produceMessages("foo", 0, 50);
+        cluster.produceMessages("baz", 2, 60);
+        String assignment = "{\"version\":1,\"partitions\":" +
+            
"[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,3,2],\"log_dirs\":[\"any\",\"any\",\"any\"]},"
 +
+            
"{\"topic\":\"baz\",\"partition\":2,\"replicas\":[3,2,1],\"log_dirs\":[\"any\",\"any\",\"any\"]}"
 +
+            "]}";
+
+        // Execute the assignment with a low throttle
+        long initialThrottle = 1L;
+        runExecuteAssignment(cluster.adminClient, false, assignment, 
initialThrottle, -1L);
+        waitForInterBrokerThrottle(Arrays.asList(0, 1, 2, 3), initialThrottle);
+
+        // Now update the throttle and verify the reassignment completes
+        long updatedThrottle = 300000L;
+        runExecuteAssignment(cluster.adminClient, true, assignment, 
updatedThrottle, -1L);
+        waitForInterBrokerThrottle(Arrays.asList(0, 1, 2, 3), updatedThrottle);
+
+        Map<TopicPartition, PartitionReassignmentState> finalAssignment = new 
HashMap<>();
+        finalAssignment.put(new TopicPartition("foo", 0),
+            new PartitionReassignmentState(Arrays.asList(0, 3, 2), 
Arrays.asList(0, 3, 2), true));
+        finalAssignment.put(new TopicPartition("baz", 2),
+            new PartitionReassignmentState(Arrays.asList(3, 2, 1), 
Arrays.asList(3, 2, 1), true));
+
+        // Now remove the throttles.
+        waitForVerifyAssignment(cluster.adminClient, assignment, false,
+            new VerifyAssignmentResult(finalAssignment));
+        waitForBrokerLevelThrottles(unthrottledBrokerConfigs);
+    }
+
+    /**
+     * Test running a reassignment with the interBrokerThrottle set.
+     */
+    @ParameterizedTest(name = "{displayName}.quorum={0}")
+    @ValueSource(strings = {"zk", "kraft"})
+    public void testThrottledReassignment(String quorum) throws Exception {
+        cluster = new ReassignPartitionsTestCluster(Collections.emptyMap(), 
Collections.emptyMap());
+        cluster.setup();
+        cluster.produceMessages("foo", 0, 50);
+        cluster.produceMessages("baz", 2, 60);
+        String assignment = "{\"version\":1,\"partitions\":" +
+            
"[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,3,2],\"log_dirs\":[\"any\",\"any\",\"any\"]},"
 +
+            
"{\"topic\":\"baz\",\"partition\":2,\"replicas\":[3,2,1],\"log_dirs\":[\"any\",\"any\",\"any\"]}"
 +
+            "]}";
+
+        // Check that the assignment has not yet been started yet.
+        Map<TopicPartition, PartitionReassignmentState> initialAssignment = 
new HashMap<>();
+        initialAssignment.put(new TopicPartition("foo", 0),
+            new PartitionReassignmentState(Arrays.asList(0, 1, 2), 
Arrays.asList(0, 3, 2), true));
+        initialAssignment.put(new TopicPartition("baz", 2),
+            new PartitionReassignmentState(Arrays.asList(0, 2, 1), 
Arrays.asList(3, 2, 1), true));
+        assertEquals(asScala(new VerifyAssignmentResult(initialAssignment)), 
runVerifyAssignment(cluster.adminClient, assignment, false));
+        assertEquals(unthrottledBrokerConfigs, 
describeBrokerLevelThrottles(unthrottledBrokerConfigs.keySet()));
+
+        // Execute the assignment
+        long interBrokerThrottle = 300000L;
+        runExecuteAssignment(cluster.adminClient, false, assignment, 
interBrokerThrottle, -1L);
+        waitForInterBrokerThrottle(Arrays.asList(0, 1, 2, 3), 
interBrokerThrottle);
+
+        Map<TopicPartition, PartitionReassignmentState> finalAssignment = new 
HashMap<>();
+        finalAssignment.put(new TopicPartition("foo", 0),
+            new PartitionReassignmentState(Arrays.asList(0, 3, 2), 
Arrays.asList(0, 3, 2), true));
+        finalAssignment.put(new TopicPartition("baz", 2),
+            new PartitionReassignmentState(Arrays.asList(3, 2, 1), 
Arrays.asList(3, 2, 1), true));
+
+        // Wait for the assignment to complete
+        TestUtils.waitUntilTrue(
+            () -> {
+                try {
+                    // Check the reassignment status.
+                    
kafka.admin.ReassignPartitionsCommand.VerifyAssignmentResult result = 
runVerifyAssignment(cluster.adminClient, assignment, true);
+
+                    if (!result.partsOngoing()) {
+                        return true;
+                    } else {
+                        assertFalse(
+                            
result.partStates().values().forall(ReassignPartitionsCommand.PartitionReassignmentState::done),
+                            "Expected at least one partition reassignment to 
be ongoing when result = " + result
+                        );
+                        assertEquals(seq(0, 3, 2), result.partStates().get(new 
TopicPartition("foo", 0)).get().targetReplicas());
+                        assertEquals(seq(3, 2, 1), result.partStates().get(new 
TopicPartition("baz", 2)).get().targetReplicas());
+                        System.out.println("Current result: " + result);
+                        waitForInterBrokerThrottle(Arrays.asList(0, 1, 2, 3), 
interBrokerThrottle);
+                        return false;
+                    }
+                } catch (ExecutionException | InterruptedException | 
JsonProcessingException e) {

Review Comment:
   I see it has been removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to