mimaison commented on code in PR #12010:
URL: https://github.com/apache/kafka/pull/12010#discussion_r864068803


##########
clients/src/test/java/org/apache/kafka/clients/admin/AdminClientConfigTest.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class AdminClientConfigTest {
+
+    @Test
+    public void testInvalidSaslMechanism() {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
SecurityProtocol.SASL_PLAINTEXT.name);
+        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:8121");
+        configs.put(SaslConfigs.SASL_MECHANISM, null);
+        ConfigException ce = assertThrows(ConfigException.class, () -> new 
AdminClientConfig(configs));
+        assertTrue(ce.getMessage().contains(SaslConfigs.SASL_MECHANISM));
+
+        configs.put(SaslConfigs.SASL_MECHANISM, "");
+        ce = assertThrows(ConfigException.class, () -> new 
AdminClientConfig(configs));
+        assertTrue(ce.getMessage().contains(SaslConfigs.SASL_MECHANISM));
+
+        configs.put(SaslConfigs.SASL_MECHANISM, " ");
+        ce = assertThrows(ConfigException.class, () -> new 
AdminClientConfig(configs));
+        assertTrue(ce.getMessage().contains(SaslConfigs.SASL_MECHANISM));

Review Comment:
   I agree with @C0urante, we don't need to have that test case in all these 
classes.



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java:
##########
@@ -49,15 +53,15 @@ public void testClusterConfigProperties() {
             "clusters", "a, b",
             "a.bootstrap.servers", "servers-one",
             "b.bootstrap.servers", "servers-two",
-            "security.protocol", "SASL",
+            "security.protocol", "SSL",

Review Comment:
   We currently don't have checks in any of the `*Config` classes (this is what 
this PR is addressing). But obviously when creating an actual client, the value 
of `security.protocol` is used and only at that point you get an exception with 
a bad value is set. So MirrorMaker can't run with `security.protocol=SASL`.
   
   For MirrorMaker, you get:
   ```
   [2022-05-03 20:32:41,008] ERROR Stopping due to error 
(org.apache.kafka.connect.mirror.MirrorMaker:313)
   org.apache.kafka.common.KafkaException: Failed to create new KafkaAdminClient
        at 
org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:545)
        at org.apache.kafka.clients.admin.Admin.create(Admin.java:143)
        at 
org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:52)
        at 
org.apache.kafka.connect.mirror.MirrorMaker.addHerder(MirrorMaker.java:236)
        at java.base/java.lang.Iterable.forEach(Iterable.java:75)
        at 
org.apache.kafka.connect.mirror.MirrorMaker.<init>(MirrorMaker.java:137)
        at 
org.apache.kafka.connect.mirror.MirrorMaker.<init>(MirrorMaker.java:149)
        at 
org.apache.kafka.connect.mirror.MirrorMaker.main(MirrorMaker.java:300)
   Caused by: java.lang.IllegalArgumentException: No enum constant 
org.apache.kafka.common.security.auth.SecurityProtocol.SASL
        at java.base/java.lang.Enum.valueOf(Enum.java:240)
        at 
org.apache.kafka.common.security.auth.SecurityProtocol.valueOf(SecurityProtocol.java:26)
        at 
org.apache.kafka.common.security.auth.SecurityProtocol.forName(SecurityProtocol.java:72)
        at 
org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:103)
        at 
org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:516)
   ```
   
   With this PR, you would get:
   ```
   [2022-05-03 20:38:13,763] ERROR Stopping due to error 
(org.apache.kafka.connect.mirror.MirrorMaker:313)
   org.apache.kafka.common.config.ConfigException: Invalid value SASL for 
configuration security.protocol: String must be one of: PLAINTEXT, SSL, 
SASL_PLAINTEXT, SASL_SSL
        at 
org.apache.kafka.common.config.ConfigDef$ValidString.ensureValid(ConfigDef.java:961)
        at 
org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:499)
        at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:483)
        at 
org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:113)
        at 
org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:146)
        at 
org.apache.kafka.clients.admin.AdminClientConfig.<init>(AdminClientConfig.java:234)
        at org.apache.kafka.clients.admin.Admin.create(Admin.java:143)
        at 
org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:52)
        at 
org.apache.kafka.connect.mirror.MirrorMaker.addHerder(MirrorMaker.java:236)
        at java.base/java.lang.Iterable.forEach(Iterable.java:75)
        at 
org.apache.kafka.connect.mirror.MirrorMaker.<init>(MirrorMaker.java:137)
        at 
org.apache.kafka.connect.mirror.MirrorMaker.<init>(MirrorMaker.java:149)
        at 
org.apache.kafka.connect.mirror.MirrorMaker.main(MirrorMaker.java:300)
   ```



##########
core/src/main/scala/kafka/server/KafkaConfig.scala:
##########
@@ -1139,7 +1139,7 @@ object KafkaConfig {
       .define(MetadataMaxRetentionMillisProp, LONG, Defaults.LogRetentionHours 
* 60 * 60 * 1000L, null, HIGH, MetadataMaxRetentionMillisDoc)
 
       /************* Authorizer Configuration ***********/
-      .define(AuthorizerClassNameProp, STRING, Defaults.AuthorizerClassName, 
LOW, AuthorizerClassNameDoc)
+      .define(AuthorizerClassNameProp, STRING, Defaults.AuthorizerClassName, 
new ConfigDef.NonNullValidator(), LOW, AuthorizerClassNameDoc)

Review Comment:
   I suspect most people use a properties file to configure their brokers and 
in that case I'm not sure if a `STRING` configuration can ever be `null`. 



##########
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##########
@@ -601,11 +607,16 @@ private void maybeOverrideClientId(Map<String, Object> 
configs) {
     protected static Map<String, Object> 
appendDeserializerToConfig(Map<String, Object> configs,
                                                                     
Deserializer<?> keyDeserializer,
                                                                     
Deserializer<?> valueDeserializer) {
+        // validate deserializer configuration, if the passed deserializer 
instance is null, the user must explicitly set a valid deserializer 
configuration value

Review Comment:
   Do we need these extra checks?
   If I don't set serdes with trunk, I get an error:
   ```
   Exception in thread "main" org.apache.kafka.common.config.ConfigException: 
Missing required configuration "key.deserializer" which has no default value.
        at 
org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:493)
        at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:483)
        at 
org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:113)
        at 
org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:133)
        at 
org.apache.kafka.clients.consumer.ConsumerConfig.<init>(ConsumerConfig.java:641)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:666)
   ```
   
   `AbstractConfig` should enforce each field has a valid value. We need to 
perform extra validation when we need multiple fields to have compatible values.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to