mimaison commented on code in PR #12010: URL: https://github.com/apache/kafka/pull/12010#discussion_r864068803
########## clients/src/test/java/org/apache/kafka/clients/admin/AdminClientConfigTest.java: ########## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class AdminClientConfigTest { + + @Test + public void testInvalidSaslMechanism() { + Map<String, Object> configs = new HashMap<>(); + configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name); + configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:8121"); + configs.put(SaslConfigs.SASL_MECHANISM, null); + ConfigException ce = assertThrows(ConfigException.class, () -> new AdminClientConfig(configs)); + assertTrue(ce.getMessage().contains(SaslConfigs.SASL_MECHANISM)); + + configs.put(SaslConfigs.SASL_MECHANISM, ""); + ce = assertThrows(ConfigException.class, () -> new AdminClientConfig(configs)); + assertTrue(ce.getMessage().contains(SaslConfigs.SASL_MECHANISM)); + + configs.put(SaslConfigs.SASL_MECHANISM, " "); + ce = assertThrows(ConfigException.class, () -> new AdminClientConfig(configs)); + assertTrue(ce.getMessage().contains(SaslConfigs.SASL_MECHANISM)); Review Comment: I agree with @C0urante, we don't need to have that test case in all these classes. ########## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java: ########## @@ -49,15 +53,15 @@ public void testClusterConfigProperties() { "clusters", "a, b", "a.bootstrap.servers", "servers-one", "b.bootstrap.servers", "servers-two", - "security.protocol", "SASL", + "security.protocol", "SSL", Review Comment: We currently don't have checks in any of the `*Config` classes (this is what this PR is addressing). But obviously when creating an actual client, the value of `security.protocol` is used and only at that point you get an exception with a bad value is set. So MirrorMaker can't run with `security.protocol=SASL`. For MirrorMaker, you get: ``` [2022-05-03 20:32:41,008] ERROR Stopping due to error (org.apache.kafka.connect.mirror.MirrorMaker:313) org.apache.kafka.common.KafkaException: Failed to create new KafkaAdminClient at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:545) at org.apache.kafka.clients.admin.Admin.create(Admin.java:143) at org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:52) at org.apache.kafka.connect.mirror.MirrorMaker.addHerder(MirrorMaker.java:236) at java.base/java.lang.Iterable.forEach(Iterable.java:75) at org.apache.kafka.connect.mirror.MirrorMaker.<init>(MirrorMaker.java:137) at org.apache.kafka.connect.mirror.MirrorMaker.<init>(MirrorMaker.java:149) at org.apache.kafka.connect.mirror.MirrorMaker.main(MirrorMaker.java:300) Caused by: java.lang.IllegalArgumentException: No enum constant org.apache.kafka.common.security.auth.SecurityProtocol.SASL at java.base/java.lang.Enum.valueOf(Enum.java:240) at org.apache.kafka.common.security.auth.SecurityProtocol.valueOf(SecurityProtocol.java:26) at org.apache.kafka.common.security.auth.SecurityProtocol.forName(SecurityProtocol.java:72) at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:103) at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:516) ``` With this PR, you would get: ``` [2022-05-03 20:38:13,763] ERROR Stopping due to error (org.apache.kafka.connect.mirror.MirrorMaker:313) org.apache.kafka.common.config.ConfigException: Invalid value SASL for configuration security.protocol: String must be one of: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL at org.apache.kafka.common.config.ConfigDef$ValidString.ensureValid(ConfigDef.java:961) at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:499) at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:483) at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:113) at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:146) at org.apache.kafka.clients.admin.AdminClientConfig.<init>(AdminClientConfig.java:234) at org.apache.kafka.clients.admin.Admin.create(Admin.java:143) at org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:52) at org.apache.kafka.connect.mirror.MirrorMaker.addHerder(MirrorMaker.java:236) at java.base/java.lang.Iterable.forEach(Iterable.java:75) at org.apache.kafka.connect.mirror.MirrorMaker.<init>(MirrorMaker.java:137) at org.apache.kafka.connect.mirror.MirrorMaker.<init>(MirrorMaker.java:149) at org.apache.kafka.connect.mirror.MirrorMaker.main(MirrorMaker.java:300) ``` ########## core/src/main/scala/kafka/server/KafkaConfig.scala: ########## @@ -1139,7 +1139,7 @@ object KafkaConfig { .define(MetadataMaxRetentionMillisProp, LONG, Defaults.LogRetentionHours * 60 * 60 * 1000L, null, HIGH, MetadataMaxRetentionMillisDoc) /************* Authorizer Configuration ***********/ - .define(AuthorizerClassNameProp, STRING, Defaults.AuthorizerClassName, LOW, AuthorizerClassNameDoc) + .define(AuthorizerClassNameProp, STRING, Defaults.AuthorizerClassName, new ConfigDef.NonNullValidator(), LOW, AuthorizerClassNameDoc) Review Comment: I suspect most people use a properties file to configure their brokers and in that case I'm not sure if a `STRING` configuration can ever be `null`. ########## clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java: ########## @@ -601,11 +607,16 @@ private void maybeOverrideClientId(Map<String, Object> configs) { protected static Map<String, Object> appendDeserializerToConfig(Map<String, Object> configs, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer) { + // validate deserializer configuration, if the passed deserializer instance is null, the user must explicitly set a valid deserializer configuration value Review Comment: Do we need these extra checks? If I don't set serdes with trunk, I get an error: ``` Exception in thread "main" org.apache.kafka.common.config.ConfigException: Missing required configuration "key.deserializer" which has no default value. at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:493) at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:483) at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:113) at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:133) at org.apache.kafka.clients.consumer.ConsumerConfig.<init>(ConsumerConfig.java:641) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:666) ``` `AbstractConfig` should enforce each field has a valid value. We need to perform extra validation when we need multiple fields to have compatible values. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org