This is an automated email from the ASF dual-hosted git repository.

gharris pushed a commit to branch 3.6
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit e070e336cbed43fce59b6013d3852e2633e409b3
Author: Greg Harris <greg.har...@aiven.io>
AuthorDate: Thu Mar 28 13:27:41 2024 -0700

    MINOR: AbstractConfig cleanup (#15597)
    
    Signed-off-by: Greg Harris <greg.har...@aiven.io>
    
    Reviewers: Chris Egerton <chr...@aiven.io>, Mickael Maison 
<mickael.mai...@gmail.com>, Omnia G H Ibrahim <o.g.h.ibra...@gmail.com>, 
Matthias J. Sax <matth...@confluent.io>
---
 .../apache/kafka/common/config/AbstractConfig.java |  50 +++++++--
 .../java/org/apache/kafka/common/utils/Utils.java  |  18 ++-
 .../kafka/common/config/AbstractConfigTest.java    | 122 ++++++++++++++++-----
 .../kafka/connect/mirror/MirrorClientConfig.java   |   2 +-
 .../kafka/connect/cli/AbstractConnectCli.java      |   2 +-
 .../apache/kafka/connect/runtime/WorkerConfig.java |   8 +-
 .../connect/runtime/rest/RestServerConfig.java     |   2 +-
 .../apache/kafka/connect/runtime/WorkerTest.java   |   1 +
 .../kafka/admin/BrokerApiVersionsCommand.scala     |   2 +-
 .../kafka/controller/PartitionStateMachine.scala   |   2 +-
 .../scala/kafka/server/DynamicBrokerConfig.scala   |   6 +-
 core/src/main/scala/kafka/server/KafkaConfig.scala |   2 +-
 .../controller/PartitionStateMachineTest.scala     |   4 +-
 13 files changed, 165 insertions(+), 56 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java 
b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index 2280813db1e..7c6fdcdf1e5 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -25,6 +25,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -33,6 +34,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
 
 /**
  * A convenient base class for configurations to extend.
@@ -58,6 +61,8 @@ public class AbstractConfig {
 
     private final ConfigDef definition;
 
+    public static final String AUTOMATIC_CONFIG_PROVIDERS_PROPERTY = 
"org.apache.kafka.automatic.config.providers";
+
     public static final String CONFIG_PROVIDERS_CONFIG = "config.providers";
 
     private static final String CONFIG_PROVIDERS_PARAM = ".param.";
@@ -101,14 +106,10 @@ public class AbstractConfig {
      *                            the constructor to resolve any variables in 
{@code originals}; may be null or empty
      * @param doLog               whether the configurations should be logged
      */
-    @SuppressWarnings("unchecked")
     public AbstractConfig(ConfigDef definition, Map<?, ?> originals, 
Map<String, ?> configProviderProps, boolean doLog) {
-        /* check that all the keys are really strings */
-        for (Map.Entry<?, ?> entry : originals.entrySet())
-            if (!(entry.getKey() instanceof String))
-                throw new ConfigException(entry.getKey().toString(), 
entry.getValue(), "Key must be a string.");
+        Map<String, Object> originalMap = 
Utils.castToStringObjectMap(originals);
 
-        this.originals = resolveConfigVariables(configProviderProps, 
(Map<String, Object>) originals);
+        this.originals = resolveConfigVariables(configProviderProps, 
originalMap);
         this.values = definition.parse(this.originals);
         Map<String, Object> configUpdates = 
postProcessParsedConfig(Collections.unmodifiableMap(this.values));
         for (Map.Entry<String, Object> update : configUpdates.entrySet()) {
@@ -521,6 +522,7 @@ public class AbstractConfig {
     private Map<String, ?> resolveConfigVariables(Map<String, ?> 
configProviderProps, Map<String, Object> originals) {
         Map<String, String> providerConfigString;
         Map<String, ?> configProperties;
+        Predicate<String> classNameFilter;
         Map<String, Object> resolvedOriginals = new HashMap<>();
         // As variable configs are strings, parse the originals and obtain the 
potential variable configs.
         Map<String, String> indirectVariables = 
extractPotentialVariables(originals);
@@ -529,11 +531,13 @@ public class AbstractConfig {
         if (configProviderProps == null || configProviderProps.isEmpty()) {
             providerConfigString = indirectVariables;
             configProperties = originals;
+            classNameFilter = automaticConfigProvidersFilter();
         } else {
             providerConfigString = 
extractPotentialVariables(configProviderProps);
             configProperties = configProviderProps;
+            classNameFilter = ignored -> true;
         }
-        Map<String, ConfigProvider> providers = 
instantiateConfigProviders(providerConfigString, configProperties);
+        Map<String, ConfigProvider> providers = 
instantiateConfigProviders(providerConfigString, configProperties, 
classNameFilter);
 
         if (!providers.isEmpty()) {
             ConfigTransformer configTransformer = new 
ConfigTransformer(providers);
@@ -547,6 +551,17 @@ public class AbstractConfig {
         return new ResolvingMap<>(resolvedOriginals, originals);
     }
 
+    private Predicate<String> automaticConfigProvidersFilter() {
+        String systemProperty = 
System.getProperty(AUTOMATIC_CONFIG_PROVIDERS_PROPERTY);
+        if (systemProperty == null) {
+            return ignored -> true;
+        } else {
+            return Arrays.stream(systemProperty.split(","))
+                    .map(String::trim)
+                    .collect(Collectors.toSet())::contains;
+        }
+    }
+
     private Map<String, Object> configProviderProperties(String 
configProviderPrefix, Map<String, ?> providerConfigProperties) {
         Map<String, Object> result = new HashMap<>();
         for (Map.Entry<String, ?> entry : providerConfigProperties.entrySet()) 
{
@@ -567,9 +582,14 @@ public class AbstractConfig {
      *
      * @param indirectConfigs          The map of potential variable configs
      * @param providerConfigProperties The map of config provider configs
-     * @return map map of config provider name and its instance.
+     * @param classNameFilter          Filter for config provider class names
+     * @return map of config provider name and its instance.
      */
-    private Map<String, ConfigProvider> instantiateConfigProviders(Map<String, 
String> indirectConfigs, Map<String, ?> providerConfigProperties) {
+    private Map<String, ConfigProvider> instantiateConfigProviders(
+            Map<String, String> indirectConfigs,
+            Map<String, ?> providerConfigProperties,
+            Predicate<String> classNameFilter
+    ) {
         final String configProviders = 
indirectConfigs.get(CONFIG_PROVIDERS_CONFIG);
 
         if (configProviders == null || configProviders.isEmpty()) {
@@ -580,9 +600,15 @@ public class AbstractConfig {
 
         for (String provider : configProviders.split(",")) {
             String providerClass = providerClassProperty(provider);
-            if (indirectConfigs.containsKey(providerClass))
-                providerMap.put(provider, indirectConfigs.get(providerClass));
-
+            if (indirectConfigs.containsKey(providerClass)) {
+                String providerClassName = indirectConfigs.get(providerClass);
+                if (classNameFilter.test(providerClassName)) {
+                    providerMap.put(provider, providerClassName);
+                } else {
+                    throw new ConfigException(providerClassName + " is not 
allowed. Update System property '"
+                            + AUTOMATIC_CONFIG_PROVIDERS_PROPERTY + "' to 
allow " + providerClassName);
+                }
+            }
         }
         // Instantiate Config Providers
         Map<String, ConfigProvider> configProviderInstances = new HashMap<>();
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java 
b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index bf9d0d8f16d..102f5b30b07 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -1435,13 +1435,23 @@ public final class Utils {
      * @return a map including all elements in properties
      */
     public static Map<String, Object> propsToMap(Properties properties) {
-        Map<String, Object> map = new HashMap<>(properties.size());
-        for (Map.Entry<Object, Object> entry : properties.entrySet()) {
+        return castToStringObjectMap(properties);
+    }
+
+    /**
+     * Cast a map with arbitrary type keys to be keyed on String.
+     * @param inputMap A map with unknown type keys
+     * @return A map with the same contents as the input map, but with String 
keys
+     * @throws ConfigException if any key is not a String
+     */
+    public static Map<String, Object> castToStringObjectMap(Map<?, ?> 
inputMap) {
+        Map<String, Object> map = new HashMap<>(inputMap.size());
+        for (Map.Entry<?, ?> entry : inputMap.entrySet()) {
             if (entry.getKey() instanceof String) {
                 String k = (String) entry.getKey();
-                map.put(k, properties.get(k));
+                map.put(k, entry.getValue());
             } else {
-                throw new ConfigException(entry.getKey().toString(), 
entry.getValue(), "Key must be a string.");
+                throw new ConfigException(String.valueOf(entry.getKey()), 
entry.getValue(), "Key must be a string.");
             }
         }
         return map;
diff --git 
a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java 
b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
index 5859dc1dc12..bf018aebbfc 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.common.config;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.config.provider.FileConfigProvider;
 import org.apache.kafka.common.config.types.Password;
 import org.apache.kafka.common.metrics.FakeMetricsReporter;
 import org.apache.kafka.common.metrics.JmxReporter;
@@ -26,7 +27,10 @@ import org.apache.kafka.common.metrics.MetricsReporter;
 import org.apache.kafka.common.security.TestSecurityConfig;
 import org.apache.kafka.common.config.provider.MockVaultConfigProvider;
 import org.apache.kafka.common.config.provider.MockFileConfigProvider;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.test.MockConsumerInterceptor;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
@@ -46,6 +50,23 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 
 public class AbstractConfigTest {
 
+    private String propertyValue;
+
+    @BeforeEach
+    public void setup() {
+        propertyValue = 
System.getProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY);
+        
System.clearProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY);
+    }
+
+    @AfterEach
+    public void teardown() {
+        if (propertyValue != null) {
+            
System.setProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY, 
propertyValue);
+        } else {
+            
System.clearProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY);
+        }
+    }
+
     @Test
     public void testConfiguredInstances() {
         testValidInputs("    ");
@@ -254,12 +275,7 @@ public class AbstractConfigTest {
         Properties props = new Properties();
         props.put(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, configValue);
         TestConfig config = new TestConfig(props);
-        try {
-            
config.getConfiguredInstances(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, 
MetricsReporter.class);
-            fail("Expected a config exception due to invalid props :" + props);
-        } catch (KafkaException e) {
-            // this is good
-        }
+        assertThrows(KafkaException.class, () -> 
config.getConfiguredInstances(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, 
MetricsReporter.class));
     }
 
     @Test
@@ -349,16 +365,6 @@ public class AbstractConfigTest {
         }
     }
 
-    @SuppressWarnings("unchecked")
-    public Map<String, ?> convertPropertiesToMap(Map<?, ?> props) {
-        for (Map.Entry<?, ?> entry : props.entrySet()) {
-            if (!(entry.getKey() instanceof String))
-                throw new ConfigException(entry.getKey().toString(), 
entry.getValue(),
-                    "Key must be a string.");
-        }
-        return (Map<String, ?>) props;
-    }
-
     @Test
     public void testOriginalWithOverrides() {
         Properties props = new Properties();
@@ -389,6 +395,43 @@ public class AbstractConfigTest {
         MockFileConfigProvider.assertClosed(id);
     }
 
+    @Test
+    public void testOriginalsWithConfigProvidersPropsExcluded() {
+        System.setProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY, 
MockVaultConfigProvider.class.getName() + " , " + 
FileConfigProvider.class.getName());
+        Properties props = new Properties();
+
+        // Test Case: Config provider that is not an allowed class
+        props.put("config.providers", "file");
+        props.put("config.providers.file.class", 
MockFileConfigProvider.class.getName());
+        String id = UUID.randomUUID().toString();
+        props.put("config.providers.file.param.testId", id);
+        props.put("prefix.ssl.truststore.location.number", 5);
+        props.put("sasl.kerberos.service.name", "service name");
+        props.put("sasl.kerberos.key", "${file:/usr/kerberos:key}");
+        props.put("sasl.kerberos.password", "${file:/usr/kerberos:password}");
+        assertThrows(ConfigException.class, () -> new 
TestIndirectConfigResolution(props, Collections.emptyMap()));
+    }
+
+    @Test
+    public void testOriginalsWithConfigProvidersPropsIncluded() {
+        System.setProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY, 
MockFileConfigProvider.class.getName() + " , " + 
FileConfigProvider.class.getName());
+        Properties props = new Properties();
+
+        // Test Case: Config provider that is an allowed class
+        props.put("config.providers", "file");
+        props.put("config.providers.file.class", 
MockFileConfigProvider.class.getName());
+        String id = UUID.randomUUID().toString();
+        props.put("config.providers.file.param.testId", id);
+        props.put("prefix.ssl.truststore.location.number", 5);
+        props.put("sasl.kerberos.service.name", "service name");
+        props.put("sasl.kerberos.key", "${file:/usr/kerberos:key}");
+        props.put("sasl.kerberos.password", "${file:/usr/kerberos:password}");
+        TestIndirectConfigResolution config = new 
TestIndirectConfigResolution(props, Collections.emptyMap());
+        assertEquals("testKey", config.originals().get("sasl.kerberos.key"));
+        assertEquals("randomPassword", 
config.originals().get("sasl.kerberos.password"));
+        MockFileConfigProvider.assertClosed(id);
+    }
+
     @Test
     public void testConfigProvidersPropsAsParam() {
         // Test Case: Valid Test Case for ConfigProviders as a separate 
variable
@@ -400,7 +443,7 @@ public class AbstractConfigTest {
         Properties props = new Properties();
         props.put("sasl.kerberos.key", "${file:/usr/kerberos:key}");
         props.put("sasl.kerberos.password", "${file:/usr/kerberos:password}");
-        TestIndirectConfigResolution config = new 
TestIndirectConfigResolution(props, convertPropertiesToMap(providers));
+        TestIndirectConfigResolution config = new 
TestIndirectConfigResolution(props, Utils.castToStringObjectMap(providers));
         assertEquals("testKey", config.originals().get("sasl.kerberos.key"));
         assertEquals("randomPassword", 
config.originals().get("sasl.kerberos.password"));
         MockFileConfigProvider.assertClosed(id);
@@ -417,7 +460,7 @@ public class AbstractConfigTest {
         Properties props = new Properties();
         props.put("sasl.kerberos.key", "${file:/usr/kerberos:key}");
         Map<?, ?> immutableMap = Collections.unmodifiableMap(props);
-        Map<String, ?> provMap = convertPropertiesToMap(providers);
+        Map<String, ?> provMap = Utils.castToStringObjectMap(providers);
         TestIndirectConfigResolution config = new 
TestIndirectConfigResolution(immutableMap, provMap);
         assertEquals("testKey", config.originals().get("sasl.kerberos.key"));
         MockFileConfigProvider.assertClosed(id);
@@ -437,7 +480,7 @@ public class AbstractConfigTest {
         props.put("sasl.kerberos.password", "${file:/usr/kerberos:password}");
         props.put("sasl.truststore.key", 
"${vault:/usr/truststore:truststoreKey}");
         props.put("sasl.truststore.password", 
"${vault:/usr/truststore:truststorePassword}");
-        TestIndirectConfigResolution config = new 
TestIndirectConfigResolution(props, convertPropertiesToMap(providers));
+        TestIndirectConfigResolution config = new 
TestIndirectConfigResolution(props, Utils.castToStringObjectMap(providers));
         assertEquals("testKey", config.originals().get("sasl.kerberos.key"));
         assertEquals("randomPassword", 
config.originals().get("sasl.kerberos.password"));
         assertEquals("testTruststoreKey", 
config.originals().get("sasl.truststore.key"));
@@ -453,12 +496,33 @@ public class AbstractConfigTest {
         props.put("config.providers.file.class",
             "org.apache.kafka.common.config.provider.InvalidConfigProvider");
         props.put("testKey", "${test:/foo/bar/testpath:testKey}");
-        try {
-            new TestIndirectConfigResolution(props);
-            fail("Expected a config exception due to invalid props :" + props);
-        } catch (KafkaException e) {
-            // this is good
-        }
+        assertThrows(KafkaException.class, () -> new 
TestIndirectConfigResolution(props));
+    }
+
+    @Test
+    public void 
testAutoConfigResolutionWithInvalidConfigProviderClassExcluded() {
+        String invalidConfigProvider = 
"org.apache.kafka.common.config.provider.InvalidConfigProvider";
+        System.setProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY, 
"");
+        // Test Case: Any config provider specified while the system property 
is empty
+        Properties props = new Properties();
+        props.put("config.providers", "file");
+        props.put("config.providers.file.class", invalidConfigProvider);
+        props.put("testKey", "${test:/foo/bar/testpath:testKey}");
+        KafkaException e = assertThrows(KafkaException.class, () -> new 
TestIndirectConfigResolution(props, Collections.emptyMap()));
+        
assertTrue(e.getMessage().contains(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY));
+    }
+
+    @Test
+    public void 
testAutoConfigResolutionWithInvalidConfigProviderClassIncluded() {
+        String invalidConfigProvider = 
"org.apache.kafka.common.config.provider.InvalidConfigProvider";
+        System.setProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY, 
invalidConfigProvider);
+        // Test Case: Invalid config provider specified, but is also included 
in the system property
+        Properties props = new Properties();
+        props.put("config.providers", "file");
+        props.put("config.providers.file.class", invalidConfigProvider);
+        props.put("testKey", "${test:/foo/bar/testpath:testKey}");
+        KafkaException e = assertThrows(KafkaException.class, () -> new 
TestIndirectConfigResolution(props, Collections.emptyMap()));
+        
assertFalse(e.getMessage().contains(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY));
     }
 
     @Test
@@ -496,13 +560,15 @@ public class AbstractConfigTest {
         props.put("config.providers", "file");
         props.put("config.providers.file.class", 
MockVaultConfigProvider.class.getName());
 
-        TestIndirectConfigResolution config = new 
TestIndirectConfigResolution(props, convertPropertiesToMap(providers));
+        TestIndirectConfigResolution config = new 
TestIndirectConfigResolution(props, Utils.castToStringObjectMap(providers));
         assertEquals("${file:/usr/kerberos:key}", 
config.originals().get("sasl.kerberos.key"));
     }
 
     @Test
     public void testConfigProviderConfigurationWithConfigParams() {
-        // Test Case: Valid Test Case With Multiple ConfigProviders as a 
separate variable
+        // should have no effect
+        System.setProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY, 
MockFileConfigProvider.class.getName());
+        // Test Case: Specify a config provider not allowed, but passed via 
the trusted providers argument
         Properties providers = new Properties();
         providers.put("config.providers", "vault");
         providers.put("config.providers.vault.class", 
MockVaultConfigProvider.class.getName());
@@ -512,7 +578,7 @@ public class AbstractConfigTest {
         props.put("sasl.truststore.key", 
"${vault:/usr/truststore:truststoreKey}");
         props.put("sasl.truststore.password", 
"${vault:/usr/truststore:truststorePassword}");
         props.put("sasl.truststore.location", 
"${vault:/usr/truststore:truststoreLocation}");
-        TestIndirectConfigResolution config = new 
TestIndirectConfigResolution(props, convertPropertiesToMap(providers));
+        TestIndirectConfigResolution config = new 
TestIndirectConfigResolution(props, Utils.castToStringObjectMap(providers));
         assertEquals("/usr/vault", 
config.originals().get("sasl.truststore.location"));
     }
 
diff --git 
a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java
 
b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java
index 477459895c5..053e594fbeb 100644
--- 
a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java
+++ 
b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java
@@ -76,7 +76,7 @@ public class MirrorClientConfig extends AbstractConfig {
     public static final String PRODUCER_CLIENT_PREFIX = "producer.";
 
     MirrorClientConfig(Map<?, ?> props) {
-        super(CONFIG_DEF, props, true);
+        super(CONFIG_DEF, props, Utils.castToStringObjectMap(props), true);
     }
 
     public ReplicationPolicy replicationPolicy() {
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java
index de666c7bd60..c770a19624a 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java
@@ -125,7 +125,7 @@ public abstract class AbstractConnectCli<T extends 
WorkerConfig> {
 
         RestClient restClient = new RestClient(config);
 
-        ConnectRestServer restServer = new 
ConnectRestServer(config.rebalanceTimeout(), restClient, workerProps);
+        ConnectRestServer restServer = new 
ConnectRestServer(config.rebalanceTimeout(), restClient, config.originals());
         restServer.initializeServer();
 
         URI advertisedUrl = restServer.advertisedUrl();
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
index c1fd69317bc..b1bd8576cbb 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
@@ -437,9 +437,15 @@ public class WorkerConfig extends AbstractConfig {
     }
 
     public WorkerConfig(ConfigDef definition, Map<String, String> props) {
-        super(definition, props);
+        super(definition, props, Utils.castToStringObjectMap(props), true);
         logInternalConverterRemovalWarnings(props);
         logPluginPathConfigProviderWarning(props);
     }
 
+    @Override
+    public Map<String, Object> originals() {
+        Map<String, Object> map = super.originals();
+        map.remove(AbstractConfig.CONFIG_PROVIDERS_CONFIG);
+        return map;
+    }
 }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServerConfig.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServerConfig.java
index 0d6d06a4a59..4b8b5acf935 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServerConfig.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServerConfig.java
@@ -258,7 +258,7 @@ public abstract class RestServerConfig extends 
AbstractConfig {
     }
 
     protected RestServerConfig(ConfigDef configDef, Map<?, ?> props) {
-        super(configDef, props);
+        super(configDef, props, Utils.castToStringObjectMap(props), true);
     }
 
     // Visible for testing
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index f88729af868..41eb01e4b8b 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -1119,6 +1119,7 @@ public class WorkerTest {
 
         Map<String, Object> connConfig = 
Collections.singletonMap("metadata.max.age.ms", "10000");
         Map<String, String> expectedConfigs = new HashMap<>(workerProps);
+        expectedConfigs.remove(AbstractConfig.CONFIG_PROVIDERS_CONFIG);
         expectedConfigs.put("bootstrap.servers", "localhost:9092");
         expectedConfigs.put("client.id", "testid");
         expectedConfigs.put("metadata.max.age.ms", "10000");
diff --git a/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala 
b/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
index bcce48f19a9..89106128e61 100644
--- a/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
@@ -260,7 +260,7 @@ object BrokerApiVersionsCommand {
       config
     }
 
-    class AdminConfig(originals: Map[_,_]) extends 
AbstractConfig(AdminConfigDef, originals.asJava, false)
+    class AdminConfig(originals: Map[_,_]) extends 
AbstractConfig(AdminConfigDef, originals.asJava, 
Utils.castToStringObjectMap(originals.asJava), false)
 
     def create(props: Properties): AdminClient = create(props.asScala.toMap)
 
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala 
b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 71b163a2e22..ff0dc72b38e 100755
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -486,7 +486,7 @@ class ZkPartitionStateMachine(config: KafkaConfig,
     } else {
       val (logConfigs, failed) = zkClient.getLogConfigs(
         partitionsWithNoLiveInSyncReplicas.iterator.map { case (partition, _) 
=> partition.topic }.toSet,
-        config.originals()
+        config.extractLogConfigMap
       )
 
       partitionsWithNoLiveInSyncReplicas.map { case (partition, leaderAndIsr) 
=>
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala 
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 610634f8598..89710972d18 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -193,7 +193,7 @@ object DynamicBrokerConfig {
 
   private[server] def resolveVariableConfigs(propsOriginal: Properties): 
Properties = {
     val props = new Properties
-    val config = new AbstractConfig(new ConfigDef(), propsOriginal, false)
+    val config = new AbstractConfig(new ConfigDef(), propsOriginal, 
Utils.castToStringObjectMap(propsOriginal), false)
     config.originals.forEach { (key, value) =>
       if (!key.startsWith(AbstractConfig.CONFIG_PROVIDERS_CONFIG)) {
         props.put(key, value)
@@ -729,13 +729,13 @@ class DynamicLogConfig(logManager: LogManager, server: 
KafkaBroker) extends Brok
     val originalLogConfig = logManager.currentDefaultConfig
     val originalUncleanLeaderElectionEnable = 
originalLogConfig.uncleanLeaderElectionEnable
     val newBrokerDefaults = new util.HashMap[String, 
Object](originalLogConfig.originals)
-    newConfig.valuesFromThisConfig.forEach { (k, v) =>
+    newConfig.extractLogConfigMap.forEach { (k, v) =>
       if (DynamicLogConfig.ReconfigurableConfigs.contains(k)) {
         DynamicLogConfig.KafkaConfigToLogConfigName.get(k).foreach { 
configName =>
           if (v == null)
              newBrokerDefaults.remove(configName)
           else
-            newBrokerDefaults.put(configName, v.asInstanceOf[AnyRef])
+            newBrokerDefaults.put(configName, v)
         }
       }
     }
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 4a11c70c6a5..3b3ec32381f 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -1602,7 +1602,7 @@ object KafkaConfig {
 }
 
 class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], 
dynamicConfigOverride: Option[DynamicBrokerConfig])
-  extends AbstractConfig(KafkaConfig.configDef, props, doLog) with Logging {
+  extends AbstractConfig(KafkaConfig.configDef, props, 
Utils.castToStringObjectMap(props), doLog) with Logging {
 
   def this(props: java.util.Map[_, _]) = this(true, 
KafkaConfig.populateSynonyms(props), None)
   def this(props: java.util.Map[_, _], doLog: Boolean) = this(doLog, 
KafkaConfig.populateSynonyms(props), None)
diff --git 
a/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala 
b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
index 10cbe589045..183e8657e0d 100644
--- a/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
@@ -258,7 +258,7 @@ class PartitionStateMachineTest {
       .thenReturn(Seq(GetDataResponse(Code.OK, null, Some(partition),
         TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch), stat, 
ResponseMetadata(0, 0))))
 
-    when(mockZkClient.getLogConfigs(Set.empty, config.originals()))
+    when(mockZkClient.getLogConfigs(Set.empty, config.extractLogConfigMap))
       .thenReturn((Map(partition.topic -> new LogConfig(new Properties)), 
Map.empty[String, Exception]))
     val leaderAndIsrAfterElection = leaderAndIsr.newLeader(brokerId)
     val updatedLeaderAndIsr = leaderAndIsrAfterElection.withPartitionEpoch(2)
@@ -434,7 +434,7 @@ class PartitionStateMachineTest {
     }
     prepareMockToGetTopicPartitionsStatesRaw()
     def prepareMockToGetLogConfigs(): Unit = {
-      when(mockZkClient.getLogConfigs(Set.empty, 
config.originals())).thenReturn((Map.empty[String, LogConfig], 
Map.empty[String, Exception]))
+      when(mockZkClient.getLogConfigs(Set.empty, 
config.extractLogConfigMap)).thenReturn((Map.empty[String, LogConfig], 
Map.empty[String, Exception]))
     }
     prepareMockToGetLogConfigs()
 

Reply via email to