Re: [PR] KAFKA-13328, KAFKA-13329 (2): Add custom preflight validation support for connector header, key, and value converters [kafka]

2024-05-07 Thread via GitHub


C0urante merged PR #14309:
URL: https://github.com/apache/kafka/pull/14309


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-13328, KAFKA-13329 (2): Add custom preflight validation support for connector header, key, and value converters [kafka]

2024-05-07 Thread via GitHub


C0urante commented on PR #14309:
URL: https://github.com/apache/kafka/pull/14309#issuecomment-2098726331

   Thanks @gharris1727. I've reverted the changes to `Utils.java` and verified 
locally with tests. Everything else looked okay on the previous CI run, going 
to merge.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-13328, KAFKA-13329 (2): Add custom preflight validation support for connector header, key, and value converters [kafka]

2024-05-07 Thread via GitHub


C0urante commented on code in PR #14309:
URL: https://github.com/apache/kafka/pull/14309#discussion_r1592678706


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##
@@ -392,6 +399,146 @@ protected Map 
validateSourceConnectorConfig(SourceConnector
 return configDef.validateAll(config);
 }
 
+/**
+ * General-purpose validation logic for converters that are configured 
directly
+ * in a connector config (as opposed to inherited from the worker config).
+ * @param connectorConfig the configuration for the connector; may not be 
null
+ * @param pluginConfigValue the {@link ConfigValue} for the converter 
property in the connector config;
+ *  may be null, in which case no validation will 
be performed under the assumption that the
+ *  connector will use inherit the converter 
settings from the worker
+ * @param pluginInterface the interface for the plugin type
+ *(e.g., {@code 
org.apache.kafka.connect.storage.Converter.class});
+ *may not be null
+ * @param configDefAccessor an accessor that can be used to retrieve a 
{@link ConfigDef}
+ *  from an instance of the plugin type (e.g., 
{@code Converter::config});
+ *  may not be null
+ * @param pluginName a lowercase, human-readable name for the type of 
plugin (e.g., {@code "key converter"});
+ *   may not be null
+ * @param pluginProperty the property used to define a custom class for 
the plugin type
+ *   in a connector config (e.g., {@link 
ConnectorConfig#KEY_CONVERTER_CLASS_CONFIG});
+ *   may not be null
+ * @param defaultProperties any default properties to include in the 
configuration that will be used for
+ *  the plugin; may be null
+
+ * @return a {@link ConfigInfos} object containing validation results for 
the plugin in the connector config,
+ * or null if no custom validation was performed (possibly because no 
custom plugin was defined in the connector
+ * config)
+
+ * @param  the plugin class to perform validation for
+ */
+private  ConfigInfos validateConverterConfig(
+Map connectorConfig,
+ConfigValue pluginConfigValue,
+Class pluginInterface,
+Function configDefAccessor,
+String pluginName,
+String pluginProperty,
+Map defaultProperties
+) {
+Objects.requireNonNull(connectorConfig);
+Objects.requireNonNull(pluginInterface);
+Objects.requireNonNull(configDefAccessor);
+Objects.requireNonNull(pluginName);
+Objects.requireNonNull(pluginProperty);
+
+String pluginClass = connectorConfig.get(pluginProperty);
+
+if (pluginClass == null
+|| pluginConfigValue == null
+|| !pluginConfigValue.errorMessages().isEmpty()
+) {
+// Either no custom converter was specified, or one was specified 
but there's a problem with it.
+// No need to proceed any further.
+return null;
+}
+
+T pluginInstance;
+try {
+pluginInstance = Utils.newInstance(pluginClass, pluginInterface);
+} catch (ClassNotFoundException | RuntimeException e) {
+log.error("Failed to instantiate {} class {}; this should have 
been caught by prior validation logic", pluginName, pluginClass, e);
+pluginConfigValue.addErrorMessage("Failed to load class " + 
pluginClass + (e.getMessage() != null ? ": " + e.getMessage() : ""));
+return null;
+}
+
+try {
+ConfigDef configDef;
+try {
+configDef = configDefAccessor.apply(pluginInstance);
+} catch (RuntimeException e) {
+log.error("Failed to load ConfigDef from {} of type {}", 
pluginName, pluginClass, e);
+pluginConfigValue.addErrorMessage("Failed to load ConfigDef 
from " + pluginName + (e.getMessage() != null ? ": " + e.getMessage() : ""));
+return null;
+}
+if (configDef == null) {
+log.warn("{}.config() has returned a null ConfigDef; no 
further preflight config validation for this converter will be performed", 
pluginClass);
+// Older versions of Connect didn't do any converter 
validation.
+// Even though converters are technically required to return a 
non-null ConfigDef object from their config() method,
+// we permit this case in order to avoid breaking existing 
converters that, despite not adhering to this requirement,
+// can be used successfully with a connector.
+return null;
+}
+final String 

Re: [PR] KAFKA-13328, KAFKA-13329 (2): Add custom preflight validation support for connector header, key, and value converters [kafka]

2024-05-06 Thread via GitHub


gharris1727 commented on code in PR #14309:
URL: https://github.com/apache/kafka/pull/14309#discussion_r1591196797


##
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##
@@ -16,6 +16,15 @@
  */
 package org.apache.kafka.common.utils;
 
+import java.lang.reflect.Modifier;
+import java.nio.BufferUnderflowException;
+import java.nio.ByteOrder;
+import java.nio.file.StandardOpenOption;
+import java.util.AbstractMap;
+import java.util.EnumSet;
+import java.util.Map.Entry;
+import java.util.SortedSet;
+import java.util.TreeSet;

Review Comment:
   I think this should be reverted, we don't touch anything else in this file.



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##
@@ -392,6 +399,146 @@ protected Map 
validateSourceConnectorConfig(SourceConnector
 return configDef.validateAll(config);
 }
 
+/**
+ * General-purpose validation logic for converters that are configured 
directly
+ * in a connector config (as opposed to inherited from the worker config).
+ * @param connectorConfig the configuration for the connector; may not be 
null
+ * @param pluginConfigValue the {@link ConfigValue} for the converter 
property in the connector config;
+ *  may be null, in which case no validation will 
be performed under the assumption that the
+ *  connector will use inherit the converter 
settings from the worker
+ * @param pluginInterface the interface for the plugin type
+ *(e.g., {@code 
org.apache.kafka.connect.storage.Converter.class});
+ *may not be null
+ * @param configDefAccessor an accessor that can be used to retrieve a 
{@link ConfigDef}
+ *  from an instance of the plugin type (e.g., 
{@code Converter::config});
+ *  may not be null
+ * @param pluginName a lowercase, human-readable name for the type of 
plugin (e.g., {@code "key converter"});
+ *   may not be null
+ * @param pluginProperty the property used to define a custom class for 
the plugin type
+ *   in a connector config (e.g., {@link 
ConnectorConfig#KEY_CONVERTER_CLASS_CONFIG});
+ *   may not be null
+ * @param defaultProperties any default properties to include in the 
configuration that will be used for
+ *  the plugin; may be null
+
+ * @return a {@link ConfigInfos} object containing validation results for 
the plugin in the connector config,
+ * or null if no custom validation was performed (possibly because no 
custom plugin was defined in the connector
+ * config)
+
+ * @param  the plugin class to perform validation for
+ */
+private  ConfigInfos validateConverterConfig(
+Map connectorConfig,
+ConfigValue pluginConfigValue,
+Class pluginInterface,
+Function configDefAccessor,
+String pluginName,
+String pluginProperty,
+Map defaultProperties
+) {
+Objects.requireNonNull(connectorConfig);
+Objects.requireNonNull(pluginInterface);
+Objects.requireNonNull(configDefAccessor);
+Objects.requireNonNull(pluginName);
+Objects.requireNonNull(pluginProperty);
+
+String pluginClass = connectorConfig.get(pluginProperty);
+
+if (pluginClass == null
+|| pluginConfigValue == null
+|| !pluginConfigValue.errorMessages().isEmpty()
+) {
+// Either no custom converter was specified, or one was specified 
but there's a problem with it.
+// No need to proceed any further.
+return null;
+}
+
+T pluginInstance;
+try {
+pluginInstance = Utils.newInstance(pluginClass, pluginInterface);
+} catch (ClassNotFoundException | RuntimeException e) {
+log.error("Failed to instantiate {} class {}; this should have 
been caught by prior validation logic", pluginName, pluginClass, e);
+pluginConfigValue.addErrorMessage("Failed to load class " + 
pluginClass + (e.getMessage() != null ? ": " + e.getMessage() : ""));
+return null;
+}
+
+try {
+ConfigDef configDef;
+try {
+configDef = configDefAccessor.apply(pluginInstance);
+} catch (RuntimeException e) {
+log.error("Failed to load ConfigDef from {} of type {}", 
pluginName, pluginClass, e);
+pluginConfigValue.addErrorMessage("Failed to load ConfigDef 
from " + pluginName + (e.getMessage() != null ? ": " + e.getMessage() : ""));
+return null;
+}
+if (configDef == null) {
+log.warn("{}.config() has returned a null ConfigDef; no 
further preflight config 

Re: [PR] KAFKA-13328, KAFKA-13329 (2): Add custom preflight validation support for connector header, key, and value converters [kafka]

2024-05-02 Thread via GitHub


C0urante commented on PR #14309:
URL: https://github.com/apache/kafka/pull/14309#issuecomment-2091927044

   Thanks @gharris1727, can you give this another round?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-13328, KAFKA-13329 (2): Add custom preflight validation support for connector header, key, and value converters [kafka]

2024-05-01 Thread via GitHub


C0urante commented on code in PR #14309:
URL: https://github.com/apache/kafka/pull/14309#discussion_r1586591019


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##
@@ -655,27 +811,38 @@ private static ConfigInfos validateClientOverrides(String 
connName,
 ConnectorClientConfigRequest connectorClientConfigRequest = new 
ConnectorClientConfigRequest(
 connName, connectorType, connectorClass, clientConfigs, 
clientType);
 List configValues = 
connectorClientConfigOverridePolicy.validate(connectorClientConfigRequest);
-if (configValues != null) {
-for (ConfigValue validatedConfigValue : configValues) {
-ConfigKey configKey = 
configKeys.get(validatedConfigValue.name());
-ConfigKeyInfo configKeyInfo = null;
-if (configKey != null) {
-if (configKey.group != null) {
-groups.add(configKey.group);
-}
-configKeyInfo = convertConfigKey(configKey, prefix);
-}
 
-ConfigValue configValue = new ConfigValue(prefix + 
validatedConfigValue.name(), validatedConfigValue.value(),
-  
validatedConfigValue.recommendedValues(), validatedConfigValue.errorMessages());
-if (!configValue.errorMessages().isEmpty()) {
-errorCount++;
+return prefixedConfigInfos(configDef.configKeys(), configValues, 
prefix);
+}
+
+private static ConfigInfos prefixedConfigInfos(Map 
configKeys, List configValues, String prefix) {
+int errorCount = 0;
+Set groups = new LinkedHashSet<>();
+List configInfos = new ArrayList<>();
+
+if (configValues == null) {

Review Comment:
   `ConfigDef::validate` is non-final, and plugin instances may return a 
subclass from their `config` methods that possibly returns null.
   
   I acknowledge that this is extremely unlikely, but it seems like this null 
guard is the best way to handle that scenario as opposed to, e.g., throwing an 
error and causing a 500 response to be returned. Thoughts?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-13328, KAFKA-13329 (2): Add custom preflight validation support for connector header, key, and value converters [kafka]

2024-05-01 Thread via GitHub


C0urante commented on code in PR #14309:
URL: https://github.com/apache/kafka/pull/14309#discussion_r1586591574


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##
@@ -655,27 +811,38 @@ private static ConfigInfos validateClientOverrides(String 
connName,
 ConnectorClientConfigRequest connectorClientConfigRequest = new 
ConnectorClientConfigRequest(
 connName, connectorType, connectorClass, clientConfigs, 
clientType);
 List configValues = 
connectorClientConfigOverridePolicy.validate(connectorClientConfigRequest);
-if (configValues != null) {
-for (ConfigValue validatedConfigValue : configValues) {
-ConfigKey configKey = 
configKeys.get(validatedConfigValue.name());
-ConfigKeyInfo configKeyInfo = null;
-if (configKey != null) {
-if (configKey.group != null) {
-groups.add(configKey.group);
-}
-configKeyInfo = convertConfigKey(configKey, prefix);
-}
 
-ConfigValue configValue = new ConfigValue(prefix + 
validatedConfigValue.name(), validatedConfigValue.value(),
-  
validatedConfigValue.recommendedValues(), validatedConfigValue.errorMessages());
-if (!configValue.errorMessages().isEmpty()) {
-errorCount++;
+return prefixedConfigInfos(configDef.configKeys(), configValues, 
prefix);
+}
+
+private static ConfigInfos prefixedConfigInfos(Map 
configKeys, List configValues, String prefix) {
+int errorCount = 0;
+Set groups = new LinkedHashSet<>();
+List configInfos = new ArrayList<>();
+
+if (configValues == null) {
+return new ConfigInfos("", 0, new ArrayList<>(groups), 
configInfos);
+}
+
+for (ConfigValue validatedConfigValue : configValues) {
+ConfigKey configKey = configKeys.get(validatedConfigValue.name());
+ConfigKeyInfo configKeyInfo = null;
+if (configKey != null) {
+if (configKey.group != null) {
+groups.add(configKey.group);
 }
-ConfigValueInfo configValueInfo = 
convertConfigValue(configValue, configKey != null ? configKey.type : null);
-configInfoList.add(new ConfigInfo(configKeyInfo, 
configValueInfo));
+configKeyInfo = convertConfigKey(configKey, prefix);
+}
+
+ConfigValue configValue = new ConfigValue(prefix + 
validatedConfigValue.name(), validatedConfigValue.value(),
+validatedConfigValue.recommendedValues(), 
validatedConfigValue.errorMessages());
+if (configValue.errorMessages().size() > 0) {
+errorCount++;
 }
+ConfigValueInfo configValueInfo = convertConfigValue(configValue, 
configKey != null ? configKey.type : null);
+configInfos.add(new ConfigInfo(configKeyInfo, configValueInfo));
 }
-return new ConfigInfos(connectorClass.toString(), errorCount, new 
ArrayList<>(groups), configInfoList);
+return new ConfigInfos("", errorCount, new ArrayList<>(groups), 
configInfos);

Review Comment:
   Yeah, it's a little weird with the empty string here. Hopefully it's fine 
for now but if we continue augmenting and refactoring this class I agree that 
it might be worth changing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-13328, KAFKA-13329 (2): Add custom preflight validation support for connector header, key, and value converters [kafka]

2024-05-01 Thread via GitHub


C0urante commented on code in PR #14309:
URL: https://github.com/apache/kafka/pull/14309#discussion_r1586591136


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##
@@ -392,6 +399,146 @@ protected Map 
validateSourceConnectorConfig(SourceConnector
 return configDef.validateAll(config);
 }
 
+/**
+ * General-purpose validation logic for converters that are configured 
directly
+ * in a connector config (as opposed to inherited from the worker config).
+ * @param connectorConfig the configuration for the connector; may not be 
null
+ * @param pluginConfigValue the {@link ConfigValue} for the converter 
property in the connector config;
+ *  may be null, in which case no validation will 
be performed under the assumption that the
+ *  connector will use inherit the converter 
settings from the worker
+ * @param pluginInterface the interface for the plugin type
+ *(e.g., {@code 
org.apache.kafka.connect.storage.Converter.class});
+ *may not be null
+ * @param configDefAccessor an accessor that can be used to retrieve a 
{@link ConfigDef}
+ *  from an instance of the plugin type (e.g., 
{@code Converter::config});
+ *  may not be null
+ * @param pluginName a lowercase, human-readable name for the type of 
plugin (e.g., {@code "key converter"});
+ *   may not be null
+ * @param pluginProperty the property used to define a custom class for 
the plugin type
+ *   in a connector config (e.g., {@link 
ConnectorConfig#KEY_CONVERTER_CLASS_CONFIG});
+ *   may not be null
+ * @param defaultProperties any default properties to include in the 
configuration that will be used for
+ *  the plugin; may be null
+
+ * @return a {@link ConfigInfos} object containing validation results for 
the plugin in the connector config,
+ * or null if no custom validation was performed (possibly because no 
custom plugin was defined in the connector
+ * config)
+
+ * @param  the plugin class to perform validation for
+ */
+private  ConfigInfos validateConverterConfig(
+Map connectorConfig,
+ConfigValue pluginConfigValue,
+Class pluginInterface,
+Function configDefAccessor,
+String pluginName,
+String pluginProperty,
+Map defaultProperties
+) {
+Objects.requireNonNull(connectorConfig);
+Objects.requireNonNull(pluginInterface);
+Objects.requireNonNull(configDefAccessor);
+Objects.requireNonNull(pluginName);
+Objects.requireNonNull(pluginProperty);
+
+String pluginClass = connectorConfig.get(pluginProperty);
+
+if (pluginClass == null
+|| pluginConfigValue == null
+|| !pluginConfigValue.errorMessages().isEmpty()
+) {
+// Either no custom converter was specified, or one was specified 
but there's a problem with it.
+// No need to proceed any further.
+return null;
+}
+
+T pluginInstance;
+try {
+pluginInstance = Utils.newInstance(pluginClass, pluginInterface);

Review Comment:
   I think this is actually correct. All calls to `validateConverterConfig` 
take place within a `LoaderSwap` that causes the connector's classloader to be 
used, which unless I'm mistaken matches the behavior when instantiating tasks 
(loader swap 
[here](https://github.com/apache/kafka/blob/4825c89d14e5f1b2da7e1f48dac97888602028d7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L654),
 converter instantiation 
[here](https://github.com/apache/kafka/blob/4825c89d14e5f1b2da7e1f48dac97888602028d7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L666-L674)).
 It's true that `Plugins::newConverter` and `Plugins::newHeaderConverter` are 
used instead of `Utils::newInstance` when starting tasks, but when invoking the 
`Plugins` methods with `classLoaderUsage` set to `CURRENT_CLASSLOADER`, no 
classloader swapping takes place, so the connector loader is still used.



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##
@@ -392,6 +399,146 @@ protected Map 
validateSourceConnectorConfig(SourceConnector
 return configDef.validateAll(config);
 }
 
+/**
+ * General-purpose validation logic for converters that are configured 
directly
+ * in a connector config (as opposed to inherited from the worker config).
+ * @param connectorConfig the configuration for the connector; may not be 
null
+ * @param pluginConfigValue the {@link ConfigValue} for the converter 
property in the connector config;
+ *  

Re: [PR] KAFKA-13328, KAFKA-13329 (2): Add custom preflight validation support for connector header, key, and value converters [kafka]

2024-05-01 Thread via GitHub


C0urante commented on code in PR #14309:
URL: https://github.com/apache/kafka/pull/14309#discussion_r1586590857


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##
@@ -562,8 +709,13 @@ ConfigInfos validateConnectorConfig(
 configKeys.putAll(configDef.configKeys());
 allGroups.addAll(configDef.groups());
 configValues.addAll(config.configValues());
-ConfigInfos configInfos =  generateResult(connType, configKeys, 
configValues, new ArrayList<>(allGroups));
 
+// do custom converter-specific validation
+ConfigInfos headerConverterConfigInfos = 
validateHeaderConverterConfig(connectorProps, 
validatedConnectorConfig.get(HEADER_CONVERTER_CLASS_CONFIG));
+ConfigInfos keyConverterConfigInfos = 
validateKeyConverterConfig(connectorProps, 
validatedConnectorConfig.get(KEY_CONVERTER_CLASS_CONFIG));
+ConfigInfos valueConverterConfigInfos = 
validateValueConverterConfig(connectorProps, 
validatedConnectorConfig.get(VALUE_CONVERTER_CLASS_CONFIG));

Review Comment:
   Yeah, the alternative was to pass in the entire `validatedConnectorConfig` 
and let the various `validateXxxConverterConfig` methods pull out the relevant 
`ConfigValue` field. But this seemed strange considering those methods only 
require a single `ConfigValue` object.



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##
@@ -392,6 +399,146 @@ protected Map 
validateSourceConnectorConfig(SourceConnector
 return configDef.validateAll(config);
 }
 
+/**
+ * General-purpose validation logic for converters that are configured 
directly
+ * in a connector config (as opposed to inherited from the worker config).
+ * @param connectorConfig the configuration for the connector; may not be 
null
+ * @param pluginConfigValue the {@link ConfigValue} for the converter 
property in the connector config;
+ *  may be null, in which case no validation will 
be performed under the assumption that the
+ *  connector will use inherit the converter 
settings from the worker
+ * @param pluginInterface the interface for the plugin type
+ *(e.g., {@code 
org.apache.kafka.connect.storage.Converter.class});
+ *may not be null
+ * @param configDefAccessor an accessor that can be used to retrieve a 
{@link ConfigDef}
+ *  from an instance of the plugin type (e.g., 
{@code Converter::config});
+ *  may not be null
+ * @param pluginName a lowercase, human-readable name for the type of 
plugin (e.g., {@code "key converter"});
+ *   may not be null
+ * @param pluginProperty the property used to define a custom class for 
the plugin type
+ *   in a connector config (e.g., {@link 
ConnectorConfig#KEY_CONVERTER_CLASS_CONFIG});
+ *   may not be null
+ * @param defaultProperties any default properties to include in the 
configuration that will be used for
+ *  the plugin; may be null
+
+ * @return a {@link ConfigInfos} object containing validation results for 
the plugin in the connector config,
+ * or null if no custom validation was performed (possibly because no 
custom plugin was defined in the connector
+ * config)
+
+ * @param  the plugin class to perform validation for
+ */
+private  ConfigInfos validateConverterConfig(
+Map connectorConfig,
+ConfigValue pluginConfigValue,
+Class pluginInterface,
+Function configDefAccessor,
+String pluginName,
+String pluginProperty,
+Map defaultProperties
+) {
+Objects.requireNonNull(connectorConfig);
+Objects.requireNonNull(pluginInterface);
+Objects.requireNonNull(configDefAccessor);
+Objects.requireNonNull(pluginName);
+Objects.requireNonNull(pluginProperty);
+
+String pluginClass = connectorConfig.get(pluginProperty);
+
+if (pluginClass == null
+|| pluginConfigValue == null
+|| !pluginConfigValue.errorMessages().isEmpty()
+) {
+// Either no custom converter was specified, or one was specified 
but there's a problem with it.
+// No need to proceed any further.
+return null;
+}
+
+T pluginInstance;
+try {
+pluginInstance = Utils.newInstance(pluginClass, pluginInterface);
+} catch (ClassNotFoundException | RuntimeException e) {
+log.error("Failed to instantiate {} class {}; this should have 
been caught by prior validation logic", pluginName, pluginClass, e);
+pluginConfigValue.addErrorMessage("Failed to load class " + 

Re: [PR] KAFKA-13328, KAFKA-13329 (2): Add custom preflight validation support for connector header, key, and value converters [kafka]

2024-04-30 Thread via GitHub


gharris1727 commented on code in PR #14309:
URL: https://github.com/apache/kafka/pull/14309#discussion_r1585294415


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##
@@ -655,27 +811,38 @@ private static ConfigInfos validateClientOverrides(String 
connName,
 ConnectorClientConfigRequest connectorClientConfigRequest = new 
ConnectorClientConfigRequest(
 connName, connectorType, connectorClass, clientConfigs, 
clientType);
 List configValues = 
connectorClientConfigOverridePolicy.validate(connectorClientConfigRequest);
-if (configValues != null) {
-for (ConfigValue validatedConfigValue : configValues) {
-ConfigKey configKey = 
configKeys.get(validatedConfigValue.name());
-ConfigKeyInfo configKeyInfo = null;
-if (configKey != null) {
-if (configKey.group != null) {
-groups.add(configKey.group);
-}
-configKeyInfo = convertConfigKey(configKey, prefix);
-}
 
-ConfigValue configValue = new ConfigValue(prefix + 
validatedConfigValue.name(), validatedConfigValue.value(),
-  
validatedConfigValue.recommendedValues(), validatedConfigValue.errorMessages());
-if (!configValue.errorMessages().isEmpty()) {
-errorCount++;
+return prefixedConfigInfos(configDef.configKeys(), configValues, 
prefix);
+}
+
+private static ConfigInfos prefixedConfigInfos(Map 
configKeys, List configValues, String prefix) {
+int errorCount = 0;
+Set groups = new LinkedHashSet<>();
+List configInfos = new ArrayList<>();
+
+if (configValues == null) {

Review Comment:
   I think this null check is only relevant when the value is coming from the 
overridePolicy.validate, in validateConverterConfig, I think the 
ConfigDef#validate call will always be non-null.



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##
@@ -392,6 +399,146 @@ protected Map 
validateSourceConnectorConfig(SourceConnector
 return configDef.validateAll(config);
 }
 
+/**
+ * General-purpose validation logic for converters that are configured 
directly
+ * in a connector config (as opposed to inherited from the worker config).
+ * @param connectorConfig the configuration for the connector; may not be 
null
+ * @param pluginConfigValue the {@link ConfigValue} for the converter 
property in the connector config;
+ *  may be null, in which case no validation will 
be performed under the assumption that the
+ *  connector will use inherit the converter 
settings from the worker
+ * @param pluginInterface the interface for the plugin type
+ *(e.g., {@code 
org.apache.kafka.connect.storage.Converter.class});
+ *may not be null
+ * @param configDefAccessor an accessor that can be used to retrieve a 
{@link ConfigDef}
+ *  from an instance of the plugin type (e.g., 
{@code Converter::config});
+ *  may not be null
+ * @param pluginName a lowercase, human-readable name for the type of 
plugin (e.g., {@code "key converter"});
+ *   may not be null
+ * @param pluginProperty the property used to define a custom class for 
the plugin type
+ *   in a connector config (e.g., {@link 
ConnectorConfig#KEY_CONVERTER_CLASS_CONFIG});
+ *   may not be null
+ * @param defaultProperties any default properties to include in the 
configuration that will be used for
+ *  the plugin; may be null
+
+ * @return a {@link ConfigInfos} object containing validation results for 
the plugin in the connector config,
+ * or null if no custom validation was performed (possibly because no 
custom plugin was defined in the connector
+ * config)
+
+ * @param  the plugin class to perform validation for
+ */
+private  ConfigInfos validateConverterConfig(
+Map connectorConfig,
+ConfigValue pluginConfigValue,
+Class pluginInterface,
+Function configDefAccessor,
+String pluginName,
+String pluginProperty,
+Map defaultProperties
+) {
+Objects.requireNonNull(connectorConfig);
+Objects.requireNonNull(pluginInterface);
+Objects.requireNonNull(configDefAccessor);
+Objects.requireNonNull(pluginName);
+Objects.requireNonNull(pluginProperty);
+
+String pluginClass = connectorConfig.get(pluginProperty);
+
+if (pluginClass == null
+|| pluginConfigValue == null
+|| 

Re: [PR] KAFKA-13328, KAFKA-13329 (2): Add custom preflight validation support for connector header, key, and value converters [kafka]

2024-04-30 Thread via GitHub


C0urante commented on PR #14309:
URL: https://github.com/apache/kafka/pull/14309#issuecomment-2085912341

   @gharris1727 I've resolved the merge conflicts again; can you please take a 
look when you get a chance?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-13328, KAFKA-13329 (2): Add custom preflight validation support for connector header, key, and value converters [kafka]

2024-04-15 Thread via GitHub


C0urante commented on PR #14309:
URL: https://github.com/apache/kafka/pull/14309#issuecomment-2057408474

   Hi @gharris1727 sorry for the delay. I've resolved the merge conflicts; let 
me know what you think if you have a moment.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-13328, KAFKA-13329 (2): Add custom preflight validation support for connector header, key, and value converters [kafka]

2024-02-05 Thread via GitHub


gharris1727 commented on PR #14309:
URL: https://github.com/apache/kafka/pull/14309#issuecomment-1928574152

   Hi @C0urante could you resolve the merge conflicts now that #14304 is merged?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-13328, KAFKA-13329 (2): Add custom preflight validation support for connector header, key, and value converters [kafka]

2024-01-29 Thread via GitHub


github-actions[bot] commented on PR #14309:
URL: https://github.com/apache/kafka/pull/14309#issuecomment-1916012508

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-13328, KAFKA-13329 (2): Add custom preflight validation support for connector header, key, and value converters [kafka]

2023-10-31 Thread via GitHub


C0urante commented on PR #14309:
URL: https://github.com/apache/kafka/pull/14309#issuecomment-1787948593

   @gharris1727 similar to https://github.com/apache/kafka/pull/14304 - 
apologies for the delay, I've addressed all outstanding concerns, ready for 
another round when you have time 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-13328, KAFKA-13329 (2): Add custom preflight validation support for connector header, key, and value converters [kafka]

2023-10-31 Thread via GitHub


C0urante commented on code in PR #14309:
URL: https://github.com/apache/kafka/pull/14309#discussion_r1378105490


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##
@@ -380,6 +387,110 @@ protected Map 
validateSourceConnectorConfig(SourceConnector
 return configDef.validateAll(config);
 }
 
+private  ConfigInfos validateConverterConfig(
+Map connectorConfig,
+ConfigValue converterConfigValue,
+Class converterInterface,
+Function configDefAccessor,
+String converterName,
+String converterProperty,
+ConverterType converterType

Review Comment:
   I like leaving room for a potential future refactor that uses this method 
for more than just converters, but there is one more part here that's 
converter-specific: we permit converters to return null `ConfigDef` objects 
from their `config` methods. We don't allow, e.g., SMTs or predicates to do the 
same.
   
   To save time on potential future refactoring, I've renamed every parameter 
and variable in this method to use "plugin" instead of "converter, but I've 
kept the method name itself (`validateConverterConfig`) the same, and have 
retained some converter-specific language in log messages. I've also used a 
`Map defaultProperties` instead of the functional programming 
approach in order to be a little more concise with existing caller code; hope 
this isn't too controversial. Let me know if this strikes the right balance!
   
   I've also added a Javadoc to this method with examples for most parameters, 
since the parameter names only helped so much and even I was having trouble 
recalling how this method worked after a few months without reading it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-13328, KAFKA-13329 (2): Add custom preflight validation support for connector header, key, and value converters [kafka]

2023-10-31 Thread via GitHub


C0urante commented on code in PR #14309:
URL: https://github.com/apache/kafka/pull/14309#discussion_r1378078025


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##
@@ -380,6 +387,110 @@ protected Map 
validateSourceConnectorConfig(SourceConnector
 return configDef.validateAll(config);
 }
 
+private  ConfigInfos validateConverterConfig(
+Map connectorConfig,
+ConfigValue converterConfigValue,
+Class converterInterface,
+Function configDefAccessor,
+String converterName,
+String converterProperty,
+ConverterType converterType
+) {
+String converterClass = connectorConfig.get(converterProperty);
+
+if (converterClass == null
+|| converterConfigValue == null
+|| !converterConfigValue.errorMessages().isEmpty()
+) {
+// Either no custom converter was specified, or one was specified 
but there's a problem with it.
+// No need to proceed any further.
+return null;
+}
+
+T converterInstance;
+try {
+converterInstance = Utils.newInstance(converterClass, 
converterInterface);
+} catch (ClassNotFoundException | RuntimeException e) {
+log.error("Failed to instantiate {} class {}; this should have 
been caught by prior validation logic", converterName, converterClass, e);
+converterConfigValue.addErrorMessage("Failed to load class " + 
converterClass + (e.getMessage() != null ? ": " + e.getMessage() : ""));
+return null;
+}
+
+try (Utils.UncheckedCloseable close = () -> 
Utils.maybeCloseQuietly(converterInstance, converterName + " " + 
converterClass);) {

Review Comment:
   Thanks for an exhaustive analysis, I see the error of my ways now 
   
   I've pushed a new commit migrating the `Utils::closeQuietly` call to a 
`finally` block. The `MaybeCloseable` idea is fascinating but I don't think the 
additional cognitive burden is worth the small bump in ergonomics for this 
change. I do think it might be worth it to apply it across the whole code base 
(sort of like how https://github.com/apache/kafka/pull/13185 forces the whole 
code base to be aware of plugin-thrown exceptions, at least for `Connector` 
instances).



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##
@@ -380,6 +387,110 @@ protected Map 
validateSourceConnectorConfig(SourceConnector
 return configDef.validateAll(config);
 }
 
+private  ConfigInfos validateConverterConfig(
+Map connectorConfig,
+ConfigValue converterConfigValue,
+Class converterInterface,
+Function configDefAccessor,
+String converterName,
+String converterProperty,
+ConverterType converterType
+) {
+String converterClass = connectorConfig.get(converterProperty);
+
+if (converterClass == null
+|| converterConfigValue == null
+|| !converterConfigValue.errorMessages().isEmpty()
+) {
+// Either no custom converter was specified, or one was specified 
but there's a problem with it.
+// No need to proceed any further.
+return null;
+}
+
+T converterInstance;
+try {
+converterInstance = Utils.newInstance(converterClass, 
converterInterface);
+} catch (ClassNotFoundException | RuntimeException e) {
+log.error("Failed to instantiate {} class {}; this should have 
been caught by prior validation logic", converterName, converterClass, e);
+converterConfigValue.addErrorMessage("Failed to load class " + 
converterClass + (e.getMessage() != null ? ": " + e.getMessage() : ""));
+return null;
+}
+
+try (Utils.UncheckedCloseable close = () -> 
Utils.maybeCloseQuietly(converterInstance, converterName + " " + 
converterClass);) {

Review Comment:
   Thanks for an exhaustive analysis, I see the error of my ways now 
   
   I've pushed a new commit migrating the `Utils::closeQuietly` call to a 
`finally` block. The `MaybeCloseable` idea is fascinating but I don't think the 
additional cognitive burden is worth the small bump in ergonomics for this 
change. I do think it might be worth it to apply it across the whole code base 
(sort of like how https://github.com/apache/kafka/pull/13185 forces the whole 
code base to be aware of plugin-thrown exceptions, at least for `Connector` 
instances).



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##
@@ -380,6 +387,110 @@ protected Map 
validateSourceConnectorConfig(SourceConnector
 return configDef.validateAll(config);
 }
 
+private  ConfigInfos validateConverterConfig(
+Map connectorConfig,
+