chia7712 commented on code in PR #15675: URL: https://github.com/apache/kafka/pull/15675#discussion_r1615769194
########## tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java: ########## @@ -0,0 +1,820 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.reassign; + +import com.fasterxml.jackson.core.JsonProcessingException; +import kafka.test.ClusterInstance; +import kafka.test.annotation.ClusterTest; +import kafka.test.annotation.ClusterConfigProperty; +import kafka.test.annotation.ClusterTestDefaults; +import kafka.test.annotation.ClusterTests; +import kafka.test.annotation.Type; +import kafka.test.junit.ClusterTestExtensions; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AlterConfigOp; +import org.apache.kafka.clients.admin.Config; +import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.clients.admin.DescribeLogDirsResult; +import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.OffsetSpec; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.TopicPartitionReplica; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.server.config.QuotaConfigs; +import org.apache.kafka.test.TestUtils; +import org.apache.kafka.tools.TerseException; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.extension.ExtendWith; + +import java.time.Duration; +import java.util.AbstractMap; +import java.util.Arrays; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static java.util.Arrays.asList; +import static java.util.Collections.singleton; +import static java.util.Collections.singletonList; +import static java.util.Collections.singletonMap; +import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC; +import static org.apache.kafka.server.common.MetadataVersion.IBP_2_7_IV1; +import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV0; +import static org.apache.kafka.server.config.QuotaConfigs.FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG; +import static org.apache.kafka.server.config.QuotaConfigs.LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG; +import static org.apache.kafka.server.config.ReplicationConfigs.AUTO_LEADER_REBALANCE_ENABLE_CONFIG; +import static org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG; +import static org.apache.kafka.server.config.ReplicationConfigs.REPLICA_FETCH_BACKOFF_MS_CONFIG; +import static org.apache.kafka.server.config.ReplicationConfigs.REPLICA_LAG_TIME_MAX_MS_CONFIG; +import static org.apache.kafka.tools.ToolsTestUtils.assignThrottledPartitionReplicas; +import static org.apache.kafka.tools.ToolsTestUtils.throttleAllBrokersReplication; +import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.BROKER_LEVEL_THROTTLES; +import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.cancelAssignment; +import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.executeAssignment; +import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.verifyAssignment; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@ClusterTestDefaults(brokers = 5, disksPerBroker = 3, serverProperties = { + @ClusterConfigProperty(key = REPLICA_FETCH_BACKOFF_MS_CONFIG, value = "100"), + @ClusterConfigProperty(key = AUTO_LEADER_REBALANCE_ENABLE_CONFIG, value = "false"), + @ClusterConfigProperty(key = REPLICA_LAG_TIME_MAX_MS_CONFIG, value = "1000"), + @ClusterConfigProperty(id = 0, key = "broker.rack", value = "rack0"), + @ClusterConfigProperty(id = 1, key = "broker.rack", value = "rack0"), + @ClusterConfigProperty(id = 2, key = "broker.rack", value = "rack1"), + @ClusterConfigProperty(id = 3, key = "broker.rack", value = "rack1"), + @ClusterConfigProperty(id = 4, key = "broker.rack", value = "rack1"), +}) +@ExtendWith(ClusterTestExtensions.class) +public class ReassignPartitionsCommandTest { + private final ClusterInstance clusterInstance; + + ReassignPartitionsCommandTest(ClusterInstance clusterInstance) { + this.clusterInstance = clusterInstance; + } + + @ClusterTest + public void testReassignment() throws Exception { + createTopics(); + executeAndVerifyReassignment(); + } + + @ClusterTests({ + @ClusterTest(types = {Type.ZK}, metadataVersion = IBP_2_7_IV1), + @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, metadataVersion = IBP_3_3_IV0) + }) + public void testReassignmentWithAlterPartitionDisabled() throws Exception { + // Test reassignment when the IBP is on an older version which does not use + // the `AlterPartition` API. In this case, the controller will register individual + // watches for each reassigning partition so that the reassignment can be + // completed as soon as the ISR is expanded. + createTopics(); + executeAndVerifyReassignment(); + } + + @ClusterTests({ + @ClusterTest(types = {Type.ZK}, serverProperties = { + @ClusterConfigProperty(id = 1, key = INTER_BROKER_PROTOCOL_VERSION_CONFIG, value = "2.7-IV1"), + @ClusterConfigProperty(id = 2, key = INTER_BROKER_PROTOCOL_VERSION_CONFIG, value = "2.7-IV1"), + @ClusterConfigProperty(id = 3, key = INTER_BROKER_PROTOCOL_VERSION_CONFIG, value = "2.7-IV1"), + }), + @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, serverProperties = { + @ClusterConfigProperty(id = 1, key = INTER_BROKER_PROTOCOL_VERSION_CONFIG, value = "3.3-IV0"), + @ClusterConfigProperty(id = 2, key = INTER_BROKER_PROTOCOL_VERSION_CONFIG, value = "3.3-IV0"), + @ClusterConfigProperty(id = 3, key = INTER_BROKER_PROTOCOL_VERSION_CONFIG, value = "3.3-IV0"), + }) + }) + public void testReassignmentCompletionDuringPartialUpgrade() throws Exception { + // Test reassignment during a partial upgrade when some brokers are relying on + // `AlterPartition` and some rely on the old notification logic through Zookeeper. + // In this test case, broker 0 starts up first on the latest IBP and is typically + // elected as controller. The three remaining brokers start up on the older IBP. + // We want to ensure that reassignment can still complete through the ISR change + // notification path even though the controller expects `AlterPartition`. + + // Override change notification settings so that test is not delayed by ISR + // change notification delay + // ZkAlterPartitionManager.DefaultIsrPropagationConfig_$eq(new IsrChangePropagationConfig(500, 100, 500)); + + createTopics(); + executeAndVerifyReassignment(); + } + + @ClusterTest + public void testHighWaterMarkAfterPartitionReassignment() throws Exception { + createTopics(); + TopicPartition foo0 = new TopicPartition("foo", 0); + produceMessages(foo0.topic(), foo0.partition(), 100); + + // Execute the assignment + String assignment = "{\"version\":1,\"partitions\":" + + "[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[3,1,2],\"log_dirs\":[\"any\",\"any\",\"any\"]}" + + "]}"; + runExecuteAssignment(false, assignment, -1L, -1L); + + try (Admin admin = Admin.create(Collections.singletonMap(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()))) { + Map<TopicPartition, PartitionReassignmentState> finalAssignment = singletonMap(foo0, + new PartitionReassignmentState(asList(3, 1, 2), asList(3, 1, 2), true)); + // Wait for the assignment to complete + waitForVerifyAssignment(admin, assignment, false, + new VerifyAssignmentResult(finalAssignment)); + TestUtils.waitForCondition(() -> { + ListOffsetsResultInfo result = admin.listOffsets(Collections.singletonMap(foo0, new OffsetSpec.LatestSpec())).partitionResult(foo0).get(); + return result.offset() == 100; + }, "Timeout for waiting offset"); + } + } + + @ClusterTest + public void testAlterReassignmentThrottle() throws Exception { + createTopics(); + produceMessages("foo", 0, 50); + produceMessages("baz", 2, 60); + String assignment = "{\"version\":1,\"partitions\":" + + "[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,3,2],\"log_dirs\":[\"any\",\"any\",\"any\"]}," + + "{\"topic\":\"baz\",\"partition\":2,\"replicas\":[3,2,1],\"log_dirs\":[\"any\",\"any\",\"any\"]}" + + "]}"; + + try (Admin admin = Admin.create(Collections.singletonMap(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()))) { + // Execute the assignment with a low throttle + long initialThrottle = 1L; + runExecuteAssignment(false, assignment, initialThrottle, -1L); + waitForInterBrokerThrottle(admin, asList(0, 1, 2, 3), initialThrottle); + + // Now update the throttle and verify the reassignment completes + long updatedThrottle = 300000L; + runExecuteAssignment(true, assignment, updatedThrottle, -1L); + waitForInterBrokerThrottle(admin, asList(0, 1, 2, 3), updatedThrottle); + + Map<TopicPartition, PartitionReassignmentState> finalAssignment = new HashMap<>(); + finalAssignment.put(new TopicPartition("foo", 0), + new PartitionReassignmentState(asList(0, 3, 2), asList(0, 3, 2), true)); + finalAssignment.put(new TopicPartition("baz", 2), + new PartitionReassignmentState(asList(3, 2, 1), asList(3, 2, 1), true)); + + // Now remove the throttles. + waitForVerifyAssignment(admin, assignment, false, + new VerifyAssignmentResult(finalAssignment)); + waitForBrokerLevelThrottles(admin, unthrottledBrokerConfigs); + } + } + + /** + * Test running a reassignment with the interBrokerThrottle set. + */ + @ClusterTest + public void testThrottledReassignment() throws Exception { + createTopics(); + produceMessages("foo", 0, 50); + produceMessages("baz", 2, 60); + String assignment = "{\"version\":1,\"partitions\":" + + "[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,3,2],\"log_dirs\":[\"any\",\"any\",\"any\"]}," + + "{\"topic\":\"baz\",\"partition\":2,\"replicas\":[3,2,1],\"log_dirs\":[\"any\",\"any\",\"any\"]}" + + "]}"; + + // Check that the assignment has not yet been started yet. + Map<TopicPartition, PartitionReassignmentState> initialAssignment = new HashMap<>(); + initialAssignment.put(new TopicPartition("foo", 0), + new PartitionReassignmentState(asList(0, 1, 2), asList(0, 3, 2), true)); + initialAssignment.put(new TopicPartition("baz", 2), + new PartitionReassignmentState(asList(0, 2, 1), asList(3, 2, 1), true)); + try (Admin admin = Admin.create(Collections.singletonMap(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()))) { + assertEquals(new VerifyAssignmentResult(initialAssignment), runVerifyAssignment(admin, assignment, false)); + assertEquals(unthrottledBrokerConfigs, describeBrokerLevelThrottles(admin, unthrottledBrokerConfigs.keySet())); + + // Execute the assignment + long interBrokerThrottle = 300000L; + runExecuteAssignment(false, assignment, interBrokerThrottle, -1L); + waitForInterBrokerThrottle(admin, asList(0, 1, 2, 3), interBrokerThrottle); + + Map<TopicPartition, PartitionReassignmentState> finalAssignment = new HashMap<>(); + finalAssignment.put(new TopicPartition("foo", 0), + new PartitionReassignmentState(asList(0, 3, 2), asList(0, 3, 2), true)); + finalAssignment.put(new TopicPartition("baz", 2), + new PartitionReassignmentState(asList(3, 2, 1), asList(3, 2, 1), true)); + + // Wait for the assignment to complete + TestUtils.waitForCondition(() -> { + // Check the reassignment status. + VerifyAssignmentResult result = runVerifyAssignment(admin, assignment, true); + + if (!result.partsOngoing) { + return true; + } else { + assertFalse( + result.partStates.values().stream().allMatch(state -> state.done), + "Expected at least one partition reassignment to be ongoing when result = " + result + ); + assertEquals(asList(0, 3, 2), result.partStates.get(new TopicPartition("foo", 0)).targetReplicas); + assertEquals(asList(3, 2, 1), result.partStates.get(new TopicPartition("baz", 2)).targetReplicas); + System.out.println("Current result: " + result); + waitForInterBrokerThrottle(admin, asList(0, 1, 2, 3), interBrokerThrottle); + return false; + } + }, "Expected reassignment to complete."); + + waitForVerifyAssignment(admin, assignment, true, + new VerifyAssignmentResult(finalAssignment)); + // The throttles should still have been preserved, since we ran with --preserve-throttles + waitForInterBrokerThrottle(admin, asList(0, 1, 2, 3), interBrokerThrottle); + // Now remove the throttles. + waitForVerifyAssignment(admin, assignment, false, + new VerifyAssignmentResult(finalAssignment)); + waitForBrokerLevelThrottles(admin, unthrottledBrokerConfigs); + } + } + + @ClusterTest + public void testProduceAndConsumeWithReassignmentInProgress() throws Exception { + createTopics(); + produceMessages("baz", 2, 60); + String assignment = "{\"version\":1,\"partitions\":" + + "[{\"topic\":\"baz\",\"partition\":2,\"replicas\":[3,2,1],\"log_dirs\":[\"any\",\"any\",\"any\"]}" + + "]}"; + runExecuteAssignment(false, assignment, 300L, -1L); + produceMessages("baz", 2, 100); + + Properties consumerProps = new Properties(); + consumerProps.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()); + consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "group"); + consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); + TopicPartition part = new TopicPartition("baz", 2); + try (Consumer<byte[], byte[]> consumer = new KafkaConsumer<>(consumerProps, new ByteArrayDeserializer(), new ByteArrayDeserializer())) { + consumer.assign(singleton(part)); + List<ConsumerRecord<byte[], byte[]>> allRecords = new ArrayList<>(); + TestUtils.waitForCondition(() -> { + ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(100L)); + System.out.println("records.count " + records.count()); Review Comment: Could we remove those debug messages? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org