This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a6a0ea56d89 KAFKA-17171 Add test cases for `STATIC_BROKER_CONFIG`in
kraft mode (#18463)
a6a0ea56d89 is described below
commit a6a0ea56d89c811478fb18cdf9f9a4d19ea6fb88
Author: TengYao Chi <[email protected]>
AuthorDate: Tue Mar 18 00:30:53 2025 +0800
KAFKA-17171 Add test cases for `STATIC_BROKER_CONFIG`in kraft mode (#18463)
Given that the `core` module will be separated into other small modules,
this test will not be added to the core module.
Instead, I added it to the `clients-integration-tests` module since it
focuses on the admin client test. The patch should include following test
cases.
1. a topic-related static config is added to quorum controller. The
configs from topic creation should include it, but `describeConfigs`
does not.
2. a topic-related static config is added to quorum controller. The
configs from topic creation should include it, and `describeConfigs`
does if admin is using controller.bootstrap
3. a topic-related static config is added to broker. The configs from
topic creation should NOT include it, but `describeConfigs` does.
4. a topic-related static config is added to broker. The configs from
topic creation should NOT include it, and `describeConfigs` does not
also if admin is using controller.bootstrap
for another, the docs of `STATIC_BROKER_CONFIG` should remind the impact of
"controller.properties" BTW, those test cases should leverage new test infra,
since new test infra allow us to define configs to broker/controller
individually.
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../clients/admin/StaticBrokerConfigTest.java | 115 +++++++++++++++++++++
.../BootstrapControllersIntegrationTest.java | 21 ++++
2 files changed, 136 insertions(+)
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/StaticBrokerConfigTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/StaticBrokerConfigTest.java
new file mode 100644
index 00000000000..9e56e6bd3e6
--- /dev/null
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/StaticBrokerConfigTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.Type;
+import org.apache.kafka.common.test.junit.ClusterTestExtensions;
+import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
+
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
+@ExtendWith(value = ClusterTestExtensions.class)
+public class StaticBrokerConfigTest {
+ private static final String TOPIC = "topic";
+ private static final String CUSTOM_VALUE = "1048576";
+
+ /**
+ * synonyms of `segment.bytes`
+ */
+ private static final String LOG_SEGMENT_BYTES = "log.segment.bytes";
+
+ @ClusterTest(types = {Type.KRAFT},
+ serverProperties = {
+ @ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+ @ClusterConfigProperty(id = 3000, key = LOG_SEGMENT_BYTES, value =
CUSTOM_VALUE)
+ })
+ public void
testTopicConfigsGetImpactedIfStaticConfigsAddToController(ClusterInstance
cluster)
+ throws ExecutionException, InterruptedException {
+ try (
+ Admin admin = cluster.admin();
+ Admin adminUsingBootstrapController = cluster.admin(Map.of(), true)
+ ) {
+ ConfigEntry configEntry = admin.createTopics(List.of(new
NewTopic(TOPIC, 1, (short) 1)))
+ .config(TOPIC).get().get(TopicConfig.SEGMENT_BYTES_CONFIG);
+ assertEquals(ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG,
configEntry.source());
+ assertEquals(CUSTOM_VALUE, configEntry.value(), "Config value
should be custom value since controller has related static config");
+
+ ConfigResource brokerResource = new
ConfigResource(ConfigResource.Type.BROKER, "0");
+ configEntry =
admin.describeConfigs(List.of(brokerResource)).all().get().get(brokerResource).get(LOG_SEGMENT_BYTES);
+ assertEquals(ConfigEntry.ConfigSource.DEFAULT_CONFIG,
configEntry.source());
+ assertNotEquals(CUSTOM_VALUE,
+ configEntry.value(),
+ "Config value should not be custom value since broker doesn't
have related static config");
+
+ ConfigResource controllerResource = new
ConfigResource(ConfigResource.Type.BROKER, "3000");
+ configEntry =
adminUsingBootstrapController.describeConfigs(List.of(controllerResource))
+ .all().get().get(controllerResource).get(LOG_SEGMENT_BYTES);
+ assertEquals(ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG,
configEntry.source());
+ assertEquals(CUSTOM_VALUE,
+ configEntry.value(),
+ "Config value should be custom value since controller has
related static config");
+ }
+ }
+
+ @ClusterTest(types = {Type.KRAFT},
+ serverProperties = {
+ @ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+ @ClusterConfigProperty(id = 0, key = LOG_SEGMENT_BYTES, value =
CUSTOM_VALUE)
+ })
+ public void
testTopicConfigsGetImpactedIfStaticConfigsAddToBroker(ClusterInstance cluster)
+ throws ExecutionException, InterruptedException {
+ try (
+ Admin admin = cluster.admin();
+ Admin adminUsingBootstrapController = cluster.admin(Map.of(), true)
+ ) {
+ ConfigEntry configEntry = admin.createTopics(List.of(new
NewTopic(TOPIC, 1, (short) 1)))
+ .config(TOPIC).get().get(TopicConfig.SEGMENT_BYTES_CONFIG);
+ assertEquals(ConfigEntry.ConfigSource.DEFAULT_CONFIG,
configEntry.source());
+ assertNotEquals(CUSTOM_VALUE,
+ configEntry.value(),
+ "Config value should not be custom value since controller
doesn't have static config");
+
+ ConfigResource brokerResource = new
ConfigResource(ConfigResource.Type.BROKER, "0");
+ configEntry =
admin.describeConfigs(List.of(brokerResource)).all().get().get(brokerResource).get(LOG_SEGMENT_BYTES);
+ assertEquals(ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG,
configEntry.source());
+ assertEquals(CUSTOM_VALUE,
+ configEntry.value(),
+ "Config value should be custom value since broker has related
static config");
+
+ ConfigResource controllerResource = new
ConfigResource(ConfigResource.Type.BROKER, "3000");
+ configEntry =
adminUsingBootstrapController.describeConfigs(List.of(controllerResource))
+ .all().get().get(controllerResource).get(LOG_SEGMENT_BYTES);
+ assertEquals(ConfigEntry.ConfigSource.DEFAULT_CONFIG,
configEntry.source());
+ assertNotEquals(CUSTOM_VALUE,
+ configEntry.value(),
+ "Config value should not be custom value since controller
doesn't have related static config");
+ }
+ }
+}
diff --git
a/core/src/test/java/kafka/server/BootstrapControllersIntegrationTest.java
b/core/src/test/java/kafka/server/BootstrapControllersIntegrationTest.java
index dcee16b8ec6..17ba071beb0 100644
--- a/core/src/test/java/kafka/server/BootstrapControllersIntegrationTest.java
+++ b/core/src/test/java/kafka/server/BootstrapControllersIntegrationTest.java
@@ -44,6 +44,7 @@ import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.errors.InvalidUpdateVersionException;
import org.apache.kafka.common.errors.MismatchedEndpointTypeException;
import org.apache.kafka.common.errors.UnsupportedEndpointTypeException;
@@ -77,6 +78,7 @@ import java.util.stream.Collectors;
import static
org.apache.kafka.clients.admin.AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG;
import static
org.apache.kafka.clients.admin.AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG;
import static
org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.DYNAMIC_BROKER_CONFIG;
+import static
org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG;
import static org.apache.kafka.common.config.ConfigResource.Type.BROKER;
import static
org.apache.kafka.server.config.ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
@@ -342,4 +344,23 @@ public class BootstrapControllersIntegrationTest {
assertEquals(aclBinding, deletedAclBindings.iterator().next());
}
}
+
+ @ClusterTest(
+ brokers = 2,
+ serverProperties = {
+ @ClusterConfigProperty(key =
TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, value = "2")
+ }
+ )
+ public void testDescribeConfigs(ClusterInstance clusterInstance) throws
Exception {
+ try (Admin admin = Admin.create(adminConfig(clusterInstance, true))) {
+ ConfigResource resource = new ConfigResource(BROKER, "");
+ Map<ConfigResource, Config> resourceToConfig =
admin.describeConfigs(List.of(resource)).all().get();
+ Config config = resourceToConfig.get(resource);
+ assertNotNull(config);
+ ConfigEntry configEntry =
config.get(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG);
+ assertEquals(DYNAMIC_DEFAULT_BROKER_CONFIG, configEntry.source());
+ assertNotNull(configEntry);
+ assertEquals("2", configEntry.value());
+ }
+ }
}