This is an automated email from the ASF dual-hosted git repository. gharris pushed a commit to branch 3.7 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.7 by this push: new d9674c6c9a9 MINOR: AbstractConfig cleanup (#15597) d9674c6c9a9 is described below commit d9674c6c9a99c0d8a9c32386b0cceaceac92969c Author: Greg Harris <greg.har...@aiven.io> AuthorDate: Thu Mar 28 13:27:41 2024 -0700 MINOR: AbstractConfig cleanup (#15597) Signed-off-by: Greg Harris <greg.har...@aiven.io> Reviewers: Chris Egerton <chr...@aiven.io>, Mickael Maison <mickael.mai...@gmail.com>, Omnia G H Ibrahim <o.g.h.ibra...@gmail.com>, Matthias J. Sax <matth...@confluent.io> --- .../apache/kafka/common/config/AbstractConfig.java | 51 +++++++-- .../java/org/apache/kafka/common/utils/Utils.java | 18 ++- .../kafka/common/config/AbstractConfigTest.java | 122 ++++++++++++++++----- .../kafka/connect/mirror/MirrorClientConfig.java | 2 +- .../kafka/connect/cli/AbstractConnectCli.java | 2 +- .../apache/kafka/connect/runtime/WorkerConfig.java | 8 +- .../connect/runtime/rest/RestServerConfig.java | 2 +- .../apache/kafka/connect/runtime/WorkerTest.java | 1 + .../kafka/admin/BrokerApiVersionsCommand.scala | 2 +- .../kafka/controller/PartitionStateMachine.scala | 2 +- .../scala/kafka/server/DynamicBrokerConfig.scala | 6 +- core/src/main/scala/kafka/server/KafkaConfig.scala | 2 +- .../controller/PartitionStateMachineTest.scala | 4 +- 13 files changed, 166 insertions(+), 56 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java index 84bae97a03a..aeb7f07a29c 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java @@ -25,6 +25,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -33,6 +34,8 @@ import java.util.Map; import java.util.Set; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Predicate; +import java.util.stream.Collectors; /** * A convenient base class for configurations to extend. @@ -58,6 +61,8 @@ public class AbstractConfig { private final ConfigDef definition; + public static final String AUTOMATIC_CONFIG_PROVIDERS_PROPERTY = "org.apache.kafka.automatic.config.providers"; + public static final String CONFIG_PROVIDERS_CONFIG = "config.providers"; private static final String CONFIG_PROVIDERS_PARAM = ".param."; @@ -101,14 +106,11 @@ public class AbstractConfig { * the constructor to resolve any variables in {@code originals}; may be null or empty * @param doLog whether the configurations should be logged */ - @SuppressWarnings({"unchecked", "this-escape"}) + @SuppressWarnings({"this-escape"}) public AbstractConfig(ConfigDef definition, Map<?, ?> originals, Map<String, ?> configProviderProps, boolean doLog) { - /* check that all the keys are really strings */ - for (Map.Entry<?, ?> entry : originals.entrySet()) - if (!(entry.getKey() instanceof String)) - throw new ConfigException(entry.getKey().toString(), entry.getValue(), "Key must be a string."); + Map<String, Object> originalMap = Utils.castToStringObjectMap(originals); - this.originals = resolveConfigVariables(configProviderProps, (Map<String, Object>) originals); + this.originals = resolveConfigVariables(configProviderProps, originalMap); this.values = definition.parse(this.originals); Map<String, Object> configUpdates = postProcessParsedConfig(Collections.unmodifiableMap(this.values)); for (Map.Entry<String, Object> update : configUpdates.entrySet()) { @@ -521,6 +523,7 @@ public class AbstractConfig { private Map<String, ?> resolveConfigVariables(Map<String, ?> configProviderProps, Map<String, Object> originals) { Map<String, String> providerConfigString; Map<String, ?> configProperties; + Predicate<String> classNameFilter; Map<String, Object> resolvedOriginals = new HashMap<>(); // As variable configs are strings, parse the originals and obtain the potential variable configs. Map<String, String> indirectVariables = extractPotentialVariables(originals); @@ -529,11 +532,13 @@ public class AbstractConfig { if (configProviderProps == null || configProviderProps.isEmpty()) { providerConfigString = indirectVariables; configProperties = originals; + classNameFilter = automaticConfigProvidersFilter(); } else { providerConfigString = extractPotentialVariables(configProviderProps); configProperties = configProviderProps; + classNameFilter = ignored -> true; } - Map<String, ConfigProvider> providers = instantiateConfigProviders(providerConfigString, configProperties); + Map<String, ConfigProvider> providers = instantiateConfigProviders(providerConfigString, configProperties, classNameFilter); if (!providers.isEmpty()) { ConfigTransformer configTransformer = new ConfigTransformer(providers); @@ -547,6 +552,17 @@ public class AbstractConfig { return new ResolvingMap<>(resolvedOriginals, originals); } + private Predicate<String> automaticConfigProvidersFilter() { + String systemProperty = System.getProperty(AUTOMATIC_CONFIG_PROVIDERS_PROPERTY); + if (systemProperty == null) { + return ignored -> true; + } else { + return Arrays.stream(systemProperty.split(",")) + .map(String::trim) + .collect(Collectors.toSet())::contains; + } + } + private Map<String, Object> configProviderProperties(String configProviderPrefix, Map<String, ?> providerConfigProperties) { Map<String, Object> result = new HashMap<>(); for (Map.Entry<String, ?> entry : providerConfigProperties.entrySet()) { @@ -567,9 +583,14 @@ public class AbstractConfig { * * @param indirectConfigs The map of potential variable configs * @param providerConfigProperties The map of config provider configs - * @return map map of config provider name and its instance. + * @param classNameFilter Filter for config provider class names + * @return map of config provider name and its instance. */ - private Map<String, ConfigProvider> instantiateConfigProviders(Map<String, String> indirectConfigs, Map<String, ?> providerConfigProperties) { + private Map<String, ConfigProvider> instantiateConfigProviders( + Map<String, String> indirectConfigs, + Map<String, ?> providerConfigProperties, + Predicate<String> classNameFilter + ) { final String configProviders = indirectConfigs.get(CONFIG_PROVIDERS_CONFIG); if (configProviders == null || configProviders.isEmpty()) { @@ -580,9 +601,15 @@ public class AbstractConfig { for (String provider : configProviders.split(",")) { String providerClass = providerClassProperty(provider); - if (indirectConfigs.containsKey(providerClass)) - providerMap.put(provider, indirectConfigs.get(providerClass)); - + if (indirectConfigs.containsKey(providerClass)) { + String providerClassName = indirectConfigs.get(providerClass); + if (classNameFilter.test(providerClassName)) { + providerMap.put(provider, providerClassName); + } else { + throw new ConfigException(providerClassName + " is not allowed. Update System property '" + + AUTOMATIC_CONFIG_PROVIDERS_PROPERTY + "' to allow " + providerClassName); + } + } } // Instantiate Config Providers Map<String, ConfigProvider> configProviderInstances = new HashMap<>(); diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index c316b7a1816..3fd3833de1e 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -1502,13 +1502,23 @@ public final class Utils { * @return a map including all elements in properties */ public static Map<String, Object> propsToMap(Properties properties) { - Map<String, Object> map = new HashMap<>(properties.size()); - for (Map.Entry<Object, Object> entry : properties.entrySet()) { + return castToStringObjectMap(properties); + } + + /** + * Cast a map with arbitrary type keys to be keyed on String. + * @param inputMap A map with unknown type keys + * @return A map with the same contents as the input map, but with String keys + * @throws ConfigException if any key is not a String + */ + public static Map<String, Object> castToStringObjectMap(Map<?, ?> inputMap) { + Map<String, Object> map = new HashMap<>(inputMap.size()); + for (Map.Entry<?, ?> entry : inputMap.entrySet()) { if (entry.getKey() instanceof String) { String k = (String) entry.getKey(); - map.put(k, properties.get(k)); + map.put(k, entry.getValue()); } else { - throw new ConfigException(entry.getKey().toString(), entry.getValue(), "Key must be a string."); + throw new ConfigException(String.valueOf(entry.getKey()), entry.getValue(), "Key must be a string."); } } return map; diff --git a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java index 5859dc1dc12..bf018aebbfc 100644 --- a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java +++ b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java @@ -19,6 +19,7 @@ package org.apache.kafka.common.config; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Type; +import org.apache.kafka.common.config.provider.FileConfigProvider; import org.apache.kafka.common.config.types.Password; import org.apache.kafka.common.metrics.FakeMetricsReporter; import org.apache.kafka.common.metrics.JmxReporter; @@ -26,7 +27,10 @@ import org.apache.kafka.common.metrics.MetricsReporter; import org.apache.kafka.common.security.TestSecurityConfig; import org.apache.kafka.common.config.provider.MockVaultConfigProvider; import org.apache.kafka.common.config.provider.MockFileConfigProvider; +import org.apache.kafka.common.utils.Utils; import org.apache.kafka.test.MockConsumerInterceptor; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.util.Arrays; @@ -46,6 +50,23 @@ import static org.junit.jupiter.api.Assertions.assertEquals; public class AbstractConfigTest { + private String propertyValue; + + @BeforeEach + public void setup() { + propertyValue = System.getProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY); + System.clearProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY); + } + + @AfterEach + public void teardown() { + if (propertyValue != null) { + System.setProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY, propertyValue); + } else { + System.clearProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY); + } + } + @Test public void testConfiguredInstances() { testValidInputs(" "); @@ -254,12 +275,7 @@ public class AbstractConfigTest { Properties props = new Properties(); props.put(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, configValue); TestConfig config = new TestConfig(props); - try { - config.getConfiguredInstances(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class); - fail("Expected a config exception due to invalid props :" + props); - } catch (KafkaException e) { - // this is good - } + assertThrows(KafkaException.class, () -> config.getConfiguredInstances(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class)); } @Test @@ -349,16 +365,6 @@ public class AbstractConfigTest { } } - @SuppressWarnings("unchecked") - public Map<String, ?> convertPropertiesToMap(Map<?, ?> props) { - for (Map.Entry<?, ?> entry : props.entrySet()) { - if (!(entry.getKey() instanceof String)) - throw new ConfigException(entry.getKey().toString(), entry.getValue(), - "Key must be a string."); - } - return (Map<String, ?>) props; - } - @Test public void testOriginalWithOverrides() { Properties props = new Properties(); @@ -389,6 +395,43 @@ public class AbstractConfigTest { MockFileConfigProvider.assertClosed(id); } + @Test + public void testOriginalsWithConfigProvidersPropsExcluded() { + System.setProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY, MockVaultConfigProvider.class.getName() + " , " + FileConfigProvider.class.getName()); + Properties props = new Properties(); + + // Test Case: Config provider that is not an allowed class + props.put("config.providers", "file"); + props.put("config.providers.file.class", MockFileConfigProvider.class.getName()); + String id = UUID.randomUUID().toString(); + props.put("config.providers.file.param.testId", id); + props.put("prefix.ssl.truststore.location.number", 5); + props.put("sasl.kerberos.service.name", "service name"); + props.put("sasl.kerberos.key", "${file:/usr/kerberos:key}"); + props.put("sasl.kerberos.password", "${file:/usr/kerberos:password}"); + assertThrows(ConfigException.class, () -> new TestIndirectConfigResolution(props, Collections.emptyMap())); + } + + @Test + public void testOriginalsWithConfigProvidersPropsIncluded() { + System.setProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY, MockFileConfigProvider.class.getName() + " , " + FileConfigProvider.class.getName()); + Properties props = new Properties(); + + // Test Case: Config provider that is an allowed class + props.put("config.providers", "file"); + props.put("config.providers.file.class", MockFileConfigProvider.class.getName()); + String id = UUID.randomUUID().toString(); + props.put("config.providers.file.param.testId", id); + props.put("prefix.ssl.truststore.location.number", 5); + props.put("sasl.kerberos.service.name", "service name"); + props.put("sasl.kerberos.key", "${file:/usr/kerberos:key}"); + props.put("sasl.kerberos.password", "${file:/usr/kerberos:password}"); + TestIndirectConfigResolution config = new TestIndirectConfigResolution(props, Collections.emptyMap()); + assertEquals("testKey", config.originals().get("sasl.kerberos.key")); + assertEquals("randomPassword", config.originals().get("sasl.kerberos.password")); + MockFileConfigProvider.assertClosed(id); + } + @Test public void testConfigProvidersPropsAsParam() { // Test Case: Valid Test Case for ConfigProviders as a separate variable @@ -400,7 +443,7 @@ public class AbstractConfigTest { Properties props = new Properties(); props.put("sasl.kerberos.key", "${file:/usr/kerberos:key}"); props.put("sasl.kerberos.password", "${file:/usr/kerberos:password}"); - TestIndirectConfigResolution config = new TestIndirectConfigResolution(props, convertPropertiesToMap(providers)); + TestIndirectConfigResolution config = new TestIndirectConfigResolution(props, Utils.castToStringObjectMap(providers)); assertEquals("testKey", config.originals().get("sasl.kerberos.key")); assertEquals("randomPassword", config.originals().get("sasl.kerberos.password")); MockFileConfigProvider.assertClosed(id); @@ -417,7 +460,7 @@ public class AbstractConfigTest { Properties props = new Properties(); props.put("sasl.kerberos.key", "${file:/usr/kerberos:key}"); Map<?, ?> immutableMap = Collections.unmodifiableMap(props); - Map<String, ?> provMap = convertPropertiesToMap(providers); + Map<String, ?> provMap = Utils.castToStringObjectMap(providers); TestIndirectConfigResolution config = new TestIndirectConfigResolution(immutableMap, provMap); assertEquals("testKey", config.originals().get("sasl.kerberos.key")); MockFileConfigProvider.assertClosed(id); @@ -437,7 +480,7 @@ public class AbstractConfigTest { props.put("sasl.kerberos.password", "${file:/usr/kerberos:password}"); props.put("sasl.truststore.key", "${vault:/usr/truststore:truststoreKey}"); props.put("sasl.truststore.password", "${vault:/usr/truststore:truststorePassword}"); - TestIndirectConfigResolution config = new TestIndirectConfigResolution(props, convertPropertiesToMap(providers)); + TestIndirectConfigResolution config = new TestIndirectConfigResolution(props, Utils.castToStringObjectMap(providers)); assertEquals("testKey", config.originals().get("sasl.kerberos.key")); assertEquals("randomPassword", config.originals().get("sasl.kerberos.password")); assertEquals("testTruststoreKey", config.originals().get("sasl.truststore.key")); @@ -453,12 +496,33 @@ public class AbstractConfigTest { props.put("config.providers.file.class", "org.apache.kafka.common.config.provider.InvalidConfigProvider"); props.put("testKey", "${test:/foo/bar/testpath:testKey}"); - try { - new TestIndirectConfigResolution(props); - fail("Expected a config exception due to invalid props :" + props); - } catch (KafkaException e) { - // this is good - } + assertThrows(KafkaException.class, () -> new TestIndirectConfigResolution(props)); + } + + @Test + public void testAutoConfigResolutionWithInvalidConfigProviderClassExcluded() { + String invalidConfigProvider = "org.apache.kafka.common.config.provider.InvalidConfigProvider"; + System.setProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY, ""); + // Test Case: Any config provider specified while the system property is empty + Properties props = new Properties(); + props.put("config.providers", "file"); + props.put("config.providers.file.class", invalidConfigProvider); + props.put("testKey", "${test:/foo/bar/testpath:testKey}"); + KafkaException e = assertThrows(KafkaException.class, () -> new TestIndirectConfigResolution(props, Collections.emptyMap())); + assertTrue(e.getMessage().contains(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY)); + } + + @Test + public void testAutoConfigResolutionWithInvalidConfigProviderClassIncluded() { + String invalidConfigProvider = "org.apache.kafka.common.config.provider.InvalidConfigProvider"; + System.setProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY, invalidConfigProvider); + // Test Case: Invalid config provider specified, but is also included in the system property + Properties props = new Properties(); + props.put("config.providers", "file"); + props.put("config.providers.file.class", invalidConfigProvider); + props.put("testKey", "${test:/foo/bar/testpath:testKey}"); + KafkaException e = assertThrows(KafkaException.class, () -> new TestIndirectConfigResolution(props, Collections.emptyMap())); + assertFalse(e.getMessage().contains(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY)); } @Test @@ -496,13 +560,15 @@ public class AbstractConfigTest { props.put("config.providers", "file"); props.put("config.providers.file.class", MockVaultConfigProvider.class.getName()); - TestIndirectConfigResolution config = new TestIndirectConfigResolution(props, convertPropertiesToMap(providers)); + TestIndirectConfigResolution config = new TestIndirectConfigResolution(props, Utils.castToStringObjectMap(providers)); assertEquals("${file:/usr/kerberos:key}", config.originals().get("sasl.kerberos.key")); } @Test public void testConfigProviderConfigurationWithConfigParams() { - // Test Case: Valid Test Case With Multiple ConfigProviders as a separate variable + // should have no effect + System.setProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY, MockFileConfigProvider.class.getName()); + // Test Case: Specify a config provider not allowed, but passed via the trusted providers argument Properties providers = new Properties(); providers.put("config.providers", "vault"); providers.put("config.providers.vault.class", MockVaultConfigProvider.class.getName()); @@ -512,7 +578,7 @@ public class AbstractConfigTest { props.put("sasl.truststore.key", "${vault:/usr/truststore:truststoreKey}"); props.put("sasl.truststore.password", "${vault:/usr/truststore:truststorePassword}"); props.put("sasl.truststore.location", "${vault:/usr/truststore:truststoreLocation}"); - TestIndirectConfigResolution config = new TestIndirectConfigResolution(props, convertPropertiesToMap(providers)); + TestIndirectConfigResolution config = new TestIndirectConfigResolution(props, Utils.castToStringObjectMap(providers)); assertEquals("/usr/vault", config.originals().get("sasl.truststore.location")); } diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java index 477459895c5..053e594fbeb 100644 --- a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java +++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java @@ -76,7 +76,7 @@ public class MirrorClientConfig extends AbstractConfig { public static final String PRODUCER_CLIENT_PREFIX = "producer."; MirrorClientConfig(Map<?, ?> props) { - super(CONFIG_DEF, props, true); + super(CONFIG_DEF, props, Utils.castToStringObjectMap(props), true); } public ReplicationPolicy replicationPolicy() { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java index de666c7bd60..c770a19624a 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java @@ -125,7 +125,7 @@ public abstract class AbstractConnectCli<T extends WorkerConfig> { RestClient restClient = new RestClient(config); - ConnectRestServer restServer = new ConnectRestServer(config.rebalanceTimeout(), restClient, workerProps); + ConnectRestServer restServer = new ConnectRestServer(config.rebalanceTimeout(), restClient, config.originals()); restServer.initializeServer(); URI advertisedUrl = restServer.advertisedUrl(); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java index fe28918a29a..6de49ebd550 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java @@ -438,9 +438,15 @@ public class WorkerConfig extends AbstractConfig { @SuppressWarnings("this-escape") public WorkerConfig(ConfigDef definition, Map<String, String> props) { - super(definition, props); + super(definition, props, Utils.castToStringObjectMap(props), true); logInternalConverterRemovalWarnings(props); logPluginPathConfigProviderWarning(props); } + @Override + public Map<String, Object> originals() { + Map<String, Object> map = super.originals(); + map.remove(AbstractConfig.CONFIG_PROVIDERS_CONFIG); + return map; + } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServerConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServerConfig.java index 0d6d06a4a59..4b8b5acf935 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServerConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServerConfig.java @@ -258,7 +258,7 @@ public abstract class RestServerConfig extends AbstractConfig { } protected RestServerConfig(ConfigDef configDef, Map<?, ?> props) { - super(configDef, props); + super(configDef, props, Utils.castToStringObjectMap(props), true); } // Visible for testing diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java index 0de444be202..58d16188d57 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java @@ -1119,6 +1119,7 @@ public class WorkerTest { Map<String, Object> connConfig = Collections.singletonMap("metadata.max.age.ms", "10000"); Map<String, String> expectedConfigs = new HashMap<>(workerProps); + expectedConfigs.remove(AbstractConfig.CONFIG_PROVIDERS_CONFIG); expectedConfigs.put("bootstrap.servers", "localhost:9092"); expectedConfigs.put("client.id", "testid"); expectedConfigs.put("metadata.max.age.ms", "10000"); diff --git a/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala b/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala index 6cf8beca275..71956efa502 100644 --- a/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala +++ b/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala @@ -260,7 +260,7 @@ object BrokerApiVersionsCommand { config } - class AdminConfig(originals: Map[_,_]) extends AbstractConfig(AdminConfigDef, originals.asJava, false) + class AdminConfig(originals: Map[_,_]) extends AbstractConfig(AdminConfigDef, originals.asJava, Utils.castToStringObjectMap(originals.asJava), false) def create(props: Properties): AdminClient = create(props.asScala.toMap) diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala index 71b163a2e22..ff0dc72b38e 100755 --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -486,7 +486,7 @@ class ZkPartitionStateMachine(config: KafkaConfig, } else { val (logConfigs, failed) = zkClient.getLogConfigs( partitionsWithNoLiveInSyncReplicas.iterator.map { case (partition, _) => partition.topic }.toSet, - config.originals() + config.extractLogConfigMap ) partitionsWithNoLiveInSyncReplicas.map { case (partition, leaderAndIsr) => diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index 2acbee89756..fac527ad054 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -196,7 +196,7 @@ object DynamicBrokerConfig { private[server] def resolveVariableConfigs(propsOriginal: Properties): Properties = { val props = new Properties - val config = new AbstractConfig(new ConfigDef(), propsOriginal, false) + val config = new AbstractConfig(new ConfigDef(), propsOriginal, Utils.castToStringObjectMap(propsOriginal), false) config.originals.forEach { (key, value) => if (!key.startsWith(AbstractConfig.CONFIG_PROVIDERS_CONFIG)) { props.put(key, value) @@ -739,13 +739,13 @@ class DynamicLogConfig(logManager: LogManager, server: KafkaBroker) extends Brok val originalLogConfig = logManager.currentDefaultConfig val originalUncleanLeaderElectionEnable = originalLogConfig.uncleanLeaderElectionEnable val newBrokerDefaults = new util.HashMap[String, Object](originalLogConfig.originals) - newConfig.valuesFromThisConfig.forEach { (k, v) => + newConfig.extractLogConfigMap.forEach { (k, v) => if (DynamicLogConfig.ReconfigurableConfigs.contains(k)) { DynamicLogConfig.KafkaConfigToLogConfigName.get(k).foreach { configName => if (v == null) newBrokerDefaults.remove(configName) else - newBrokerDefaults.put(configName, v.asInstanceOf[AnyRef]) + newBrokerDefaults.put(configName, v) } } } diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 5ae8b236d04..e4a72ed8af2 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -1633,7 +1633,7 @@ object KafkaConfig { } class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynamicConfigOverride: Option[DynamicBrokerConfig]) - extends AbstractConfig(KafkaConfig.configDef, props, doLog) with Logging { + extends AbstractConfig(KafkaConfig.configDef, props, Utils.castToStringObjectMap(props), doLog) with Logging { def this(props: java.util.Map[_, _]) = this(true, KafkaConfig.populateSynonyms(props), None) def this(props: java.util.Map[_, _], doLog: Boolean) = this(doLog, KafkaConfig.populateSynonyms(props), None) diff --git a/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala index 10cbe589045..183e8657e0d 100644 --- a/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala +++ b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala @@ -258,7 +258,7 @@ class PartitionStateMachineTest { .thenReturn(Seq(GetDataResponse(Code.OK, null, Some(partition), TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch), stat, ResponseMetadata(0, 0)))) - when(mockZkClient.getLogConfigs(Set.empty, config.originals())) + when(mockZkClient.getLogConfigs(Set.empty, config.extractLogConfigMap)) .thenReturn((Map(partition.topic -> new LogConfig(new Properties)), Map.empty[String, Exception])) val leaderAndIsrAfterElection = leaderAndIsr.newLeader(brokerId) val updatedLeaderAndIsr = leaderAndIsrAfterElection.withPartitionEpoch(2) @@ -434,7 +434,7 @@ class PartitionStateMachineTest { } prepareMockToGetTopicPartitionsStatesRaw() def prepareMockToGetLogConfigs(): Unit = { - when(mockZkClient.getLogConfigs(Set.empty, config.originals())).thenReturn((Map.empty[String, LogConfig], Map.empty[String, Exception])) + when(mockZkClient.getLogConfigs(Set.empty, config.extractLogConfigMap)).thenReturn((Map.empty[String, LogConfig], Map.empty[String, Exception])) } prepareMockToGetLogConfigs()