This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c8cb85274e9 MINOR: Refactor DelegatingClassLoader to emit immutable 
PluginScanResult (#13771)
c8cb85274e9 is described below

commit c8cb85274e9e7c7c797d2d498b54fa58ec238e0e
Author: Greg Harris <greg.har...@aiven.io>
AuthorDate: Tue Jun 6 06:37:57 2023 -0700

    MINOR: Refactor DelegatingClassLoader to emit immutable PluginScanResult 
(#13771)
    
    Reviewers: Chris Egerton <chr...@aiven.io>
---
 .../runtime/isolation/DelegatingClassLoader.java   | 184 +++++----------------
 .../connect/runtime/isolation/PluginDesc.java      |  11 +-
 .../runtime/isolation/PluginScanResult.java        |  98 +++++++----
 .../connect/runtime/isolation/PluginUtils.java     |  47 +++---
 .../kafka/connect/runtime/isolation/Plugins.java   |  39 +++--
 .../connect/runtime/isolation/PluginDescTest.java  |  15 ++
 .../connect/runtime/isolation/PluginUtilsTest.java | 182 ++++++++++++++++++++
 7 files changed, 357 insertions(+), 219 deletions(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
index cedabe5a828..cf0a5a1ba7f 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
@@ -18,7 +18,6 @@ package org.apache.kafka.connect.runtime.isolation;
 
 import org.apache.kafka.common.config.provider.ConfigProvider;
 import org.apache.kafka.connect.components.Versioned;
-import org.apache.kafka.connect.connector.Connector;
 import 
org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
 import org.apache.kafka.connect.rest.ConnectRestExtension;
 import org.apache.kafka.connect.sink.SinkConnector;
@@ -48,10 +47,11 @@ import java.security.PrivilegedAction;
 import java.sql.Driver;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.ServiceConfigurationError;
 import java.util.ServiceLoader;
 import java.util.Set;
@@ -79,15 +79,6 @@ public class DelegatingClassLoader extends URLClassLoader {
 
     private final ConcurrentMap<String, SortedMap<PluginDesc<?>, ClassLoader>> 
pluginLoaders;
     private final ConcurrentMap<String, String> aliases;
-    private final SortedSet<PluginDesc<SinkConnector>> sinkConnectors;
-    private final SortedSet<PluginDesc<SourceConnector>> sourceConnectors;
-    private final SortedSet<PluginDesc<Converter>> converters;
-    private final SortedSet<PluginDesc<HeaderConverter>> headerConverters;
-    private final SortedSet<PluginDesc<Transformation<?>>> transformations;
-    private final SortedSet<PluginDesc<Predicate<?>>> predicates;
-    private final SortedSet<PluginDesc<ConfigProvider>> configProviders;
-    private final SortedSet<PluginDesc<ConnectRestExtension>> restExtensions;
-    private final SortedSet<PluginDesc<ConnectorClientConfigOverridePolicy>> 
connectorClientConfigPolicies;
     private final List<Path> pluginLocations;
 
     // Although this classloader does not load classes directly but rather 
delegates loading to a
@@ -103,15 +94,6 @@ public class DelegatingClassLoader extends URLClassLoader {
         this.pluginLocations = pluginLocations;
         this.pluginLoaders = new ConcurrentHashMap<>();
         this.aliases = new ConcurrentHashMap<>();
-        this.sinkConnectors = new TreeSet<>();
-        this.sourceConnectors = new TreeSet<>();
-        this.converters = new TreeSet<>();
-        this.headerConverters = new TreeSet<>();
-        this.transformations = new TreeSet<>();
-        this.predicates = new TreeSet<>();
-        this.configProviders = new TreeSet<>();
-        this.restExtensions = new TreeSet<>();
-        this.connectorClientConfigPolicies = new TreeSet<>();
     }
 
     public DelegatingClassLoader(List<Path> pluginLocations) {
@@ -122,49 +104,6 @@ public class DelegatingClassLoader extends URLClassLoader {
         this(pluginLocations, DelegatingClassLoader.class.getClassLoader());
     }
 
-    @SuppressWarnings({"unchecked", "rawtypes"})
-    public Set<PluginDesc<Connector>> connectors() {
-        Set<PluginDesc<Connector>> connectors = new TreeSet<>((Set) 
sinkConnectors);
-        connectors.addAll((Set) sourceConnectors);
-        return connectors;
-    }
-
-    public Set<PluginDesc<SinkConnector>> sinkConnectors() {
-        return sinkConnectors;
-    }
-
-    public Set<PluginDesc<SourceConnector>> sourceConnectors() {
-        return sourceConnectors;
-    }
-
-    public Set<PluginDesc<Converter>> converters() {
-        return converters;
-    }
-
-    public Set<PluginDesc<HeaderConverter>> headerConverters() {
-        return headerConverters;
-    }
-
-    public Set<PluginDesc<Transformation<?>>> transformations() {
-        return transformations;
-    }
-
-    public Set<PluginDesc<Predicate<?>>> predicates() {
-        return predicates;
-    }
-
-    public Set<PluginDesc<ConfigProvider>> configProviders() {
-        return configProviders;
-    }
-
-    public Set<PluginDesc<ConnectRestExtension>> restExtensions() {
-        return restExtensions;
-    }
-
-    public Set<PluginDesc<ConnectorClientConfigOverridePolicy>> 
connectorClientConfigPolicies() {
-        return connectorClientConfigPolicies;
-    }
-
     /**
      * Retrieve the PluginClassLoader associated with a plugin class
      * @param name The fully qualified class name of the plugin
@@ -208,24 +147,11 @@ public class DelegatingClassLoader extends URLClassLoader 
{
         );
     }
 
-    private <T> void addPlugins(Collection<PluginDesc<T>> plugins, ClassLoader 
loader) {
-        for (PluginDesc<T> plugin : plugins) {
-            String pluginClassName = plugin.className();
-            SortedMap<PluginDesc<?>, ClassLoader> inner = 
pluginLoaders.get(pluginClassName);
-            if (inner == null) {
-                inner = new TreeMap<>();
-                pluginLoaders.put(pluginClassName, inner);
-                // TODO: once versioning is enabled this line should be moved 
outside this if branch
-                log.info("Added plugin '{}'", pluginClassName);
-            }
-            inner.put(plugin, loader);
-        }
-    }
-
-    protected void initLoaders() {
+    public PluginScanResult initLoaders() {
+        List<PluginScanResult> results = new ArrayList<>();
         for (Path pluginLocation : pluginLocations) {
             try {
-                registerPlugin(pluginLocation);
+                results.add(registerPlugin(pluginLocation));
             } catch (InvalidPathException | MalformedURLException e) {
                 log.error("Invalid path in plugin path: {}. Ignoring.", 
pluginLocation, e);
             } catch (IOException e) {
@@ -233,14 +159,16 @@ public class DelegatingClassLoader extends URLClassLoader 
{
             }
         }
         // Finally add parent/system loader.
-        scanUrlsAndAddPlugins(
+        results.add(scanUrlsAndAddPlugins(
                 getParent(),
                 ClasspathHelper.forJavaClassPath().toArray(new URL[0])
-        );
-        addAllAliases();
+        ));
+        PluginScanResult scanResult = new PluginScanResult(results);
+        installDiscoveredPlugins(scanResult);
+        return scanResult;
     }
 
-    private void registerPlugin(Path pluginLocation)
+    private PluginScanResult registerPlugin(Path pluginLocation)
         throws IOException {
         log.info("Loading plugin from: {}", pluginLocation);
         List<URL> pluginUrls = new ArrayList<>();
@@ -256,37 +184,17 @@ public class DelegatingClassLoader extends URLClassLoader 
{
                 urls,
                 this
         );
-        scanUrlsAndAddPlugins(loader, urls);
+        return scanUrlsAndAddPlugins(loader, urls);
     }
 
-    private void scanUrlsAndAddPlugins(
+    private PluginScanResult scanUrlsAndAddPlugins(
             ClassLoader loader,
             URL[] urls
     ) {
         PluginScanResult plugins = scanPluginPath(loader, urls);
         log.info("Registered loader: {}", loader);
-        if (!plugins.isEmpty()) {
-            addPlugins(plugins.sinkConnectors(), loader);
-            sinkConnectors.addAll(plugins.sinkConnectors());
-            addPlugins(plugins.sourceConnectors(), loader);
-            sourceConnectors.addAll(plugins.sourceConnectors());
-            addPlugins(plugins.converters(), loader);
-            converters.addAll(plugins.converters());
-            addPlugins(plugins.headerConverters(), loader);
-            headerConverters.addAll(plugins.headerConverters());
-            addPlugins(plugins.transformations(), loader);
-            transformations.addAll(plugins.transformations());
-            addPlugins(plugins.predicates(), loader);
-            predicates.addAll(plugins.predicates());
-            addPlugins(plugins.configProviders(), loader);
-            configProviders.addAll(plugins.configProviders());
-            addPlugins(plugins.restExtensions(), loader);
-            restExtensions.addAll(plugins.restExtensions());
-            addPlugins(plugins.connectorClientConfigPolicies(), loader);
-            
connectorClientConfigPolicies.addAll(plugins.connectorClientConfigPolicies());
-        }
-
         loadJdbcDrivers(loader);
+        return plugins;
     }
 
     private void loadJdbcDrivers(final ClassLoader loader) {
@@ -344,16 +252,16 @@ public class DelegatingClassLoader extends URLClassLoader 
{
     }
 
     @SuppressWarnings({"unchecked"})
-    private Collection<PluginDesc<Predicate<?>>> 
getPredicatePluginDesc(ClassLoader loader, Reflections reflections) {
-        return (Collection<PluginDesc<Predicate<?>>>) (Collection<?>) 
getPluginDesc(reflections, Predicate.class, loader);
+    private SortedSet<PluginDesc<Predicate<?>>> 
getPredicatePluginDesc(ClassLoader loader, Reflections reflections) {
+        return (SortedSet<PluginDesc<Predicate<?>>>) (SortedSet<?>) 
getPluginDesc(reflections, Predicate.class, loader);
     }
 
     @SuppressWarnings({"unchecked"})
-    private Collection<PluginDesc<Transformation<?>>> 
getTransformationPluginDesc(ClassLoader loader, Reflections reflections) {
-        return (Collection<PluginDesc<Transformation<?>>>) (Collection<?>) 
getPluginDesc(reflections, Transformation.class, loader);
+    private SortedSet<PluginDesc<Transformation<?>>> 
getTransformationPluginDesc(ClassLoader loader, Reflections reflections) {
+        return (SortedSet<PluginDesc<Transformation<?>>>) (SortedSet<?>) 
getPluginDesc(reflections, Transformation.class, loader);
     }
 
-    private <T> Collection<PluginDesc<T>> getPluginDesc(
+    private <T> SortedSet<PluginDesc<T>> getPluginDesc(
             Reflections reflections,
             Class<T> klass,
             ClassLoader loader
@@ -364,10 +272,10 @@ public class DelegatingClassLoader extends URLClassLoader 
{
         } catch (ReflectionsException e) {
             log.debug("Reflections scanner could not find any classes for 
URLs: " +
                     reflections.getConfiguration().getUrls(), e);
-            return Collections.emptyList();
+            return Collections.emptySortedSet();
         }
 
-        Collection<PluginDesc<T>> result = new ArrayList<>();
+        SortedSet<PluginDesc<T>> result = new TreeSet<>();
         for (Class<? extends T> pluginKlass : plugins) {
             if (!PluginUtils.isConcrete(pluginKlass)) {
                 log.debug("Skipping {} as it is not concrete implementation", 
pluginKlass);
@@ -393,8 +301,8 @@ public class DelegatingClassLoader extends URLClassLoader {
     }
 
     @SuppressWarnings("unchecked")
-    private <T> Collection<PluginDesc<T>> getServiceLoaderPluginDesc(Class<T> 
klass, ClassLoader loader) {
-        Collection<PluginDesc<T>> result = new ArrayList<>();
+    private <T> SortedSet<PluginDesc<T>> getServiceLoaderPluginDesc(Class<T> 
klass, ClassLoader loader) {
+        SortedSet<PluginDesc<T>> result = new TreeSet<>();
         ServiceLoader<T> serviceLoader = ServiceLoader.load(klass, loader);
         for (Iterator<T> iterator = serviceLoader.iterator(); 
iterator.hasNext(); ) {
             try (LoaderSwap loaderSwap = withClassLoader(loader)) {
@@ -461,6 +369,17 @@ public class DelegatingClassLoader extends URLClassLoader {
         }
     }
 
+    private void installDiscoveredPlugins(PluginScanResult scanResult) {
+        pluginLoaders.putAll(computePluginLoaders(scanResult));
+        for (String pluginClassName : pluginLoaders.keySet()) {
+            log.info("Added plugin '{}'", pluginClassName);
+        }
+        aliases.putAll(PluginUtils.computeAliases(scanResult));
+        for (Map.Entry<String, String> alias : aliases.entrySet()) {
+            log.info("Added alias '{}' to plugin '{}'", alias.getKey(), 
alias.getValue());
+        }
+    }
+
     @Override
     protected Class<?> loadClass(String name, boolean resolve) throws 
ClassNotFoundException {
         String fullName = aliases.getOrDefault(name, name);
@@ -473,35 +392,12 @@ public class DelegatingClassLoader extends URLClassLoader 
{
         return super.loadClass(fullName, resolve);
     }
 
-    private void addAllAliases() {
-        addAliases(connectors());
-        addAliases(converters);
-        addAliases(headerConverters);
-        addAliases(transformations);
-        addAliases(predicates);
-        addAliases(restExtensions);
-        addAliases(connectorClientConfigPolicies);
-    }
-
-    private <S> void addAliases(Collection<PluginDesc<S>> plugins) {
-        for (PluginDesc<S> plugin : plugins) {
-            if (PluginUtils.isAliasUnique(plugin, plugins)) {
-                String simple = PluginUtils.simpleName(plugin);
-                String pruned = PluginUtils.prunedName(plugin);
-                aliases.put(simple, plugin.className());
-                if (simple.equals(pruned)) {
-                    log.info("Added alias '{}' to plugin '{}'", simple, 
plugin.className());
-                } else {
-                    aliases.put(pruned, plugin.className());
-                    log.info(
-                            "Added aliases '{}' and '{}' to plugin '{}'",
-                            simple,
-                            pruned,
-                            plugin.className()
-                    );
-                }
-            }
-        }
+    private static Map<String, SortedMap<PluginDesc<?>, ClassLoader>> 
computePluginLoaders(PluginScanResult plugins) {
+        Map<String, SortedMap<PluginDesc<?>, ClassLoader>> pluginLoaders = new 
HashMap<>();
+        plugins.forEach(pluginDesc ->
+                pluginLoaders.computeIfAbsent(pluginDesc.className(), k -> new 
TreeMap<>())
+                        .put(pluginDesc, pluginDesc.loader()));
+        return pluginLoaders;
     }
 
     private static class InternalReflections extends Reflections {
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java
index 62a7d6cd65d..c2829c60273 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java
@@ -29,6 +29,7 @@ public class PluginDesc<T> implements 
Comparable<PluginDesc<T>> {
     private final PluginType type;
     private final String typeName;
     private final String location;
+    private final ClassLoader loader;
 
     public PluginDesc(Class<? extends T> klass, String version, ClassLoader 
loader) {
         this.klass = klass;
@@ -40,6 +41,7 @@ public class PluginDesc<T> implements 
Comparable<PluginDesc<T>> {
         this.location = loader instanceof PluginClassLoader
                 ? ((PluginClassLoader) loader).location()
                 : "classpath";
+        this.loader = loader;
     }
 
     @Override
@@ -83,6 +85,10 @@ public class PluginDesc<T> implements 
Comparable<PluginDesc<T>> {
         return location;
     }
 
+    public ClassLoader loader() {
+        return loader;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
@@ -105,6 +111,9 @@ public class PluginDesc<T> implements 
Comparable<PluginDesc<T>> {
     @Override
     public int compareTo(PluginDesc<T> other) {
         int nameComp = name.compareTo(other.name);
-        return nameComp != 0 ? nameComp : 
encodedVersion.compareTo(other.encodedVersion);
+        int versionComp = encodedVersion.compareTo(other.encodedVersion);
+        // isolated plugins appear after classpath plugins when they have 
identical versions.
+        int isolatedComp = Boolean.compare(other.loader instanceof 
PluginClassLoader, loader instanceof PluginClassLoader);
+        return nameComp != 0 ? nameComp : (versionComp != 0 ? versionComp : 
isolatedComp);
     }
 }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java
index 565fe6dc67d..ae015ed3507 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java
@@ -27,32 +27,35 @@ import org.apache.kafka.connect.transforms.Transformation;
 import org.apache.kafka.connect.transforms.predicates.Predicate;
 
 import java.util.Arrays;
-import java.util.Collection;
+import java.util.SortedSet;
 import java.util.List;
+import java.util.TreeSet;
+import java.util.function.Consumer;
+import java.util.function.Function;
 
 public class PluginScanResult {
-    private final Collection<PluginDesc<SinkConnector>> sinkConnectors;
-    private final Collection<PluginDesc<SourceConnector>> sourceConnectors;
-    private final Collection<PluginDesc<Converter>> converters;
-    private final Collection<PluginDesc<HeaderConverter>> headerConverters;
-    private final Collection<PluginDesc<Transformation<?>>> transformations;
-    private final Collection<PluginDesc<Predicate<?>>> predicates;
-    private final Collection<PluginDesc<ConfigProvider>> configProviders;
-    private final Collection<PluginDesc<ConnectRestExtension>> restExtensions;
-    private final Collection<PluginDesc<ConnectorClientConfigOverridePolicy>> 
connectorClientConfigPolicies;
-
-    private final List<Collection<?>> allPlugins;
+    private final SortedSet<PluginDesc<SinkConnector>> sinkConnectors;
+    private final SortedSet<PluginDesc<SourceConnector>> sourceConnectors;
+    private final SortedSet<PluginDesc<Converter>> converters;
+    private final SortedSet<PluginDesc<HeaderConverter>> headerConverters;
+    private final SortedSet<PluginDesc<Transformation<?>>> transformations;
+    private final SortedSet<PluginDesc<Predicate<?>>> predicates;
+    private final SortedSet<PluginDesc<ConfigProvider>> configProviders;
+    private final SortedSet<PluginDesc<ConnectRestExtension>> restExtensions;
+    private final SortedSet<PluginDesc<ConnectorClientConfigOverridePolicy>> 
connectorClientConfigPolicies;
+
+    private final List<SortedSet<? extends PluginDesc<?>>> allPlugins;
 
     public PluginScanResult(
-            Collection<PluginDesc<SinkConnector>> sinkConnectors,
-            Collection<PluginDesc<SourceConnector>> sourceConnectors,
-            Collection<PluginDesc<Converter>> converters,
-            Collection<PluginDesc<HeaderConverter>> headerConverters,
-            Collection<PluginDesc<Transformation<?>>> transformations,
-            Collection<PluginDesc<Predicate<?>>> predicates,
-            Collection<PluginDesc<ConfigProvider>> configProviders,
-            Collection<PluginDesc<ConnectRestExtension>> restExtensions,
-            Collection<PluginDesc<ConnectorClientConfigOverridePolicy>> 
connectorClientConfigPolicies
+            SortedSet<PluginDesc<SinkConnector>> sinkConnectors,
+            SortedSet<PluginDesc<SourceConnector>> sourceConnectors,
+            SortedSet<PluginDesc<Converter>> converters,
+            SortedSet<PluginDesc<HeaderConverter>> headerConverters,
+            SortedSet<PluginDesc<Transformation<?>>> transformations,
+            SortedSet<PluginDesc<Predicate<?>>> predicates,
+            SortedSet<PluginDesc<ConfigProvider>> configProviders,
+            SortedSet<PluginDesc<ConnectRestExtension>> restExtensions,
+            SortedSet<PluginDesc<ConnectorClientConfigOverridePolicy>> 
connectorClientConfigPolicies
     ) {
         this.sinkConnectors = sinkConnectors;
         this.sourceConnectors = sourceConnectors;
@@ -64,49 +67,78 @@ public class PluginScanResult {
         this.restExtensions = restExtensions;
         this.connectorClientConfigPolicies = connectorClientConfigPolicies;
         this.allPlugins =
-            Arrays.asList(sinkConnectors, sourceConnectors, converters, 
headerConverters, transformations, configProviders,
-                          connectorClientConfigPolicies);
+            Arrays.asList(sinkConnectors, sourceConnectors, converters, 
headerConverters, transformations, predicates,
+                    configProviders, restExtensions, 
connectorClientConfigPolicies);
+    }
+
+    /**
+     * Merge one or more {@link PluginScanResult results} into one result 
object
+     */
+    public PluginScanResult(List<PluginScanResult> results) {
+        this(
+                merge(results, PluginScanResult::sinkConnectors),
+                merge(results, PluginScanResult::sourceConnectors),
+                merge(results, PluginScanResult::converters),
+                merge(results, PluginScanResult::headerConverters),
+                merge(results, PluginScanResult::transformations),
+                merge(results, PluginScanResult::predicates),
+                merge(results, PluginScanResult::configProviders),
+                merge(results, PluginScanResult::restExtensions),
+                merge(results, PluginScanResult::connectorClientConfigPolicies)
+        );
+    }
+
+    private static <R extends Comparable<R>> SortedSet<R> 
merge(List<PluginScanResult> results, Function<PluginScanResult, SortedSet<R>> 
accessor) {
+        SortedSet<R> merged = new TreeSet<>();
+        for (PluginScanResult element : results) {
+            merged.addAll(accessor.apply(element));
+        }
+        return merged;
     }
 
-    public Collection<PluginDesc<SinkConnector>> sinkConnectors() {
+    public SortedSet<PluginDesc<SinkConnector>> sinkConnectors() {
         return sinkConnectors;
     }
 
-    public Collection<PluginDesc<SourceConnector>> sourceConnectors() {
+    public SortedSet<PluginDesc<SourceConnector>> sourceConnectors() {
         return sourceConnectors;
     }
 
-    public Collection<PluginDesc<Converter>> converters() {
+    public SortedSet<PluginDesc<Converter>> converters() {
         return converters;
     }
 
-    public Collection<PluginDesc<HeaderConverter>> headerConverters() {
+    public SortedSet<PluginDesc<HeaderConverter>> headerConverters() {
         return headerConverters;
     }
 
-    public Collection<PluginDesc<Transformation<?>>> transformations() {
+    public SortedSet<PluginDesc<Transformation<?>>> transformations() {
         return transformations;
     }
 
-    public Collection<PluginDesc<Predicate<?>>> predicates() {
+    public SortedSet<PluginDesc<Predicate<?>>> predicates() {
         return predicates;
     }
 
-    public Collection<PluginDesc<ConfigProvider>> configProviders() {
+    public SortedSet<PluginDesc<ConfigProvider>> configProviders() {
         return configProviders;
     }
 
-    public Collection<PluginDesc<ConnectRestExtension>> restExtensions() {
+    public SortedSet<PluginDesc<ConnectRestExtension>> restExtensions() {
         return restExtensions;
     }
 
-    public Collection<PluginDesc<ConnectorClientConfigOverridePolicy>> 
connectorClientConfigPolicies() {
+    public SortedSet<PluginDesc<ConnectorClientConfigOverridePolicy>> 
connectorClientConfigPolicies() {
         return connectorClientConfigPolicies;
     }
 
+    public void forEach(Consumer<PluginDesc<?>> consumer) {
+        allPlugins.forEach(plugins -> plugins.forEach(consumer));
+    }
+
     public boolean isEmpty() {
         boolean isEmpty = true;
-        for (Collection<?> plugins : allPlugins) {
+        for (SortedSet<?> plugins : allPlugins) {
             isEmpty = isEmpty && plugins.isEmpty();
         }
         return isEmpty;
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
index e7e0271d16b..3592e0ec96c 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
@@ -28,13 +28,14 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Locale;
+import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.regex.Pattern;
@@ -352,31 +353,6 @@ public class PluginUtils {
         }
     }
 
-    /**
-     * Verify whether a given plugin's alias matches another alias in a 
collection of plugins.
-     *
-     * @param alias the plugin descriptor to test for alias matching.
-     * @param plugins the collection of plugins to test against.
-     * @param <U> the plugin type.
-     * @return false if a match was found in the collection, otherwise true.
-     */
-    public static <U> boolean isAliasUnique(
-            PluginDesc<U> alias,
-            Collection<PluginDesc<U>> plugins
-    ) {
-        boolean matched = false;
-        for (PluginDesc<U> plugin : plugins) {
-            if (simpleName(alias).equals(simpleName(plugin))
-                    || prunedName(alias).equals(prunedName(plugin))) {
-                if (matched) {
-                    return false;
-                }
-                matched = true;
-            }
-        }
-        return true;
-    }
-
     private static String prunePluginName(PluginDesc<?> plugin, String suffix) 
{
         String simple = plugin.pluginClass().getSimpleName();
         int pos = simple.lastIndexOf(suffix);
@@ -386,6 +362,25 @@ public class PluginUtils {
         return simple;
     }
 
+    public static Map<String, String> computeAliases(PluginScanResult 
scanResult) {
+        Map<String, Set<String>> aliasCollisions = new HashMap<>();
+        scanResult.forEach(pluginDesc -> {
+            aliasCollisions.computeIfAbsent(simpleName(pluginDesc), ignored -> 
new HashSet<>()).add(pluginDesc.className());
+            aliasCollisions.computeIfAbsent(prunedName(pluginDesc), ignored -> 
new HashSet<>()).add(pluginDesc.className());
+        });
+        Map<String, String> aliases = new HashMap<>();
+        for (Map.Entry<String, Set<String>> entry : 
aliasCollisions.entrySet()) {
+            String alias = entry.getKey();
+            Set<String> classNames = entry.getValue();
+            if (classNames.size() == 1) {
+                aliases.put(alias, classNames.stream().findAny().get());
+            } else {
+                log.warn("Ignoring ambiguous alias '{}' since it refers to 
multiple distinct plugins {}", alias, classNames);
+            }
+        }
+        return aliases;
+    }
+
     private static class DirectoryEntry {
         final DirectoryStream<Path> stream;
         final Iterator<Path> iterator;
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
index cc763ae8dbc..a43f0f226a2 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
@@ -45,6 +45,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TreeSet;
 
 public class Plugins {
 
@@ -55,6 +56,7 @@ public class Plugins {
 
     private static final Logger log = LoggerFactory.getLogger(Plugins.class);
     private final DelegatingClassLoader delegatingLoader;
+    private final PluginScanResult scanResult;
 
     public Plugins(Map<String, String> props) {
         this(props, Plugins.class.getClassLoader());
@@ -65,7 +67,7 @@ public class Plugins {
         String pluginPath = WorkerConfig.pluginPath(props);
         List<Path> pluginLocations = PluginUtils.pluginLocations(pluginPath);
         delegatingLoader = newDelegatingClassLoader(pluginLocations, parent);
-        delegatingLoader.initLoaders();
+        scanResult = delegatingLoader.initLoaders();
     }
 
     // VisibleForTesting
@@ -194,32 +196,39 @@ public class Plugins {
         return delegatingLoader.connectorLoader(connectorClassOrAlias);
     }
 
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    public Set<PluginDesc<Connector>> connectors() {
+        Set<PluginDesc<Connector>> connectors = new TreeSet<>((Set) 
sinkConnectors());
+        connectors.addAll((Set) sourceConnectors());
+        return connectors;
+    }
+
     public Set<PluginDesc<SinkConnector>> sinkConnectors() {
-        return delegatingLoader.sinkConnectors();
+        return scanResult.sinkConnectors();
     }
 
     public Set<PluginDesc<SourceConnector>> sourceConnectors() {
-        return delegatingLoader.sourceConnectors();
+        return scanResult.sourceConnectors();
     }
 
     public Set<PluginDesc<Converter>> converters() {
-        return delegatingLoader.converters();
+        return scanResult.converters();
     }
 
     public Set<PluginDesc<HeaderConverter>> headerConverters() {
-        return delegatingLoader.headerConverters();
+        return scanResult.headerConverters();
     }
 
     public Set<PluginDesc<Transformation<?>>> transformations() {
-        return delegatingLoader.transformations();
+        return scanResult.transformations();
     }
 
     public Set<PluginDesc<Predicate<?>>> predicates() {
-        return delegatingLoader.predicates();
+        return scanResult.predicates();
     }
 
     public Set<PluginDesc<ConnectorClientConfigOverridePolicy>> 
connectorClientConfigPolicies() {
-        return delegatingLoader.connectorClientConfigPolicies();
+        return scanResult.connectorClientConfigPolicies();
     }
 
     public Object newPlugin(String classOrAlias) throws ClassNotFoundException 
{
@@ -242,7 +251,7 @@ public class Plugins {
             );
         } catch (ClassNotFoundException e) {
             List<PluginDesc<? extends Connector>> matches = new ArrayList<>();
-            Set<PluginDesc<Connector>> connectors = 
delegatingLoader.connectors();
+            Set<PluginDesc<Connector>> connectors = connectors();
             for (PluginDesc<? extends Connector> plugin : connectors) {
                 Class<?> pluginClass = plugin.pluginClass();
                 String simpleName = pluginClass.getSimpleName();
@@ -300,7 +309,7 @@ public class Plugins {
                 // Attempt to load first with the current classloader, and 
plugins as a fallback.
                 // Note: we can't use config.getConfiguredInstance because 
Converter doesn't implement Configurable, and even if it did
                 // we have to remove the property prefixes before calling 
config(...) and we still always want to call Converter.config.
-                klass = pluginClassFromConfig(config, classPropertyName, 
Converter.class, delegatingLoader.converters());
+                klass = pluginClassFromConfig(config, classPropertyName, 
Converter.class, scanResult.converters());
                 break;
             case PLUGINS:
                 // Attempt to load with the plugin class loader, which uses 
the current classloader as a fallback
@@ -311,7 +320,7 @@ public class Plugins {
                     throw new ConnectException(
                             "Failed to find any class that implements 
Converter and which name matches "
                             + converterClassOrAlias + ", available converters 
are: "
-                            + pluginNames(delegatingLoader.converters())
+                            + pluginNames(scanResult.converters())
                     );
                 }
                 break;
@@ -383,7 +392,7 @@ public class Plugins {
                 // Attempt to load first with the current classloader, and 
plugins as a fallback.
                 // Note: we can't use config.getConfiguredInstance because we 
have to remove the property prefixes
                 // before calling config(...)
-                klass = pluginClassFromConfig(config, classPropertyName, 
HeaderConverter.class, delegatingLoader.headerConverters());
+                klass = pluginClassFromConfig(config, classPropertyName, 
HeaderConverter.class, scanResult.headerConverters());
                 break;
             case PLUGINS:
                 // Attempt to load with the plugin class loader, which uses 
the current classloader as a fallback.
@@ -400,7 +409,7 @@ public class Plugins {
                             "Failed to find any class that implements 
HeaderConverter and which name matches "
                                     + converterClassOrAlias
                                     + ", available header converters are: "
-                                    + 
pluginNames(delegatingLoader.headerConverters())
+                                    + 
pluginNames(scanResult.headerConverters())
                     );
                 }
         }
@@ -432,7 +441,7 @@ public class Plugins {
         switch (classLoaderUsage) {
             case CURRENT_CLASSLOADER:
                 // Attempt to load first with the current classloader, and 
plugins as a fallback.
-                klass = pluginClassFromConfig(config, classPropertyName, 
ConfigProvider.class, delegatingLoader.configProviders());
+                klass = pluginClassFromConfig(config, classPropertyName, 
ConfigProvider.class, scanResult.configProviders());
                 break;
             case PLUGINS:
                 // Attempt to load with the plugin class loader, which uses 
the current classloader as a fallback
@@ -443,7 +452,7 @@ public class Plugins {
                     throw new ConnectException(
                             "Failed to find any class that implements 
ConfigProvider and which name matches "
                                     + configProviderClassOrAlias + ", 
available ConfigProviders are: "
-                                    + 
pluginNames(delegatingLoader.configProviders())
+                                    + pluginNames(scanResult.configProviders())
                     );
                 }
                 break;
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginDescTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginDescTest.java
index 72a2493e7f0..affd7df268c 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginDescTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginDescTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.connect.sink.SinkConnector;
 import org.apache.kafka.connect.source.SourceConnector;
 import org.apache.kafka.connect.storage.Converter;
 import org.apache.kafka.connect.transforms.Transformation;
+import org.apache.kafka.connect.transforms.predicates.Predicate;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -223,6 +224,20 @@ public class PluginDescTest {
         );
 
         assertNewer(transformDescPluginPath, transformDescClasspath);
+
+        PluginDesc<Predicate> predicateDescPluginPath = new PluginDesc<>(
+                Predicate.class,
+                regularVersion,
+                pluginLoader
+        );
+
+        PluginDesc<Predicate> predicateDescClasspath = new PluginDesc<>(
+                Predicate.class,
+                regularVersion,
+                systemLoader
+        );
+
+        assertNewer(predicateDescPluginPath, predicateDescClasspath);
     }
 
     private static <T> void assertPluginDesc(
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
index 19766989fcf..3e4c219b0ab 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
@@ -16,6 +16,17 @@
  */
 package org.apache.kafka.connect.runtime.isolation;
 
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.sink.SinkConnector;
+import org.apache.kafka.connect.source.SourceConnector;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.apache.kafka.connect.tools.MockSinkConnector;
+import org.apache.kafka.connect.tools.MockSourceConnector;
+import org.apache.kafka.connect.transforms.Transformation;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -28,7 +39,11 @@ import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -490,6 +505,173 @@ public class PluginUtilsTest {
         assertUrls(expectedUrls, PluginUtils.pluginUrls(pluginPath));
     }
 
+    @Test
+    public void testNonCollidingAliases() {
+        SortedSet<PluginDesc<SinkConnector>> sinkConnectors = new TreeSet<>();
+        sinkConnectors.add(new PluginDesc<>(MockSinkConnector.class, null, 
MockSinkConnector.class.getClassLoader()));
+        SortedSet<PluginDesc<SourceConnector>> sourceConnectors = new 
TreeSet<>();
+        sourceConnectors.add(new PluginDesc<>(MockSourceConnector.class, null, 
MockSourceConnector.class.getClassLoader()));
+        SortedSet<PluginDesc<Converter>> converters = new TreeSet<>();
+        converters.add(new PluginDesc<>(CollidingConverter.class, null, 
CollidingConverter.class.getClassLoader()));
+        PluginScanResult result = new PluginScanResult(
+                sinkConnectors,
+                sourceConnectors,
+                converters,
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet()
+        );
+        Map<String, String> aliases = PluginUtils.computeAliases(result);
+        Map<String, String> actualAliases = PluginUtils.computeAliases(result);
+        Map<String, String> expectedAliases = new HashMap<>();
+        expectedAliases.put("MockSinkConnector", 
MockSinkConnector.class.getName());
+        expectedAliases.put("MockSink", MockSinkConnector.class.getName());
+        expectedAliases.put("MockSourceConnector", 
MockSourceConnector.class.getName());
+        expectedAliases.put("MockSource", MockSourceConnector.class.getName());
+        expectedAliases.put("CollidingConverter", 
CollidingConverter.class.getName());
+        expectedAliases.put("Colliding", CollidingConverter.class.getName());
+        assertEquals(expectedAliases, actualAliases);
+    }
+
+    @Test
+    public void testMultiVersionAlias() {
+        SortedSet<PluginDesc<SinkConnector>> sinkConnectors = new TreeSet<>();
+        // distinct versions don't cause an alias collision (the class name is 
the same)
+        sinkConnectors.add(new PluginDesc<>(MockSinkConnector.class, null, 
MockSinkConnector.class.getClassLoader()));
+        sinkConnectors.add(new PluginDesc<>(MockSinkConnector.class, "1.0", 
MockSinkConnector.class.getClassLoader()));
+        assertEquals(2, sinkConnectors.size());
+        PluginScanResult result = new PluginScanResult(
+                sinkConnectors,
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet()
+        );
+        Map<String, String> actualAliases = PluginUtils.computeAliases(result);
+        Map<String, String> expectedAliases = new HashMap<>();
+        expectedAliases.put("MockSinkConnector", 
MockSinkConnector.class.getName());
+        expectedAliases.put("MockSink", MockSinkConnector.class.getName());
+        assertEquals(expectedAliases, actualAliases);
+    }
+
+    @Test
+    public void testCollidingPrunedAlias() {
+        SortedSet<PluginDesc<Converter>> converters = new TreeSet<>();
+        converters.add(new PluginDesc<>(CollidingConverter.class, null, 
CollidingConverter.class.getClassLoader()));
+        SortedSet<PluginDesc<HeaderConverter>> headerConverters = new 
TreeSet<>();
+        headerConverters.add(new PluginDesc<>(CollidingHeaderConverter.class, 
null, CollidingHeaderConverter.class.getClassLoader()));
+        PluginScanResult result = new PluginScanResult(
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                converters,
+                headerConverters,
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet()
+        );
+        Map<String, String> actualAliases = PluginUtils.computeAliases(result);
+        Map<String, String> expectedAliases = new HashMap<>();
+        expectedAliases.put("CollidingConverter", 
CollidingConverter.class.getName());
+        expectedAliases.put("CollidingHeaderConverter", 
CollidingHeaderConverter.class.getName());
+        assertEquals(expectedAliases, actualAliases);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testCollidingSimpleAlias() {
+        SortedSet<PluginDesc<Converter>> converters = new TreeSet<>();
+        converters.add(new PluginDesc<>(CollidingConverter.class, null, 
CollidingConverter.class.getClassLoader()));
+        SortedSet<PluginDesc<Transformation<?>>> transformations = new 
TreeSet<>();
+        transformations.add(new PluginDesc<>((Class<? extends 
Transformation<?>>) (Class<?>) Colliding.class, null, 
Colliding.class.getClassLoader()));
+        PluginScanResult result = new PluginScanResult(
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                converters,
+                Collections.emptySortedSet(),
+                transformations,
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet()
+        );
+        Map<String, String> actualAliases = PluginUtils.computeAliases(result);
+        Map<String, String> expectedAliases = new HashMap<>();
+        expectedAliases.put("CollidingConverter", 
CollidingConverter.class.getName());
+        assertEquals(expectedAliases, actualAliases);
+    }
+
+    public static class CollidingConverter implements Converter {
+        @Override
+        public void configure(Map<String, ?> configs, boolean isKey) {
+        }
+
+        @Override
+        public byte[] fromConnectData(String topic, Schema schema, Object 
value) {
+            return new byte[0];
+        }
+
+        @Override
+        public SchemaAndValue toConnectData(String topic, byte[] value) {
+            return null;
+        }
+    }
+
+    public static class CollidingHeaderConverter implements HeaderConverter {
+
+        @Override
+        public SchemaAndValue toConnectHeader(String topic, String headerKey, 
byte[] value) {
+            return null;
+        }
+
+        @Override
+        public byte[] fromConnectHeader(String topic, String headerKey, Schema 
schema, Object value) {
+            return new byte[0];
+        }
+
+        @Override
+        public ConfigDef config() {
+            return null;
+        }
+
+        @Override
+        public void close() throws IOException {
+        }
+
+        @Override
+        public void configure(Map<String, ?> configs) {
+        }
+    }
+
+    public static class Colliding<R extends ConnectRecord<R>> implements 
Transformation<R> {
+
+        @Override
+        public void configure(Map<String, ?> configs) {
+        }
+
+        @Override
+        public R apply(R record) {
+            return null;
+        }
+
+        @Override
+        public ConfigDef config() {
+            return null;
+        }
+
+        @Override
+        public void close() {
+        }
+    }
+
     private void createBasicDirectoryLayout() throws IOException {
         Files.createDirectories(pluginPath.resolve("connectorA"));
         Files.createDirectories(pluginPath.resolve("connectorB/deps"));


Reply via email to