chia7712 commented on code in PR #16317:
URL: https://github.com/apache/kafka/pull/16317#discussion_r1642019830


##########
core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java:
##########
@@ -493,36 +496,48 @@ private void alterConfigWithKraft(Admin client, 
Optional<String> brokerId, Map<S
     }
 
     private void verifyConfig(Admin client, Optional<String> brokerId, 
Map<String, String> config) throws Exception {
-        ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.BROKER, brokerId.orElse(defaultBrokerId));
+        ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.BROKER, brokerId.orElse(""));
         TestUtils.waitForCondition(() -> {
-            Map<String, String> current = 
client.describeConfigs(singletonList(configResource))
-                    .all()
-                    .get()
-                    .values()
-                    .stream()
-                    .flatMap(e -> e.entries().stream())
-                    .collect(HashMap::new, (map, entry) -> 
map.put(entry.name(), entry.value()), HashMap::putAll);
+            Map<String, String> current = getConfigEntryStream(client, 
configResource)
+                    .filter(configEntry -> 
Objects.nonNull(configEntry.value()))
+                    .collect(Collectors.toMap(ConfigEntry::name, 
ConfigEntry::value));
             return config.entrySet().stream().allMatch(e -> 
e.getValue().equals(current.get(e.getKey())));
         }, 10000, config + " are not updated");
     }
 
+    private Stream<ConfigEntry> getConfigEntryStream(Admin client,
+                                                     ConfigResource 
configResource) throws InterruptedException, ExecutionException {
+        return client.describeConfigs(singletonList(configResource))
+                .all()
+                .get()
+                .values()
+                .stream()
+                .flatMap(e -> e.entries().stream());
+    }
+
     private void deleteAndVerifyConfig(Admin client, Optional<String> 
brokerId, Set<String> config) throws Exception {
         ConfigCommand.ConfigCommandOptions deleteOpts =
                 new ConfigCommand.ConfigCommandOptions(toArray(alterOpts, 
entityOp(brokerId),
                         asList("--delete-config", String.join(",", config))));
         ConfigCommand.alterConfig(client, deleteOpts);
-        verifyConfigDefaultValue(client, brokerId, config);
+        verifyPerBrokerConfigValue(client, brokerId, config);

Review Comment:
   `verifyPerBrokerConfigValue` is used to make sure "the config is existent", 
right? That means it will get flaky if we verify it after we "delete" the 
configs



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to