chia7712 commented on code in PR #15715: URL: https://github.com/apache/kafka/pull/15715#discussion_r1590605162
########## core/src/test/java/kafka/test/junit/ClusterTestExtensions.java: ########## @@ -143,11 +143,12 @@ private void processClusterTest(ExtensionContext context, ClusterTest annot, Clu Type type = annot.clusterType() == Type.DEFAULT ? defaults.clusterType() : annot.clusterType(); Map<String, String> serverProperties = new HashMap<>(); + Map<Integer, Map<String, String>> perServerProperties = new HashMap<>(); Review Comment: Maybe we can rewrite them by lambda. ```java Map<String, String> serverProperties = Stream.concat(Arrays.stream(defaults.serverProperties()), Arrays.stream(annot.serverProperties())) .filter(e -> e.id() == -1) .collect(Collectors.toMap(ClusterConfigProperty::key, ClusterConfigProperty::value, (a, b) -> b)); Map<Integer, Map<String, String>> perServerProperties = Stream.concat(Arrays.stream(defaults.serverProperties()), Arrays.stream(annot.serverProperties())) .filter(e -> e.id() != -1) .collect(Collectors.groupingBy(ClusterConfigProperty::id, Collectors.mapping(Function.identity(), Collectors.toMap(ClusterConfigProperty::key, ClusterConfigProperty::value, (a, b) -> b)))); ``` ########## core/src/test/java/kafka/testkit/TestKitNodes.java: ########## @@ -105,17 +109,27 @@ public TestKitNodes build() { List<Integer> controllerNodeIds = IntStream.range(startControllerId(), startControllerId() + numControllerNodes) .boxed() .collect(Collectors.toList()); - List<Integer> brokerNodeIds = IntStream.range(startBrokerId(), startBrokerId() + numBrokerNodes) + List<Integer> brokerNodeIds = IntStream.range(BROKER_ID_OFFSET, BROKER_ID_OFFSET + numBrokerNodes) .boxed() .collect(Collectors.toList()); + Set<Integer> unknownIds = perServerProperties.keySet().stream() Review Comment: We can convert the `Integer` to `String` here. The following error message can use `String.join` to simplify code. ########## core/src/test/java/kafka/test/ClusterTestExtensionsTest.java: ########## @@ -80,30 +84,55 @@ public void testClusterTemplate() { @ClusterTests({ @ClusterTest(name = "cluster-tests-1", clusterType = Type.ZK, serverProperties = { @ClusterConfigProperty(key = "foo", value = "bar"), - @ClusterConfigProperty(key = "spam", value = "eggs") + @ClusterConfigProperty(key = "spam", value = "eggs"), + @ClusterConfigProperty(id = 86400, key = "baz", value = "qux"), // this one will be ignored as there is no broker id is 86400 }), @ClusterTest(name = "cluster-tests-2", clusterType = Type.KRAFT, serverProperties = { @ClusterConfigProperty(key = "foo", value = "baz"), @ClusterConfigProperty(key = "spam", value = "eggz"), - @ClusterConfigProperty(key = "default.key", value = "overwrite.value") + @ClusterConfigProperty(key = "default.key", value = "overwrite.value"), + @ClusterConfigProperty(id = 0, key = "queued.max.requests", value = "200"), + @ClusterConfigProperty(id = 3000, key = "queued.max.requests", value = "300") }), @ClusterTest(name = "cluster-tests-3", clusterType = Type.CO_KRAFT, serverProperties = { @ClusterConfigProperty(key = "foo", value = "baz"), @ClusterConfigProperty(key = "spam", value = "eggz"), - @ClusterConfigProperty(key = "default.key", value = "overwrite.value") + @ClusterConfigProperty(key = "default.key", value = "overwrite.value"), + @ClusterConfigProperty(id = 0, key = "queued.max.requests", value = "200") }) }) - public void testClusterTests() { - if (clusterInstance.clusterType().equals(ClusterInstance.ClusterType.ZK)) { + public void testClusterTests() throws ExecutionException, InterruptedException { + if (!clusterInstance.isKRaftTest()) { Assertions.assertEquals("bar", clusterInstance.config().serverProperties().get("foo")); Assertions.assertEquals("eggs", clusterInstance.config().serverProperties().get("spam")); Assertions.assertEquals("default.value", clusterInstance.config().serverProperties().get("default.key")); - } else if (clusterInstance.clusterType().equals(ClusterInstance.ClusterType.RAFT)) { + + try (Admin admin = clusterInstance.createAdminClient()) { + ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, "0"); + Map<ConfigResource, Config> configs = admin.describeConfigs(Collections.singletonList(configResource)).all().get(); + Assertions.assertEquals(1, configs.size()); + Assertions.assertEquals("100", configs.get(configResource).get("queued.max.requests").value()); Review Comment: Could you please add comments for those assert? ########## core/src/test/java/kafka/testkit/TestKitNodes.java: ########## @@ -140,15 +154,11 @@ public TestKitNodes build() { brokerNodes); } - private int startBrokerId() { - return 0; - } - private int startControllerId() { Review Comment: Could we inline this function? For example: `int controllerId = combined ? BROKER_ID_OFFSET : BROKER_ID_OFFSET + CONTROLLER_ID_OFFSET;` ########## core/src/test/java/kafka/test/annotation/ClusterConfigProperty.java: ########## @@ -27,6 +27,27 @@ @Target({ElementType.ANNOTATION_TYPE}) @Retention(RetentionPolicy.RUNTIME) public @interface ClusterConfigProperty { + /** + * The config applies to the controller/broker with specified id. Default is -1, indicating the property applied to + * all controller/broker servers. Note that the "controller" here refers to the KRaft quorum controller. + * The id can vary depending on the different {@link kafka.test.annotation.Type}. + * <ul> + * <li> Under {@link kafka.test.annotation.Type#ZK}, the broker id starts from + * {@link kafka.testkit.TestKitNodes#BROKER_ID_OFFSET 0} and increases by 1 + * with each additional broker, and there is no controller server under this mode. </li> + * <li> Under {@link kafka.test.annotation.Type#KRAFT}, the broker id starts from + * {@link kafka.testkit.TestKitNodes#BROKER_ID_OFFSET 0}, the controller id + * starts from {@link kafka.testkit.TestKitNodes#CONTROLLER_ID_OFFSET 3000} + * and increases by 1 with each addition broker/controller.</li> + * <li> Under {@link kafka.test.annotation.Type#CO_KRAFT}, the broker id and controller id both start from + * {@link kafka.testkit.TestKitNodes#BROKER_ID_OFFSET 0} + * and increases by 1 with each additional broker/controller.</li> + * </ul> + * + * If the id doesn't correspond to any broker/controller server, throw RuntimeException Review Comment: It seems to me `IllegalArgumentException` is more suitable since that is caused by "illegal argument" ########## core/src/test/java/kafka/testkit/TestKitNodes.java: ########## @@ -105,17 +109,27 @@ public TestKitNodes build() { List<Integer> controllerNodeIds = IntStream.range(startControllerId(), startControllerId() + numControllerNodes) .boxed() .collect(Collectors.toList()); - List<Integer> brokerNodeIds = IntStream.range(startBrokerId(), startBrokerId() + numBrokerNodes) + List<Integer> brokerNodeIds = IntStream.range(BROKER_ID_OFFSET, BROKER_ID_OFFSET + numBrokerNodes) .boxed() .collect(Collectors.toList()); + Set<Integer> unknownIds = perServerProperties.keySet().stream() + .filter(id -> !controllerNodeIds.contains(id)) + .filter(id -> !brokerNodeIds.contains(id)) + .collect(Collectors.toSet()); + if (!unknownIds.isEmpty()) { + throw new RuntimeException(String.format("Unknown server id %s in perServerProperties", Review Comment: Could you please add "existent id" to the error message? ########## core/src/test/java/kafka/test/ClusterTestExtensionsTest.java: ########## @@ -80,30 +84,55 @@ public void testClusterTemplate() { @ClusterTests({ @ClusterTest(name = "cluster-tests-1", clusterType = Type.ZK, serverProperties = { @ClusterConfigProperty(key = "foo", value = "bar"), - @ClusterConfigProperty(key = "spam", value = "eggs") + @ClusterConfigProperty(key = "spam", value = "eggs"), + @ClusterConfigProperty(id = 86400, key = "baz", value = "qux"), // this one will be ignored as there is no broker id is 86400 }), @ClusterTest(name = "cluster-tests-2", clusterType = Type.KRAFT, serverProperties = { @ClusterConfigProperty(key = "foo", value = "baz"), @ClusterConfigProperty(key = "spam", value = "eggz"), - @ClusterConfigProperty(key = "default.key", value = "overwrite.value") + @ClusterConfigProperty(key = "default.key", value = "overwrite.value"), + @ClusterConfigProperty(id = 0, key = "queued.max.requests", value = "200"), + @ClusterConfigProperty(id = 3000, key = "queued.max.requests", value = "300") }), @ClusterTest(name = "cluster-tests-3", clusterType = Type.CO_KRAFT, serverProperties = { @ClusterConfigProperty(key = "foo", value = "baz"), @ClusterConfigProperty(key = "spam", value = "eggz"), - @ClusterConfigProperty(key = "default.key", value = "overwrite.value") + @ClusterConfigProperty(key = "default.key", value = "overwrite.value"), + @ClusterConfigProperty(id = 0, key = "queued.max.requests", value = "200") }) }) - public void testClusterTests() { - if (clusterInstance.clusterType().equals(ClusterInstance.ClusterType.ZK)) { + public void testClusterTests() throws ExecutionException, InterruptedException { + if (!clusterInstance.isKRaftTest()) { Assertions.assertEquals("bar", clusterInstance.config().serverProperties().get("foo")); Assertions.assertEquals("eggs", clusterInstance.config().serverProperties().get("spam")); Assertions.assertEquals("default.value", clusterInstance.config().serverProperties().get("default.key")); - } else if (clusterInstance.clusterType().equals(ClusterInstance.ClusterType.RAFT)) { + + try (Admin admin = clusterInstance.createAdminClient()) { + ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, "0"); + Map<ConfigResource, Config> configs = admin.describeConfigs(Collections.singletonList(configResource)).all().get(); + Assertions.assertEquals(1, configs.size()); + Assertions.assertEquals("100", configs.get(configResource).get("queued.max.requests").value()); + } + } else { Assertions.assertEquals("baz", clusterInstance.config().serverProperties().get("foo")); Assertions.assertEquals("eggz", clusterInstance.config().serverProperties().get("spam")); Assertions.assertEquals("overwrite.value", clusterInstance.config().serverProperties().get("default.key")); - } else { - Assertions.fail("Unknown cluster type " + clusterInstance.clusterType()); + + try (Admin admin = clusterInstance.createAdminClient()) { + ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, "0"); + Map<ConfigResource, Config> configs = admin.describeConfigs(Collections.singletonList(configResource)).all().get(); + Assertions.assertEquals(1, configs.size()); + Assertions.assertEquals("200", configs.get(configResource).get("queued.max.requests").value()); + } + if (clusterInstance.config().clusterType().equals(Type.KRAFT)) { Review Comment: `clusterInstance.config().clusterType() == Type.KRAFT` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org