chia7712 commented on code in PR #15675:
URL: https://github.com/apache/kafka/pull/15675#discussion_r1615769194


##########
tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java:
##########
@@ -0,0 +1,820 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.reassign;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterConfigProperty;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.ClusterTests;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AlterConfigOp;
+import org.apache.kafka.clients.admin.Config;
+import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.DescribeLogDirsResult;
+import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionReplica;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.config.QuotaConfigs;
+import org.apache.kafka.test.TestUtils;
+import org.apache.kafka.tools.TerseException;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.time.Duration;
+import java.util.AbstractMap;
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.singleton;
+import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
+import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_2_7_IV1;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV0;
+import static 
org.apache.kafka.server.config.QuotaConfigs.FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG;
+import static 
org.apache.kafka.server.config.QuotaConfigs.LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG;
+import static 
org.apache.kafka.server.config.ReplicationConfigs.AUTO_LEADER_REBALANCE_ENABLE_CONFIG;
+import static 
org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG;
+import static 
org.apache.kafka.server.config.ReplicationConfigs.REPLICA_FETCH_BACKOFF_MS_CONFIG;
+import static 
org.apache.kafka.server.config.ReplicationConfigs.REPLICA_LAG_TIME_MAX_MS_CONFIG;
+import static 
org.apache.kafka.tools.ToolsTestUtils.assignThrottledPartitionReplicas;
+import static 
org.apache.kafka.tools.ToolsTestUtils.throttleAllBrokersReplication;
+import static 
org.apache.kafka.tools.reassign.ReassignPartitionsCommand.BROKER_LEVEL_THROTTLES;
+import static 
org.apache.kafka.tools.reassign.ReassignPartitionsCommand.cancelAssignment;
+import static 
org.apache.kafka.tools.reassign.ReassignPartitionsCommand.executeAssignment;
+import static 
org.apache.kafka.tools.reassign.ReassignPartitionsCommand.verifyAssignment;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@ClusterTestDefaults(brokers = 5, disksPerBroker = 3, serverProperties = {
+        @ClusterConfigProperty(key = REPLICA_FETCH_BACKOFF_MS_CONFIG, value = 
"100"),
+        @ClusterConfigProperty(key = AUTO_LEADER_REBALANCE_ENABLE_CONFIG, 
value = "false"),
+        @ClusterConfigProperty(key = REPLICA_LAG_TIME_MAX_MS_CONFIG, value = 
"1000"),
+        @ClusterConfigProperty(id = 0, key = "broker.rack", value = "rack0"),
+        @ClusterConfigProperty(id = 1, key = "broker.rack", value = "rack0"),
+        @ClusterConfigProperty(id = 2, key = "broker.rack", value = "rack1"),
+        @ClusterConfigProperty(id = 3, key = "broker.rack", value = "rack1"),
+        @ClusterConfigProperty(id = 4, key = "broker.rack", value = "rack1"),
+})
+@ExtendWith(ClusterTestExtensions.class)
+public class ReassignPartitionsCommandTest {
+    private final ClusterInstance clusterInstance;
+
+    ReassignPartitionsCommandTest(ClusterInstance clusterInstance) {
+        this.clusterInstance = clusterInstance;
+    }
+
+    @ClusterTest
+    public void testReassignment() throws Exception {
+        createTopics();
+        executeAndVerifyReassignment();
+    }
+
+    @ClusterTests({
+            @ClusterTest(types = {Type.ZK}, metadataVersion = IBP_2_7_IV1),
+            @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, metadataVersion 
= IBP_3_3_IV0)
+    })
+    public void testReassignmentWithAlterPartitionDisabled() throws Exception {
+        // Test reassignment when the IBP is on an older version which does 
not use
+        // the `AlterPartition` API. In this case, the controller will 
register individual
+        // watches for each reassigning partition so that the reassignment can 
be
+        // completed as soon as the ISR is expanded.
+        createTopics();
+        executeAndVerifyReassignment();
+    }
+
+    @ClusterTests({
+            @ClusterTest(types = {Type.ZK}, serverProperties = {
+                    @ClusterConfigProperty(id = 1, key = 
INTER_BROKER_PROTOCOL_VERSION_CONFIG, value = "2.7-IV1"),
+                    @ClusterConfigProperty(id = 2, key = 
INTER_BROKER_PROTOCOL_VERSION_CONFIG, value = "2.7-IV1"),
+                    @ClusterConfigProperty(id = 3, key = 
INTER_BROKER_PROTOCOL_VERSION_CONFIG, value = "2.7-IV1"),
+            }),
+            @ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, serverProperties 
= {
+                    @ClusterConfigProperty(id = 1, key = 
INTER_BROKER_PROTOCOL_VERSION_CONFIG, value = "3.3-IV0"),
+                    @ClusterConfigProperty(id = 2, key = 
INTER_BROKER_PROTOCOL_VERSION_CONFIG, value = "3.3-IV0"),
+                    @ClusterConfigProperty(id = 3, key = 
INTER_BROKER_PROTOCOL_VERSION_CONFIG, value = "3.3-IV0"),
+            })
+    })
+    public void testReassignmentCompletionDuringPartialUpgrade() throws 
Exception {
+        // Test reassignment during a partial upgrade when some brokers are 
relying on
+        // `AlterPartition` and some rely on the old notification logic 
through Zookeeper.
+        // In this test case, broker 0 starts up first on the latest IBP and 
is typically
+        // elected as controller. The three remaining brokers start up on the 
older IBP.
+        // We want to ensure that reassignment can still complete through the 
ISR change
+        // notification path even though the controller expects 
`AlterPartition`.
+
+        // Override change notification settings so that test is not delayed 
by ISR
+        // change notification delay
+        // ZkAlterPartitionManager.DefaultIsrPropagationConfig_$eq(new 
IsrChangePropagationConfig(500, 100, 500));
+
+        createTopics();
+        executeAndVerifyReassignment();
+    }
+
+    @ClusterTest
+    public void testHighWaterMarkAfterPartitionReassignment() throws Exception 
{
+        createTopics();
+        TopicPartition foo0 = new TopicPartition("foo", 0);
+        produceMessages(foo0.topic(), foo0.partition(), 100);
+
+        // Execute the assignment
+        String assignment = "{\"version\":1,\"partitions\":" +
+                
"[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[3,1,2],\"log_dirs\":[\"any\",\"any\",\"any\"]}"
 +
+                "]}";
+        runExecuteAssignment(false, assignment, -1L, -1L);
+
+        try (Admin admin = 
Admin.create(Collections.singletonMap(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
 clusterInstance.bootstrapServers()))) {
+            Map<TopicPartition, PartitionReassignmentState> finalAssignment = 
singletonMap(foo0,
+                    new PartitionReassignmentState(asList(3, 1, 2), asList(3, 
1, 2), true));
+            // Wait for the assignment to complete
+            waitForVerifyAssignment(admin, assignment, false,
+                    new VerifyAssignmentResult(finalAssignment));
+            TestUtils.waitForCondition(() -> {
+                ListOffsetsResultInfo result = 
admin.listOffsets(Collections.singletonMap(foo0, new 
OffsetSpec.LatestSpec())).partitionResult(foo0).get();
+                return result.offset() == 100;
+            }, "Timeout for waiting offset");
+        }
+    }
+
+    @ClusterTest
+    public void testAlterReassignmentThrottle() throws Exception {
+        createTopics();
+        produceMessages("foo", 0, 50);
+        produceMessages("baz", 2, 60);
+        String assignment = "{\"version\":1,\"partitions\":" +
+                
"[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,3,2],\"log_dirs\":[\"any\",\"any\",\"any\"]},"
 +
+                
"{\"topic\":\"baz\",\"partition\":2,\"replicas\":[3,2,1],\"log_dirs\":[\"any\",\"any\",\"any\"]}"
 +
+                "]}";
+
+        try (Admin admin = 
Admin.create(Collections.singletonMap(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
 clusterInstance.bootstrapServers()))) {
+            // Execute the assignment with a low throttle
+            long initialThrottle = 1L;
+            runExecuteAssignment(false, assignment, initialThrottle, -1L);
+            waitForInterBrokerThrottle(admin, asList(0, 1, 2, 3), 
initialThrottle);
+
+            // Now update the throttle and verify the reassignment completes
+            long updatedThrottle = 300000L;
+            runExecuteAssignment(true, assignment, updatedThrottle, -1L);
+            waitForInterBrokerThrottle(admin, asList(0, 1, 2, 3), 
updatedThrottle);
+
+            Map<TopicPartition, PartitionReassignmentState> finalAssignment = 
new HashMap<>();
+            finalAssignment.put(new TopicPartition("foo", 0),
+                    new PartitionReassignmentState(asList(0, 3, 2), asList(0, 
3, 2), true));
+            finalAssignment.put(new TopicPartition("baz", 2),
+                    new PartitionReassignmentState(asList(3, 2, 1), asList(3, 
2, 1), true));
+
+            // Now remove the throttles.
+            waitForVerifyAssignment(admin, assignment, false,
+                    new VerifyAssignmentResult(finalAssignment));
+            waitForBrokerLevelThrottles(admin, unthrottledBrokerConfigs);
+        }
+    }
+
+    /**
+     * Test running a reassignment with the interBrokerThrottle set.
+     */
+    @ClusterTest
+    public void testThrottledReassignment() throws Exception {
+        createTopics();
+        produceMessages("foo", 0, 50);
+        produceMessages("baz", 2, 60);
+        String assignment = "{\"version\":1,\"partitions\":" +
+                
"[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,3,2],\"log_dirs\":[\"any\",\"any\",\"any\"]},"
 +
+                
"{\"topic\":\"baz\",\"partition\":2,\"replicas\":[3,2,1],\"log_dirs\":[\"any\",\"any\",\"any\"]}"
 +
+                "]}";
+
+        // Check that the assignment has not yet been started yet.
+        Map<TopicPartition, PartitionReassignmentState> initialAssignment = 
new HashMap<>();
+        initialAssignment.put(new TopicPartition("foo", 0),
+                new PartitionReassignmentState(asList(0, 1, 2), asList(0, 3, 
2), true));
+        initialAssignment.put(new TopicPartition("baz", 2),
+                new PartitionReassignmentState(asList(0, 2, 1), asList(3, 2, 
1), true));
+        try (Admin admin = 
Admin.create(Collections.singletonMap(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
 clusterInstance.bootstrapServers()))) {
+            assertEquals(new VerifyAssignmentResult(initialAssignment), 
runVerifyAssignment(admin, assignment, false));
+            assertEquals(unthrottledBrokerConfigs, 
describeBrokerLevelThrottles(admin, unthrottledBrokerConfigs.keySet()));
+
+            // Execute the assignment
+            long interBrokerThrottle = 300000L;
+            runExecuteAssignment(false, assignment, interBrokerThrottle, -1L);
+            waitForInterBrokerThrottle(admin, asList(0, 1, 2, 3), 
interBrokerThrottle);
+
+            Map<TopicPartition, PartitionReassignmentState> finalAssignment = 
new HashMap<>();
+            finalAssignment.put(new TopicPartition("foo", 0),
+                    new PartitionReassignmentState(asList(0, 3, 2), asList(0, 
3, 2), true));
+            finalAssignment.put(new TopicPartition("baz", 2),
+                    new PartitionReassignmentState(asList(3, 2, 1), asList(3, 
2, 1), true));
+
+            // Wait for the assignment to complete
+            TestUtils.waitForCondition(() -> {
+                // Check the reassignment status.
+                VerifyAssignmentResult result = runVerifyAssignment(admin, 
assignment, true);
+
+                if (!result.partsOngoing) {
+                    return true;
+                } else {
+                    assertFalse(
+                            result.partStates.values().stream().allMatch(state 
-> state.done),
+                            "Expected at least one partition reassignment to 
be ongoing when result = " + result
+                    );
+                    assertEquals(asList(0, 3, 2), result.partStates.get(new 
TopicPartition("foo", 0)).targetReplicas);
+                    assertEquals(asList(3, 2, 1), result.partStates.get(new 
TopicPartition("baz", 2)).targetReplicas);
+                    System.out.println("Current result: " + result);
+                    waitForInterBrokerThrottle(admin, asList(0, 1, 2, 3), 
interBrokerThrottle);
+                    return false;
+                }
+            }, "Expected reassignment to complete.");
+
+            waitForVerifyAssignment(admin, assignment, true,
+                    new VerifyAssignmentResult(finalAssignment));
+            // The throttles should still have been preserved, since we ran 
with --preserve-throttles
+            waitForInterBrokerThrottle(admin, asList(0, 1, 2, 3), 
interBrokerThrottle);
+            // Now remove the throttles.
+            waitForVerifyAssignment(admin, assignment, false,
+                    new VerifyAssignmentResult(finalAssignment));
+            waitForBrokerLevelThrottles(admin, unthrottledBrokerConfigs);
+        }
+    }
+
+    @ClusterTest
+    public void testProduceAndConsumeWithReassignmentInProgress() throws 
Exception {
+        createTopics();
+        produceMessages("baz", 2, 60);
+        String assignment = "{\"version\":1,\"partitions\":" +
+                
"[{\"topic\":\"baz\",\"partition\":2,\"replicas\":[3,2,1],\"log_dirs\":[\"any\",\"any\",\"any\"]}"
 +
+                "]}";
+        runExecuteAssignment(false, assignment, 300L, -1L);
+        produceMessages("baz", 2, 100);
+
+        Properties consumerProps = new Properties();
+        consumerProps.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
clusterInstance.bootstrapServers());
+        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "group");
+        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
+        TopicPartition part = new TopicPartition("baz", 2);
+        try (Consumer<byte[], byte[]> consumer = new 
KafkaConsumer<>(consumerProps, new ByteArrayDeserializer(), new 
ByteArrayDeserializer())) {
+            consumer.assign(singleton(part));
+            List<ConsumerRecord<byte[], byte[]>> allRecords = new 
ArrayList<>();
+            TestUtils.waitForCondition(() -> {
+                ConsumerRecords<byte[], byte[]> records = 
consumer.poll(Duration.ofMillis(100L));
+                System.out.println("records.count " + records.count());

Review Comment:
   Could we remove those debug messages?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to