C0urante commented on code in PR #14309: URL: https://github.com/apache/kafka/pull/14309#discussion_r1586590857
########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java: ########## @@ -562,8 +709,13 @@ ConfigInfos validateConnectorConfig( configKeys.putAll(configDef.configKeys()); allGroups.addAll(configDef.groups()); configValues.addAll(config.configValues()); - ConfigInfos configInfos = generateResult(connType, configKeys, configValues, new ArrayList<>(allGroups)); + // do custom converter-specific validation + ConfigInfos headerConverterConfigInfos = validateHeaderConverterConfig(connectorProps, validatedConnectorConfig.get(HEADER_CONVERTER_CLASS_CONFIG)); + ConfigInfos keyConverterConfigInfos = validateKeyConverterConfig(connectorProps, validatedConnectorConfig.get(KEY_CONVERTER_CLASS_CONFIG)); + ConfigInfos valueConverterConfigInfos = validateValueConverterConfig(connectorProps, validatedConnectorConfig.get(VALUE_CONVERTER_CLASS_CONFIG)); Review Comment: Yeah, the alternative was to pass in the entire `validatedConnectorConfig` and let the various `validateXxxConverterConfig` methods pull out the relevant `ConfigValue` field. But this seemed strange considering those methods only require a single `ConfigValue` object. ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java: ########## @@ -392,6 +399,146 @@ protected Map<String, ConfigValue> validateSourceConnectorConfig(SourceConnector return configDef.validateAll(config); } + /** + * General-purpose validation logic for converters that are configured directly + * in a connector config (as opposed to inherited from the worker config). + * @param connectorConfig the configuration for the connector; may not be null + * @param pluginConfigValue the {@link ConfigValue} for the converter property in the connector config; + * may be null, in which case no validation will be performed under the assumption that the + * connector will use inherit the converter settings from the worker + * @param pluginInterface the interface for the plugin type + * (e.g., {@code org.apache.kafka.connect.storage.Converter.class}); + * may not be null + * @param configDefAccessor an accessor that can be used to retrieve a {@link ConfigDef} + * from an instance of the plugin type (e.g., {@code Converter::config}); + * may not be null + * @param pluginName a lowercase, human-readable name for the type of plugin (e.g., {@code "key converter"}); + * may not be null + * @param pluginProperty the property used to define a custom class for the plugin type + * in a connector config (e.g., {@link ConnectorConfig#KEY_CONVERTER_CLASS_CONFIG}); + * may not be null + * @param defaultProperties any default properties to include in the configuration that will be used for + * the plugin; may be null + + * @return a {@link ConfigInfos} object containing validation results for the plugin in the connector config, + * or null if no custom validation was performed (possibly because no custom plugin was defined in the connector + * config) + + * @param <T> the plugin class to perform validation for + */ + private <T> ConfigInfos validateConverterConfig( + Map<String, String> connectorConfig, + ConfigValue pluginConfigValue, + Class<T> pluginInterface, + Function<T, ConfigDef> configDefAccessor, + String pluginName, + String pluginProperty, + Map<String, String> defaultProperties + ) { + Objects.requireNonNull(connectorConfig); + Objects.requireNonNull(pluginInterface); + Objects.requireNonNull(configDefAccessor); + Objects.requireNonNull(pluginName); + Objects.requireNonNull(pluginProperty); + + String pluginClass = connectorConfig.get(pluginProperty); + + if (pluginClass == null + || pluginConfigValue == null + || !pluginConfigValue.errorMessages().isEmpty() + ) { + // Either no custom converter was specified, or one was specified but there's a problem with it. + // No need to proceed any further. + return null; + } + + T pluginInstance; + try { + pluginInstance = Utils.newInstance(pluginClass, pluginInterface); + } catch (ClassNotFoundException | RuntimeException e) { + log.error("Failed to instantiate {} class {}; this should have been caught by prior validation logic", pluginName, pluginClass, e); + pluginConfigValue.addErrorMessage("Failed to load class " + pluginClass + (e.getMessage() != null ? ": " + e.getMessage() : "")); + return null; + } + + try { + ConfigDef configDef; + try { Review Comment: Good call, added several. ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java: ########## @@ -655,27 +811,38 @@ private static ConfigInfos validateClientOverrides(String connName, ConnectorClientConfigRequest connectorClientConfigRequest = new ConnectorClientConfigRequest( connName, connectorType, connectorClass, clientConfigs, clientType); List<ConfigValue> configValues = connectorClientConfigOverridePolicy.validate(connectorClientConfigRequest); - if (configValues != null) { - for (ConfigValue validatedConfigValue : configValues) { - ConfigKey configKey = configKeys.get(validatedConfigValue.name()); - ConfigKeyInfo configKeyInfo = null; - if (configKey != null) { - if (configKey.group != null) { - groups.add(configKey.group); - } - configKeyInfo = convertConfigKey(configKey, prefix); - } - ConfigValue configValue = new ConfigValue(prefix + validatedConfigValue.name(), validatedConfigValue.value(), - validatedConfigValue.recommendedValues(), validatedConfigValue.errorMessages()); - if (!configValue.errorMessages().isEmpty()) { - errorCount++; + return prefixedConfigInfos(configDef.configKeys(), configValues, prefix); + } + + private static ConfigInfos prefixedConfigInfos(Map<String, ConfigKey> configKeys, List<ConfigValue> configValues, String prefix) { + int errorCount = 0; + Set<String> groups = new LinkedHashSet<>(); + List<ConfigInfo> configInfos = new ArrayList<>(); + + if (configValues == null) { Review Comment: `ConfigDef::validate` is non-final, and plugin instances may return a subclass from their `config` methods that possibly returns null. I acknowledge that this is extremely unlikely, but it seems like this null guard is the best way to handle that scenario as opposed to, throwing an error and causing a 500 response to be returned. Thoughts? ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java: ########## @@ -392,6 +399,146 @@ protected Map<String, ConfigValue> validateSourceConnectorConfig(SourceConnector return configDef.validateAll(config); } + /** + * General-purpose validation logic for converters that are configured directly + * in a connector config (as opposed to inherited from the worker config). + * @param connectorConfig the configuration for the connector; may not be null + * @param pluginConfigValue the {@link ConfigValue} for the converter property in the connector config; + * may be null, in which case no validation will be performed under the assumption that the + * connector will use inherit the converter settings from the worker + * @param pluginInterface the interface for the plugin type + * (e.g., {@code org.apache.kafka.connect.storage.Converter.class}); + * may not be null + * @param configDefAccessor an accessor that can be used to retrieve a {@link ConfigDef} + * from an instance of the plugin type (e.g., {@code Converter::config}); + * may not be null + * @param pluginName a lowercase, human-readable name for the type of plugin (e.g., {@code "key converter"}); + * may not be null + * @param pluginProperty the property used to define a custom class for the plugin type + * in a connector config (e.g., {@link ConnectorConfig#KEY_CONVERTER_CLASS_CONFIG}); + * may not be null + * @param defaultProperties any default properties to include in the configuration that will be used for + * the plugin; may be null + + * @return a {@link ConfigInfos} object containing validation results for the plugin in the connector config, + * or null if no custom validation was performed (possibly because no custom plugin was defined in the connector + * config) + + * @param <T> the plugin class to perform validation for + */ + private <T> ConfigInfos validateConverterConfig( + Map<String, String> connectorConfig, + ConfigValue pluginConfigValue, + Class<T> pluginInterface, + Function<T, ConfigDef> configDefAccessor, + String pluginName, + String pluginProperty, + Map<String, String> defaultProperties + ) { + Objects.requireNonNull(connectorConfig); + Objects.requireNonNull(pluginInterface); + Objects.requireNonNull(configDefAccessor); + Objects.requireNonNull(pluginName); + Objects.requireNonNull(pluginProperty); + + String pluginClass = connectorConfig.get(pluginProperty); + + if (pluginClass == null + || pluginConfigValue == null + || !pluginConfigValue.errorMessages().isEmpty() + ) { + // Either no custom converter was specified, or one was specified but there's a problem with it. + // No need to proceed any further. + return null; + } + + T pluginInstance; + try { + pluginInstance = Utils.newInstance(pluginClass, pluginInterface); Review Comment: Good point, and definitely worth calling out. I suspect this won't be a resource bottleneck and GC will be sufficient to clean up these instances before they eat up too much memory, but if not then we can certainly look into caching these plugin instances. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org