gharris1727 commented on code in PR #14304:
URL: https://github.com/apache/kafka/pull/14304#discussion_r1312392628


##########
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##########
@@ -1519,6 +1537,32 @@ public static String[] enumOptions(Class<? extends 
Enum<?>> enumClass) {
                 .toArray(String[]::new);
     }
 
+    /**
+     * Ensure that the class is concrete (i.e., not abstract). If it is, throw 
a {@link ConfigException}
+     * with a friendly error message suggesting a list of concrete child 
subclasses (if any are known).
+     * @param cls the class to check; may not be null
+     * @param name the name of the type of class to use in the error message; 
e.g., "Transform",
+     *             "Interceptor", or even just "Class"; may be null
+     * @throws ConfigException if the class is not concrete
+     */
+    public static void ensureConcrete(Class<?> cls, String name) {
+        Objects.requireNonNull(cls);
+        if (isBlank(name))
+            name = "Class";
+        if (Modifier.isAbstract(cls.getModifiers())) {
+            String childClassNames = Stream.of(cls.getClasses())
+                    .filter(cls::isAssignableFrom)
+                    .filter(c -> !Modifier.isAbstract(c.getModifiers()))
+                    .filter(c -> Modifier.isPublic(c.getModifiers()))
+                    .map(Class::getName)
+                    .collect(Collectors.joining(", "));
+            String message = Utils.isBlank(childClassNames) ?
+                    name + " is abstract and cannot be created." :
+                    name + " is abstract and cannot be created. Did you mean " 
+ childClassNames + "?";
+            throw new ConfigException(name, cls.getName(), message);

Review Comment:
   The first argument to this exception should be the config name, not the type 
name like it's used in the exception message and in the call-sites.



##########
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##########
@@ -1519,6 +1537,32 @@ public static String[] enumOptions(Class<? extends 
Enum<?>> enumClass) {
                 .toArray(String[]::new);
     }
 
+    /**
+     * Ensure that the class is concrete (i.e., not abstract). If it is, throw 
a {@link ConfigException}
+     * with a friendly error message suggesting a list of concrete child 
subclasses (if any are known).
+     * @param cls the class to check; may not be null
+     * @param name the name of the type of class to use in the error message; 
e.g., "Transform",
+     *             "Interceptor", or even just "Class"; may be null
+     * @throws ConfigException if the class is not concrete
+     */
+    public static void ensureConcrete(Class<?> cls, String name) {
+        Objects.requireNonNull(cls);
+        if (isBlank(name))
+            name = "Class";
+        if (Modifier.isAbstract(cls.getModifiers())) {
+            String childClassNames = Stream.of(cls.getClasses())
+                    .filter(cls::isAssignableFrom)

Review Comment:
   If `AbstractClass.Key` implemented the base interface, but didn't extend 
`AbstractClass`, this wouldn't show up in this listing. If this was 
`baseClass::isAssignableFrom`, it would find classes like that.
   
   I don't know how often people would create `Key` and `Value` inner classes 
without subclassing the outer class, since that seems to be the reason people 
use the inner classes in the first place. But if you change this to take the 
`baseClass` and check `baseClass.isAssignableFrom(cls)` like both of the 
call-sites, you could change this filter statement too. I don't think this is a 
big deal, so feel free to leave this implementation as-is.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/util/InstantiableClassValidator.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.utils.Utils;
+
+public class InstantiableClassValidator implements ConfigDef.Validator {
+
+    @Override
+    public void ensureValid(String name, Object value) {
+        if (value == null) {
+            // The value will be null if the class couldn't be found; no point 
in performing follow-up validation
+            return;
+        }
+
+        Class<?> cls = (Class<?>) value;
+        try {
+            Object o = cls.getDeclaredConstructor().newInstance();
+            Utils.maybeCloseQuietly(o, o + " (instantiated for preflight 
validation");
+        } catch (NoSuchMethodException e) {
+            throw new ConfigException(name, cls.getName(), "Could not find a 
public no-argument constructor for class" + (e.getMessage() != null ? ": " + 
e.getMessage() : ""));
+        } catch (ReflectiveOperationException | RuntimeException e) {
+            throw new ConfigException(name, cls.getName(), "Could not 
instantiate class" + (e.getMessage() != null ? ": " + e.getMessage() : ""));
+        }

Review Comment:
   I think you can also get a LinkageError from calling the constructor the 
first time. I think that would get past this try-catch.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/util/InstantiableClassValidator.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.utils.Utils;
+
+public class InstantiableClassValidator implements ConfigDef.Validator {
+
+    @Override
+    public void ensureValid(String name, Object value) {
+        if (value == null) {
+            // The value will be null if the class couldn't be found; no point 
in performing follow-up validation
+            return;
+        }
+
+        Class<?> cls = (Class<?>) value;
+        try {
+            Object o = cls.getDeclaredConstructor().newInstance();
+            Utils.maybeCloseQuietly(o, o + " (instantiated for preflight 
validation");
+        } catch (NoSuchMethodException e) {
+            throw new ConfigException(name, cls.getName(), "Could not find a 
public no-argument constructor for class" + (e.getMessage() != null ? ": " + 
e.getMessage() : ""));
+        } catch (ReflectiveOperationException | RuntimeException e) {

Review Comment:
   This branch will include IllegalAccessException which is thrown when the 
constructor exists but is not public afaik.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to