This is an automated email from the ASF dual-hosted git repository.

gharris pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new af0054b5020 KAFKA-18182: KIP-891 Add VersionRange to Plugins and 
DelegatingClassLoader APIs (#16984)
af0054b5020 is described below

commit af0054b50204c17be0cc84b8a3ef4575324bcb79
Author: snehashisp <[email protected]>
AuthorDate: Sat Dec 7 22:50:02 2024 +0530

    KAFKA-18182: KIP-891 Add VersionRange to Plugins and DelegatingClassLoader 
APIs (#16984)
    
    Reviewers: Greg Harris <[email protected]>
---
 build.gradle                                       |   1 +
 .../runtime/isolation/DelegatingClassLoader.java   | 163 +++++++++--
 .../connect/runtime/isolation/PluginDesc.java      |   5 +
 .../connect/runtime/isolation/PluginUtils.java     |  22 +-
 .../kafka/connect/runtime/isolation/Plugins.java   | 298 ++++++++++++++-------
 .../isolation/VersionedPluginLoadingException.java |  40 +++
 .../runtime/isolation/SynchronizationTest.java     |   5 +-
 7 files changed, 416 insertions(+), 118 deletions(-)

diff --git a/build.gradle b/build.gradle
index 0dadfa4beac..5aeec1b5033 100644
--- a/build.gradle
+++ b/build.gradle
@@ -3518,6 +3518,7 @@ project(':connect:runtime') {
 
     implementation libs.slf4jApi
     implementation libs.reload4j
+    implementation libs.slf4jReload4j
     implementation libs.jose4j                    // for SASL/OAUTHBEARER JWT 
validation
     implementation libs.jacksonAnnotations
     implementation libs.jacksonJaxrsJsonProvider
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
index fdbadef7b69..2ca7979a526 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
@@ -16,17 +16,23 @@
  */
 package org.apache.kafka.connect.runtime.isolation;
 
+import org.apache.maven.artifact.versioning.ArtifactVersion;
+import org.apache.maven.artifact.versioning.DefaultArtifactVersion;
+import org.apache.maven.artifact.versioning.VersionRange;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.net.URL;
 import java.net.URLClassLoader;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.stream.Collectors;
 
 /**
  * A custom classloader dedicated to loading Connect plugin classes in 
classloading isolation.
@@ -69,36 +75,108 @@ public class DelegatingClassLoader extends URLClassLoader {
 
     /**
      * Retrieve the PluginClassLoader associated with a plugin class
+     *
      * @param name The fully qualified class name of the plugin
      * @return the PluginClassLoader that should be used to load this, or null 
if the plugin is not isolated.
      */
     // VisibleForTesting
-    PluginClassLoader pluginClassLoader(String name) {
+    PluginClassLoader pluginClassLoader(String name, VersionRange range) {
         if (!PluginUtils.shouldLoadInIsolation(name)) {
             return null;
         }
+
         SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(name);
         if (inner == null) {
             return null;
         }
-        ClassLoader pluginLoader = inner.get(inner.lastKey());
+
+
+        ClassLoader pluginLoader = findPluginLoader(inner, name, range);
         return pluginLoader instanceof PluginClassLoader
-               ? (PluginClassLoader) pluginLoader
-               : null;
+            ? (PluginClassLoader) pluginLoader
+            : null;
     }
 
-    ClassLoader connectorLoader(String connectorClassOrAlias) {
-        String fullName = aliases.getOrDefault(connectorClassOrAlias, 
connectorClassOrAlias);
-        ClassLoader classLoader = pluginClassLoader(fullName);
-        if (classLoader == null) classLoader = this;
+    PluginClassLoader pluginClassLoader(String name) {
+        return pluginClassLoader(name, null);
+    }
+
+    ClassLoader loader(String classOrAlias, VersionRange range) {
+        String fullName = aliases.getOrDefault(classOrAlias, classOrAlias);
+        ClassLoader classLoader = pluginClassLoader(fullName, range);
+        if (classLoader == null) {
+            classLoader = this;
+        }
         log.debug(
-            "Getting plugin class loader: '{}' for connector: {}",
-            classLoader,
-            connectorClassOrAlias
+                "Got plugin class loader: '{}' for connector: {}",
+                classLoader,
+                classOrAlias
         );
         return classLoader;
     }
 
+    ClassLoader loader(String classOrAlias) {
+        return loader(classOrAlias, null);
+    }
+
+    ClassLoader connectorLoader(String connectorClassOrAlias) {
+        return loader(connectorClassOrAlias);
+    }
+
+    String resolveFullClassName(String classOrAlias) {
+        return aliases.getOrDefault(classOrAlias, classOrAlias);
+    }
+
+    String latestVersion(String classOrAlias) {
+        if (classOrAlias == null) {
+            return null;
+        }
+        String fullName = aliases.getOrDefault(classOrAlias, classOrAlias);
+        SortedMap<PluginDesc<?>, ClassLoader> inner = 
pluginLoaders.get(fullName);
+        if (inner == null) {
+            return null;
+        }
+        return inner.lastKey().version();
+    }
+
+    private ClassLoader findPluginLoader(
+        SortedMap<PluginDesc<?>, ClassLoader> loaders,
+        String pluginName,
+        VersionRange range
+    ) {
+
+        if (range != null) {
+
+            if (null != range.getRecommendedVersion()) {
+                throw new VersionedPluginLoadingException(String.format("A 
soft version range is not supported for plugin loading, "
+                        + "this is an internal error as connect should 
automatically convert soft ranges to hard ranges. "
+                        + "Provided soft version: %s ", range));
+            }
+
+            ArtifactVersion version = null;
+            ClassLoader loader = null;
+            for (Map.Entry<PluginDesc<?>, ClassLoader> entry : 
loaders.entrySet()) {
+                // the entries should be in sorted order of versions so this 
should end up picking the latest version which matches the range
+                if (range.containsVersion(entry.getKey().encodedVersion())) {
+                    loader = entry.getValue();
+                }
+            }
+
+            if (loader == null) {
+                List<String> availableVersions = 
loaders.keySet().stream().map(PluginDesc::version).collect(Collectors.toList());
+                throw new VersionedPluginLoadingException(String.format(
+                        "Plugin %s not found that matches the version range 
%s, available versions: %s",
+                        pluginName,
+                        range,
+                        availableVersions
+                ), availableVersions);
+            }
+            return loader;
+        }
+
+        return loaders.get(loaders.lastKey());
+    }
+
     public void installDiscoveredPlugins(PluginScanResult scanResult) {
         pluginLoaders.putAll(computePluginLoaders(scanResult));
         for (String pluginClassName : pluginLoaders.keySet()) {
@@ -112,21 +190,72 @@ public class DelegatingClassLoader extends URLClassLoader 
{
 
     @Override
     protected Class<?> loadClass(String name, boolean resolve) throws 
ClassNotFoundException {
+        return loadVersionedPluginClass(name, null, resolve);
+    }
+
+    protected Class<?> loadVersionedPluginClass(
+        String name,
+        VersionRange range,
+        boolean resolve
+    ) throws VersionedPluginLoadingException, ClassNotFoundException {
+
         String fullName = aliases.getOrDefault(name, name);
-        PluginClassLoader pluginLoader = pluginClassLoader(fullName);
+        PluginClassLoader pluginLoader = pluginClassLoader(fullName, range);
+        Class<?> plugin;
         if (pluginLoader != null) {
-            log.trace("Retrieving loaded class '{}' from '{}'", fullName, 
pluginLoader);
-            return pluginLoader.loadClass(fullName, resolve);
+            log.trace("Retrieving loaded class '{}' from '{}'", name, 
pluginLoader);
+            plugin = pluginLoader.loadClass(fullName, resolve);
+        } else {
+            plugin = super.loadClass(fullName, resolve);
+            if (range == null) {
+                return plugin;
+            }
+            verifyClasspathVersionedPlugin(name, plugin, range);
+        }
+        return plugin;
+    }
+
+    private void verifyClasspathVersionedPlugin(String name, Class<?> plugin, 
VersionRange range) throws VersionedPluginLoadingException {
+        String pluginVersion;
+        SortedMap<PluginDesc<?>, ClassLoader> scannedPlugin = 
pluginLoaders.get(name);
+
+        if (scannedPlugin == null) {
+            throw new VersionedPluginLoadingException(String.format(
+                    "Plugin %s is not part of Connect's plugin loading 
mechanism (ClassPath or Plugin Path)",
+                    name
+            ));
         }
 
-        return super.loadClass(fullName, resolve);
+        List<PluginDesc<?>> classpathPlugins = scannedPlugin.keySet().stream()
+                .filter(pluginDesc -> 
pluginDesc.location().equals("classpath"))
+                .collect(Collectors.toList());
+
+        if (classpathPlugins.size() > 1) {
+            throw new VersionedPluginLoadingException(String.format(
+                    "Plugin %s has multiple versions specified in class path, "
+                            + "only one version is allowed in class path for 
loading a plugin with version range",
+                    name
+            ));
+        } else if (classpathPlugins.isEmpty()) {
+            throw new VersionedPluginLoadingException("Invalid plugin found in 
classpath");
+        } else {
+            pluginVersion = classpathPlugins.get(0).version();
+            if (!range.containsVersion(new 
DefaultArtifactVersion(pluginVersion))) {
+                throw new VersionedPluginLoadingException(String.format(
+                        "Plugin %s has version %s which does not match the 
required version range %s",
+                        name,
+                        pluginVersion,
+                        range
+                ), Collections.singletonList(pluginVersion));
+            }
+        }
     }
 
     private static Map<String, SortedMap<PluginDesc<?>, ClassLoader>> 
computePluginLoaders(PluginScanResult plugins) {
         Map<String, SortedMap<PluginDesc<?>, ClassLoader>> pluginLoaders = new 
HashMap<>();
         plugins.forEach(pluginDesc ->
-                pluginLoaders.computeIfAbsent(pluginDesc.className(), k -> new 
TreeMap<>())
-                        .put(pluginDesc, pluginDesc.loader()));
+            pluginLoaders.computeIfAbsent(pluginDesc.className(), k -> new 
TreeMap<>())
+                .put(pluginDesc, pluginDesc.loader()));
         return pluginLoaders;
     }
 }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java
index a58aef7ceca..ff575e8edf8 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.connect.runtime.isolation;
 
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
 import org.apache.maven.artifact.versioning.DefaultArtifactVersion;
@@ -59,6 +60,10 @@ public class PluginDesc<T> implements 
Comparable<PluginDesc<?>> {
                 ", location='" + location + '\'' +
                 '}';
     }
+    @JsonIgnore
+    DefaultArtifactVersion encodedVersion() {
+        return encodedVersion;
+    }
 
     public Class<? extends T> pluginClass() {
         return klass;
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
index 932e87395f7..56567b3bee7 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.connect.runtime.isolation;
 
+import 
org.apache.maven.artifact.versioning.InvalidVersionSpecificationException;
+import org.apache.maven.artifact.versioning.VersionRange;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -469,7 +471,7 @@ public class PluginUtils {
         }
         return distinctUrls(urls);
     }
-    
+
     private static Collection<URL> forClassLoader(ClassLoader classLoader) {
         final Collection<URL> result = new ArrayList<>();
         while (classLoader != null) {
@@ -483,7 +485,7 @@ public class PluginUtils {
         }
         return distinctUrls(result);
     }
-    
+
     private static Collection<URL> distinctUrls(Collection<URL> urls) {
         Map<String, URL> distinct = new HashMap<>(urls.size());
         for (URL url : urls) {
@@ -491,4 +493,20 @@ public class PluginUtils {
         }
         return distinct.values();
     }
+    public static VersionRange connectorVersionRequirement(String version) 
throws InvalidVersionSpecificationException {
+        if (version == null || version.equals("latest")) {
+            return null;
+        }
+        version = version.trim();
+
+        // check first if the given version is valid
+        VersionRange.createFromVersionSpec(version);
+
+        // now if the version is not enclosed we consider it as a hard 
requirement and enclose it in []
+        if (!version.startsWith("[") && !version.startsWith("(")) {
+            version = "[" + version + "]";
+        }
+        return VersionRange.createFromVersionSpec(version);
+    }
+
 }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
index 816f870157e..28c1f80c618 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
@@ -35,6 +35,8 @@ import org.apache.kafka.connect.storage.HeaderConverter;
 import org.apache.kafka.connect.transforms.Transformation;
 import org.apache.kafka.connect.transforms.predicates.Predicate;
 
+import 
org.apache.maven.artifact.versioning.InvalidVersionSpecificationException;
+import org.apache.maven.artifact.versioning.VersionRange;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,6 +50,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 public class Plugins {
@@ -168,11 +171,20 @@ public class Plugins {
             String classOrAlias,
             Class<U> pluginClass
     ) throws ClassNotFoundException {
-        Class<?> klass = loader.loadClass(classOrAlias, false);
+        return pluginClass(loader, classOrAlias, pluginClass, null);
+    }
+
+    @SuppressWarnings("unchecked")
+    protected static <U> Class<? extends U> pluginClass(
+            DelegatingClassLoader loader,
+            String classOrAlias,
+            Class<U> pluginClass,
+            VersionRange range
+    ) throws VersionedPluginLoadingException, ClassNotFoundException {
+        Class<?> klass = loader.loadVersionedPluginClass(classOrAlias, range, 
false);
         if (pluginClass.isAssignableFrom(klass)) {
             return (Class<? extends U>) klass;
         }
-
         throw new ClassNotFoundException(
                 "Requested class: "
                         + classOrAlias
@@ -184,6 +196,10 @@ public class Plugins {
         return pluginClass(delegatingLoader, classOrAlias, Object.class);
     }
 
+    public Class<?> pluginClass(String classOrAlias, VersionRange range) 
throws VersionedPluginLoadingException, ClassNotFoundException {
+        return pluginClass(delegatingLoader, classOrAlias, Object.class, 
range);
+    }
+
     public static ClassLoader compareAndSwapLoaders(ClassLoader loader) {
         ClassLoader current = Thread.currentThread().getContextClassLoader();
         if (!current.equals(loader)) {
@@ -240,14 +256,37 @@ public class Plugins {
         };
     }
 
+    public Function<ClassLoader, LoaderSwap> safeLoaderSwapper() {
+        return loader -> {
+            if (!(loader instanceof PluginClassLoader)) {
+                loader = delegatingLoader;
+            }
+            return withClassLoader(loader);
+        };
+    }
+
+    public String latestVersion(String classOrAlias) {
+        return delegatingLoader.latestVersion(classOrAlias);
+    }
+
     public DelegatingClassLoader delegatingLoader() {
         return delegatingLoader;
     }
 
+    // kept for compatibility
     public ClassLoader connectorLoader(String connectorClassOrAlias) {
-        return delegatingLoader.connectorLoader(connectorClassOrAlias);
+        return delegatingLoader.loader(connectorClassOrAlias);
+    }
+
+    public ClassLoader pluginLoader(String classOrAlias, VersionRange range) 
throws ClassNotFoundException, VersionedPluginLoadingException {
+        return delegatingLoader.loader(classOrAlias, range);
     }
 
+    public ClassLoader pluginLoader(String classOrAlias) {
+        return delegatingLoader.loader(classOrAlias);
+    }
+
+
     @SuppressWarnings({"unchecked", "rawtypes"})
     public Set<PluginDesc<Connector>> connectors() {
         Set<PluginDesc<Connector>> connectors = new TreeSet<>((Set) 
sinkConnectors());
@@ -259,48 +298,89 @@ public class Plugins {
         return scanResult.sinkConnectors();
     }
 
+    public Set<PluginDesc<SinkConnector>> sinkConnectors(String 
connectorClassOrAlias) {
+        return pluginsOfClass(connectorClassOrAlias, 
scanResult.sinkConnectors());
+    }
+
     public Set<PluginDesc<SourceConnector>> sourceConnectors() {
         return scanResult.sourceConnectors();
     }
 
+    public Set<PluginDesc<SourceConnector>> sourceConnectors(String 
connectorClassOrAlias) {
+        return pluginsOfClass(connectorClassOrAlias, 
scanResult.sourceConnectors());
+    }
+
     public Set<PluginDesc<Converter>> converters() {
         return scanResult.converters();
     }
 
+    Set<PluginDesc<Converter>> converters(String converterClassOrAlias) {
+        return pluginsOfClass(converterClassOrAlias, scanResult.converters());
+    }
+
     public Set<PluginDesc<HeaderConverter>> headerConverters() {
         return scanResult.headerConverters();
     }
 
+    Set<PluginDesc<HeaderConverter>> headerConverters(String 
headerConverterClassOrAlias) {
+        return pluginsOfClass(headerConverterClassOrAlias, 
scanResult.headerConverters());
+    }
+
     public Set<PluginDesc<Transformation<?>>> transformations() {
         return scanResult.transformations();
     }
 
+    Set<PluginDesc<Transformation<?>>> transformations(String 
transformationClassOrAlias) {
+        return pluginsOfClass(transformationClassOrAlias, 
scanResult.transformations());
+    }
+
     public Set<PluginDesc<Predicate<?>>> predicates() {
         return scanResult.predicates();
     }
 
+    Set<PluginDesc<Predicate<?>>> predicates(String predicateClassOrAlias) {
+        return pluginsOfClass(predicateClassOrAlias, scanResult.predicates());
+    }
+
     public Set<PluginDesc<ConnectorClientConfigOverridePolicy>> 
connectorClientConfigPolicies() {
         return scanResult.connectorClientConfigPolicies();
     }
 
+    private <T> Set<PluginDesc<T>> pluginsOfClass(String classNameOrAlias, 
Set<PluginDesc<T>> allPluginsOfType) {
+        String className = 
delegatingLoader.resolveFullClassName(classNameOrAlias);
+        Set<PluginDesc<T>> plugins = new TreeSet<>();
+        for (PluginDesc<T> desc : allPluginsOfType) {
+            if (desc.className().equals(className)) {
+                plugins.add(desc);
+            }
+        }
+        return plugins;
+    }
+
     public Object newPlugin(String classOrAlias) throws ClassNotFoundException 
{
         Class<?> klass = pluginClass(delegatingLoader, classOrAlias, 
Object.class);
         return newPlugin(klass);
     }
 
+    public Object newPlugin(String classOrAlias, VersionRange range) throws 
VersionedPluginLoadingException, ClassNotFoundException {
+        Class<?> klass = pluginClass(delegatingLoader, classOrAlias, 
Object.class, range);
+        return newPlugin(klass);
+    }
+
     public Connector newConnector(String connectorClassOrAlias) {
         Class<? extends Connector> klass = 
connectorClass(connectorClassOrAlias);
         return newPlugin(klass);
     }
 
-    public Class<? extends Connector> connectorClass(String 
connectorClassOrAlias) {
+    public Connector newConnector(String connectorClassOrAlias, VersionRange 
range) throws VersionedPluginLoadingException {
+        Class<? extends Connector> klass = 
connectorClass(connectorClassOrAlias, range);
+        return newPlugin(klass);
+    }
+
+    public Class<? extends Connector> connectorClass(String 
connectorClassOrAlias, VersionRange range) throws 
VersionedPluginLoadingException {
         Class<? extends Connector> klass;
         try {
-            klass = pluginClass(
-                    delegatingLoader,
-                    connectorClassOrAlias,
-                    Connector.class
-            );
+            klass = pluginClass(delegatingLoader, connectorClassOrAlias, 
Connector.class, range);
         } catch (ClassNotFoundException e) {
             List<PluginDesc<? extends Connector>> matches = new ArrayList<>();
             Set<PluginDesc<Connector>> connectors = connectors();
@@ -336,6 +416,10 @@ public class Plugins {
         return klass;
     }
 
+    public Class<? extends Connector> connectorClass(String 
connectorClassOrAlias) {
+        return connectorClass(connectorClassOrAlias, null);
+    }
+
     public Task newTask(Class<? extends Task> taskClass) {
         return newPlugin(taskClass);
     }
@@ -350,54 +434,49 @@ public class Plugins {
      * @throws ConnectException if the {@link Converter} implementation class 
could not be found
      */
     public Converter newConverter(AbstractConfig config, String 
classPropertyName, ClassLoaderUsage classLoaderUsage) {
+        return newConverter(config, classPropertyName, null, classLoaderUsage);
+    }
+
+    /**
+     * Used to get a versioned converter. If the version is specified, it will 
always use the plugins classloader.
+     *
+     * @param config              the configuration containing the {@link 
Converter}'s configuration; may not be null
+     * @param classPropertyName   the name of the property that contains the 
name of the {@link Converter} class; may not be null
+     * @param versionPropertyName the name of the property that contains the 
version of the {@link Converter} class; may not be null
+     * @return the instantiated and configured {@link Converter}; null if the 
configuration did not define the specified property
+     * @throws ConnectException if the {@link Converter} implementation class 
could not be found,
+     * @throws VersionedPluginLoadingException if the version requested is not 
found
+     */
+    public Converter newConverter(AbstractConfig config, String 
classPropertyName, String versionPropertyName) {
+        ClassLoaderUsage classLoader = config.getString(versionPropertyName) 
== null ? ClassLoaderUsage.CURRENT_CLASSLOADER : ClassLoaderUsage.PLUGINS;
+        return newConverter(config, classPropertyName, versionPropertyName, 
classLoader);
+    }
+
+    private Converter newConverter(AbstractConfig config, String 
classPropertyName, String versionPropertyName, ClassLoaderUsage 
classLoaderUsage) {
         if (!config.originals().containsKey(classPropertyName)) {
             // This configuration does not define the converter via the 
specified property name
             return null;
         }
-
-        Class<? extends Converter> klass = null;
-        switch (classLoaderUsage) {
-            case CURRENT_CLASSLOADER:
-                // Attempt to load first with the current classloader, and 
plugins as a fallback.
-                // Note: we can't use config.getConfiguredInstance because 
Converter doesn't implement Configurable, and even if it did
-                // we have to remove the property prefixes before calling 
config(...) and we still always want to call Converter.config.
-                klass = pluginClassFromConfig(config, classPropertyName, 
Converter.class, scanResult.converters());
-                break;
-            case PLUGINS:
-                // Attempt to load with the plugin class loader, which uses 
the current classloader as a fallback
-                String converterClassOrAlias = 
config.getClass(classPropertyName).getName();
-                try {
-                    klass = pluginClass(delegatingLoader, 
converterClassOrAlias, Converter.class);
-                } catch (ClassNotFoundException e) {
-                    throw new ConnectException(
-                            "Failed to find any class that implements 
Converter and which name matches "
-                            + converterClassOrAlias + ", available converters 
are: "
-                            + pluginNames(scanResult.converters())
-                    );
-                }
-                break;
-        }
-        if (klass == null) {
-            throw new ConnectException("Unable to initialize the Converter 
specified in '" + classPropertyName + "'");
-        }
-
         // Determine whether this is a key or value converter based upon the 
supplied property name ...
         final boolean isKeyConverter = 
WorkerConfig.KEY_CONVERTER_CLASS_CONFIG.equals(classPropertyName);
 
         // Configure the Converter using only the old configuration mechanism 
...
         String configPrefix = classPropertyName + ".";
         Map<String, Object> converterConfig = 
config.originalsWithPrefix(configPrefix);
+
         log.debug("Configuring the {} converter with configuration keys:{}{}",
-                  isKeyConverter ? "key" : "value", System.lineSeparator(), 
converterConfig.keySet());
+                isKeyConverter ? "key" : "value", System.lineSeparator(), 
converterConfig.keySet());
 
-        Converter plugin;
-        try (LoaderSwap loaderSwap = withClassLoader(klass.getClassLoader())) {
-            plugin = newPlugin(klass);
+        Converter plugin = newVersionedPlugin(config, classPropertyName, 
versionPropertyName,
+                Converter.class, classLoaderUsage, scanResult.converters());
+        try (LoaderSwap loaderSwap = 
safeLoaderSwapper().apply(plugin.getClass().getClassLoader())) {
             plugin.configure(converterConfig, isKeyConverter);
         }
         return plugin;
     }
 
+
+
     /**
      * Load an internal converter, used by the worker for (de)serializing data 
in internal topics.
      *
@@ -427,99 +506,124 @@ public class Plugins {
      * If the given configuration defines a {@link HeaderConverter} using the 
named configuration property, return a new configured
      * instance.
      *
-     * @param config             the configuration containing the {@link 
Converter}'s configuration; may not be null
-     * @param classPropertyName  the name of the property that contains the 
name of the {@link Converter} class; may not be null
-     * @param classLoaderUsage   which classloader should be used
+     * @param config             the configuration containing the {@link 
HeaderConverter}'s configuration; may not be null
+     * @param classPropertyName  the name of the property that contains the 
name of the {@link HeaderConverter} class; may not be null
+     * @param classLoaderUsage   the name of the property that contains the 
version of the {@link HeaderConverter} class; may not be null
      * @return the instantiated and configured {@link HeaderConverter}; null 
if the configuration did not define the specified property
      * @throws ConnectException if the {@link HeaderConverter} implementation 
class could not be found
      */
     public HeaderConverter newHeaderConverter(AbstractConfig config, String 
classPropertyName, ClassLoaderUsage classLoaderUsage) {
-        Class<? extends HeaderConverter> klass = null;
-        switch (classLoaderUsage) {
-            case CURRENT_CLASSLOADER:
-                if (!config.originals().containsKey(classPropertyName)) {
-                    // This connector configuration does not define the header 
converter via the specified property name
-                    return null;
-                }
-                // Attempt to load first with the current classloader, and 
plugins as a fallback.
-                // Note: we can't use config.getConfiguredInstance because we 
have to remove the property prefixes
-                // before calling config(...)
-                klass = pluginClassFromConfig(config, classPropertyName, 
HeaderConverter.class, scanResult.headerConverters());
-                break;
-            case PLUGINS:
-                // Attempt to load with the plugin class loader, which uses 
the current classloader as a fallback.
-                // Note that there will always be at least a default header 
converter for the worker
-                String converterClassOrAlias = 
config.getClass(classPropertyName).getName();
-                try {
-                    klass = pluginClass(
-                            delegatingLoader,
-                            converterClassOrAlias,
-                            HeaderConverter.class
-                    );
-                } catch (ClassNotFoundException e) {
-                    throw new ConnectException(
-                            "Failed to find any class that implements 
HeaderConverter and which name matches "
-                                    + converterClassOrAlias
-                                    + ", available header converters are: "
-                                    + 
pluginNames(scanResult.headerConverters())
-                    );
-                }
-        }
-        if (klass == null) {
-            throw new ConnectException("Unable to initialize the 
HeaderConverter specified in '" + classPropertyName + "'");
+        return newHeaderConverter(config, classPropertyName, null, 
classLoaderUsage);
+    }
+
+    /**
+     * If the given configuration defines a {@link HeaderConverter} using the 
named configuration property, return a new configured
+     * instance. If the version is specified, it will always use the plugins 
classloader.
+     *
+     * @param config                the configuration containing the {@link 
HeaderConverter}'s configuration; may not be null
+     * @param classPropertyName     the name of the property that contains the 
name of the {@link HeaderConverter} class; may not be null
+     * @param versionPropertyName   the config for the version for the header 
converter
+     * @return the instantiated and configured {@link HeaderConverter}; null 
if the configuration did not define the specified property
+     * @throws ConnectException if the {@link HeaderConverter} implementation 
class could not be found
+     */
+    public HeaderConverter newHeaderConverter(AbstractConfig config, String 
classPropertyName, String versionPropertyName) {
+        ClassLoaderUsage classLoader = config.getString(versionPropertyName) 
== null ? ClassLoaderUsage.CURRENT_CLASSLOADER : ClassLoaderUsage.PLUGINS;
+        return newHeaderConverter(config, classPropertyName, 
versionPropertyName, classLoader);
+    }
+
+    private HeaderConverter newHeaderConverter(AbstractConfig config, String 
classPropertyName, String versionPropertyName, ClassLoaderUsage 
classLoaderUsage) {
+        if (!config.originals().containsKey(classPropertyName) && 
classLoaderUsage == ClassLoaderUsage.CURRENT_CLASSLOADER) {
+            // This configuration does not define the Header Converter via the 
specified property name
+            return null;
         }
+        HeaderConverter plugin = newVersionedPlugin(config, classPropertyName, 
versionPropertyName,
+                HeaderConverter.class, classLoaderUsage, 
scanResult.headerConverters());
 
         String configPrefix = classPropertyName + ".";
         Map<String, Object> converterConfig = 
config.originalsWithPrefix(configPrefix);
         converterConfig.put(ConverterConfig.TYPE_CONFIG, 
ConverterType.HEADER.getName());
         log.debug("Configuring the header converter with configuration 
keys:{}{}", System.lineSeparator(), converterConfig.keySet());
 
-        HeaderConverter plugin;
-        try (LoaderSwap loaderSwap = withClassLoader(klass.getClassLoader())) {
-            plugin = newPlugin(klass);
+        try (LoaderSwap loaderSwap = 
safeLoaderSwapper().apply(plugin.getClass().getClassLoader())) {
             plugin.configure(converterConfig);
         }
         return plugin;
     }
 
-    public ConfigProvider newConfigProvider(AbstractConfig config, String 
providerPrefix, ClassLoaderUsage classLoaderUsage) {
-        String classPropertyName = providerPrefix + ".class";
-        Map<String, String> originalConfig = config.originalsStrings();
-        if (!originalConfig.containsKey(classPropertyName)) {
-            // This configuration does not define the config provider via the 
specified property name
-            return null;
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    private <U> U newVersionedPlugin(
+            AbstractConfig config,
+            String classPropertyName,
+            String versionPropertyName,
+            Class basePluginClass,
+            ClassLoaderUsage classLoaderUsage,
+            SortedSet<PluginDesc<U>> availablePlugins
+    ) {
+
+        String version = versionPropertyName == null ? null : 
config.getString(versionPropertyName);
+        VersionRange range = null;
+        if (version != null) {
+            try {
+                range = PluginUtils.connectorVersionRequirement(version);
+            } catch (InvalidVersionSpecificationException e) {
+                throw new ConnectException(String.format("Invalid version 
range for %s: %s", classPropertyName, version), e);
+            }
         }
-        Class<? extends ConfigProvider> klass = null;
+
+        assert range == null || classLoaderUsage == ClassLoaderUsage.PLUGINS;
+
+        Class<? extends U> klass = null;
+        String basePluginClassName = basePluginClass.getSimpleName();
         switch (classLoaderUsage) {
             case CURRENT_CLASSLOADER:
                 // Attempt to load first with the current classloader, and 
plugins as a fallback.
-                klass = pluginClassFromConfig(config, classPropertyName, 
ConfigProvider.class, scanResult.configProviders());
+                klass = pluginClassFromConfig(config, classPropertyName, 
basePluginClass, availablePlugins);
                 break;
             case PLUGINS:
                 // Attempt to load with the plugin class loader, which uses 
the current classloader as a fallback
-                String configProviderClassOrAlias = 
originalConfig.get(classPropertyName);
+
+                // if the config specifies the class name, use it, otherwise 
use the default which we can get from config.getClass
+                String classOrAlias = 
config.originalsStrings().get(classPropertyName);
+                if (classOrAlias == null) {
+                    classOrAlias = 
config.getClass(classPropertyName).getName();
+                }
                 try {
-                    klass = pluginClass(delegatingLoader, 
configProviderClassOrAlias, ConfigProvider.class);
+                    klass = pluginClass(delegatingLoader, classOrAlias, 
basePluginClass, range);
                 } catch (ClassNotFoundException e) {
                     throw new ConnectException(
-                            "Failed to find any class that implements 
ConfigProvider and which name matches "
-                                    + configProviderClassOrAlias + ", 
available ConfigProviders are: "
-                                    + pluginNames(scanResult.configProviders())
+                            "Failed to find any class that implements " + 
basePluginClassName + " and which name matches "
+                                    + classOrAlias + ", available plugins are: 
"
+                                    + pluginNames(availablePlugins)
                     );
                 }
                 break;
         }
         if (klass == null) {
-            throw new ConnectException("Unable to initialize the 
ConfigProvider specified in '" + classPropertyName + "'");
+            throw new ConnectException("Unable to initialize the " + 
basePluginClassName
+                    + " specified in " + classPropertyName);
         }
 
+        U plugin;
+        try (LoaderSwap loaderSwap = withClassLoader(klass.getClassLoader())) {
+            plugin = newPlugin(klass);
+        }
+        return plugin;
+    }
+
+    public ConfigProvider newConfigProvider(AbstractConfig config, String 
providerPrefix, ClassLoaderUsage classLoaderUsage) {
+        String classPropertyName = providerPrefix + ".class";
+        Map<String, String> originalConfig = config.originalsStrings();
+        if (!originalConfig.containsKey(classPropertyName)) {
+            // This configuration does not define the config provider via the 
specified property name
+            return null;
+        }
+
+        ConfigProvider plugin = newVersionedPlugin(config, classPropertyName, 
null, ConfigProvider.class, classLoaderUsage, scanResult.configProviders());
+
         // Configure the ConfigProvider
         String configPrefix = providerPrefix + ".param.";
         Map<String, Object> configProviderConfig = 
config.originalsWithPrefix(configPrefix);
-
-        ConfigProvider plugin;
-        try (LoaderSwap loaderSwap = withClassLoader(klass.getClassLoader())) {
-            plugin = newPlugin(klass);
+        try (LoaderSwap loaderSwap = 
safeLoaderSwapper().apply(plugin.getClass().getClassLoader())) {
             plugin.configure(configProviderConfig);
         }
         return plugin;
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/VersionedPluginLoadingException.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/VersionedPluginLoadingException.java
new file mode 100644
index 00000000000..8fb3042549b
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/VersionedPluginLoadingException.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime.isolation;
+
+import org.apache.kafka.common.config.ConfigException;
+
+import java.util.List;
+
+public class VersionedPluginLoadingException extends ConfigException {
+
+    private List<String> availableVersions = null;
+
+    public VersionedPluginLoadingException(String message) {
+        super(message);
+    }
+
+    public VersionedPluginLoadingException(String message, List<String> 
availableVersions) {
+        super(message);
+        this.availableVersions = availableVersions;
+    }
+
+    public List<String> availableVersions() {
+        return availableVersions;
+    }
+}
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java
index 70b875b21b8..3c990917400 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.storage.Converter;
 
+import org.apache.maven.artifact.versioning.VersionRange;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -190,10 +191,10 @@ public class SynchronizationTest {
         }
 
         @Override
-        public PluginClassLoader pluginClassLoader(String name) {
+        public PluginClassLoader pluginClassLoader(String name, VersionRange 
range) {
             dclBreakpoint.await(name);
             dclBreakpoint.await(name);
-            return super.pluginClassLoader(name);
+            return super.pluginClassLoader(name, range);
         }
     }
 


Reply via email to