This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f79e429044c KAFKA-16939 Revisit ConfigCommandIntegrationTest (#16317)
f79e429044c is described below

commit f79e429044cfe3701f48d54ff79ee2b42841ed99
Author: Ken Huang <s7133...@gmail.com>
AuthorDate: Thu Jun 20 10:40:47 2024 +0900

    KAFKA-16939 Revisit ConfigCommandIntegrationTest (#16317)
    
    Reviewers: Chia-Ping Tsai <chia7...@gmail.com>
---
 .../kafka/admin/ConfigCommandIntegrationTest.java  | 80 ++++++++++++++--------
 1 file changed, 53 insertions(+), 27 deletions(-)

diff --git a/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java 
b/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java
index fc88424b842..045a9377f67 100644
--- a/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java
+++ b/core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java
@@ -28,6 +28,7 @@ import kafka.zk.BrokerInfo;
 import kafka.zk.KafkaZkClient;
 
 import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.ConfigEntry;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.network.ListenerName;
@@ -48,6 +49,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
@@ -229,15 +231,17 @@ public class ConfigCommandIntegrationTest {
             alterAndVerifyConfig(client, Optional.empty(), 
singletonMap("message.max.bytes", "140000"));
 
             // Delete config
-            deleteAndVerifyConfig(client, Optional.of(defaultBrokerId), 
singleton("message.max.bytes"));
+            deleteAndVerifyConfigValue(client, defaultBrokerId, 
singleton("message.max.bytes"), true);
 
             // Listener configs: should work only with listener name
             alterAndVerifyConfig(client, Optional.of(defaultBrokerId),
                     
singletonMap("listener.name.internal.ssl.keystore.location", "/tmp/test.jks"));
-            alterConfigWithKraft(client, Optional.empty(),
-                    
singletonMap("listener.name.internal.ssl.keystore.location", "/tmp/test.jks"));
-            deleteAndVerifyConfig(client, Optional.of(defaultBrokerId),
-                    singleton("listener.name.internal.ssl.keystore.location"));
+            // Per-broker config configured at default cluster-level should 
fail
+            assertThrows(ExecutionException.class,
+                    () -> alterConfigWithKraft(client, Optional.empty(), 
+                            
singletonMap("listener.name.internal.ssl.keystore.location", "/tmp/test.jks")));
+            deleteAndVerifyConfigValue(client, defaultBrokerId,
+                    singleton("listener.name.internal.ssl.keystore.location"), 
false);
             alterConfigWithKraft(client, Optional.of(defaultBrokerId),
                     
singletonMap("listener.name.external.ssl.keystore.password", "secret"));
 
@@ -360,7 +364,7 @@ public class ConfigCommandIntegrationTest {
 
             alterConfigWithKraft(client, Optional.of(defaultBrokerId),
                     singletonMap(listenerName + "ssl.truststore.password", 
"password"));
-            verifyConfigDefaultValue(client, Optional.of(defaultBrokerId),
+            verifyConfigSecretValue(client, Optional.of(defaultBrokerId),
                     singleton(listenerName + "ssl.truststore.password"));
         }
     }
@@ -478,8 +482,7 @@ public class ConfigCommandIntegrationTest {
 
     private List<String> generateDefaultAlterOpts(String bootstrapServers) {
         return asList("--bootstrap-server", bootstrapServers,
-                "--entity-type", "brokers",
-                "--entity-name", "0", "--alter");
+                "--entity-type", "brokers", "--alter");
     }
 
     private void alterAndVerifyConfig(Admin client, Optional<String> brokerId, 
Map<String, String> config) throws Exception {
@@ -495,36 +498,59 @@ public class ConfigCommandIntegrationTest {
     }
 
     private void verifyConfig(Admin client, Optional<String> brokerId, 
Map<String, String> config) throws Exception {
-        ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.BROKER, brokerId.orElse(defaultBrokerId));
+        ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.BROKER, brokerId.orElse(""));
         TestUtils.waitForCondition(() -> {
-            Map<String, String> current = 
client.describeConfigs(singletonList(configResource))
-                    .all()
-                    .get()
-                    .values()
-                    .stream()
-                    .flatMap(e -> e.entries().stream())
-                    .collect(HashMap::new, (map, entry) -> 
map.put(entry.name(), entry.value()), HashMap::putAll);
+            Map<String, String> current = getConfigEntryStream(client, 
configResource)
+                    .filter(configEntry -> 
Objects.nonNull(configEntry.value()))
+                    .collect(Collectors.toMap(ConfigEntry::name, 
ConfigEntry::value));
             return config.entrySet().stream().allMatch(e -> 
e.getValue().equals(current.get(e.getKey())));
         }, 10000, config + " are not updated");
     }
 
-    private void deleteAndVerifyConfig(Admin client, Optional<String> 
brokerId, Set<String> config) throws Exception {
+    private Stream<ConfigEntry> getConfigEntryStream(Admin client,
+                                                     ConfigResource 
configResource) throws InterruptedException, ExecutionException {
+        return client.describeConfigs(singletonList(configResource))
+                .all()
+                .get()
+                .values()
+                .stream()
+                .flatMap(e -> e.entries().stream());
+    }
+
+    private void deleteAndVerifyConfigValue(Admin client, 
+                                            String brokerId, 
+                                            Set<String> config, 
+                                            boolean hasDefaultValue) throws 
Exception {
         ConfigCommand.ConfigCommandOptions deleteOpts =
-                new ConfigCommand.ConfigCommandOptions(toArray(alterOpts, 
entityOp(brokerId),
+                new ConfigCommand.ConfigCommandOptions(toArray(alterOpts, 
asList("--entity-name", brokerId),
                         asList("--delete-config", String.join(",", config))));
         ConfigCommand.alterConfig(client, deleteOpts);
-        verifyConfigDefaultValue(client, brokerId, config);
+        verifyPerBrokerConfigValue(client, brokerId, config, hasDefaultValue);
+    }
+
+    private void verifyPerBrokerConfigValue(Admin client,
+                                            String brokerId,
+                                            Set<String> config,
+                                            boolean hasDefaultValue) throws 
Exception {
+        ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.BROKER, brokerId);
+        TestUtils.waitForCondition(() -> {
+            if (hasDefaultValue) {
+                Map<String, String> current = getConfigEntryStream(client, 
configResource)
+                        .filter(configEntry -> 
Objects.nonNull(configEntry.value()))
+                        .collect(Collectors.toMap(ConfigEntry::name, 
ConfigEntry::value));
+                return config.stream().allMatch(current::containsKey);
+            } else {
+                return getConfigEntryStream(client, configResource)
+                        .noneMatch(configEntry -> 
config.contains(configEntry.name()));
+            }
+        }, 5000, config + " are not updated");
     }
 
-    private void verifyConfigDefaultValue(Admin client, Optional<String> 
brokerId, Set<String> config) throws Exception {
-        ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.BROKER, brokerId.orElse(defaultBrokerId));
+    private void verifyConfigSecretValue(Admin client, Optional<String> 
brokerId, Set<String> config) throws Exception {
+        ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.BROKER, brokerId.orElse(""));
         TestUtils.waitForCondition(() -> {
-            Map<String, String> current = 
client.describeConfigs(singletonList(configResource))
-                    .all()
-                    .get()
-                    .values()
-                    .stream()
-                    .flatMap(e -> e.entries().stream())
+            Map<String, String> current = getConfigEntryStream(client, 
configResource)
+                    .filter(ConfigEntry::isSensitive)
                     .collect(HashMap::new, (map, entry) -> 
map.put(entry.name(), entry.value()), HashMap::putAll);
             return config.stream().allMatch(current::containsKey);
         }, 5000, config + " are not updated");

Reply via email to