This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new c8cb85274e9 MINOR: Refactor DelegatingClassLoader to emit immutable PluginScanResult (#13771) c8cb85274e9 is described below commit c8cb85274e9e7c7c797d2d498b54fa58ec238e0e Author: Greg Harris <greg.har...@aiven.io> AuthorDate: Tue Jun 6 06:37:57 2023 -0700 MINOR: Refactor DelegatingClassLoader to emit immutable PluginScanResult (#13771) Reviewers: Chris Egerton <chr...@aiven.io> --- .../runtime/isolation/DelegatingClassLoader.java | 184 +++++---------------- .../connect/runtime/isolation/PluginDesc.java | 11 +- .../runtime/isolation/PluginScanResult.java | 98 +++++++---- .../connect/runtime/isolation/PluginUtils.java | 47 +++--- .../kafka/connect/runtime/isolation/Plugins.java | 39 +++-- .../connect/runtime/isolation/PluginDescTest.java | 15 ++ .../connect/runtime/isolation/PluginUtilsTest.java | 182 ++++++++++++++++++++ 7 files changed, 357 insertions(+), 219 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java index cedabe5a828..cf0a5a1ba7f 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java @@ -18,7 +18,6 @@ package org.apache.kafka.connect.runtime.isolation; import org.apache.kafka.common.config.provider.ConfigProvider; import org.apache.kafka.connect.components.Versioned; -import org.apache.kafka.connect.connector.Connector; import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy; import org.apache.kafka.connect.rest.ConnectRestExtension; import org.apache.kafka.connect.sink.SinkConnector; @@ -48,10 +47,11 @@ import java.security.PrivilegedAction; import java.sql.Driver; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.ServiceConfigurationError; import java.util.ServiceLoader; import java.util.Set; @@ -79,15 +79,6 @@ public class DelegatingClassLoader extends URLClassLoader { private final ConcurrentMap<String, SortedMap<PluginDesc<?>, ClassLoader>> pluginLoaders; private final ConcurrentMap<String, String> aliases; - private final SortedSet<PluginDesc<SinkConnector>> sinkConnectors; - private final SortedSet<PluginDesc<SourceConnector>> sourceConnectors; - private final SortedSet<PluginDesc<Converter>> converters; - private final SortedSet<PluginDesc<HeaderConverter>> headerConverters; - private final SortedSet<PluginDesc<Transformation<?>>> transformations; - private final SortedSet<PluginDesc<Predicate<?>>> predicates; - private final SortedSet<PluginDesc<ConfigProvider>> configProviders; - private final SortedSet<PluginDesc<ConnectRestExtension>> restExtensions; - private final SortedSet<PluginDesc<ConnectorClientConfigOverridePolicy>> connectorClientConfigPolicies; private final List<Path> pluginLocations; // Although this classloader does not load classes directly but rather delegates loading to a @@ -103,15 +94,6 @@ public class DelegatingClassLoader extends URLClassLoader { this.pluginLocations = pluginLocations; this.pluginLoaders = new ConcurrentHashMap<>(); this.aliases = new ConcurrentHashMap<>(); - this.sinkConnectors = new TreeSet<>(); - this.sourceConnectors = new TreeSet<>(); - this.converters = new TreeSet<>(); - this.headerConverters = new TreeSet<>(); - this.transformations = new TreeSet<>(); - this.predicates = new TreeSet<>(); - this.configProviders = new TreeSet<>(); - this.restExtensions = new TreeSet<>(); - this.connectorClientConfigPolicies = new TreeSet<>(); } public DelegatingClassLoader(List<Path> pluginLocations) { @@ -122,49 +104,6 @@ public class DelegatingClassLoader extends URLClassLoader { this(pluginLocations, DelegatingClassLoader.class.getClassLoader()); } - @SuppressWarnings({"unchecked", "rawtypes"}) - public Set<PluginDesc<Connector>> connectors() { - Set<PluginDesc<Connector>> connectors = new TreeSet<>((Set) sinkConnectors); - connectors.addAll((Set) sourceConnectors); - return connectors; - } - - public Set<PluginDesc<SinkConnector>> sinkConnectors() { - return sinkConnectors; - } - - public Set<PluginDesc<SourceConnector>> sourceConnectors() { - return sourceConnectors; - } - - public Set<PluginDesc<Converter>> converters() { - return converters; - } - - public Set<PluginDesc<HeaderConverter>> headerConverters() { - return headerConverters; - } - - public Set<PluginDesc<Transformation<?>>> transformations() { - return transformations; - } - - public Set<PluginDesc<Predicate<?>>> predicates() { - return predicates; - } - - public Set<PluginDesc<ConfigProvider>> configProviders() { - return configProviders; - } - - public Set<PluginDesc<ConnectRestExtension>> restExtensions() { - return restExtensions; - } - - public Set<PluginDesc<ConnectorClientConfigOverridePolicy>> connectorClientConfigPolicies() { - return connectorClientConfigPolicies; - } - /** * Retrieve the PluginClassLoader associated with a plugin class * @param name The fully qualified class name of the plugin @@ -208,24 +147,11 @@ public class DelegatingClassLoader extends URLClassLoader { ); } - private <T> void addPlugins(Collection<PluginDesc<T>> plugins, ClassLoader loader) { - for (PluginDesc<T> plugin : plugins) { - String pluginClassName = plugin.className(); - SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(pluginClassName); - if (inner == null) { - inner = new TreeMap<>(); - pluginLoaders.put(pluginClassName, inner); - // TODO: once versioning is enabled this line should be moved outside this if branch - log.info("Added plugin '{}'", pluginClassName); - } - inner.put(plugin, loader); - } - } - - protected void initLoaders() { + public PluginScanResult initLoaders() { + List<PluginScanResult> results = new ArrayList<>(); for (Path pluginLocation : pluginLocations) { try { - registerPlugin(pluginLocation); + results.add(registerPlugin(pluginLocation)); } catch (InvalidPathException | MalformedURLException e) { log.error("Invalid path in plugin path: {}. Ignoring.", pluginLocation, e); } catch (IOException e) { @@ -233,14 +159,16 @@ public class DelegatingClassLoader extends URLClassLoader { } } // Finally add parent/system loader. - scanUrlsAndAddPlugins( + results.add(scanUrlsAndAddPlugins( getParent(), ClasspathHelper.forJavaClassPath().toArray(new URL[0]) - ); - addAllAliases(); + )); + PluginScanResult scanResult = new PluginScanResult(results); + installDiscoveredPlugins(scanResult); + return scanResult; } - private void registerPlugin(Path pluginLocation) + private PluginScanResult registerPlugin(Path pluginLocation) throws IOException { log.info("Loading plugin from: {}", pluginLocation); List<URL> pluginUrls = new ArrayList<>(); @@ -256,37 +184,17 @@ public class DelegatingClassLoader extends URLClassLoader { urls, this ); - scanUrlsAndAddPlugins(loader, urls); + return scanUrlsAndAddPlugins(loader, urls); } - private void scanUrlsAndAddPlugins( + private PluginScanResult scanUrlsAndAddPlugins( ClassLoader loader, URL[] urls ) { PluginScanResult plugins = scanPluginPath(loader, urls); log.info("Registered loader: {}", loader); - if (!plugins.isEmpty()) { - addPlugins(plugins.sinkConnectors(), loader); - sinkConnectors.addAll(plugins.sinkConnectors()); - addPlugins(plugins.sourceConnectors(), loader); - sourceConnectors.addAll(plugins.sourceConnectors()); - addPlugins(plugins.converters(), loader); - converters.addAll(plugins.converters()); - addPlugins(plugins.headerConverters(), loader); - headerConverters.addAll(plugins.headerConverters()); - addPlugins(plugins.transformations(), loader); - transformations.addAll(plugins.transformations()); - addPlugins(plugins.predicates(), loader); - predicates.addAll(plugins.predicates()); - addPlugins(plugins.configProviders(), loader); - configProviders.addAll(plugins.configProviders()); - addPlugins(plugins.restExtensions(), loader); - restExtensions.addAll(plugins.restExtensions()); - addPlugins(plugins.connectorClientConfigPolicies(), loader); - connectorClientConfigPolicies.addAll(plugins.connectorClientConfigPolicies()); - } - loadJdbcDrivers(loader); + return plugins; } private void loadJdbcDrivers(final ClassLoader loader) { @@ -344,16 +252,16 @@ public class DelegatingClassLoader extends URLClassLoader { } @SuppressWarnings({"unchecked"}) - private Collection<PluginDesc<Predicate<?>>> getPredicatePluginDesc(ClassLoader loader, Reflections reflections) { - return (Collection<PluginDesc<Predicate<?>>>) (Collection<?>) getPluginDesc(reflections, Predicate.class, loader); + private SortedSet<PluginDesc<Predicate<?>>> getPredicatePluginDesc(ClassLoader loader, Reflections reflections) { + return (SortedSet<PluginDesc<Predicate<?>>>) (SortedSet<?>) getPluginDesc(reflections, Predicate.class, loader); } @SuppressWarnings({"unchecked"}) - private Collection<PluginDesc<Transformation<?>>> getTransformationPluginDesc(ClassLoader loader, Reflections reflections) { - return (Collection<PluginDesc<Transformation<?>>>) (Collection<?>) getPluginDesc(reflections, Transformation.class, loader); + private SortedSet<PluginDesc<Transformation<?>>> getTransformationPluginDesc(ClassLoader loader, Reflections reflections) { + return (SortedSet<PluginDesc<Transformation<?>>>) (SortedSet<?>) getPluginDesc(reflections, Transformation.class, loader); } - private <T> Collection<PluginDesc<T>> getPluginDesc( + private <T> SortedSet<PluginDesc<T>> getPluginDesc( Reflections reflections, Class<T> klass, ClassLoader loader @@ -364,10 +272,10 @@ public class DelegatingClassLoader extends URLClassLoader { } catch (ReflectionsException e) { log.debug("Reflections scanner could not find any classes for URLs: " + reflections.getConfiguration().getUrls(), e); - return Collections.emptyList(); + return Collections.emptySortedSet(); } - Collection<PluginDesc<T>> result = new ArrayList<>(); + SortedSet<PluginDesc<T>> result = new TreeSet<>(); for (Class<? extends T> pluginKlass : plugins) { if (!PluginUtils.isConcrete(pluginKlass)) { log.debug("Skipping {} as it is not concrete implementation", pluginKlass); @@ -393,8 +301,8 @@ public class DelegatingClassLoader extends URLClassLoader { } @SuppressWarnings("unchecked") - private <T> Collection<PluginDesc<T>> getServiceLoaderPluginDesc(Class<T> klass, ClassLoader loader) { - Collection<PluginDesc<T>> result = new ArrayList<>(); + private <T> SortedSet<PluginDesc<T>> getServiceLoaderPluginDesc(Class<T> klass, ClassLoader loader) { + SortedSet<PluginDesc<T>> result = new TreeSet<>(); ServiceLoader<T> serviceLoader = ServiceLoader.load(klass, loader); for (Iterator<T> iterator = serviceLoader.iterator(); iterator.hasNext(); ) { try (LoaderSwap loaderSwap = withClassLoader(loader)) { @@ -461,6 +369,17 @@ public class DelegatingClassLoader extends URLClassLoader { } } + private void installDiscoveredPlugins(PluginScanResult scanResult) { + pluginLoaders.putAll(computePluginLoaders(scanResult)); + for (String pluginClassName : pluginLoaders.keySet()) { + log.info("Added plugin '{}'", pluginClassName); + } + aliases.putAll(PluginUtils.computeAliases(scanResult)); + for (Map.Entry<String, String> alias : aliases.entrySet()) { + log.info("Added alias '{}' to plugin '{}'", alias.getKey(), alias.getValue()); + } + } + @Override protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException { String fullName = aliases.getOrDefault(name, name); @@ -473,35 +392,12 @@ public class DelegatingClassLoader extends URLClassLoader { return super.loadClass(fullName, resolve); } - private void addAllAliases() { - addAliases(connectors()); - addAliases(converters); - addAliases(headerConverters); - addAliases(transformations); - addAliases(predicates); - addAliases(restExtensions); - addAliases(connectorClientConfigPolicies); - } - - private <S> void addAliases(Collection<PluginDesc<S>> plugins) { - for (PluginDesc<S> plugin : plugins) { - if (PluginUtils.isAliasUnique(plugin, plugins)) { - String simple = PluginUtils.simpleName(plugin); - String pruned = PluginUtils.prunedName(plugin); - aliases.put(simple, plugin.className()); - if (simple.equals(pruned)) { - log.info("Added alias '{}' to plugin '{}'", simple, plugin.className()); - } else { - aliases.put(pruned, plugin.className()); - log.info( - "Added aliases '{}' and '{}' to plugin '{}'", - simple, - pruned, - plugin.className() - ); - } - } - } + private static Map<String, SortedMap<PluginDesc<?>, ClassLoader>> computePluginLoaders(PluginScanResult plugins) { + Map<String, SortedMap<PluginDesc<?>, ClassLoader>> pluginLoaders = new HashMap<>(); + plugins.forEach(pluginDesc -> + pluginLoaders.computeIfAbsent(pluginDesc.className(), k -> new TreeMap<>()) + .put(pluginDesc, pluginDesc.loader())); + return pluginLoaders; } private static class InternalReflections extends Reflections { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java index 62a7d6cd65d..c2829c60273 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java @@ -29,6 +29,7 @@ public class PluginDesc<T> implements Comparable<PluginDesc<T>> { private final PluginType type; private final String typeName; private final String location; + private final ClassLoader loader; public PluginDesc(Class<? extends T> klass, String version, ClassLoader loader) { this.klass = klass; @@ -40,6 +41,7 @@ public class PluginDesc<T> implements Comparable<PluginDesc<T>> { this.location = loader instanceof PluginClassLoader ? ((PluginClassLoader) loader).location() : "classpath"; + this.loader = loader; } @Override @@ -83,6 +85,10 @@ public class PluginDesc<T> implements Comparable<PluginDesc<T>> { return location; } + public ClassLoader loader() { + return loader; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -105,6 +111,9 @@ public class PluginDesc<T> implements Comparable<PluginDesc<T>> { @Override public int compareTo(PluginDesc<T> other) { int nameComp = name.compareTo(other.name); - return nameComp != 0 ? nameComp : encodedVersion.compareTo(other.encodedVersion); + int versionComp = encodedVersion.compareTo(other.encodedVersion); + // isolated plugins appear after classpath plugins when they have identical versions. + int isolatedComp = Boolean.compare(other.loader instanceof PluginClassLoader, loader instanceof PluginClassLoader); + return nameComp != 0 ? nameComp : (versionComp != 0 ? versionComp : isolatedComp); } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java index 565fe6dc67d..ae015ed3507 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java @@ -27,32 +27,35 @@ import org.apache.kafka.connect.transforms.Transformation; import org.apache.kafka.connect.transforms.predicates.Predicate; import java.util.Arrays; -import java.util.Collection; +import java.util.SortedSet; import java.util.List; +import java.util.TreeSet; +import java.util.function.Consumer; +import java.util.function.Function; public class PluginScanResult { - private final Collection<PluginDesc<SinkConnector>> sinkConnectors; - private final Collection<PluginDesc<SourceConnector>> sourceConnectors; - private final Collection<PluginDesc<Converter>> converters; - private final Collection<PluginDesc<HeaderConverter>> headerConverters; - private final Collection<PluginDesc<Transformation<?>>> transformations; - private final Collection<PluginDesc<Predicate<?>>> predicates; - private final Collection<PluginDesc<ConfigProvider>> configProviders; - private final Collection<PluginDesc<ConnectRestExtension>> restExtensions; - private final Collection<PluginDesc<ConnectorClientConfigOverridePolicy>> connectorClientConfigPolicies; - - private final List<Collection<?>> allPlugins; + private final SortedSet<PluginDesc<SinkConnector>> sinkConnectors; + private final SortedSet<PluginDesc<SourceConnector>> sourceConnectors; + private final SortedSet<PluginDesc<Converter>> converters; + private final SortedSet<PluginDesc<HeaderConverter>> headerConverters; + private final SortedSet<PluginDesc<Transformation<?>>> transformations; + private final SortedSet<PluginDesc<Predicate<?>>> predicates; + private final SortedSet<PluginDesc<ConfigProvider>> configProviders; + private final SortedSet<PluginDesc<ConnectRestExtension>> restExtensions; + private final SortedSet<PluginDesc<ConnectorClientConfigOverridePolicy>> connectorClientConfigPolicies; + + private final List<SortedSet<? extends PluginDesc<?>>> allPlugins; public PluginScanResult( - Collection<PluginDesc<SinkConnector>> sinkConnectors, - Collection<PluginDesc<SourceConnector>> sourceConnectors, - Collection<PluginDesc<Converter>> converters, - Collection<PluginDesc<HeaderConverter>> headerConverters, - Collection<PluginDesc<Transformation<?>>> transformations, - Collection<PluginDesc<Predicate<?>>> predicates, - Collection<PluginDesc<ConfigProvider>> configProviders, - Collection<PluginDesc<ConnectRestExtension>> restExtensions, - Collection<PluginDesc<ConnectorClientConfigOverridePolicy>> connectorClientConfigPolicies + SortedSet<PluginDesc<SinkConnector>> sinkConnectors, + SortedSet<PluginDesc<SourceConnector>> sourceConnectors, + SortedSet<PluginDesc<Converter>> converters, + SortedSet<PluginDesc<HeaderConverter>> headerConverters, + SortedSet<PluginDesc<Transformation<?>>> transformations, + SortedSet<PluginDesc<Predicate<?>>> predicates, + SortedSet<PluginDesc<ConfigProvider>> configProviders, + SortedSet<PluginDesc<ConnectRestExtension>> restExtensions, + SortedSet<PluginDesc<ConnectorClientConfigOverridePolicy>> connectorClientConfigPolicies ) { this.sinkConnectors = sinkConnectors; this.sourceConnectors = sourceConnectors; @@ -64,49 +67,78 @@ public class PluginScanResult { this.restExtensions = restExtensions; this.connectorClientConfigPolicies = connectorClientConfigPolicies; this.allPlugins = - Arrays.asList(sinkConnectors, sourceConnectors, converters, headerConverters, transformations, configProviders, - connectorClientConfigPolicies); + Arrays.asList(sinkConnectors, sourceConnectors, converters, headerConverters, transformations, predicates, + configProviders, restExtensions, connectorClientConfigPolicies); + } + + /** + * Merge one or more {@link PluginScanResult results} into one result object + */ + public PluginScanResult(List<PluginScanResult> results) { + this( + merge(results, PluginScanResult::sinkConnectors), + merge(results, PluginScanResult::sourceConnectors), + merge(results, PluginScanResult::converters), + merge(results, PluginScanResult::headerConverters), + merge(results, PluginScanResult::transformations), + merge(results, PluginScanResult::predicates), + merge(results, PluginScanResult::configProviders), + merge(results, PluginScanResult::restExtensions), + merge(results, PluginScanResult::connectorClientConfigPolicies) + ); + } + + private static <R extends Comparable<R>> SortedSet<R> merge(List<PluginScanResult> results, Function<PluginScanResult, SortedSet<R>> accessor) { + SortedSet<R> merged = new TreeSet<>(); + for (PluginScanResult element : results) { + merged.addAll(accessor.apply(element)); + } + return merged; } - public Collection<PluginDesc<SinkConnector>> sinkConnectors() { + public SortedSet<PluginDesc<SinkConnector>> sinkConnectors() { return sinkConnectors; } - public Collection<PluginDesc<SourceConnector>> sourceConnectors() { + public SortedSet<PluginDesc<SourceConnector>> sourceConnectors() { return sourceConnectors; } - public Collection<PluginDesc<Converter>> converters() { + public SortedSet<PluginDesc<Converter>> converters() { return converters; } - public Collection<PluginDesc<HeaderConverter>> headerConverters() { + public SortedSet<PluginDesc<HeaderConverter>> headerConverters() { return headerConverters; } - public Collection<PluginDesc<Transformation<?>>> transformations() { + public SortedSet<PluginDesc<Transformation<?>>> transformations() { return transformations; } - public Collection<PluginDesc<Predicate<?>>> predicates() { + public SortedSet<PluginDesc<Predicate<?>>> predicates() { return predicates; } - public Collection<PluginDesc<ConfigProvider>> configProviders() { + public SortedSet<PluginDesc<ConfigProvider>> configProviders() { return configProviders; } - public Collection<PluginDesc<ConnectRestExtension>> restExtensions() { + public SortedSet<PluginDesc<ConnectRestExtension>> restExtensions() { return restExtensions; } - public Collection<PluginDesc<ConnectorClientConfigOverridePolicy>> connectorClientConfigPolicies() { + public SortedSet<PluginDesc<ConnectorClientConfigOverridePolicy>> connectorClientConfigPolicies() { return connectorClientConfigPolicies; } + public void forEach(Consumer<PluginDesc<?>> consumer) { + allPlugins.forEach(plugins -> plugins.forEach(consumer)); + } + public boolean isEmpty() { boolean isEmpty = true; - for (Collection<?> plugins : allPlugins) { + for (SortedSet<?> plugins : allPlugins) { isEmpty = isEmpty && plugins.isEmpty(); } return isEmpty; diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java index e7e0271d16b..3592e0ec96c 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java @@ -28,13 +28,14 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.Set; import java.util.TreeSet; import java.util.regex.Pattern; @@ -352,31 +353,6 @@ public class PluginUtils { } } - /** - * Verify whether a given plugin's alias matches another alias in a collection of plugins. - * - * @param alias the plugin descriptor to test for alias matching. - * @param plugins the collection of plugins to test against. - * @param <U> the plugin type. - * @return false if a match was found in the collection, otherwise true. - */ - public static <U> boolean isAliasUnique( - PluginDesc<U> alias, - Collection<PluginDesc<U>> plugins - ) { - boolean matched = false; - for (PluginDesc<U> plugin : plugins) { - if (simpleName(alias).equals(simpleName(plugin)) - || prunedName(alias).equals(prunedName(plugin))) { - if (matched) { - return false; - } - matched = true; - } - } - return true; - } - private static String prunePluginName(PluginDesc<?> plugin, String suffix) { String simple = plugin.pluginClass().getSimpleName(); int pos = simple.lastIndexOf(suffix); @@ -386,6 +362,25 @@ public class PluginUtils { return simple; } + public static Map<String, String> computeAliases(PluginScanResult scanResult) { + Map<String, Set<String>> aliasCollisions = new HashMap<>(); + scanResult.forEach(pluginDesc -> { + aliasCollisions.computeIfAbsent(simpleName(pluginDesc), ignored -> new HashSet<>()).add(pluginDesc.className()); + aliasCollisions.computeIfAbsent(prunedName(pluginDesc), ignored -> new HashSet<>()).add(pluginDesc.className()); + }); + Map<String, String> aliases = new HashMap<>(); + for (Map.Entry<String, Set<String>> entry : aliasCollisions.entrySet()) { + String alias = entry.getKey(); + Set<String> classNames = entry.getValue(); + if (classNames.size() == 1) { + aliases.put(alias, classNames.stream().findAny().get()); + } else { + log.warn("Ignoring ambiguous alias '{}' since it refers to multiple distinct plugins {}", alias, classNames); + } + } + return aliases; + } + private static class DirectoryEntry { final DirectoryStream<Path> stream; final Iterator<Path> iterator; diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java index cc763ae8dbc..a43f0f226a2 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java @@ -45,6 +45,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.TreeSet; public class Plugins { @@ -55,6 +56,7 @@ public class Plugins { private static final Logger log = LoggerFactory.getLogger(Plugins.class); private final DelegatingClassLoader delegatingLoader; + private final PluginScanResult scanResult; public Plugins(Map<String, String> props) { this(props, Plugins.class.getClassLoader()); @@ -65,7 +67,7 @@ public class Plugins { String pluginPath = WorkerConfig.pluginPath(props); List<Path> pluginLocations = PluginUtils.pluginLocations(pluginPath); delegatingLoader = newDelegatingClassLoader(pluginLocations, parent); - delegatingLoader.initLoaders(); + scanResult = delegatingLoader.initLoaders(); } // VisibleForTesting @@ -194,32 +196,39 @@ public class Plugins { return delegatingLoader.connectorLoader(connectorClassOrAlias); } + @SuppressWarnings({"unchecked", "rawtypes"}) + public Set<PluginDesc<Connector>> connectors() { + Set<PluginDesc<Connector>> connectors = new TreeSet<>((Set) sinkConnectors()); + connectors.addAll((Set) sourceConnectors()); + return connectors; + } + public Set<PluginDesc<SinkConnector>> sinkConnectors() { - return delegatingLoader.sinkConnectors(); + return scanResult.sinkConnectors(); } public Set<PluginDesc<SourceConnector>> sourceConnectors() { - return delegatingLoader.sourceConnectors(); + return scanResult.sourceConnectors(); } public Set<PluginDesc<Converter>> converters() { - return delegatingLoader.converters(); + return scanResult.converters(); } public Set<PluginDesc<HeaderConverter>> headerConverters() { - return delegatingLoader.headerConverters(); + return scanResult.headerConverters(); } public Set<PluginDesc<Transformation<?>>> transformations() { - return delegatingLoader.transformations(); + return scanResult.transformations(); } public Set<PluginDesc<Predicate<?>>> predicates() { - return delegatingLoader.predicates(); + return scanResult.predicates(); } public Set<PluginDesc<ConnectorClientConfigOverridePolicy>> connectorClientConfigPolicies() { - return delegatingLoader.connectorClientConfigPolicies(); + return scanResult.connectorClientConfigPolicies(); } public Object newPlugin(String classOrAlias) throws ClassNotFoundException { @@ -242,7 +251,7 @@ public class Plugins { ); } catch (ClassNotFoundException e) { List<PluginDesc<? extends Connector>> matches = new ArrayList<>(); - Set<PluginDesc<Connector>> connectors = delegatingLoader.connectors(); + Set<PluginDesc<Connector>> connectors = connectors(); for (PluginDesc<? extends Connector> plugin : connectors) { Class<?> pluginClass = plugin.pluginClass(); String simpleName = pluginClass.getSimpleName(); @@ -300,7 +309,7 @@ public class Plugins { // Attempt to load first with the current classloader, and plugins as a fallback. // Note: we can't use config.getConfiguredInstance because Converter doesn't implement Configurable, and even if it did // we have to remove the property prefixes before calling config(...) and we still always want to call Converter.config. - klass = pluginClassFromConfig(config, classPropertyName, Converter.class, delegatingLoader.converters()); + klass = pluginClassFromConfig(config, classPropertyName, Converter.class, scanResult.converters()); break; case PLUGINS: // Attempt to load with the plugin class loader, which uses the current classloader as a fallback @@ -311,7 +320,7 @@ public class Plugins { throw new ConnectException( "Failed to find any class that implements Converter and which name matches " + converterClassOrAlias + ", available converters are: " - + pluginNames(delegatingLoader.converters()) + + pluginNames(scanResult.converters()) ); } break; @@ -383,7 +392,7 @@ public class Plugins { // Attempt to load first with the current classloader, and plugins as a fallback. // Note: we can't use config.getConfiguredInstance because we have to remove the property prefixes // before calling config(...) - klass = pluginClassFromConfig(config, classPropertyName, HeaderConverter.class, delegatingLoader.headerConverters()); + klass = pluginClassFromConfig(config, classPropertyName, HeaderConverter.class, scanResult.headerConverters()); break; case PLUGINS: // Attempt to load with the plugin class loader, which uses the current classloader as a fallback. @@ -400,7 +409,7 @@ public class Plugins { "Failed to find any class that implements HeaderConverter and which name matches " + converterClassOrAlias + ", available header converters are: " - + pluginNames(delegatingLoader.headerConverters()) + + pluginNames(scanResult.headerConverters()) ); } } @@ -432,7 +441,7 @@ public class Plugins { switch (classLoaderUsage) { case CURRENT_CLASSLOADER: // Attempt to load first with the current classloader, and plugins as a fallback. - klass = pluginClassFromConfig(config, classPropertyName, ConfigProvider.class, delegatingLoader.configProviders()); + klass = pluginClassFromConfig(config, classPropertyName, ConfigProvider.class, scanResult.configProviders()); break; case PLUGINS: // Attempt to load with the plugin class loader, which uses the current classloader as a fallback @@ -443,7 +452,7 @@ public class Plugins { throw new ConnectException( "Failed to find any class that implements ConfigProvider and which name matches " + configProviderClassOrAlias + ", available ConfigProviders are: " - + pluginNames(delegatingLoader.configProviders()) + + pluginNames(scanResult.configProviders()) ); } break; diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginDescTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginDescTest.java index 72a2493e7f0..affd7df268c 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginDescTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginDescTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.connect.sink.SinkConnector; import org.apache.kafka.connect.source.SourceConnector; import org.apache.kafka.connect.storage.Converter; import org.apache.kafka.connect.transforms.Transformation; +import org.apache.kafka.connect.transforms.predicates.Predicate; import org.junit.Before; import org.junit.Test; @@ -223,6 +224,20 @@ public class PluginDescTest { ); assertNewer(transformDescPluginPath, transformDescClasspath); + + PluginDesc<Predicate> predicateDescPluginPath = new PluginDesc<>( + Predicate.class, + regularVersion, + pluginLoader + ); + + PluginDesc<Predicate> predicateDescClasspath = new PluginDesc<>( + Predicate.class, + regularVersion, + systemLoader + ); + + assertNewer(predicateDescPluginPath, predicateDescClasspath); } private static <T> void assertPluginDesc( diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java index 19766989fcf..3e4c219b0ab 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java @@ -16,6 +16,17 @@ */ package org.apache.kafka.connect.runtime.isolation; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.sink.SinkConnector; +import org.apache.kafka.connect.source.SourceConnector; +import org.apache.kafka.connect.storage.Converter; +import org.apache.kafka.connect.storage.HeaderConverter; +import org.apache.kafka.connect.tools.MockSinkConnector; +import org.apache.kafka.connect.tools.MockSourceConnector; +import org.apache.kafka.connect.transforms.Transformation; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -28,7 +39,11 @@ import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.SortedSet; +import java.util.TreeSet; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -490,6 +505,173 @@ public class PluginUtilsTest { assertUrls(expectedUrls, PluginUtils.pluginUrls(pluginPath)); } + @Test + public void testNonCollidingAliases() { + SortedSet<PluginDesc<SinkConnector>> sinkConnectors = new TreeSet<>(); + sinkConnectors.add(new PluginDesc<>(MockSinkConnector.class, null, MockSinkConnector.class.getClassLoader())); + SortedSet<PluginDesc<SourceConnector>> sourceConnectors = new TreeSet<>(); + sourceConnectors.add(new PluginDesc<>(MockSourceConnector.class, null, MockSourceConnector.class.getClassLoader())); + SortedSet<PluginDesc<Converter>> converters = new TreeSet<>(); + converters.add(new PluginDesc<>(CollidingConverter.class, null, CollidingConverter.class.getClassLoader())); + PluginScanResult result = new PluginScanResult( + sinkConnectors, + sourceConnectors, + converters, + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet() + ); + Map<String, String> aliases = PluginUtils.computeAliases(result); + Map<String, String> actualAliases = PluginUtils.computeAliases(result); + Map<String, String> expectedAliases = new HashMap<>(); + expectedAliases.put("MockSinkConnector", MockSinkConnector.class.getName()); + expectedAliases.put("MockSink", MockSinkConnector.class.getName()); + expectedAliases.put("MockSourceConnector", MockSourceConnector.class.getName()); + expectedAliases.put("MockSource", MockSourceConnector.class.getName()); + expectedAliases.put("CollidingConverter", CollidingConverter.class.getName()); + expectedAliases.put("Colliding", CollidingConverter.class.getName()); + assertEquals(expectedAliases, actualAliases); + } + + @Test + public void testMultiVersionAlias() { + SortedSet<PluginDesc<SinkConnector>> sinkConnectors = new TreeSet<>(); + // distinct versions don't cause an alias collision (the class name is the same) + sinkConnectors.add(new PluginDesc<>(MockSinkConnector.class, null, MockSinkConnector.class.getClassLoader())); + sinkConnectors.add(new PluginDesc<>(MockSinkConnector.class, "1.0", MockSinkConnector.class.getClassLoader())); + assertEquals(2, sinkConnectors.size()); + PluginScanResult result = new PluginScanResult( + sinkConnectors, + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet() + ); + Map<String, String> actualAliases = PluginUtils.computeAliases(result); + Map<String, String> expectedAliases = new HashMap<>(); + expectedAliases.put("MockSinkConnector", MockSinkConnector.class.getName()); + expectedAliases.put("MockSink", MockSinkConnector.class.getName()); + assertEquals(expectedAliases, actualAliases); + } + + @Test + public void testCollidingPrunedAlias() { + SortedSet<PluginDesc<Converter>> converters = new TreeSet<>(); + converters.add(new PluginDesc<>(CollidingConverter.class, null, CollidingConverter.class.getClassLoader())); + SortedSet<PluginDesc<HeaderConverter>> headerConverters = new TreeSet<>(); + headerConverters.add(new PluginDesc<>(CollidingHeaderConverter.class, null, CollidingHeaderConverter.class.getClassLoader())); + PluginScanResult result = new PluginScanResult( + Collections.emptySortedSet(), + Collections.emptySortedSet(), + converters, + headerConverters, + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet() + ); + Map<String, String> actualAliases = PluginUtils.computeAliases(result); + Map<String, String> expectedAliases = new HashMap<>(); + expectedAliases.put("CollidingConverter", CollidingConverter.class.getName()); + expectedAliases.put("CollidingHeaderConverter", CollidingHeaderConverter.class.getName()); + assertEquals(expectedAliases, actualAliases); + } + + @SuppressWarnings("unchecked") + @Test + public void testCollidingSimpleAlias() { + SortedSet<PluginDesc<Converter>> converters = new TreeSet<>(); + converters.add(new PluginDesc<>(CollidingConverter.class, null, CollidingConverter.class.getClassLoader())); + SortedSet<PluginDesc<Transformation<?>>> transformations = new TreeSet<>(); + transformations.add(new PluginDesc<>((Class<? extends Transformation<?>>) (Class<?>) Colliding.class, null, Colliding.class.getClassLoader())); + PluginScanResult result = new PluginScanResult( + Collections.emptySortedSet(), + Collections.emptySortedSet(), + converters, + Collections.emptySortedSet(), + transformations, + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet() + ); + Map<String, String> actualAliases = PluginUtils.computeAliases(result); + Map<String, String> expectedAliases = new HashMap<>(); + expectedAliases.put("CollidingConverter", CollidingConverter.class.getName()); + assertEquals(expectedAliases, actualAliases); + } + + public static class CollidingConverter implements Converter { + @Override + public void configure(Map<String, ?> configs, boolean isKey) { + } + + @Override + public byte[] fromConnectData(String topic, Schema schema, Object value) { + return new byte[0]; + } + + @Override + public SchemaAndValue toConnectData(String topic, byte[] value) { + return null; + } + } + + public static class CollidingHeaderConverter implements HeaderConverter { + + @Override + public SchemaAndValue toConnectHeader(String topic, String headerKey, byte[] value) { + return null; + } + + @Override + public byte[] fromConnectHeader(String topic, String headerKey, Schema schema, Object value) { + return new byte[0]; + } + + @Override + public ConfigDef config() { + return null; + } + + @Override + public void close() throws IOException { + } + + @Override + public void configure(Map<String, ?> configs) { + } + } + + public static class Colliding<R extends ConnectRecord<R>> implements Transformation<R> { + + @Override + public void configure(Map<String, ?> configs) { + } + + @Override + public R apply(R record) { + return null; + } + + @Override + public ConfigDef config() { + return null; + } + + @Override + public void close() { + } + } + private void createBasicDirectoryLayout() throws IOException { Files.createDirectories(pluginPath.resolve("connectorA")); Files.createDirectories(pluginPath.resolve("connectorB/deps"));