chia7712 commented on code in PR #21175:
URL: https://github.com/apache/kafka/pull/21175#discussion_r2637082859


##########
server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java:
##########
@@ -644,6 +674,544 @@ private Admin createAdminClient(KafkaClusterTestKit 
cluster, boolean usingBootst
         return cluster.admin(Map.of(AdminClientConfig.CLIENT_ID_CONFIG, 
this.getClass().getName()), usingBootstrapControllers);
     }
 
+    @Test
+    public void testCreateClusterAndPerformReassignment() throws Exception {
+        try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+            new TestKitNodes.Builder()
+                .setNumBrokerNodes(4)
+                .setNumControllerNodes(3)
+                .build()).build()) {
+            cluster.format();
+            cluster.startup();
+            cluster.waitForReadyBrokers();
+
+            try (Admin admin = cluster.admin()) {
+                // Create the topic.
+                Map<Integer, List<Integer>> assignments = Map.of(
+                    0, List.of(0, 1, 2),
+                    1, List.of(1, 2, 3),
+                    2, List.of(2, 3, 0),
+                    3, List.of(3, 2, 1)
+                );
+
+                CreateTopicsResult createTopicResult = 
admin.createTopics(List.of(
+                    new NewTopic("foo", assignments)));
+                createTopicResult.all().get();
+                waitForTopicListing(admin, List.of("foo"), List.of());
+
+                // Start some reassignments.
+                assertEquals(Map.of(), 
admin.listPartitionReassignments().reassignments().get());
+                Map<TopicPartition, Optional<NewPartitionReassignment>> 
reassignments = Map.of(
+                    new TopicPartition("foo", 0), Optional.of(new 
NewPartitionReassignment(List.of(2, 1, 0))),
+                    new TopicPartition("foo", 1), Optional.of(new 
NewPartitionReassignment(List.of(0, 1, 2))),
+                    new TopicPartition("foo", 2), Optional.of(new 
NewPartitionReassignment(List.of(2, 3))),
+                    new TopicPartition("foo", 3), Optional.of(new 
NewPartitionReassignment(List.of(3, 2, 0, 1)))
+                );
+                admin.alterPartitionReassignments(reassignments).all().get();
+                TestUtils.waitForCondition(
+                    () -> 
admin.listPartitionReassignments().reassignments().get().isEmpty(),
+                    "The reassignment never completed."
+                );
+                AtomicReference<List<List<Integer>>> currentMapping = new 
AtomicReference<>(List.of());
+                List<List<Integer>> expectedMapping = List.of(
+                    List.of(2, 1, 0),
+                    List.of(0, 1, 2),
+                    List.of(2, 3),
+                    List.of(3, 2, 0, 1)
+                );
+                TestUtils.waitForCondition(() -> {
+                    Map<String, TopicDescription> topicInfoMap = 
admin.describeTopics(Set.of("foo")).allTopicNames().get();
+                    if (topicInfoMap.containsKey("foo")) {
+                        
currentMapping.set(translatePartitionInfoToSeq(topicInfoMap.get("foo").partitions()));
+                        return expectedMapping.equals(currentMapping.get());
+                    } else {
+                        return false;
+                    }
+                }, () -> "Timed out waiting for replica assignments for topic 
foo. " +
+                    "Wanted: " + expectedMapping + ". Got: " + 
currentMapping.get());
+
+                TestUtils.retryOnExceptionWithTimeout(60000, () -> 
checkReplicaManager(
+                    cluster,
+                    Map.of(
+                        0, List.of(true, true, false, true),
+                        1, List.of(true, true, false, true),
+                        2, List.of(true, true, true, true),
+                        3, List.of(false, false, true, true)
+                    )
+                ));
+            }
+        }
+    }
+
+    private void checkReplicaManager(KafkaClusterTestKit cluster, Map<Integer, 
List<Boolean>> expectedHosting) {
+        for (Map.Entry<Integer, List<Boolean>> entry : 
expectedHosting.entrySet()) {
+            int brokerId = entry.getKey();
+            List<Boolean> partitionsIsHosted = entry.getValue();
+            var broker = cluster.brokers().get(brokerId);
+
+            for (int partitionId = 0; partitionId < partitionsIsHosted.size(); 
partitionId++) {
+                boolean isHosted = partitionsIsHosted.get(partitionId);
+                TopicPartition topicPartition = new TopicPartition("foo", 
partitionId);
+                var partition = 
broker.replicaManager().getPartition(topicPartition);
+                if (isHosted) {
+                    
assertNotEquals(kafka.server.HostedPartition.None$.MODULE$, partition, 
"topicPartition = " + topicPartition);
+                } else {
+                    assertEquals(kafka.server.HostedPartition.None$.MODULE$, 
partition, "topicPartition = " + topicPartition);
+                }
+            }
+        }
+    }
+
+    private List<List<Integer>> 
translatePartitionInfoToSeq(List<TopicPartitionInfo> partitions) {
+        return partitions.stream()
+            .map(partition -> partition.replicas().stream()
+                .map(Node::id)
+                .collect(Collectors.toList()))
+            .collect(Collectors.toList());
+    }
+
+    @Test
+    public void testIncrementalAlterConfigs() throws Exception {
+        try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+            new TestKitNodes.Builder()
+                .setNumBrokerNodes(4)

Review Comment:
   It appears that 3 brokers are sufficient for this test



##########
server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java:
##########
@@ -65,31 +84,42 @@
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.time.Duration;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
+import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static org.apache.kafka.server.IntegrationTestUtils.connectAndReceive;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
 
 @Timeout(120)
 @Tag("integration")
 public class KRaftClusterTest {
+    private static final Logger log = 
LoggerFactory.getLogger(KRaftClusterTest.class);

Review Comment:
   `log` -> `LOG`



##########
server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java:
##########
@@ -644,6 +674,544 @@ private Admin createAdminClient(KafkaClusterTestKit 
cluster, boolean usingBootst
         return cluster.admin(Map.of(AdminClientConfig.CLIENT_ID_CONFIG, 
this.getClass().getName()), usingBootstrapControllers);
     }
 
+    @Test
+    public void testCreateClusterAndPerformReassignment() throws Exception {
+        try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+            new TestKitNodes.Builder()
+                .setNumBrokerNodes(4)
+                .setNumControllerNodes(3)
+                .build()).build()) {
+            cluster.format();
+            cluster.startup();
+            cluster.waitForReadyBrokers();
+
+            try (Admin admin = cluster.admin()) {
+                // Create the topic.
+                Map<Integer, List<Integer>> assignments = Map.of(
+                    0, List.of(0, 1, 2),
+                    1, List.of(1, 2, 3),
+                    2, List.of(2, 3, 0),
+                    3, List.of(3, 2, 1)
+                );
+
+                CreateTopicsResult createTopicResult = 
admin.createTopics(List.of(
+                    new NewTopic("foo", assignments)));
+                createTopicResult.all().get();
+                waitForTopicListing(admin, List.of("foo"), List.of());
+
+                // Start some reassignments.
+                assertEquals(Map.of(), 
admin.listPartitionReassignments().reassignments().get());
+                Map<TopicPartition, Optional<NewPartitionReassignment>> 
reassignments = Map.of(
+                    new TopicPartition("foo", 0), Optional.of(new 
NewPartitionReassignment(List.of(2, 1, 0))),
+                    new TopicPartition("foo", 1), Optional.of(new 
NewPartitionReassignment(List.of(0, 1, 2))),
+                    new TopicPartition("foo", 2), Optional.of(new 
NewPartitionReassignment(List.of(2, 3))),
+                    new TopicPartition("foo", 3), Optional.of(new 
NewPartitionReassignment(List.of(3, 2, 0, 1)))
+                );
+                admin.alterPartitionReassignments(reassignments).all().get();
+                TestUtils.waitForCondition(
+                    () -> 
admin.listPartitionReassignments().reassignments().get().isEmpty(),
+                    "The reassignment never completed."
+                );
+                AtomicReference<List<List<Integer>>> currentMapping = new 
AtomicReference<>(List.of());
+                List<List<Integer>> expectedMapping = List.of(
+                    List.of(2, 1, 0),
+                    List.of(0, 1, 2),
+                    List.of(2, 3),
+                    List.of(3, 2, 0, 1)
+                );
+                TestUtils.waitForCondition(() -> {
+                    Map<String, TopicDescription> topicInfoMap = 
admin.describeTopics(Set.of("foo")).allTopicNames().get();
+                    if (topicInfoMap.containsKey("foo")) {
+                        
currentMapping.set(translatePartitionInfoToSeq(topicInfoMap.get("foo").partitions()));
+                        return expectedMapping.equals(currentMapping.get());
+                    } else {
+                        return false;
+                    }
+                }, () -> "Timed out waiting for replica assignments for topic 
foo. " +
+                    "Wanted: " + expectedMapping + ". Got: " + 
currentMapping.get());
+
+                TestUtils.retryOnExceptionWithTimeout(60000, () -> 
checkReplicaManager(
+                    cluster,
+                    Map.of(
+                        0, List.of(true, true, false, true),
+                        1, List.of(true, true, false, true),
+                        2, List.of(true, true, true, true),
+                        3, List.of(false, false, true, true)
+                    )
+                ));
+            }
+        }
+    }
+
+    private void checkReplicaManager(KafkaClusterTestKit cluster, Map<Integer, 
List<Boolean>> expectedHosting) {
+        for (Map.Entry<Integer, List<Boolean>> entry : 
expectedHosting.entrySet()) {
+            int brokerId = entry.getKey();
+            List<Boolean> partitionsIsHosted = entry.getValue();
+            var broker = cluster.brokers().get(brokerId);
+
+            for (int partitionId = 0; partitionId < partitionsIsHosted.size(); 
partitionId++) {
+                boolean isHosted = partitionsIsHosted.get(partitionId);
+                TopicPartition topicPartition = new TopicPartition("foo", 
partitionId);
+                var partition = 
broker.replicaManager().getPartition(topicPartition);
+                if (isHosted) {
+                    
assertNotEquals(kafka.server.HostedPartition.None$.MODULE$, partition, 
"topicPartition = " + topicPartition);
+                } else {
+                    assertEquals(kafka.server.HostedPartition.None$.MODULE$, 
partition, "topicPartition = " + topicPartition);
+                }
+            }
+        }
+    }
+
+    private List<List<Integer>> 
translatePartitionInfoToSeq(List<TopicPartitionInfo> partitions) {
+        return partitions.stream()
+            .map(partition -> partition.replicas().stream()
+                .map(Node::id)
+                .collect(Collectors.toList()))
+            .collect(Collectors.toList());
+    }
+
+    @Test
+    public void testIncrementalAlterConfigs() throws Exception {
+        try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+            new TestKitNodes.Builder()
+                .setNumBrokerNodes(4)
+                .setNumControllerNodes(3)
+                .build()).build()) {
+            cluster.format();
+            cluster.startup();
+            cluster.waitForReadyBrokers();
+
+            try (Admin admin = cluster.admin()) {
+                Map<ConfigResource, Collection<AlterConfigOp>> brokerConfigs = 
Map.of(
+                    new ConfigResource(Type.BROKER, ""),
+                    List.of(
+                        new AlterConfigOp(new ConfigEntry("log.roll.ms", 
"1234567"), AlterConfigOp.OpType.SET),
+                        new AlterConfigOp(new 
ConfigEntry("max.connections.per.ip", "60"), AlterConfigOp.OpType.SET)
+                    )
+                );
+                assertEquals(List.of(ApiError.NONE), incrementalAlter(admin, 
brokerConfigs));
+
+                validateConfigs(admin, Map.of(
+                    new ConfigResource(Type.BROKER, ""), Map.of(
+                        "log.roll.ms", "1234567",
+                        "max.connections.per.ip", "60",
+                        "min.insync.replicas", "1"
+                    )), true);
+
+                admin.createTopics(List.of(
+                    new NewTopic("foo", 2, (short) 3),
+                    new NewTopic("bar", 2, (short) 3)
+                )).all().get();
+                waitForAllPartitions(cluster, "foo", 2);
+                waitForAllPartitions(cluster, "bar", 2);
+
+                validateConfigs(admin, Map.of(
+                    new ConfigResource(Type.TOPIC, "bar"), Map.of()
+                ), false);
+
+                assertListEquals(List.of(ApiError.NONE,
+                    new ApiError(Errors.INVALID_CONFIG, "Unknown topic config 
name: not.a.real.topic.config"),
+                    new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, "The topic 
'baz' does not exist.")),
+                    incrementalAlter(admin, Map.of(
+                        new ConfigResource(Type.TOPIC, "foo"),
+                        List.of(new AlterConfigOp(new 
ConfigEntry("segment.jitter.ms", "345"), AlterConfigOp.OpType.SET)),
+                        new ConfigResource(Type.TOPIC, "bar"),
+                        List.of(new AlterConfigOp(new 
ConfigEntry("not.a.real.topic.config", "789"), AlterConfigOp.OpType.SET)),
+                        new ConfigResource(Type.TOPIC, "baz"),
+                        List.of(new AlterConfigOp(new 
ConfigEntry("segment.jitter.ms", "678"), AlterConfigOp.OpType.SET))
+                    )));
+
+                validateConfigs(admin, Map.of(
+                    new ConfigResource(Type.TOPIC, "foo"), 
Map.of("segment.jitter.ms", "345")
+                ), false);
+                assertEquals(List.of(ApiError.NONE), incrementalAlter(admin, 
Map.of(
+                    new ConfigResource(Type.BROKER, "2"),
+                    List.of(new AlterConfigOp(new 
ConfigEntry("max.connections.per.ip", "7"), AlterConfigOp.OpType.SET))
+                )));
+                validateConfigs(admin, Map.of(
+                    new ConfigResource(Type.BROKER, "2"), 
Map.of("max.connections.per.ip", "7")
+                ), false);
+            }
+        }
+    }
+
+    private void waitForAllPartitions(KafkaClusterTestKit cluster, String 
topic, int expectedNumPartitions)
+        throws InterruptedException {
+        TestUtils.waitForCondition(() -> 
cluster.brokers().values().stream().allMatch(broker -> {
+            Optional<Integer> numPartitionsOpt = 
broker.metadataCache().numPartitions(topic);
+            if (expectedNumPartitions == 0) {
+                return numPartitionsOpt.isEmpty();
+            } else {
+                return numPartitionsOpt.isPresent() && numPartitionsOpt.get() 
== expectedNumPartitions;
+            }
+        }), 60000L, "Topic [" + topic + "] metadata not propagated after 60000 
ms");
+    }
+
+    private List<ApiError> incrementalAlter(Admin admin, Map<ConfigResource, 
Collection<AlterConfigOp>> changes) {
+        Map<ConfigResource, KafkaFuture<Void>> values = 
admin.incrementalAlterConfigs(changes).values();
+        return changes.keySet().stream().map(resource -> {
+            try {
+                values.get(resource).get();
+                return ApiError.NONE;
+            } catch (ExecutionException e) {
+                return ApiError.fromThrowable(e.getCause());
+            } catch (Throwable t) {
+                return ApiError.fromThrowable(t);
+            }
+        }).collect(Collectors.toList());
+    }
+
+    private Map<ConfigResource, Map<String, String>> validateConfigs(
+        Admin admin,
+        Map<ConfigResource, Map<String, String>> expected,
+        boolean exhaustive
+    ) throws Exception {
+        Map<ConfigResource, Map<String, String>> results = new HashMap<>();
+        TestUtils.retryOnExceptionWithTimeout(60000, () -> {
+            try {
+                var values = admin.describeConfigs(expected.keySet()).values();
+                results.clear();
+                assertEquals(expected.keySet(), values.keySet());
+                for (Map.Entry<ConfigResource, Map<String, String>> entry : 
expected.entrySet()) {
+                    ConfigResource resource = entry.getKey();
+                    Map<String, String> expectedPairs = entry.getValue();
+                    var config = values.get(resource).get();
+                    Map<String, String> actualMap = new TreeMap<>();
+                    Map<String, String> expectedMap = new TreeMap<>();
+                    config.entries().forEach(configEntry -> {
+                        actualMap.put(configEntry.name(), configEntry.value());
+                        if (!exhaustive) {
+                            expectedMap.put(configEntry.name(), 
configEntry.value());
+                        }
+                    });
+                    expectedMap.putAll(expectedPairs);
+                    assertEquals(expectedMap, actualMap);
+                    results.put(resource, actualMap);
+                }
+            } catch (Exception t) {
+                log.warn("Unable to describeConfigs({})", expected.keySet(), 
t);
+                throw t;
+            }
+        });
+        return results;
+    }
+
+    @Test
+    public void testSetLog4jConfigurations() throws Exception {
+        try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+            new TestKitNodes.Builder()
+                .setNumBrokerNodes(4)
+                .setNumControllerNodes(3)
+                .build()).build()) {
+            cluster.format();
+            cluster.startup();
+            cluster.waitForReadyBrokers();
+
+            try (Admin admin = cluster.admin()) {
+                log.debug("setting log4j");
+                LOG_2.debug("setting log4j");
+
+                ConfigResource broker2 = new 
ConfigResource(Type.BROKER_LOGGER, "2");

Review Comment:
   We could scale down the broker count for this test, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to