This is an automated email from the ASF dual-hosted git repository. chia7712 pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new f79e429044c KAFKA-16939 Revisit ConfigCommandIntegrationTest (#16317) f79e429044c is described below commit f79e429044cfe3701f48d54ff79ee2b42841ed99 Author: Ken Huang <s7133...@gmail.com> AuthorDate: Thu Jun 20 10:40:47 2024 +0900 KAFKA-16939 Revisit ConfigCommandIntegrationTest (#16317) Reviewers: Chia-Ping Tsai <chia7...@gmail.com> --- .../kafka/admin/ConfigCommandIntegrationTest.java | 80 ++++++++++++++-------- 1 file changed, 53 insertions(+), 27 deletions(-) diff --git a/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java b/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java index fc88424b842..045a9377f67 100644 --- a/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java +++ b/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java @@ -28,6 +28,7 @@ import kafka.zk.BrokerInfo; import kafka.zk.KafkaZkClient; import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.ConfigEntry; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.network.ListenerName; @@ -48,6 +49,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Properties; import java.util.Set; @@ -229,15 +231,17 @@ public class ConfigCommandIntegrationTest { alterAndVerifyConfig(client, Optional.empty(), singletonMap("message.max.bytes", "140000")); // Delete config - deleteAndVerifyConfig(client, Optional.of(defaultBrokerId), singleton("message.max.bytes")); + deleteAndVerifyConfigValue(client, defaultBrokerId, singleton("message.max.bytes"), true); // Listener configs: should work only with listener name alterAndVerifyConfig(client, Optional.of(defaultBrokerId), singletonMap("listener.name.internal.ssl.keystore.location", "/tmp/test.jks")); - alterConfigWithKraft(client, Optional.empty(), - singletonMap("listener.name.internal.ssl.keystore.location", "/tmp/test.jks")); - deleteAndVerifyConfig(client, Optional.of(defaultBrokerId), - singleton("listener.name.internal.ssl.keystore.location")); + // Per-broker config configured at default cluster-level should fail + assertThrows(ExecutionException.class, + () -> alterConfigWithKraft(client, Optional.empty(), + singletonMap("listener.name.internal.ssl.keystore.location", "/tmp/test.jks"))); + deleteAndVerifyConfigValue(client, defaultBrokerId, + singleton("listener.name.internal.ssl.keystore.location"), false); alterConfigWithKraft(client, Optional.of(defaultBrokerId), singletonMap("listener.name.external.ssl.keystore.password", "secret")); @@ -360,7 +364,7 @@ public class ConfigCommandIntegrationTest { alterConfigWithKraft(client, Optional.of(defaultBrokerId), singletonMap(listenerName + "ssl.truststore.password", "password")); - verifyConfigDefaultValue(client, Optional.of(defaultBrokerId), + verifyConfigSecretValue(client, Optional.of(defaultBrokerId), singleton(listenerName + "ssl.truststore.password")); } } @@ -478,8 +482,7 @@ public class ConfigCommandIntegrationTest { private List<String> generateDefaultAlterOpts(String bootstrapServers) { return asList("--bootstrap-server", bootstrapServers, - "--entity-type", "brokers", - "--entity-name", "0", "--alter"); + "--entity-type", "brokers", "--alter"); } private void alterAndVerifyConfig(Admin client, Optional<String> brokerId, Map<String, String> config) throws Exception { @@ -495,36 +498,59 @@ public class ConfigCommandIntegrationTest { } private void verifyConfig(Admin client, Optional<String> brokerId, Map<String, String> config) throws Exception { - ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, brokerId.orElse(defaultBrokerId)); + ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, brokerId.orElse("")); TestUtils.waitForCondition(() -> { - Map<String, String> current = client.describeConfigs(singletonList(configResource)) - .all() - .get() - .values() - .stream() - .flatMap(e -> e.entries().stream()) - .collect(HashMap::new, (map, entry) -> map.put(entry.name(), entry.value()), HashMap::putAll); + Map<String, String> current = getConfigEntryStream(client, configResource) + .filter(configEntry -> Objects.nonNull(configEntry.value())) + .collect(Collectors.toMap(ConfigEntry::name, ConfigEntry::value)); return config.entrySet().stream().allMatch(e -> e.getValue().equals(current.get(e.getKey()))); }, 10000, config + " are not updated"); } - private void deleteAndVerifyConfig(Admin client, Optional<String> brokerId, Set<String> config) throws Exception { + private Stream<ConfigEntry> getConfigEntryStream(Admin client, + ConfigResource configResource) throws InterruptedException, ExecutionException { + return client.describeConfigs(singletonList(configResource)) + .all() + .get() + .values() + .stream() + .flatMap(e -> e.entries().stream()); + } + + private void deleteAndVerifyConfigValue(Admin client, + String brokerId, + Set<String> config, + boolean hasDefaultValue) throws Exception { ConfigCommand.ConfigCommandOptions deleteOpts = - new ConfigCommand.ConfigCommandOptions(toArray(alterOpts, entityOp(brokerId), + new ConfigCommand.ConfigCommandOptions(toArray(alterOpts, asList("--entity-name", brokerId), asList("--delete-config", String.join(",", config)))); ConfigCommand.alterConfig(client, deleteOpts); - verifyConfigDefaultValue(client, brokerId, config); + verifyPerBrokerConfigValue(client, brokerId, config, hasDefaultValue); + } + + private void verifyPerBrokerConfigValue(Admin client, + String brokerId, + Set<String> config, + boolean hasDefaultValue) throws Exception { + ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, brokerId); + TestUtils.waitForCondition(() -> { + if (hasDefaultValue) { + Map<String, String> current = getConfigEntryStream(client, configResource) + .filter(configEntry -> Objects.nonNull(configEntry.value())) + .collect(Collectors.toMap(ConfigEntry::name, ConfigEntry::value)); + return config.stream().allMatch(current::containsKey); + } else { + return getConfigEntryStream(client, configResource) + .noneMatch(configEntry -> config.contains(configEntry.name())); + } + }, 5000, config + " are not updated"); } - private void verifyConfigDefaultValue(Admin client, Optional<String> brokerId, Set<String> config) throws Exception { - ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, brokerId.orElse(defaultBrokerId)); + private void verifyConfigSecretValue(Admin client, Optional<String> brokerId, Set<String> config) throws Exception { + ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, brokerId.orElse("")); TestUtils.waitForCondition(() -> { - Map<String, String> current = client.describeConfigs(singletonList(configResource)) - .all() - .get() - .values() - .stream() - .flatMap(e -> e.entries().stream()) + Map<String, String> current = getConfigEntryStream(client, configResource) + .filter(ConfigEntry::isSensitive) .collect(HashMap::new, (map, entry) -> map.put(entry.name(), entry.value()), HashMap::putAll); return config.stream().allMatch(current::containsKey); }, 5000, config + " are not updated");