OmniaGM commented on code in PR #15597:
URL: https://github.com/apache/kafka/pull/15597#discussion_r1541313630


##########
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##########
@@ -1502,13 +1502,23 @@ public static <K, V> Map<K, V> filterMap(final Map<K, 
V> map, final Predicate<En
      * @return a map including all elements in properties
      */
     public static Map<String, Object> propsToMap(Properties properties) {
-        Map<String, Object> map = new HashMap<>(properties.size());
-        for (Map.Entry<Object, Object> entry : properties.entrySet()) {
+        return castToStringObjectMap(properties);
+    }
+
+    /**
+     * Cast a map with arbitrary type keys to be keyed on String.
+     * @param inputMap A map with unknown type keys
+     * @return A map with the same contents as the input map, but with String 
keys
+     * @throws ConfigException if any key is not a String
+     */
+    public static Map<String, Object> castToStringObjectMap(Map<?, ?> 
inputMap) {

Review Comment:
   There is a similar function like this one in 
`AbstractConfigTest.convertPropertiesToMap` maybe we can drop the one in 
`AbstractConfigTest`



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -925,10 +926,13 @@ static Map<String, Object> adminConfigs(String connName,
         // Ignore configs that begin with "admin." since those will be added 
next (with the prefix stripped)
         // and those that begin with "producer." and "consumer.", since we 
know they aren't intended for
         // the admin client
+        // Also ignore the config.providers configurations because the 
worker-configured ConfigProviders should
+        // already have been evaluated via the trusted WorkerConfig constructor
         Map<String, Object> nonPrefixedWorkerConfigs = 
config.originals().entrySet().stream()
                 .filter(e -> !e.getKey().startsWith("admin.")
                         && !e.getKey().startsWith("producer.")
-                        && !e.getKey().startsWith("consumer."))
+                        && !e.getKey().startsWith("consumer.")
+                        && 
!e.getKey().startsWith(AbstractConfig.CONFIG_PROVIDERS_CONFIG))

Review Comment:
   Small suggestion, as the list of config isn't that huge then I would suggest 
refactoring this to something similar to the following 
   ```
   Stream<String> prefixes = Stream.of("admin.", "producer.", "consumer.", 
AbstractConfig.CONFIG_PROVIDERS_CONFIG);
           Map<String, Object> nonPrefixedWorkerConfigs = 
config.originals().entrySet().stream()
                   .filter(e -> prefixes.allMatch(s -> 
!e.getKey().startsWith(s)))
                   .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
   ```
   



##########
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java:
##########
@@ -580,8 +601,15 @@ private Map<String, ConfigProvider> 
instantiateConfigProviders(Map<String, Strin
 
         for (String provider : configProviders.split(",")) {
             String providerClass = providerClassProperty(provider);
-            if (indirectConfigs.containsKey(providerClass))
-                providerMap.put(provider, indirectConfigs.get(providerClass));
+            if (indirectConfigs.containsKey(providerClass)) {
+                String providerClassName = indirectConfigs.get(providerClass);
+                if (classNameFilter.test(providerClassName)) {
+                    providerMap.put(provider, providerClassName);
+                } else {
+                    throw new ConfigException(providerClassName + " is not 
allowed. Update System property '"
+                            + AUTOMATIC_CONFIG_PROVIDERS_PROPERTY + "' to 
allow " + providerClassName);
+                }
+            }
 

Review Comment:
   There is an extra line here need to be removed



##########
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java:
##########
@@ -580,8 +601,15 @@ private Map<String, ConfigProvider> 
instantiateConfigProviders(Map<String, Strin
 
         for (String provider : configProviders.split(",")) {
             String providerClass = providerClassProperty(provider);
-            if (indirectConfigs.containsKey(providerClass))
-                providerMap.put(provider, indirectConfigs.get(providerClass));
+            if (indirectConfigs.containsKey(providerClass)) {
+                String providerClassName = indirectConfigs.get(providerClass);
+                if (classNameFilter.test(providerClassName)) {

Review Comment:
   small suggestion here instead of having 2 nested `if`s inside a `for` maybe 
we can something like this 
   ```
   Optional<String> providerClassName = 
Optional.ofNullable(indirectConfigs.get(providerClass));
               Boolean isAllowed = providerClassName.map(name -> 
classNameFilter.test(name)).orElse(false);
               if (isAllowed) {
                   providerMap.put(provider, providerClassName.get());
               } else {
                   throw new ConfigException(providerClassName + " is not 
allowed. Update System property '"
                           + AUTOMATIC_CONFIG_PROVIDERS_PROPERTY + "' to allow 
" + providerClassName);
               }
     ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to