This is an automated email from the ASF dual-hosted git repository.

gharris pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ad3369859ca KAFKA-18419: KIP-891 Connect Multiversion Support 
(Transformation and Predicate Changes) (#17742)
ad3369859ca is described below

commit ad3369859ca65e41be01278e529bc7b9165c9d09
Author: snehashisp <[email protected]>
AuthorDate: Tue Jan 7 01:48:45 2025 +0530

    KAFKA-18419: KIP-891 Connect Multiversion Support (Transformation and 
Predicate Changes) (#17742)
    
    Reviewers: Greg Harris <[email protected]>
---
 .../kafka/connect/runtime/AbstractHerder.java      |   4 +-
 .../kafka/connect/runtime/CachedConnectors.java    |  11 +-
 .../kafka/connect/runtime/ConnectorConfig.java     | 156 ++++++++++++++-------
 .../runtime/isolation/DelegatingClassLoader.java   |  40 +++---
 .../kafka/connect/runtime/isolation/Plugins.java   |  17 ++-
 .../runtime/isolation/PluginsRecommenders.java     |  67 +++++++++
 .../kafka/connect/runtime/AbstractHerderTest.java  |  21 ++-
 .../kafka/connect/runtime/ConnectorConfigTest.java |  12 +-
 .../connect/runtime/TransformationConfigTest.java  |  39 ++++--
 9 files changed, 255 insertions(+), 112 deletions(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index 525fc30b1d6..5984a32d3cd 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -859,9 +859,11 @@ public abstract class AbstractHerder implements Herder, 
TaskStatus.Listener, Con
 
             addNullValuedErrors(connectorProps, validatedConnectorConfig);
 
-            ConfigInfos connectorConfigInfo = 
validateConnectorPluginSpecifiedConfigs(connectorProps, 
validatedConnectorConfig, enrichedConfigDef, connector, reportStage);
+            // the order of operations here is important, converter 
validations can add error messages to the connector config
+            // which are collected and converted to ConfigInfos in 
validateConnectorPluginSpecifiedConfigs
             ConfigInfos converterConfigInfo = 
validateAllConverterConfigs(connectorProps, validatedConnectorConfig, 
connectorLoader, reportStage);
             ConfigInfos clientOverrideInfo = 
validateClientOverrides(connectorProps, connectorType, connector.getClass(), 
reportStage, doLog);
+            ConfigInfos connectorConfigInfo = 
validateConnectorPluginSpecifiedConfigs(connectorProps, 
validatedConnectorConfig, enrichedConfigDef, connector, reportStage);
 
             return mergeConfigInfos(connType,
                     connectorConfigInfo,
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/CachedConnectors.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/CachedConnectors.java
index 59c4281fff8..ebfa3522f90 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/CachedConnectors.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/CachedConnectors.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.connect.runtime;
 
 import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
 import 
org.apache.kafka.connect.runtime.isolation.VersionedPluginLoadingException;
 
@@ -31,8 +32,8 @@ public class CachedConnectors {
     private static final String LATEST_VERSION = "latest";
 
     private final Map<String, Map<String, Connector>> connectors;
-    private final Map<String, Exception> invalidConnectors;
-    private final Map<String, Map<String, Exception>> invalidVersions;
+    private final Map<String, Throwable> invalidConnectors;
+    private final Map<String, Map<String, VersionedPluginLoadingException>> 
invalidVersions;
     private final Plugins plugins;
 
     public CachedConnectors(Plugins plugins) {
@@ -42,14 +43,14 @@ public class CachedConnectors {
         this.invalidVersions = new ConcurrentHashMap<>();
     }
 
-    private void validate(String connectorName, VersionRange range) throws 
Exception {
+    private void validate(String connectorName, VersionRange range) throws 
ConnectException, VersionedPluginLoadingException {
         if (invalidConnectors.containsKey(connectorName)) {
-            throw new Exception(invalidConnectors.get(connectorName));
+            throw new ConnectException(invalidConnectors.get(connectorName));
         }
 
         String version = range == null ? LATEST_VERSION : range.toString();
         if (invalidVersions.containsKey(connectorName) && 
invalidVersions.get(connectorName).containsKey(version)) {
-            throw new 
Exception(invalidVersions.get(connectorName).get(version));
+            throw new 
VersionedPluginLoadingException(invalidVersions.get(connectorName).get(version).getMessage());
         }
     }
 
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
index fb523ac4d75..67102f69cc5 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
@@ -27,6 +27,7 @@ import org.apache.kafka.connect.connector.ConnectRecord;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.runtime.errors.ToleranceType;
 import org.apache.kafka.connect.runtime.isolation.PluginDesc;
+import org.apache.kafka.connect.runtime.isolation.PluginType;
 import org.apache.kafka.connect.runtime.isolation.PluginUtils;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.runtime.isolation.PluginsRecommenders;
@@ -274,17 +275,17 @@ public class ConnectorConfig extends AbstractConfig {
     public static ConfigDef enrichedConfigDef(Plugins plugins, Map<String, 
String> connProps, WorkerConfig workerConfig) {
         PluginsRecommenders recommender = new PluginsRecommenders(plugins);
         ConverterDefaults keyConverterDefaults = converterDefaults(plugins, 
KEY_CONVERTER_CLASS_CONFIG,
-                WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, 
WorkerConfig.KEY_CONVERTER_VERSION, connProps, workerConfig, Converter.class);
+                WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, 
WorkerConfig.KEY_CONVERTER_VERSION, connProps, workerConfig, 
PluginType.CONVERTER);
         ConverterDefaults valueConverterDefaults = converterDefaults(plugins, 
VALUE_CONVERTER_CLASS_CONFIG,
-                WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, 
WorkerConfig.VALUE_CONVERTER_VERSION, connProps, workerConfig, Converter.class);
+                WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, 
WorkerConfig.VALUE_CONVERTER_VERSION, connProps, workerConfig, 
PluginType.CONVERTER);
         ConverterDefaults headerConverterDefaults = converterDefaults(plugins, 
HEADER_CONVERTER_CLASS_CONFIG,
-                WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, 
WorkerConfig.HEADER_CONVERTER_VERSION, connProps, workerConfig, 
HeaderConverter.class);
-        return 
configDef(plugins.latestVersion(connProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG)),
+                WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, 
WorkerConfig.HEADER_CONVERTER_VERSION, connProps, workerConfig, 
PluginType.HEADER_CONVERTER);
+        return 
configDef(plugins.latestVersion(connProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG),
 PluginType.SINK, PluginType.SOURCE),
                 keyConverterDefaults, valueConverterDefaults, 
headerConverterDefaults, recommender);
     }
 
     public static ConfigDef enrichedConfigDef(Plugins plugins, String 
connectorClass) {
-        return configDef(plugins.latestVersion(connectorClass), 
CONVERTER_DEFAULTS, CONVERTER_DEFAULTS, CONVERTER_DEFAULTS, EMPTY_RECOMMENDER);
+        return configDef(plugins.latestVersion(connectorClass, 
PluginType.SINK, PluginType.SOURCE), CONVERTER_DEFAULTS, CONVERTER_DEFAULTS, 
CONVERTER_DEFAULTS, EMPTY_RECOMMENDER);
     }
 
     private static ConfigDef.CompositeValidator aliasValidator(String kind) {
@@ -395,10 +396,9 @@ public class ConnectorConfig extends AbstractConfig {
      * <p>
      * {@code requireFullConfig} specifies whether required config values that 
are missing should cause an exception to be thrown.
      */
-    @SuppressWarnings({"rawtypes", "unchecked"})
     public static ConfigDef enrich(Plugins plugins, ConfigDef baseConfigDef, 
Map<String, String> props, boolean requireFullConfig) {
         ConfigDef newDef = new ConfigDef(baseConfigDef);
-        new EnrichablePlugin<Transformation<?>>("Transformation", 
TRANSFORMS_CONFIG, TRANSFORMS_GROUP, (Class) Transformation.class,
+        new EnrichablePlugin<Transformation<?>>("Transformation", 
TRANSFORMS_CONFIG, TRANSFORMS_GROUP, PluginType.TRANSFORMATION,
                 props, requireFullConfig) {
 
             @Override
@@ -417,8 +417,8 @@ public class ConnectorConfig extends AbstractConfig {
             }
 
             @Override
-            protected Stream<Map.Entry<String, ConfigDef.ConfigKey>> 
configDefsForClass(String typeConfig) {
-                return super.configDefsForClass(typeConfig)
+            protected Stream<Map.Entry<String, ConfigDef.ConfigKey>> 
configDefsForClass(String typeConfig, String versionConfig, Plugins plugins) {
+                return super.configDefsForClass(typeConfig, versionConfig, 
plugins)
                         .filter(entry -> {
                             // The implicit parameters mask any from the 
transformer with the same name
                             if 
(TransformationStage.PREDICATE_CONFIG.equals(entry.getKey())
@@ -447,10 +447,16 @@ public class ConnectorConfig extends AbstractConfig {
                             "but there is no config '" + prefixedPredicate + 
"' defining a predicate to be negated.");
                 }
             }
-        }.enrich(newDef);
+
+            @Override
+            protected ConfigDef.Recommender versionRecommender(String 
typeConfig) {
+                return new 
PluginsRecommenders(plugins).transformationPluginRecommender(typeConfig);
+            }
+
+        }.enrich(newDef, plugins);
 
         new EnrichablePlugin<Predicate<?>>("Predicate", PREDICATES_CONFIG, 
PREDICATES_GROUP,
-                (Class) Predicate.class, props, requireFullConfig) {
+                PluginType.PREDICATE, props, requireFullConfig) {
             @Override
             protected Set<PluginDesc<Predicate<?>>> plugins() {
                 return plugins.predicates();
@@ -460,7 +466,14 @@ public class ConnectorConfig extends AbstractConfig {
             protected ConfigDef config(Predicate<?> predicate) {
                 return predicate.config();
             }
-        }.enrich(newDef);
+
+            @Override
+            protected ConfigDef.Recommender versionRecommender(String 
typeConfig) {
+                return new 
PluginsRecommenders(plugins).predicatePluginRecommender(typeConfig);
+            }
+
+        }.enrich(newDef, plugins);
+
         return newDef;
     }
 
@@ -471,7 +484,7 @@ public class ConnectorConfig extends AbstractConfig {
             String workerConverterVersionConfig,
             Map<String, String> connectorProps,
             WorkerConfig workerConfig,
-            Class<T> converterType
+            PluginType converterType
     ) {
         /*
         if a converter is specified in the connector config it overrides the 
worker config for the corresponding converter
@@ -510,34 +523,23 @@ public class ConnectorConfig extends AbstractConfig {
 
         String version = null;
         if (connectorConverter != null) {
-            version = fetchPluginVersion(plugins, connectorConverter, 
connectorVersion, connectorConverter);
+            version = fetchPluginVersion(plugins, connectorClass, 
connectorVersion, connectorConverter, converterType);
         } else {
             version = 
workerConfig.originalsStrings().get(workerConverterVersionConfig);
             if (version == null) {
-                version = plugins.latestVersion(workerConverter);
+                version = plugins.latestVersion(workerConverter, 
converterType);
             }
         }
         return new ConverterDefaults(type, version);
     }
 
-    private static void updateKeyDefault(ConfigDef configDef, String 
versionConfigKey, String versionDefault) {
-        ConfigDef.ConfigKey key = configDef.configKeys().get(versionConfigKey);
-        if (key == null) {
-            return;
-        }
-        configDef.configKeys().put(versionConfigKey, new ConfigDef.ConfigKey(
-                versionConfigKey, key.type, versionDefault, key.validator, 
key.importance, key.documentation, key.group, key.orderInGroup, key.width, 
key.displayName, key.dependents, key.recommender, false
-        ));
-    }
-
-    @SuppressWarnings("unchecked")
-    private static <T> String fetchPluginVersion(Plugins plugins, String 
connectorClass, String connectorVersion, String pluginName) {
-        if (pluginName == null) {
+    private static <T> String fetchPluginVersion(Plugins plugins, String 
connectorClass, String connectorVersion, String pluginName, PluginType 
pluginType) {
+        if (pluginName == null || connectorClass == null) {
             return null;
         }
         try {
             VersionRange range = 
PluginUtils.connectorVersionRequirement(connectorVersion);
-            return plugins.pluginVersion(pluginName, 
plugins.pluginLoader(connectorClass, range));
+            return plugins.pluginVersion(pluginName, 
plugins.pluginLoader(connectorClass, range), pluginType);
         } catch (InvalidVersionSpecificationException | 
VersionedPluginLoadingException e) {
             // these errors should be captured in other places, so we can 
ignore them here
             log.warn("Failed to determine default plugin version for {}", 
connectorClass, e);
@@ -559,24 +561,27 @@ public class ConnectorConfig extends AbstractConfig {
         private final String aliasKind;
         private final String aliasConfig;
         private final String aliasGroup;
+        private final PluginType pluginType;
         private final Class<T> baseClass;
         private final Map<String, String> props;
         private final boolean requireFullConfig;
 
+        @SuppressWarnings("unchecked")
         public EnrichablePlugin(
                 String aliasKind,
-                String aliasConfig, String aliasGroup, Class<T> baseClass,
+                String aliasConfig, String aliasGroup, PluginType pluginType,
                 Map<String, String> props, boolean requireFullConfig) {
             this.aliasKind = aliasKind;
             this.aliasConfig = aliasConfig;
             this.aliasGroup = aliasGroup;
-            this.baseClass = baseClass;
+            this.pluginType = pluginType;
+            this.baseClass = (Class<T>) pluginType.superClass();
             this.props = props;
             this.requireFullConfig = requireFullConfig;
         }
 
         /** Add the configs for this alias to the given {@code ConfigDef}. */
-        void enrich(ConfigDef newDef) {
+        void enrich(ConfigDef newDef, Plugins plugins) {
             Object aliases = ConfigDef.parseType(aliasConfig, 
props.get(aliasConfig), Type.LIST);
             if (!(aliases instanceof List)) {
                 return;
@@ -593,12 +598,17 @@ public class ConnectorConfig extends AbstractConfig {
                 int orderInGroup = 0;
 
                 final String typeConfig = prefix + "type";
+                final String versionConfig = prefix + 
WorkerConfig.PLUGIN_VERSION_SUFFIX;
+                final String defaultVersion = fetchPluginVersion(plugins, 
props.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG),
+                        props.get(ConnectorConfig.CONNECTOR_VERSION), 
props.get(typeConfig), pluginType);
+
+                // Add the class configuration
                 final ConfigDef.Validator typeValidator = 
ConfigDef.LambdaValidator.with(
                         (String name, Object value) -> {
                             validateProps(prefix);
                             // The value will be null if the class couldn't be 
found; no point in performing follow-up validation
                             if (value != null) {
-                                
getConfigDefFromConfigProvidingClass(typeConfig, (Class<?>) value);
+                                getConfigDefFromPlugin(typeConfig, ((Class<?>) 
value).getName(), props.getOrDefault(versionConfig, defaultVersion), plugins);
                             }
                         },
                         () -> "valid configs for " + alias + " " + 
aliasKind.toLowerCase(Locale.ENGLISH));
@@ -607,7 +617,25 @@ public class ConnectorConfig extends AbstractConfig {
                         baseClass.getSimpleName() + " type for " + alias,
                         Collections.emptyList(), new ClassRecommender());
 
-                final ConfigDef configDef = populateConfigDef(typeConfig);
+                // Add the version configuration
+                final ConfigDef.Validator versionValidator = (name, value) -> {
+                    if (value != null) {
+                        try {
+                            getConfigDefFromPlugin(typeConfig, 
props.get(typeConfig), (String) value, plugins);
+                        } catch (VersionedPluginLoadingException e) {
+                            throw e;
+                        } catch (Exception e) {
+                            // ignore any other exception here as they are not 
related to version validation and
+                            // will be captured in the validation of the class 
configuration
+                        }
+                    }
+                };
+                newDef.define(versionConfig, Type.STRING, defaultVersion, 
versionValidator, Importance.HIGH,
+                        "Version of the '" + alias + "' " + 
aliasKind.toLowerCase(Locale.ENGLISH) + ".", group, orderInGroup++, Width.LONG,
+                        baseClass.getSimpleName() + " version for " + alias,
+                        Collections.emptyList(), 
versionRecommender(typeConfig));
+
+                final ConfigDef configDef = populateConfigDef(typeConfig, 
versionConfig, plugins);
                 if (configDef == null) continue;
                 newDef.embed(prefix, group, orderInGroup, configDef);
             }
@@ -621,10 +649,10 @@ public class ConnectorConfig extends AbstractConfig {
          * Populates the ConfigDef according to the configs returned from 
{@code configs()} method of class
          * named in the {@code ...type} parameter of the {@code props}.
          */
-        protected ConfigDef populateConfigDef(String typeConfig) {
+        protected ConfigDef populateConfigDef(String typeConfig, String 
versionConfig, Plugins plugins) {
             final ConfigDef configDef = initialConfigDef();
             try {
-                configDefsForClass(typeConfig)
+                configDefsForClass(typeConfig, versionConfig, plugins)
                         .forEach(entry -> configDef.define(entry.getValue()));
             } catch (ConfigException e) {
                 if (requireFullConfig) {
@@ -640,9 +668,11 @@ public class ConnectorConfig extends AbstractConfig {
          * Return a stream of configs provided by the {@code configs()} method 
of class
          * named in the {@code ...type} parameter of the {@code props}.
          */
-        protected Stream<Map.Entry<String, ConfigDef.ConfigKey>> 
configDefsForClass(String typeConfig) {
-            final Class<?> cls = (Class<?>) ConfigDef.parseType(typeConfig, 
props.get(typeConfig), Type.CLASS);
-            return getConfigDefFromConfigProvidingClass(typeConfig, cls)
+        protected Stream<Map.Entry<String, ConfigDef.ConfigKey>> 
configDefsForClass(String typeConfig, String versionConfig, Plugins plugins) {
+            if (props.get(typeConfig) == null) {
+                throw new ConfigException(typeConfig, null, "Not a " + 
baseClass.getSimpleName());
+            }
+            return getConfigDefFromPlugin(typeConfig, props.get(typeConfig), 
props.get(versionConfig), plugins)
                     .configKeys().entrySet().stream();
         }
 
@@ -651,30 +681,46 @@ public class ConnectorConfig extends AbstractConfig {
             return new ConfigDef();
         }
 
-        /**
-         * Return {@link ConfigDef} from {@code cls}, which is expected to be 
a non-null {@code Class<T>},
-         * by instantiating it and invoking {@link #config(T)}.
-         * @param key
-         * @param cls The subclass of the baseclass.
-         */
-        ConfigDef getConfigDefFromConfigProvidingClass(String key, Class<?> 
cls) {
-            if (cls == null) {
-                throw new ConfigException(key, null, "Not a " + 
baseClass.getSimpleName());
+        @SuppressWarnings("unchecked")
+        ConfigDef getConfigDefFromPlugin(String key, String pluginClass, 
String version, Plugins plugins) {
+            String connectorClass = props.get(CONNECTOR_CLASS_CONFIG);
+            if (pluginClass == null || connectorClass == null) {
+                // if transformation class is null or connector class is null, 
we return empty as these validations are done in respective validators
+                return new ConfigDef();
+            }
+            VersionRange connectorVersionRange;
+            try {
+                connectorVersionRange = 
PluginUtils.connectorVersionRequirement(props.get(CONNECTOR_VERSION));
+            } catch (InvalidVersionSpecificationException e) {
+                // this should be caught in connector version validation
+                return new ConfigDef();
             }
+
+            VersionRange pluginVersion;
+            try {
+                pluginVersion = 
PluginUtils.connectorVersionRequirement(version);
+            } catch (InvalidVersionSpecificationException e) {
+                throw new VersionedPluginLoadingException(e.getMessage());
+            }
+
+            // validate that the plugin class is a subclass of the base class
+            final Class<?> cls = (Class<?>) ConfigDef.parseType(key, 
props.get(key), Type.CLASS);
             Utils.ensureConcreteSubclass(baseClass, cls);
 
-            T pluginInstance;
+            T plugin;
             try {
-                pluginInstance = Utils.newInstance(cls, baseClass);
+                plugin = (T) plugins.newPlugin(pluginClass, pluginVersion, 
plugins.pluginLoader(connectorClass, connectorVersionRange));
+            } catch (VersionedPluginLoadingException e) {
+                throw e;
             } catch (Exception e) {
-                throw new ConfigException(key, String.valueOf(cls), "Error 
getting config definition from " + baseClass.getSimpleName() + ": " + 
e.getMessage());
+                throw new ConfigException(key, pluginClass, "Error getting 
config definition from " + baseClass.getSimpleName() + ": " + e.getMessage());
             }
-            ConfigDef configDef = config(pluginInstance);
+            ConfigDef configDef = config(plugin);
             if (null == configDef) {
                 throw new ConnectException(
                         String.format(
                                 "%s.config() must return a ConfigDef that is 
not null.",
-                                cls.getName()
+                                plugin.getClass().getName()
                         )
                 );
             }
@@ -694,6 +740,8 @@ public class ConnectorConfig extends AbstractConfig {
          */
         protected abstract Set<PluginDesc<T>> plugins();
 
+        protected abstract ConfigDef.Recommender versionRecommender(String 
typeConfig);
+
         /**
          * Recommend bundled transformations or predicates.
          */
@@ -741,7 +789,7 @@ public class ConnectorConfig extends AbstractConfig {
             try {
                 PluginUtils.connectorVersionRequirement((String) value);
             } catch (InvalidVersionSpecificationException e) {
-                throw new ConfigException(name, value, e.getMessage());
+                throw new VersionedPluginLoadingException(e.getMessage());
             }
         }
     }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
index 6a850439287..bdeb224cfde 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.connect.runtime.isolation;
 
-import org.apache.maven.artifact.versioning.ArtifactVersion;
 import org.apache.maven.artifact.versioning.DefaultArtifactVersion;
 import org.apache.maven.artifact.versioning.VersionRange;
 import org.slf4j.Logger;
@@ -28,6 +27,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
@@ -127,19 +127,7 @@ public class DelegatingClassLoader extends URLClassLoader {
         return aliases.getOrDefault(classOrAlias, classOrAlias);
     }
 
-    String latestVersion(String classOrAlias) {
-        if (classOrAlias == null) {
-            return null;
-        }
-        String fullName = aliases.getOrDefault(classOrAlias, classOrAlias);
-        SortedMap<PluginDesc<?>, ClassLoader> inner = 
pluginLoaders.get(fullName);
-        if (inner == null) {
-            return null;
-        }
-        return inner.lastKey().version();
-    }
-
-    String versionInLocation(String classOrAlias, String location) {
+    PluginDesc<?> pluginDesc(String classOrAlias, String preferredLocation, 
Set<PluginType> allowedTypes) {
         if (classOrAlias == null) {
             return null;
         }
@@ -148,12 +136,17 @@ public class DelegatingClassLoader extends URLClassLoader 
{
         if (inner == null) {
             return null;
         }
+        PluginDesc<?> result = null;
         for (Map.Entry<PluginDesc<?>, ClassLoader> entry : inner.entrySet()) {
-            if (entry.getKey().location().equals(location)) {
-                return entry.getKey().version();
+            if (!allowedTypes.contains(entry.getKey().type())) {
+                continue;
+            }
+            result = entry.getKey();
+            if (result.location().equals(preferredLocation)) {
+                return result;
             }
         }
-        return null;
+        return result;
     }
 
     private ClassLoader findPluginLoader(
@@ -170,7 +163,6 @@ public class DelegatingClassLoader extends URLClassLoader {
                         + "Provided soft version: %s ", range));
             }
 
-            ArtifactVersion version = null;
             ClassLoader loader = null;
             for (Map.Entry<PluginDesc<?>, ClassLoader> entry : 
loaders.entrySet()) {
                 // the entries should be in sorted order of versions so this 
should end up picking the latest version which matches the range
@@ -227,19 +219,19 @@ public class DelegatingClassLoader extends URLClassLoader 
{
             if (range == null) {
                 return plugin;
             }
-            verifyClasspathVersionedPlugin(name, plugin, range);
+            verifyClasspathVersionedPlugin(fullName, plugin, range);
         }
         return plugin;
     }
 
-    private void verifyClasspathVersionedPlugin(String name, Class<?> plugin, 
VersionRange range) throws VersionedPluginLoadingException {
+    private void verifyClasspathVersionedPlugin(String fullName, Class<?> 
plugin, VersionRange range) throws VersionedPluginLoadingException {
         String pluginVersion;
-        SortedMap<PluginDesc<?>, ClassLoader> scannedPlugin = 
pluginLoaders.get(name);
+        SortedMap<PluginDesc<?>, ClassLoader> scannedPlugin = 
pluginLoaders.get(fullName);
 
         if (scannedPlugin == null) {
             throw new VersionedPluginLoadingException(String.format(
                     "Plugin %s is not part of Connect's plugin loading 
mechanism (ClassPath or Plugin Path)",
-                    name
+                    fullName
             ));
         }
 
@@ -255,7 +247,7 @@ public class DelegatingClassLoader extends URLClassLoader {
             throw new VersionedPluginLoadingException(String.format(
                     "Plugin %s has multiple versions specified in class path, "
                             + "only one version is allowed in class path for 
loading a plugin with version range",
-                    name
+                    fullName
             ));
         } else if (classpathPlugins.isEmpty()) {
             throw new VersionedPluginLoadingException("Invalid plugin found in 
classpath");
@@ -264,7 +256,7 @@ public class DelegatingClassLoader extends URLClassLoader {
             if (!range.containsVersion(new 
DefaultArtifactVersion(pluginVersion))) {
                 throw new VersionedPluginLoadingException(String.format(
                         "Plugin %s has version %s which does not match the 
required version range %s",
-                        name,
+                        fullName,
                         pluginVersion,
                         range
                 ), Collections.singletonList(pluginVersion));
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
index 310adb40c7f..8be45e773b3 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
@@ -45,6 +45,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -265,15 +266,17 @@ public class Plugins {
         };
     }
 
-    public String latestVersion(String classOrAlias) {
-        return delegatingLoader.latestVersion(classOrAlias);
+    public String latestVersion(String classOrAlias, PluginType... 
allowedTypes) {
+        return pluginVersion(classOrAlias, null, allowedTypes);
     }
 
-    public String pluginVersion(String classOrAlias, ClassLoader sourceLoader) 
{
-        if (!(sourceLoader instanceof PluginClassLoader)) {
-            return latestVersion(classOrAlias);
+    public String pluginVersion(String classOrAlias, ClassLoader sourceLoader, 
PluginType... allowedTypes) {
+        String location = (sourceLoader instanceof PluginClassLoader) ? 
((PluginClassLoader) sourceLoader).location() : null;
+        PluginDesc<?> desc = delegatingLoader.pluginDesc(classOrAlias, 
location, new HashSet<>(Arrays.asList(allowedTypes)));
+        if (desc != null) {
+            return desc.version();
         }
-        return delegatingLoader.versionInLocation(classOrAlias, 
((PluginClassLoader) sourceLoader).location());
+        return null;
     }
 
     public DelegatingClassLoader delegatingLoader() {
@@ -376,7 +379,7 @@ public class Plugins {
 
     public Object newPlugin(String classOrAlias, VersionRange range, 
ClassLoader sourceLoader) throws ClassNotFoundException {
         if (range == null && sourceLoader instanceof PluginClassLoader) {
-            sourceLoader.loadClass(classOrAlias);
+            return newPlugin(sourceLoader.loadClass(classOrAlias));
         }
         return newPlugin(classOrAlias, range);
     }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginsRecommenders.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginsRecommenders.java
index 8cf209ac08c..76f28659726 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginsRecommenders.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginsRecommenders.java
@@ -19,10 +19,13 @@ package org.apache.kafka.connect.runtime.isolation;
 
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
+import org.apache.kafka.connect.transforms.Transformation;
+import org.apache.kafka.connect.transforms.predicates.Predicate;
 
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -74,6 +77,14 @@ public class PluginsRecommenders {
         return headerConverterPluginVersionRecommender;
     }
 
+    public TransformationPluginRecommender 
transformationPluginRecommender(String classOrAlias) {
+        return new TransformationPluginRecommender(classOrAlias);
+    }
+
+    public PredicatePluginRecommender predicatePluginRecommender(String 
classOrAlias) {
+        return new PredicatePluginRecommender(classOrAlias);
+    }
+
     public class ConnectorPluginVersionRecommender implements 
ConfigDef.Recommender {
 
         @SuppressWarnings({"unchecked", "rawtypes"})
@@ -195,4 +206,60 @@ public class PluginsRecommenders {
                     
.map(PluginDesc::version).distinct().collect(Collectors.toList());
         }
     }
+
+    // Recommender for transformation and predicate plugins
+    public abstract class SMTPluginRecommender<T> implements 
ConfigDef.Recommender {
+
+        protected abstract Function<String, Set<PluginDesc<T>>> plugins();
+
+        protected final String classOrAliasConfig;
+
+        public SMTPluginRecommender(String classOrAliasConfig) {
+            this.classOrAliasConfig = classOrAliasConfig;
+        }
+
+        @Override
+        @SuppressWarnings({"rawtypes"})
+        public List<Object> validValues(String name, Map<String, Object> 
parsedConfig) {
+            if (plugins == null) {
+                return Collections.emptyList();
+            }
+            if (parsedConfig.get(classOrAliasConfig) == null) {
+                return Collections.emptyList();
+            }
+
+            Class classOrAlias = (Class) parsedConfig.get(classOrAliasConfig);
+            return plugins().apply(classOrAlias.getName())
+                    
.stream().map(PluginDesc::version).distinct().collect(Collectors.toList());
+        }
+
+        @Override
+        public boolean visible(String name, Map<String, Object> parsedConfig) {
+            return true;
+        }
+    }
+
+    public class TransformationPluginRecommender extends 
SMTPluginRecommender<Transformation<?>> {
+
+        public TransformationPluginRecommender(String classOrAliasConfig) {
+            super(classOrAliasConfig);
+        }
+
+        @Override
+        protected Function<String, Set<PluginDesc<Transformation<?>>>> 
plugins() {
+            return plugins::transformations;
+        }
+    }
+
+    public class PredicatePluginRecommender extends 
SMTPluginRecommender<Predicate<?>> {
+
+        public PredicatePluginRecommender(String classOrAliasConfig) {
+            super(classOrAliasConfig);
+        }
+
+        @Override
+        protected Function<String, Set<PluginDesc<Predicate<?>>>> plugins() {
+            return plugins::predicates;
+        }
+    }
 }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
index 01a9bcf6373..6a5556fcf50 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
@@ -62,6 +62,7 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
+import org.mockito.Mockito;
 import org.mockito.junit.jupiter.MockitoExtension;
 import org.mockito.junit.jupiter.MockitoSettings;
 import org.mockito.quality.Strictness;
@@ -559,12 +560,14 @@ public class AbstractHerderTest {
     }
 
     @Test
-    public void testConfigValidationTransformsExtendResults() {
+    @SuppressWarnings("rawtypes")
+    public void testConfigValidationTransformsExtendResults() throws 
ClassNotFoundException {
         final Class<? extends Connector> connectorClass = 
SampleSourceConnector.class;
         AbstractHerder herder = createConfigValidationHerder(connectorClass, 
noneConnectorClientConfigOverridePolicy);
 
         // 2 transform aliases defined -> 2 plugin lookups
-        
when(plugins.transformations()).thenReturn(Collections.singleton(transformationPluginDesc()));
+        
Mockito.lenient().when(plugins.transformations()).thenReturn(Collections.singleton(transformationPluginDesc()));
+        
Mockito.lenient().when(plugins.newPlugin(SampleTransformation.class.getName(), 
null, classLoader)).thenReturn(new SampleTransformation());
 
         // Define 2 transformations. One has a class defined and so can get 
embedded configs, the other is missing
         // class info that should generate an error.
@@ -575,6 +578,7 @@ public class AbstractHerderTest {
         config.put(ConnectorConfig.TRANSFORMS_CONFIG + ".xformA.type", 
SampleTransformation.class.getName());
         config.put("required", "value"); // connector required config
         ConfigInfos result = herder.validateConnectorConfig(config, s -> null, 
false);
+
         assertEquals(herder.connectorType(config), ConnectorType.SOURCE);
 
         // We expect there to be errors due to the missing name and .... Note 
that these assertions depend heavily on
@@ -596,7 +600,7 @@ public class AbstractHerderTest {
         assertEquals(1, result.errorCount());
         Map<String, ConfigInfo> infos = result.values().stream()
                 .collect(Collectors.toMap(info -> info.configKey().name(), 
Function.identity()));
-        assertEquals(31, infos.size());
+        assertEquals(33, infos.size());
         // Should get 2 type fields from the transforms, first adds its own 
config since it has a valid class
         assertEquals("transforms.xformA.type",
                 infos.get("transforms.xformA.type").configValue().name());
@@ -611,12 +615,15 @@ public class AbstractHerderTest {
     }
 
     @Test
-    public void testConfigValidationPredicatesExtendResults() {
+    @SuppressWarnings("rawtypes")
+    public void testConfigValidationPredicatesExtendResults() throws 
ClassNotFoundException {
         final Class<? extends Connector> connectorClass = 
SampleSourceConnector.class;
         AbstractHerder herder = createConfigValidationHerder(connectorClass, 
noneConnectorClientConfigOverridePolicy);
 
-        
when(plugins.transformations()).thenReturn(Collections.singleton(transformationPluginDesc()));
-        
when(plugins.predicates()).thenReturn(Collections.singleton(predicatePluginDesc()));
+        
Mockito.lenient().when(plugins.transformations()).thenReturn(Collections.singleton(transformationPluginDesc()));
+        
Mockito.lenient().when(plugins.predicates()).thenReturn(Collections.singleton(predicatePluginDesc()));
+        
Mockito.lenient().when(plugins.newPlugin(SampleTransformation.class.getName(), 
null, classLoader)).thenReturn(new SampleTransformation());
+        
Mockito.lenient().when(plugins.newPlugin(SamplePredicate.class.getName(), null, 
classLoader)).thenReturn(new SamplePredicate());
 
         // Define 2 predicates. One has a class defined and so can get 
embedded configs, the other is missing
         // class info that should generate an error.
@@ -653,7 +660,7 @@ public class AbstractHerderTest {
         assertEquals(1, result.errorCount());
         Map<String, ConfigInfo> infos = result.values().stream()
                 .collect(Collectors.toMap(info -> info.configKey().name(), 
Function.identity()));
-        assertEquals(33, infos.size());
+        assertEquals(36, infos.size());
         // Should get 2 type fields from the transforms, first adds its own 
config since it has a valid class
         assertEquals("transforms.xformA.type", 
infos.get("transforms.xformA.type").configValue().name());
         
assertTrue(infos.get("transforms.xformA.type").configValue().errors().isEmpty());
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
index 6092f8ca7bd..a5af1d13469 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
@@ -41,6 +41,8 @@ import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class ConnectorConfigTest<R extends ConnectRecord<R>> {
 
@@ -455,13 +457,19 @@ public class ConnectorConfigTest<R extends 
ConnectRecord<R>> {
     }
 
     @Test
-    public void testEnrichedConfigDef() {
+    @SuppressWarnings("rawtypes")
+    public void testEnrichedConfigDef() throws ClassNotFoundException {
         String alias = "hdt";
         String prefix = ConnectorConfig.TRANSFORMS_CONFIG + "." + alias + ".";
         Map<String, String> props = new HashMap<>();
         props.put(ConnectorConfig.TRANSFORMS_CONFIG, alias);
+        props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, 
TestConnector.class.getName());
         props.put(prefix + "type", 
HasDuplicateConfigTransformation.class.getName());
-        ConfigDef def = ConnectorConfig.enrich(MOCK_PLUGINS, new ConfigDef(), 
props, false);
+        Plugins mockPlugins = mock(Plugins.class);
+        
when(mockPlugins.newPlugin(HasDuplicateConfigTransformation.class.getName(),
+                null, (ClassLoader) null)).thenReturn(new 
HasDuplicateConfigTransformation());
+        when(mockPlugins.transformations()).thenReturn(Collections.emptySet());
+        ConfigDef def = ConnectorConfig.enrich(mockPlugins, new ConfigDef(), 
props, false);
         assertEnrichedConfigDef(def, prefix, 
HasDuplicateConfigTransformation.MUST_EXIST_KEY, ConfigDef.Type.BOOLEAN);
         assertEnrichedConfigDef(def, prefix, 
TransformationStage.PREDICATE_CONFIG, ConfigDef.Type.STRING);
         assertEnrichedConfigDef(def, prefix, 
TransformationStage.NEGATE_CONFIG, ConfigDef.Type.BOOLEAN);
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TransformationConfigTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TransformationConfigTest.java
index 2a6c0ed2b9d..ef7f17e1d09 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TransformationConfigTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TransformationConfigTest.java
@@ -29,12 +29,16 @@ import org.apache.kafka.connect.transforms.ReplaceField;
 import org.apache.kafka.connect.transforms.SetSchemaMetadata;
 import org.apache.kafka.connect.transforms.TimestampConverter;
 import org.apache.kafka.connect.transforms.TimestampRouter;
+import org.apache.kafka.connect.transforms.Transformation;
 import org.apache.kafka.connect.transforms.ValueToKey;
 
 import org.junit.jupiter.api.Test;
 
 import java.util.HashMap;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 /**
  * Tests that transformations' configs can be composed with ConnectorConfig 
during its construction, ensuring no
  * conflicting fields or other issues.
@@ -42,8 +46,19 @@ import java.util.HashMap;
  * This test appears here simply because it requires both connect-runtime and 
connect-transforms and connect-runtime
  * already depends on connect-transforms.
  */
+@SuppressWarnings("rawtypes")
 public class TransformationConfigTest {
 
+    private Plugins setupMockPlugins(Transformation transformation) {
+        Plugins plugins = mock(Plugins.class);
+        try {
+            when(plugins.newPlugin(transformation.getClass().getName(), null, 
(ClassLoader) null)).thenReturn(transformation);
+        } catch (ClassNotFoundException e) {
+            // Shouldn't happen since we're mocking the plugins
+        }
+        return plugins;
+    }
+
     @Test
     public void testEmbeddedConfigCast() {
         // Validate that we can construct a Connector config containing the 
extended config for the transform
@@ -54,7 +69,7 @@ public class TransformationConfigTest {
         connProps.put("transforms.example.type", Cast.Value.class.getName());
         connProps.put("transforms.example.spec", "int8");
 
-        Plugins plugins = null; // Safe when we're only constructing the config
+        Plugins plugins = setupMockPlugins(new Cast.Value());
         new ConnectorConfig(plugins, connProps);
     }
 
@@ -68,7 +83,7 @@ public class TransformationConfigTest {
         connProps.put("transforms.example.type", 
ExtractField.Value.class.getName());
         connProps.put("transforms.example.field", "field");
 
-        Plugins plugins = null; // Safe when we're only constructing the config
+        Plugins plugins = setupMockPlugins(new ExtractField.Value());
         new ConnectorConfig(plugins, connProps);
     }
 
@@ -81,7 +96,7 @@ public class TransformationConfigTest {
         connProps.put("transforms", "example");
         connProps.put("transforms.example.type", 
Flatten.Value.class.getName());
 
-        Plugins plugins = null; // Safe when we're only constructing the config
+        Plugins plugins = setupMockPlugins(new Flatten.Value());
         new ConnectorConfig(plugins, connProps);
     }
 
@@ -95,7 +110,7 @@ public class TransformationConfigTest {
         connProps.put("transforms.example.type", 
HoistField.Value.class.getName());
         connProps.put("transforms.example.field", "field");
 
-        Plugins plugins = null; // Safe when we're only constructing the config
+        Plugins plugins = setupMockPlugins(new HoistField.Value());
         new ConnectorConfig(plugins, connProps);
     }
 
@@ -108,7 +123,7 @@ public class TransformationConfigTest {
         connProps.put("transforms", "example");
         connProps.put("transforms.example.type", 
InsertField.Value.class.getName());
 
-        Plugins plugins = null; // Safe when we're only constructing the config
+        Plugins plugins = setupMockPlugins(new InsertField.Value());
         new ConnectorConfig(plugins, connProps);
     }
 
@@ -123,7 +138,7 @@ public class TransformationConfigTest {
         connProps.put("transforms.example.fields", "field");
         connProps.put("transforms.example.replacement", "nothing");
 
-        Plugins plugins = null; // Safe when we're only constructing the config
+        Plugins plugins = setupMockPlugins(new MaskField.Value());
         new ConnectorConfig(plugins, connProps);
     }
 
@@ -138,7 +153,7 @@ public class TransformationConfigTest {
         connProps.put("transforms.example.regex", "(.*)");
         connProps.put("transforms.example.replacement", "prefix-$1");
 
-        Plugins plugins = null; // Safe when we're only constructing the config
+        Plugins plugins = setupMockPlugins(new RegexRouter());
         new ConnectorConfig(plugins, connProps);
     }
 
@@ -151,7 +166,7 @@ public class TransformationConfigTest {
         connProps.put("transforms", "example");
         connProps.put("transforms.example.type", 
ReplaceField.Value.class.getName());
 
-        Plugins plugins = null; // Safe when we're only constructing the config
+        Plugins plugins = setupMockPlugins(new ReplaceField.Value());
         new ConnectorConfig(plugins, connProps);
     }
 
@@ -164,7 +179,7 @@ public class TransformationConfigTest {
         connProps.put("transforms", "example");
         connProps.put("transforms.example.type", 
SetSchemaMetadata.Value.class.getName());
 
-        Plugins plugins = null; // Safe when we're only constructing the config
+        Plugins plugins = setupMockPlugins(new SetSchemaMetadata.Value());
         new ConnectorConfig(plugins, connProps);
     }
 
@@ -178,7 +193,7 @@ public class TransformationConfigTest {
         connProps.put("transforms.example.type", 
TimestampConverter.Value.class.getName());
         connProps.put("transforms.example.target.type", "unix");
 
-        Plugins plugins = null; // Safe when we're only constructing the config
+        Plugins plugins = setupMockPlugins(new TimestampConverter.Value());
         new ConnectorConfig(plugins, connProps);
     }
 
@@ -191,7 +206,7 @@ public class TransformationConfigTest {
         connProps.put("transforms", "example");
         connProps.put("transforms.example.type", 
TimestampRouter.class.getName());
 
-        Plugins plugins = null; // Safe when we're only constructing the config
+        Plugins plugins = setupMockPlugins(new TimestampRouter());
         new ConnectorConfig(plugins, connProps);
     }
 
@@ -205,7 +220,7 @@ public class TransformationConfigTest {
         connProps.put("transforms.example.type", ValueToKey.class.getName());
         connProps.put("transforms.example.fields", "field");
 
-        Plugins plugins = null; // Safe when we're only constructing the config
+        Plugins plugins = setupMockPlugins(new ValueToKey());
         new ConnectorConfig(plugins, connProps);
     }
 

Reply via email to