jolshan commented on code in PR #14456: URL: https://github.com/apache/kafka/pull/14456#discussion_r1341851404
########## tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsIntegrationTest.java: ########## @@ -0,0 +1,924 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.reassign; + +import com.fasterxml.jackson.core.JsonProcessingException; +import kafka.admin.ReassignPartitionsCommand; +import kafka.cluster.Partition; +import kafka.log.UnifiedLog; +import kafka.server.HostedPartition; +import kafka.server.IsrChangePropagationConfig; +import kafka.server.KafkaBroker; +import kafka.server.KafkaConfig; +import kafka.server.QuorumTestHarness; +import kafka.server.ZkAlterPartitionManager; +import kafka.utils.TestUtils; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.AlterConfigOp; +import org.apache.kafka.clients.admin.Config; +import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.clients.admin.DescribeLogDirsResult; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.TopicPartitionReplica; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import scala.None$; +import scala.Option; +import scala.Some$; +import scala.Tuple2; +import scala.collection.JavaConverters; +import scala.collection.Seq; + +import java.io.Closeable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.kafka.server.common.MetadataVersion.IBP_2_7_IV1; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@SuppressWarnings("ClassFanOutComplexity") +@Timeout(300) +public class ReassignPartitionsIntegrationTest extends QuorumTestHarness { + ReassignPartitionsTestCluster cluster; + + @AfterEach + @Override + public void tearDown() { + Utils.closeQuietly(cluster, "ReassignPartitionsTestCluster"); + super.tearDown(); + } + + private final Map<Integer, Map<String, Long>> unthrottledBrokerConfigs = new HashMap<>(); { + IntStream.range(0, 4).forEach(brokerId -> { + Map<String, Long> brokerConfig = new HashMap<>(); + + ReassignPartitionsCommand.brokerLevelThrottles().foreach(throttle -> { + brokerConfig.put(throttle, -1L); + return null; + }); + + unthrottledBrokerConfigs.put(brokerId, brokerConfig); + }); + } + + @ParameterizedTest(name = "{displayName}.quorum={0}") + @ValueSource(strings = {"zk", "kraft"}) + public void testReassignment(String quorum) throws Exception { + cluster = new ReassignPartitionsTestCluster(Collections.emptyMap(), Collections.emptyMap()); + cluster.setup(); + executeAndVerifyReassignment(); + } + + @ParameterizedTest(name = "{displayName}.quorum={0}") + @ValueSource(strings = "zk") // Note: KRaft requires AlterPartition + public void testReassignmentWithAlterPartitionDisabled(String quorum) throws Exception { + // Test reassignment when the IBP is on an older version which does not use + // the `AlterPartition` API. In this case, the controller will register individual + // watches for each reassigning partition so that the reassignment can be + // completed as soon as the ISR is expanded. + Map<String, String> configOverrides = Collections.singletonMap(KafkaConfig.InterBrokerProtocolVersionProp(), IBP_2_7_IV1.version()); + cluster = new ReassignPartitionsTestCluster(configOverrides, Collections.emptyMap()); + cluster.setup(); + executeAndVerifyReassignment(); + } + + @ParameterizedTest(name = "{displayName}.quorum={0}") + @ValueSource(strings = "zk") // Note: KRaft requires AlterPartition + public void testReassignmentCompletionDuringPartialUpgrade(String quorum) throws Exception { + // Test reassignment during a partial upgrade when some brokers are relying on + // `AlterPartition` and some rely on the old notification logic through Zookeeper. + // In this test case, broker 0 starts up first on the latest IBP and is typically + // elected as controller. The three remaining brokers start up on the older IBP. + // We want to ensure that reassignment can still complete through the ISR change + // notification path even though the controller expects `AlterPartition`. + + // Override change notification settings so that test is not delayed by ISR + // change notification delay + ZkAlterPartitionManager.DefaultIsrPropagationConfig_$eq(new IsrChangePropagationConfig(500, 100, 500)); + + Map<String, String> oldIbpConfig = Collections.singletonMap(KafkaConfig.InterBrokerProtocolVersionProp(), IBP_2_7_IV1.version()); + Map<Integer, Map<String, String>> brokerConfigOverrides = new HashMap<>(); + brokerConfigOverrides.put(1, oldIbpConfig); + brokerConfigOverrides.put(2, oldIbpConfig); + brokerConfigOverrides.put(3, oldIbpConfig); + + cluster = new ReassignPartitionsTestCluster(Collections.emptyMap(), brokerConfigOverrides); + cluster.setup(); + + executeAndVerifyReassignment(); + } + + private void executeAndVerifyReassignment() throws ExecutionException, InterruptedException, JsonProcessingException { + String assignment = "{\"version\":1,\"partitions\":" + + "[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,1,3],\"log_dirs\":[\"any\",\"any\",\"any\"]}," + + "{\"topic\":\"bar\",\"partition\":0,\"replicas\":[3,2,0],\"log_dirs\":[\"any\",\"any\",\"any\"]}" + + "]}"; + + TopicPartition foo0 = new TopicPartition("foo", 0); + TopicPartition bar0 = new TopicPartition("bar", 0); + + // Check that the assignment has not yet been started yet. + Map<TopicPartition, PartitionReassignmentState> initialAssignment = new HashMap<>(); + + initialAssignment.put(foo0, new PartitionReassignmentState(Arrays.asList(0, 1, 2), Arrays.asList(0, 1, 3), true)); + initialAssignment.put(bar0, new PartitionReassignmentState(Arrays.asList(3, 2, 1), Arrays.asList(3, 2, 0), true)); + + waitForVerifyAssignment(cluster.adminClient, assignment, false, + new VerifyAssignmentResult(initialAssignment)); + + // Execute the assignment + runExecuteAssignment(cluster.adminClient, false, assignment, -1L, -1L); + assertEquals(unthrottledBrokerConfigs, describeBrokerLevelThrottles(unthrottledBrokerConfigs.keySet())); + Map<TopicPartition, PartitionReassignmentState> finalAssignment = new HashMap<>(); + finalAssignment.put(foo0, new PartitionReassignmentState(Arrays.asList(0, 1, 3), Arrays.asList(0, 1, 3), true)); + finalAssignment.put(bar0, new PartitionReassignmentState(Arrays.asList(3, 2, 0), Arrays.asList(3, 2, 0), true)); + + kafka.admin.ReassignPartitionsCommand.VerifyAssignmentResult verifyAssignmentResult = runVerifyAssignment(cluster.adminClient, assignment, false); + assertFalse(verifyAssignmentResult.movesOngoing()); + + // Wait for the assignment to complete + waitForVerifyAssignment(cluster.adminClient, assignment, false, + new VerifyAssignmentResult(finalAssignment)); + + assertEquals(unthrottledBrokerConfigs, + describeBrokerLevelThrottles(unthrottledBrokerConfigs.keySet())); + + // Verify that partitions are removed from brokers no longer assigned + verifyReplicaDeleted(foo0, 2); + verifyReplicaDeleted(bar0, 1); + } + + @ParameterizedTest(name = "{displayName}.quorum={0}") + @ValueSource(strings = {"zk", "kraft"}) + public void testHighWaterMarkAfterPartitionReassignment(String quorum) throws Exception { + cluster = new ReassignPartitionsTestCluster(Collections.emptyMap(), Collections.emptyMap()); + cluster.setup(); + String assignment = "{\"version\":1,\"partitions\":" + + "[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[3,1,2],\"log_dirs\":[\"any\",\"any\",\"any\"]}" + + "]}"; + + // Set the high water mark of foo-0 to 123 on its leader. + TopicPartition part = new TopicPartition("foo", 0); + cluster.servers.get(0).replicaManager().logManager().truncateFullyAndStartAt(part, 123L, false, None$.empty()); + + // Execute the assignment + runExecuteAssignment(cluster.adminClient, false, assignment, -1L, -1L); + Map<TopicPartition, PartitionReassignmentState> finalAssignment = Collections.singletonMap(part, + new PartitionReassignmentState(Arrays.asList(3, 1, 2), Arrays.asList(3, 1, 2), true)); + + // Wait for the assignment to complete + waitForVerifyAssignment(cluster.adminClient, assignment, false, + new VerifyAssignmentResult(finalAssignment)); + + TestUtils.waitUntilTrue(() -> + cluster.servers.get(3).replicaManager().onlinePartition(part). + map(Partition::leaderLogIfLocal).isDefined(), + () -> "broker 3 should be the new leader", org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS, 10L); + assertEquals(123L, cluster.servers.get(3).replicaManager().localLogOrException(part).highWatermark(), + "Expected broker 3 to have the correct high water mark for the partition."); + } + + @ParameterizedTest(name = "{displayName}.quorum={0}") + @ValueSource(strings = {"zk", "kraft"}) + public void testAlterReassignmentThrottle(String quorum) throws Exception { + cluster = new ReassignPartitionsTestCluster(Collections.emptyMap(), Collections.emptyMap()); + cluster.setup(); + cluster.produceMessages("foo", 0, 50); + cluster.produceMessages("baz", 2, 60); + String assignment = "{\"version\":1,\"partitions\":" + + "[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,3,2],\"log_dirs\":[\"any\",\"any\",\"any\"]}," + + "{\"topic\":\"baz\",\"partition\":2,\"replicas\":[3,2,1],\"log_dirs\":[\"any\",\"any\",\"any\"]}" + + "]}"; + + // Execute the assignment with a low throttle + long initialThrottle = 1L; + runExecuteAssignment(cluster.adminClient, false, assignment, initialThrottle, -1L); + waitForInterBrokerThrottle(Arrays.asList(0, 1, 2, 3), initialThrottle); + + // Now update the throttle and verify the reassignment completes + long updatedThrottle = 300000L; + runExecuteAssignment(cluster.adminClient, true, assignment, updatedThrottle, -1L); + waitForInterBrokerThrottle(Arrays.asList(0, 1, 2, 3), updatedThrottle); + + Map<TopicPartition, PartitionReassignmentState> finalAssignment = new HashMap<>(); + finalAssignment.put(new TopicPartition("foo", 0), + new PartitionReassignmentState(Arrays.asList(0, 3, 2), Arrays.asList(0, 3, 2), true)); + finalAssignment.put(new TopicPartition("baz", 2), + new PartitionReassignmentState(Arrays.asList(3, 2, 1), Arrays.asList(3, 2, 1), true)); + + // Now remove the throttles. + waitForVerifyAssignment(cluster.adminClient, assignment, false, + new VerifyAssignmentResult(finalAssignment)); + waitForBrokerLevelThrottles(unthrottledBrokerConfigs); + } + + /** + * Test running a reassignment with the interBrokerThrottle set. + */ + @ParameterizedTest(name = "{displayName}.quorum={0}") + @ValueSource(strings = {"zk", "kraft"}) + public void testThrottledReassignment(String quorum) throws Exception { + cluster = new ReassignPartitionsTestCluster(Collections.emptyMap(), Collections.emptyMap()); + cluster.setup(); + cluster.produceMessages("foo", 0, 50); + cluster.produceMessages("baz", 2, 60); + String assignment = "{\"version\":1,\"partitions\":" + + "[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,3,2],\"log_dirs\":[\"any\",\"any\",\"any\"]}," + + "{\"topic\":\"baz\",\"partition\":2,\"replicas\":[3,2,1],\"log_dirs\":[\"any\",\"any\",\"any\"]}" + + "]}"; + + // Check that the assignment has not yet been started yet. + Map<TopicPartition, PartitionReassignmentState> initialAssignment = new HashMap<>(); + initialAssignment.put(new TopicPartition("foo", 0), + new PartitionReassignmentState(Arrays.asList(0, 1, 2), Arrays.asList(0, 3, 2), true)); + initialAssignment.put(new TopicPartition("baz", 2), + new PartitionReassignmentState(Arrays.asList(0, 2, 1), Arrays.asList(3, 2, 1), true)); + assertEquals(asScala(new VerifyAssignmentResult(initialAssignment)), runVerifyAssignment(cluster.adminClient, assignment, false)); + assertEquals(unthrottledBrokerConfigs, describeBrokerLevelThrottles(unthrottledBrokerConfigs.keySet())); + + // Execute the assignment + long interBrokerThrottle = 300000L; + runExecuteAssignment(cluster.adminClient, false, assignment, interBrokerThrottle, -1L); + waitForInterBrokerThrottle(Arrays.asList(0, 1, 2, 3), interBrokerThrottle); + + Map<TopicPartition, PartitionReassignmentState> finalAssignment = new HashMap<>(); + finalAssignment.put(new TopicPartition("foo", 0), + new PartitionReassignmentState(Arrays.asList(0, 3, 2), Arrays.asList(0, 3, 2), true)); + finalAssignment.put(new TopicPartition("baz", 2), + new PartitionReassignmentState(Arrays.asList(3, 2, 1), Arrays.asList(3, 2, 1), true)); + + // Wait for the assignment to complete + TestUtils.waitUntilTrue( + () -> { + try { + // Check the reassignment status. + kafka.admin.ReassignPartitionsCommand.VerifyAssignmentResult result = runVerifyAssignment(cluster.adminClient, assignment, true); + + if (!result.partsOngoing()) { + return true; + } else { + assertFalse( + result.partStates().values().forall(ReassignPartitionsCommand.PartitionReassignmentState::done), + "Expected at least one partition reassignment to be ongoing when result = " + result + ); + assertEquals(seq(0, 3, 2), result.partStates().get(new TopicPartition("foo", 0)).get().targetReplicas()); + assertEquals(seq(3, 2, 1), result.partStates().get(new TopicPartition("baz", 2)).get().targetReplicas()); + System.out.println("Current result: " + result); + waitForInterBrokerThrottle(Arrays.asList(0, 1, 2, 3), interBrokerThrottle); + return false; + } + } catch (ExecutionException | InterruptedException | JsonProcessingException e) { Review Comment: I see it has been removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org