[GitHub] [kafka] gharris1727 commented on a diff in pull request #14304: KAFKA-13328, KAFKA-13329 (1): Add preflight validations for key, value, and header converter classes

2023-09-05 Thread via GitHub


gharris1727 commented on code in PR #14304:
URL: https://github.com/apache/kafka/pull/14304#discussion_r1316388557


##
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##
@@ -1519,6 +1537,32 @@ public static String[] enumOptions(Class> enumClass) {
 .toArray(String[]::new);
 }
 
+/**
+ * Ensure that the class is concrete (i.e., not abstract). If it is, throw 
a {@link ConfigException}
+ * with a friendly error message suggesting a list of concrete child 
subclasses (if any are known).
+ * @param cls the class to check; may not be null
+ * @param name the name of the type of class to use in the error message; 
e.g., "Transform",
+ * "Interceptor", or even just "Class"; may be null
+ * @throws ConfigException if the class is not concrete
+ */
+public static void ensureConcrete(Class cls, String name) {
+Objects.requireNonNull(cls);
+if (isBlank(name))
+name = "Class";
+if (Modifier.isAbstract(cls.getModifiers())) {
+String childClassNames = Stream.of(cls.getClasses())
+.filter(cls::isAssignableFrom)

Review Comment:
   > I guess we could add that check as well and have something like 
Utils::ensureConcreteSubclass--how does that sound?
   
   Yeah I imagine a `ensureConcreteSubclass(Class cls, Class baseClass)` 
that:
   1. asserts that cls is concrete and baseClass.isAssignableFrom(cls) is true
   2. if not valid, suggests child classes that are concrete 
and`baseClass::isAssignableFrom` is true.
   
   I don't think that it would compose well with the current `ensureConcrete` 
method though, so it would probably replace what is there already.
   
   > I'm on the fence about it potentially doing too much for a utility method, 
but it does provide user-facing advantages.
   
   I thought that about the child class suggestion logic, but relented for the 
same reason. It significantly complicates this method which could otherwise be 
a single if condition, but that complication could help someone quickly resolve 
an obvious typo.
   
   I don't think that asserting `baseClass.isAssignableFrom(cls)` is too much 
for this utility method, because we should always have a baseClass in mind when 
defining a `CLASS` configuration. Users of the 
`AbstractConfig::getConfiguredInstance` methods already need to provide a 
baseClass for subclass checking and type safety at runtime, this utility method 
would help users to remember the same check at validation time.
   
   And worst-case, someone can just specify Object as the baseClass, and 
effectively disable the baseClass filtering.
   
   
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a diff in pull request #14304: KAFKA-13328, KAFKA-13329 (1): Add preflight validations for key, value, and header converter classes

2023-09-05 Thread via GitHub


gharris1727 commented on code in PR #14304:
URL: https://github.com/apache/kafka/pull/14304#discussion_r1316362502


##
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##
@@ -1519,6 +1537,32 @@ public static String[] enumOptions(Class> enumClass) {
 .toArray(String[]::new);
 }
 
+/**
+ * Ensure that the class is concrete (i.e., not abstract). If it is, throw 
a {@link ConfigException}
+ * with a friendly error message suggesting a list of concrete child 
subclasses (if any are known).
+ * @param cls the class to check; may not be null
+ * @param name the name of the type of class to use in the error message; 
e.g., "Transform",
+ * "Interceptor", or even just "Class"; may be null
+ * @throws ConfigException if the class is not concrete
+ */
+public static void ensureConcrete(Class cls, String name) {
+Objects.requireNonNull(cls);
+if (isBlank(name))
+name = "Class";
+if (Modifier.isAbstract(cls.getModifiers())) {
+String childClassNames = Stream.of(cls.getClasses())
+.filter(cls::isAssignableFrom)
+.filter(c -> !Modifier.isAbstract(c.getModifiers()))
+.filter(c -> Modifier.isPublic(c.getModifiers()))
+.map(Class::getName)
+.collect(Collectors.joining(", "));
+String message = Utils.isBlank(childClassNames) ?
+name + " is abstract and cannot be created." :
+name + " is abstract and cannot be created. Did you mean " 
+ childClassNames + "?";
+throw new ConfigException(name, cls.getName(), message);

Review Comment:
   I like this simplification, and it makes sense to generalize the error 
message as part of promoting this to the Utils class. `parseForValidate` will 
still attribute the error to the correct key/value.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a diff in pull request #14304: KAFKA-13328, KAFKA-13329 (1): Add preflight validations for key, value, and header converter classes

2023-09-05 Thread via GitHub


gharris1727 commented on code in PR #14304:
URL: https://github.com/apache/kafka/pull/14304#discussion_r1316362060


##
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##
@@ -1090,6 +1090,23 @@ public interface UncheckedCloseable extends 
AutoCloseable {
 void close();
 }
 
+/**
+ * Closes {@code maybeCloseable} if it implements the {@link 
AutoCloseable} interface,
+ * and if an exception is thrown, it is logged at the WARN level.
+ * Be cautious when passing method references as an argument. For 
example:
+ * 
+ * {@code closeQuietly(task::stop, "source task");}
+ * 
+ * Although this method gracefully handles null {@link AutoCloseable} 
objects, attempts to take a method
+ * reference from a null object will result in a {@link 
NullPointerException}. In the example code above,
+ * it would be the caller's responsibility to ensure that {@code task} was 
non-null before attempting to
+ * use a method reference from it.
+ */
+public static void maybeCloseQuietly(Object maybeCloseable, String name) {

Review Comment:
   I found more use-cases for this today, so  on adding this helper.



##
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##
@@ -1090,6 +1090,23 @@ public interface UncheckedCloseable extends 
AutoCloseable {
 void close();
 }
 
+/**
+ * Closes {@code maybeCloseable} if it implements the {@link 
AutoCloseable} interface,
+ * and if an exception is thrown, it is logged at the WARN level.
+ * Be cautious when passing method references as an argument. For 
example:
+ * 
+ * {@code closeQuietly(task::stop, "source task");}
+ * 
+ * Although this method gracefully handles null {@link AutoCloseable} 
objects, attempts to take a method
+ * reference from a null object will result in a {@link 
NullPointerException}. In the example code above,
+ * it would be the caller's responsibility to ensure that {@code task} was 
non-null before attempting to
+ * use a method reference from it.
+ */
+public static void maybeCloseQuietly(Object maybeCloseable, String name) {

Review Comment:
   I found more use-cases for this today, so  on adding this helper.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a diff in pull request #14304: KAFKA-13328, KAFKA-13329 (1): Add preflight validations for key, value, and header converter classes

2023-08-31 Thread via GitHub


gharris1727 commented on code in PR #14304:
URL: https://github.com/apache/kafka/pull/14304#discussion_r1312392628


##
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##
@@ -1519,6 +1537,32 @@ public static String[] enumOptions(Class> enumClass) {
 .toArray(String[]::new);
 }
 
+/**
+ * Ensure that the class is concrete (i.e., not abstract). If it is, throw 
a {@link ConfigException}
+ * with a friendly error message suggesting a list of concrete child 
subclasses (if any are known).
+ * @param cls the class to check; may not be null
+ * @param name the name of the type of class to use in the error message; 
e.g., "Transform",
+ * "Interceptor", or even just "Class"; may be null
+ * @throws ConfigException if the class is not concrete
+ */
+public static void ensureConcrete(Class cls, String name) {
+Objects.requireNonNull(cls);
+if (isBlank(name))
+name = "Class";
+if (Modifier.isAbstract(cls.getModifiers())) {
+String childClassNames = Stream.of(cls.getClasses())
+.filter(cls::isAssignableFrom)
+.filter(c -> !Modifier.isAbstract(c.getModifiers()))
+.filter(c -> Modifier.isPublic(c.getModifiers()))
+.map(Class::getName)
+.collect(Collectors.joining(", "));
+String message = Utils.isBlank(childClassNames) ?
+name + " is abstract and cannot be created." :
+name + " is abstract and cannot be created. Did you mean " 
+ childClassNames + "?";
+throw new ConfigException(name, cls.getName(), message);

Review Comment:
   The first argument to this exception should be the config name, not the type 
name like it's used in the exception message and in the call-sites.



##
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##
@@ -1519,6 +1537,32 @@ public static String[] enumOptions(Class> enumClass) {
 .toArray(String[]::new);
 }
 
+/**
+ * Ensure that the class is concrete (i.e., not abstract). If it is, throw 
a {@link ConfigException}
+ * with a friendly error message suggesting a list of concrete child 
subclasses (if any are known).
+ * @param cls the class to check; may not be null
+ * @param name the name of the type of class to use in the error message; 
e.g., "Transform",
+ * "Interceptor", or even just "Class"; may be null
+ * @throws ConfigException if the class is not concrete
+ */
+public static void ensureConcrete(Class cls, String name) {
+Objects.requireNonNull(cls);
+if (isBlank(name))
+name = "Class";
+if (Modifier.isAbstract(cls.getModifiers())) {
+String childClassNames = Stream.of(cls.getClasses())
+.filter(cls::isAssignableFrom)

Review Comment:
   If `AbstractClass.Key` implemented the base interface, but didn't extend 
`AbstractClass`, this wouldn't show up in this listing. If this was 
`baseClass::isAssignableFrom`, it would find classes like that.
   
   I don't know how often people would create `Key` and `Value` inner classes 
without subclassing the outer class, since that seems to be the reason people 
use the inner classes in the first place. But if you change this to take the 
`baseClass` and check `baseClass.isAssignableFrom(cls)` like both of the 
call-sites, you could change this filter statement too. I don't think this is a 
big deal, so feel free to leave this implementation as-is.



##
connect/runtime/src/main/java/org/apache/kafka/connect/util/InstantiableClassValidator.java:
##
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.utils.Utils;
+
+public class InstantiableClassValidator implements ConfigDef.Validator {
+
+@Override
+public void ensureValid(String name, Object value) {
+if (value == null) {
+// The value