This is an automated email from the ASF dual-hosted git repository.
gharris pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new af0054b5020 KAFKA-18182: KIP-891 Add VersionRange to Plugins and
DelegatingClassLoader APIs (#16984)
af0054b5020 is described below
commit af0054b50204c17be0cc84b8a3ef4575324bcb79
Author: snehashisp <[email protected]>
AuthorDate: Sat Dec 7 22:50:02 2024 +0530
KAFKA-18182: KIP-891 Add VersionRange to Plugins and DelegatingClassLoader
APIs (#16984)
Reviewers: Greg Harris <[email protected]>
---
build.gradle | 1 +
.../runtime/isolation/DelegatingClassLoader.java | 163 +++++++++--
.../connect/runtime/isolation/PluginDesc.java | 5 +
.../connect/runtime/isolation/PluginUtils.java | 22 +-
.../kafka/connect/runtime/isolation/Plugins.java | 298 ++++++++++++++-------
.../isolation/VersionedPluginLoadingException.java | 40 +++
.../runtime/isolation/SynchronizationTest.java | 5 +-
7 files changed, 416 insertions(+), 118 deletions(-)
diff --git a/build.gradle b/build.gradle
index 0dadfa4beac..5aeec1b5033 100644
--- a/build.gradle
+++ b/build.gradle
@@ -3518,6 +3518,7 @@ project(':connect:runtime') {
implementation libs.slf4jApi
implementation libs.reload4j
+ implementation libs.slf4jReload4j
implementation libs.jose4j // for SASL/OAUTHBEARER JWT
validation
implementation libs.jacksonAnnotations
implementation libs.jacksonJaxrsJsonProvider
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
index fdbadef7b69..2ca7979a526 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
@@ -16,17 +16,23 @@
*/
package org.apache.kafka.connect.runtime.isolation;
+import org.apache.maven.artifact.versioning.ArtifactVersion;
+import org.apache.maven.artifact.versioning.DefaultArtifactVersion;
+import org.apache.maven.artifact.versioning.VersionRange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URL;
import java.net.URLClassLoader;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.stream.Collectors;
/**
* A custom classloader dedicated to loading Connect plugin classes in
classloading isolation.
@@ -69,36 +75,108 @@ public class DelegatingClassLoader extends URLClassLoader {
/**
* Retrieve the PluginClassLoader associated with a plugin class
+ *
* @param name The fully qualified class name of the plugin
* @return the PluginClassLoader that should be used to load this, or null
if the plugin is not isolated.
*/
// VisibleForTesting
- PluginClassLoader pluginClassLoader(String name) {
+ PluginClassLoader pluginClassLoader(String name, VersionRange range) {
if (!PluginUtils.shouldLoadInIsolation(name)) {
return null;
}
+
SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(name);
if (inner == null) {
return null;
}
- ClassLoader pluginLoader = inner.get(inner.lastKey());
+
+
+ ClassLoader pluginLoader = findPluginLoader(inner, name, range);
return pluginLoader instanceof PluginClassLoader
- ? (PluginClassLoader) pluginLoader
- : null;
+ ? (PluginClassLoader) pluginLoader
+ : null;
}
- ClassLoader connectorLoader(String connectorClassOrAlias) {
- String fullName = aliases.getOrDefault(connectorClassOrAlias,
connectorClassOrAlias);
- ClassLoader classLoader = pluginClassLoader(fullName);
- if (classLoader == null) classLoader = this;
+ PluginClassLoader pluginClassLoader(String name) {
+ return pluginClassLoader(name, null);
+ }
+
+ ClassLoader loader(String classOrAlias, VersionRange range) {
+ String fullName = aliases.getOrDefault(classOrAlias, classOrAlias);
+ ClassLoader classLoader = pluginClassLoader(fullName, range);
+ if (classLoader == null) {
+ classLoader = this;
+ }
log.debug(
- "Getting plugin class loader: '{}' for connector: {}",
- classLoader,
- connectorClassOrAlias
+ "Got plugin class loader: '{}' for connector: {}",
+ classLoader,
+ classOrAlias
);
return classLoader;
}
+ ClassLoader loader(String classOrAlias) {
+ return loader(classOrAlias, null);
+ }
+
+ ClassLoader connectorLoader(String connectorClassOrAlias) {
+ return loader(connectorClassOrAlias);
+ }
+
+ String resolveFullClassName(String classOrAlias) {
+ return aliases.getOrDefault(classOrAlias, classOrAlias);
+ }
+
+ String latestVersion(String classOrAlias) {
+ if (classOrAlias == null) {
+ return null;
+ }
+ String fullName = aliases.getOrDefault(classOrAlias, classOrAlias);
+ SortedMap<PluginDesc<?>, ClassLoader> inner =
pluginLoaders.get(fullName);
+ if (inner == null) {
+ return null;
+ }
+ return inner.lastKey().version();
+ }
+
+ private ClassLoader findPluginLoader(
+ SortedMap<PluginDesc<?>, ClassLoader> loaders,
+ String pluginName,
+ VersionRange range
+ ) {
+
+ if (range != null) {
+
+ if (null != range.getRecommendedVersion()) {
+ throw new VersionedPluginLoadingException(String.format("A
soft version range is not supported for plugin loading, "
+ + "this is an internal error as connect should
automatically convert soft ranges to hard ranges. "
+ + "Provided soft version: %s ", range));
+ }
+
+ ArtifactVersion version = null;
+ ClassLoader loader = null;
+ for (Map.Entry<PluginDesc<?>, ClassLoader> entry :
loaders.entrySet()) {
+ // the entries should be in sorted order of versions so this
should end up picking the latest version which matches the range
+ if (range.containsVersion(entry.getKey().encodedVersion())) {
+ loader = entry.getValue();
+ }
+ }
+
+ if (loader == null) {
+ List<String> availableVersions =
loaders.keySet().stream().map(PluginDesc::version).collect(Collectors.toList());
+ throw new VersionedPluginLoadingException(String.format(
+ "Plugin %s not found that matches the version range
%s, available versions: %s",
+ pluginName,
+ range,
+ availableVersions
+ ), availableVersions);
+ }
+ return loader;
+ }
+
+ return loaders.get(loaders.lastKey());
+ }
+
public void installDiscoveredPlugins(PluginScanResult scanResult) {
pluginLoaders.putAll(computePluginLoaders(scanResult));
for (String pluginClassName : pluginLoaders.keySet()) {
@@ -112,21 +190,72 @@ public class DelegatingClassLoader extends URLClassLoader
{
@Override
protected Class<?> loadClass(String name, boolean resolve) throws
ClassNotFoundException {
+ return loadVersionedPluginClass(name, null, resolve);
+ }
+
+ protected Class<?> loadVersionedPluginClass(
+ String name,
+ VersionRange range,
+ boolean resolve
+ ) throws VersionedPluginLoadingException, ClassNotFoundException {
+
String fullName = aliases.getOrDefault(name, name);
- PluginClassLoader pluginLoader = pluginClassLoader(fullName);
+ PluginClassLoader pluginLoader = pluginClassLoader(fullName, range);
+ Class<?> plugin;
if (pluginLoader != null) {
- log.trace("Retrieving loaded class '{}' from '{}'", fullName,
pluginLoader);
- return pluginLoader.loadClass(fullName, resolve);
+ log.trace("Retrieving loaded class '{}' from '{}'", name,
pluginLoader);
+ plugin = pluginLoader.loadClass(fullName, resolve);
+ } else {
+ plugin = super.loadClass(fullName, resolve);
+ if (range == null) {
+ return plugin;
+ }
+ verifyClasspathVersionedPlugin(name, plugin, range);
+ }
+ return plugin;
+ }
+
+ private void verifyClasspathVersionedPlugin(String name, Class<?> plugin,
VersionRange range) throws VersionedPluginLoadingException {
+ String pluginVersion;
+ SortedMap<PluginDesc<?>, ClassLoader> scannedPlugin =
pluginLoaders.get(name);
+
+ if (scannedPlugin == null) {
+ throw new VersionedPluginLoadingException(String.format(
+ "Plugin %s is not part of Connect's plugin loading
mechanism (ClassPath or Plugin Path)",
+ name
+ ));
}
- return super.loadClass(fullName, resolve);
+ List<PluginDesc<?>> classpathPlugins = scannedPlugin.keySet().stream()
+ .filter(pluginDesc ->
pluginDesc.location().equals("classpath"))
+ .collect(Collectors.toList());
+
+ if (classpathPlugins.size() > 1) {
+ throw new VersionedPluginLoadingException(String.format(
+ "Plugin %s has multiple versions specified in class path, "
+ + "only one version is allowed in class path for
loading a plugin with version range",
+ name
+ ));
+ } else if (classpathPlugins.isEmpty()) {
+ throw new VersionedPluginLoadingException("Invalid plugin found in
classpath");
+ } else {
+ pluginVersion = classpathPlugins.get(0).version();
+ if (!range.containsVersion(new
DefaultArtifactVersion(pluginVersion))) {
+ throw new VersionedPluginLoadingException(String.format(
+ "Plugin %s has version %s which does not match the
required version range %s",
+ name,
+ pluginVersion,
+ range
+ ), Collections.singletonList(pluginVersion));
+ }
+ }
}
private static Map<String, SortedMap<PluginDesc<?>, ClassLoader>>
computePluginLoaders(PluginScanResult plugins) {
Map<String, SortedMap<PluginDesc<?>, ClassLoader>> pluginLoaders = new
HashMap<>();
plugins.forEach(pluginDesc ->
- pluginLoaders.computeIfAbsent(pluginDesc.className(), k -> new
TreeMap<>())
- .put(pluginDesc, pluginDesc.loader()));
+ pluginLoaders.computeIfAbsent(pluginDesc.className(), k -> new
TreeMap<>())
+ .put(pluginDesc, pluginDesc.loader()));
return pluginLoaders;
}
}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java
index a58aef7ceca..ff575e8edf8 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.connect.runtime.isolation;
+import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.maven.artifact.versioning.DefaultArtifactVersion;
@@ -59,6 +60,10 @@ public class PluginDesc<T> implements
Comparable<PluginDesc<?>> {
", location='" + location + '\'' +
'}';
}
+ @JsonIgnore
+ DefaultArtifactVersion encodedVersion() {
+ return encodedVersion;
+ }
public Class<? extends T> pluginClass() {
return klass;
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
index 932e87395f7..56567b3bee7 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
@@ -16,6 +16,8 @@
*/
package org.apache.kafka.connect.runtime.isolation;
+import
org.apache.maven.artifact.versioning.InvalidVersionSpecificationException;
+import org.apache.maven.artifact.versioning.VersionRange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -469,7 +471,7 @@ public class PluginUtils {
}
return distinctUrls(urls);
}
-
+
private static Collection<URL> forClassLoader(ClassLoader classLoader) {
final Collection<URL> result = new ArrayList<>();
while (classLoader != null) {
@@ -483,7 +485,7 @@ public class PluginUtils {
}
return distinctUrls(result);
}
-
+
private static Collection<URL> distinctUrls(Collection<URL> urls) {
Map<String, URL> distinct = new HashMap<>(urls.size());
for (URL url : urls) {
@@ -491,4 +493,20 @@ public class PluginUtils {
}
return distinct.values();
}
+ public static VersionRange connectorVersionRequirement(String version)
throws InvalidVersionSpecificationException {
+ if (version == null || version.equals("latest")) {
+ return null;
+ }
+ version = version.trim();
+
+ // check first if the given version is valid
+ VersionRange.createFromVersionSpec(version);
+
+ // now if the version is not enclosed we consider it as a hard
requirement and enclose it in []
+ if (!version.startsWith("[") && !version.startsWith("(")) {
+ version = "[" + version + "]";
+ }
+ return VersionRange.createFromVersionSpec(version);
+ }
+
}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
index 816f870157e..28c1f80c618 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
@@ -35,6 +35,8 @@ import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.predicates.Predicate;
+import
org.apache.maven.artifact.versioning.InvalidVersionSpecificationException;
+import org.apache.maven.artifact.versioning.VersionRange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,6 +50,7 @@ import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
+import java.util.function.Function;
import java.util.stream.Collectors;
public class Plugins {
@@ -168,11 +171,20 @@ public class Plugins {
String classOrAlias,
Class<U> pluginClass
) throws ClassNotFoundException {
- Class<?> klass = loader.loadClass(classOrAlias, false);
+ return pluginClass(loader, classOrAlias, pluginClass, null);
+ }
+
+ @SuppressWarnings("unchecked")
+ protected static <U> Class<? extends U> pluginClass(
+ DelegatingClassLoader loader,
+ String classOrAlias,
+ Class<U> pluginClass,
+ VersionRange range
+ ) throws VersionedPluginLoadingException, ClassNotFoundException {
+ Class<?> klass = loader.loadVersionedPluginClass(classOrAlias, range,
false);
if (pluginClass.isAssignableFrom(klass)) {
return (Class<? extends U>) klass;
}
-
throw new ClassNotFoundException(
"Requested class: "
+ classOrAlias
@@ -184,6 +196,10 @@ public class Plugins {
return pluginClass(delegatingLoader, classOrAlias, Object.class);
}
+ public Class<?> pluginClass(String classOrAlias, VersionRange range)
throws VersionedPluginLoadingException, ClassNotFoundException {
+ return pluginClass(delegatingLoader, classOrAlias, Object.class,
range);
+ }
+
public static ClassLoader compareAndSwapLoaders(ClassLoader loader) {
ClassLoader current = Thread.currentThread().getContextClassLoader();
if (!current.equals(loader)) {
@@ -240,14 +256,37 @@ public class Plugins {
};
}
+ public Function<ClassLoader, LoaderSwap> safeLoaderSwapper() {
+ return loader -> {
+ if (!(loader instanceof PluginClassLoader)) {
+ loader = delegatingLoader;
+ }
+ return withClassLoader(loader);
+ };
+ }
+
+ public String latestVersion(String classOrAlias) {
+ return delegatingLoader.latestVersion(classOrAlias);
+ }
+
public DelegatingClassLoader delegatingLoader() {
return delegatingLoader;
}
+ // kept for compatibility
public ClassLoader connectorLoader(String connectorClassOrAlias) {
- return delegatingLoader.connectorLoader(connectorClassOrAlias);
+ return delegatingLoader.loader(connectorClassOrAlias);
+ }
+
+ public ClassLoader pluginLoader(String classOrAlias, VersionRange range)
throws ClassNotFoundException, VersionedPluginLoadingException {
+ return delegatingLoader.loader(classOrAlias, range);
}
+ public ClassLoader pluginLoader(String classOrAlias) {
+ return delegatingLoader.loader(classOrAlias);
+ }
+
+
@SuppressWarnings({"unchecked", "rawtypes"})
public Set<PluginDesc<Connector>> connectors() {
Set<PluginDesc<Connector>> connectors = new TreeSet<>((Set)
sinkConnectors());
@@ -259,48 +298,89 @@ public class Plugins {
return scanResult.sinkConnectors();
}
+ public Set<PluginDesc<SinkConnector>> sinkConnectors(String
connectorClassOrAlias) {
+ return pluginsOfClass(connectorClassOrAlias,
scanResult.sinkConnectors());
+ }
+
public Set<PluginDesc<SourceConnector>> sourceConnectors() {
return scanResult.sourceConnectors();
}
+ public Set<PluginDesc<SourceConnector>> sourceConnectors(String
connectorClassOrAlias) {
+ return pluginsOfClass(connectorClassOrAlias,
scanResult.sourceConnectors());
+ }
+
public Set<PluginDesc<Converter>> converters() {
return scanResult.converters();
}
+ Set<PluginDesc<Converter>> converters(String converterClassOrAlias) {
+ return pluginsOfClass(converterClassOrAlias, scanResult.converters());
+ }
+
public Set<PluginDesc<HeaderConverter>> headerConverters() {
return scanResult.headerConverters();
}
+ Set<PluginDesc<HeaderConverter>> headerConverters(String
headerConverterClassOrAlias) {
+ return pluginsOfClass(headerConverterClassOrAlias,
scanResult.headerConverters());
+ }
+
public Set<PluginDesc<Transformation<?>>> transformations() {
return scanResult.transformations();
}
+ Set<PluginDesc<Transformation<?>>> transformations(String
transformationClassOrAlias) {
+ return pluginsOfClass(transformationClassOrAlias,
scanResult.transformations());
+ }
+
public Set<PluginDesc<Predicate<?>>> predicates() {
return scanResult.predicates();
}
+ Set<PluginDesc<Predicate<?>>> predicates(String predicateClassOrAlias) {
+ return pluginsOfClass(predicateClassOrAlias, scanResult.predicates());
+ }
+
public Set<PluginDesc<ConnectorClientConfigOverridePolicy>>
connectorClientConfigPolicies() {
return scanResult.connectorClientConfigPolicies();
}
+ private <T> Set<PluginDesc<T>> pluginsOfClass(String classNameOrAlias,
Set<PluginDesc<T>> allPluginsOfType) {
+ String className =
delegatingLoader.resolveFullClassName(classNameOrAlias);
+ Set<PluginDesc<T>> plugins = new TreeSet<>();
+ for (PluginDesc<T> desc : allPluginsOfType) {
+ if (desc.className().equals(className)) {
+ plugins.add(desc);
+ }
+ }
+ return plugins;
+ }
+
public Object newPlugin(String classOrAlias) throws ClassNotFoundException
{
Class<?> klass = pluginClass(delegatingLoader, classOrAlias,
Object.class);
return newPlugin(klass);
}
+ public Object newPlugin(String classOrAlias, VersionRange range) throws
VersionedPluginLoadingException, ClassNotFoundException {
+ Class<?> klass = pluginClass(delegatingLoader, classOrAlias,
Object.class, range);
+ return newPlugin(klass);
+ }
+
public Connector newConnector(String connectorClassOrAlias) {
Class<? extends Connector> klass =
connectorClass(connectorClassOrAlias);
return newPlugin(klass);
}
- public Class<? extends Connector> connectorClass(String
connectorClassOrAlias) {
+ public Connector newConnector(String connectorClassOrAlias, VersionRange
range) throws VersionedPluginLoadingException {
+ Class<? extends Connector> klass =
connectorClass(connectorClassOrAlias, range);
+ return newPlugin(klass);
+ }
+
+ public Class<? extends Connector> connectorClass(String
connectorClassOrAlias, VersionRange range) throws
VersionedPluginLoadingException {
Class<? extends Connector> klass;
try {
- klass = pluginClass(
- delegatingLoader,
- connectorClassOrAlias,
- Connector.class
- );
+ klass = pluginClass(delegatingLoader, connectorClassOrAlias,
Connector.class, range);
} catch (ClassNotFoundException e) {
List<PluginDesc<? extends Connector>> matches = new ArrayList<>();
Set<PluginDesc<Connector>> connectors = connectors();
@@ -336,6 +416,10 @@ public class Plugins {
return klass;
}
+ public Class<? extends Connector> connectorClass(String
connectorClassOrAlias) {
+ return connectorClass(connectorClassOrAlias, null);
+ }
+
public Task newTask(Class<? extends Task> taskClass) {
return newPlugin(taskClass);
}
@@ -350,54 +434,49 @@ public class Plugins {
* @throws ConnectException if the {@link Converter} implementation class
could not be found
*/
public Converter newConverter(AbstractConfig config, String
classPropertyName, ClassLoaderUsage classLoaderUsage) {
+ return newConverter(config, classPropertyName, null, classLoaderUsage);
+ }
+
+ /**
+ * Used to get a versioned converter. If the version is specified, it will
always use the plugins classloader.
+ *
+ * @param config the configuration containing the {@link
Converter}'s configuration; may not be null
+ * @param classPropertyName the name of the property that contains the
name of the {@link Converter} class; may not be null
+ * @param versionPropertyName the name of the property that contains the
version of the {@link Converter} class; may not be null
+ * @return the instantiated and configured {@link Converter}; null if the
configuration did not define the specified property
+ * @throws ConnectException if the {@link Converter} implementation class
could not be found,
+ * @throws VersionedPluginLoadingException if the version requested is not
found
+ */
+ public Converter newConverter(AbstractConfig config, String
classPropertyName, String versionPropertyName) {
+ ClassLoaderUsage classLoader = config.getString(versionPropertyName)
== null ? ClassLoaderUsage.CURRENT_CLASSLOADER : ClassLoaderUsage.PLUGINS;
+ return newConverter(config, classPropertyName, versionPropertyName,
classLoader);
+ }
+
+ private Converter newConverter(AbstractConfig config, String
classPropertyName, String versionPropertyName, ClassLoaderUsage
classLoaderUsage) {
if (!config.originals().containsKey(classPropertyName)) {
// This configuration does not define the converter via the
specified property name
return null;
}
-
- Class<? extends Converter> klass = null;
- switch (classLoaderUsage) {
- case CURRENT_CLASSLOADER:
- // Attempt to load first with the current classloader, and
plugins as a fallback.
- // Note: we can't use config.getConfiguredInstance because
Converter doesn't implement Configurable, and even if it did
- // we have to remove the property prefixes before calling
config(...) and we still always want to call Converter.config.
- klass = pluginClassFromConfig(config, classPropertyName,
Converter.class, scanResult.converters());
- break;
- case PLUGINS:
- // Attempt to load with the plugin class loader, which uses
the current classloader as a fallback
- String converterClassOrAlias =
config.getClass(classPropertyName).getName();
- try {
- klass = pluginClass(delegatingLoader,
converterClassOrAlias, Converter.class);
- } catch (ClassNotFoundException e) {
- throw new ConnectException(
- "Failed to find any class that implements
Converter and which name matches "
- + converterClassOrAlias + ", available converters
are: "
- + pluginNames(scanResult.converters())
- );
- }
- break;
- }
- if (klass == null) {
- throw new ConnectException("Unable to initialize the Converter
specified in '" + classPropertyName + "'");
- }
-
// Determine whether this is a key or value converter based upon the
supplied property name ...
final boolean isKeyConverter =
WorkerConfig.KEY_CONVERTER_CLASS_CONFIG.equals(classPropertyName);
// Configure the Converter using only the old configuration mechanism
...
String configPrefix = classPropertyName + ".";
Map<String, Object> converterConfig =
config.originalsWithPrefix(configPrefix);
+
log.debug("Configuring the {} converter with configuration keys:{}{}",
- isKeyConverter ? "key" : "value", System.lineSeparator(),
converterConfig.keySet());
+ isKeyConverter ? "key" : "value", System.lineSeparator(),
converterConfig.keySet());
- Converter plugin;
- try (LoaderSwap loaderSwap = withClassLoader(klass.getClassLoader())) {
- plugin = newPlugin(klass);
+ Converter plugin = newVersionedPlugin(config, classPropertyName,
versionPropertyName,
+ Converter.class, classLoaderUsage, scanResult.converters());
+ try (LoaderSwap loaderSwap =
safeLoaderSwapper().apply(plugin.getClass().getClassLoader())) {
plugin.configure(converterConfig, isKeyConverter);
}
return plugin;
}
+
+
/**
* Load an internal converter, used by the worker for (de)serializing data
in internal topics.
*
@@ -427,99 +506,124 @@ public class Plugins {
* If the given configuration defines a {@link HeaderConverter} using the
named configuration property, return a new configured
* instance.
*
- * @param config the configuration containing the {@link
Converter}'s configuration; may not be null
- * @param classPropertyName the name of the property that contains the
name of the {@link Converter} class; may not be null
- * @param classLoaderUsage which classloader should be used
+ * @param config the configuration containing the {@link
HeaderConverter}'s configuration; may not be null
+ * @param classPropertyName the name of the property that contains the
name of the {@link HeaderConverter} class; may not be null
+ * @param classLoaderUsage the name of the property that contains the
version of the {@link HeaderConverter} class; may not be null
* @return the instantiated and configured {@link HeaderConverter}; null
if the configuration did not define the specified property
* @throws ConnectException if the {@link HeaderConverter} implementation
class could not be found
*/
public HeaderConverter newHeaderConverter(AbstractConfig config, String
classPropertyName, ClassLoaderUsage classLoaderUsage) {
- Class<? extends HeaderConverter> klass = null;
- switch (classLoaderUsage) {
- case CURRENT_CLASSLOADER:
- if (!config.originals().containsKey(classPropertyName)) {
- // This connector configuration does not define the header
converter via the specified property name
- return null;
- }
- // Attempt to load first with the current classloader, and
plugins as a fallback.
- // Note: we can't use config.getConfiguredInstance because we
have to remove the property prefixes
- // before calling config(...)
- klass = pluginClassFromConfig(config, classPropertyName,
HeaderConverter.class, scanResult.headerConverters());
- break;
- case PLUGINS:
- // Attempt to load with the plugin class loader, which uses
the current classloader as a fallback.
- // Note that there will always be at least a default header
converter for the worker
- String converterClassOrAlias =
config.getClass(classPropertyName).getName();
- try {
- klass = pluginClass(
- delegatingLoader,
- converterClassOrAlias,
- HeaderConverter.class
- );
- } catch (ClassNotFoundException e) {
- throw new ConnectException(
- "Failed to find any class that implements
HeaderConverter and which name matches "
- + converterClassOrAlias
- + ", available header converters are: "
- +
pluginNames(scanResult.headerConverters())
- );
- }
- }
- if (klass == null) {
- throw new ConnectException("Unable to initialize the
HeaderConverter specified in '" + classPropertyName + "'");
+ return newHeaderConverter(config, classPropertyName, null,
classLoaderUsage);
+ }
+
+ /**
+ * If the given configuration defines a {@link HeaderConverter} using the
named configuration property, return a new configured
+ * instance. If the version is specified, it will always use the plugins
classloader.
+ *
+ * @param config the configuration containing the {@link
HeaderConverter}'s configuration; may not be null
+ * @param classPropertyName the name of the property that contains the
name of the {@link HeaderConverter} class; may not be null
+ * @param versionPropertyName the config for the version for the header
converter
+ * @return the instantiated and configured {@link HeaderConverter}; null
if the configuration did not define the specified property
+ * @throws ConnectException if the {@link HeaderConverter} implementation
class could not be found
+ */
+ public HeaderConverter newHeaderConverter(AbstractConfig config, String
classPropertyName, String versionPropertyName) {
+ ClassLoaderUsage classLoader = config.getString(versionPropertyName)
== null ? ClassLoaderUsage.CURRENT_CLASSLOADER : ClassLoaderUsage.PLUGINS;
+ return newHeaderConverter(config, classPropertyName,
versionPropertyName, classLoader);
+ }
+
+ private HeaderConverter newHeaderConverter(AbstractConfig config, String
classPropertyName, String versionPropertyName, ClassLoaderUsage
classLoaderUsage) {
+ if (!config.originals().containsKey(classPropertyName) &&
classLoaderUsage == ClassLoaderUsage.CURRENT_CLASSLOADER) {
+ // This configuration does not define the Header Converter via the
specified property name
+ return null;
}
+ HeaderConverter plugin = newVersionedPlugin(config, classPropertyName,
versionPropertyName,
+ HeaderConverter.class, classLoaderUsage,
scanResult.headerConverters());
String configPrefix = classPropertyName + ".";
Map<String, Object> converterConfig =
config.originalsWithPrefix(configPrefix);
converterConfig.put(ConverterConfig.TYPE_CONFIG,
ConverterType.HEADER.getName());
log.debug("Configuring the header converter with configuration
keys:{}{}", System.lineSeparator(), converterConfig.keySet());
- HeaderConverter plugin;
- try (LoaderSwap loaderSwap = withClassLoader(klass.getClassLoader())) {
- plugin = newPlugin(klass);
+ try (LoaderSwap loaderSwap =
safeLoaderSwapper().apply(plugin.getClass().getClassLoader())) {
plugin.configure(converterConfig);
}
return plugin;
}
- public ConfigProvider newConfigProvider(AbstractConfig config, String
providerPrefix, ClassLoaderUsage classLoaderUsage) {
- String classPropertyName = providerPrefix + ".class";
- Map<String, String> originalConfig = config.originalsStrings();
- if (!originalConfig.containsKey(classPropertyName)) {
- // This configuration does not define the config provider via the
specified property name
- return null;
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ private <U> U newVersionedPlugin(
+ AbstractConfig config,
+ String classPropertyName,
+ String versionPropertyName,
+ Class basePluginClass,
+ ClassLoaderUsage classLoaderUsage,
+ SortedSet<PluginDesc<U>> availablePlugins
+ ) {
+
+ String version = versionPropertyName == null ? null :
config.getString(versionPropertyName);
+ VersionRange range = null;
+ if (version != null) {
+ try {
+ range = PluginUtils.connectorVersionRequirement(version);
+ } catch (InvalidVersionSpecificationException e) {
+ throw new ConnectException(String.format("Invalid version
range for %s: %s", classPropertyName, version), e);
+ }
}
- Class<? extends ConfigProvider> klass = null;
+
+ assert range == null || classLoaderUsage == ClassLoaderUsage.PLUGINS;
+
+ Class<? extends U> klass = null;
+ String basePluginClassName = basePluginClass.getSimpleName();
switch (classLoaderUsage) {
case CURRENT_CLASSLOADER:
// Attempt to load first with the current classloader, and
plugins as a fallback.
- klass = pluginClassFromConfig(config, classPropertyName,
ConfigProvider.class, scanResult.configProviders());
+ klass = pluginClassFromConfig(config, classPropertyName,
basePluginClass, availablePlugins);
break;
case PLUGINS:
// Attempt to load with the plugin class loader, which uses
the current classloader as a fallback
- String configProviderClassOrAlias =
originalConfig.get(classPropertyName);
+
+ // if the config specifies the class name, use it, otherwise
use the default which we can get from config.getClass
+ String classOrAlias =
config.originalsStrings().get(classPropertyName);
+ if (classOrAlias == null) {
+ classOrAlias =
config.getClass(classPropertyName).getName();
+ }
try {
- klass = pluginClass(delegatingLoader,
configProviderClassOrAlias, ConfigProvider.class);
+ klass = pluginClass(delegatingLoader, classOrAlias,
basePluginClass, range);
} catch (ClassNotFoundException e) {
throw new ConnectException(
- "Failed to find any class that implements
ConfigProvider and which name matches "
- + configProviderClassOrAlias + ",
available ConfigProviders are: "
- + pluginNames(scanResult.configProviders())
+ "Failed to find any class that implements " +
basePluginClassName + " and which name matches "
+ + classOrAlias + ", available plugins are:
"
+ + pluginNames(availablePlugins)
);
}
break;
}
if (klass == null) {
- throw new ConnectException("Unable to initialize the
ConfigProvider specified in '" + classPropertyName + "'");
+ throw new ConnectException("Unable to initialize the " +
basePluginClassName
+ + " specified in " + classPropertyName);
}
+ U plugin;
+ try (LoaderSwap loaderSwap = withClassLoader(klass.getClassLoader())) {
+ plugin = newPlugin(klass);
+ }
+ return plugin;
+ }
+
+ public ConfigProvider newConfigProvider(AbstractConfig config, String
providerPrefix, ClassLoaderUsage classLoaderUsage) {
+ String classPropertyName = providerPrefix + ".class";
+ Map<String, String> originalConfig = config.originalsStrings();
+ if (!originalConfig.containsKey(classPropertyName)) {
+ // This configuration does not define the config provider via the
specified property name
+ return null;
+ }
+
+ ConfigProvider plugin = newVersionedPlugin(config, classPropertyName,
null, ConfigProvider.class, classLoaderUsage, scanResult.configProviders());
+
// Configure the ConfigProvider
String configPrefix = providerPrefix + ".param.";
Map<String, Object> configProviderConfig =
config.originalsWithPrefix(configPrefix);
-
- ConfigProvider plugin;
- try (LoaderSwap loaderSwap = withClassLoader(klass.getClassLoader())) {
- plugin = newPlugin(klass);
+ try (LoaderSwap loaderSwap =
safeLoaderSwapper().apply(plugin.getClass().getClassLoader())) {
plugin.configure(configProviderConfig);
}
return plugin;
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/VersionedPluginLoadingException.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/VersionedPluginLoadingException.java
new file mode 100644
index 00000000000..8fb3042549b
--- /dev/null
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/VersionedPluginLoadingException.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime.isolation;
+
+import org.apache.kafka.common.config.ConfigException;
+
+import java.util.List;
+
+public class VersionedPluginLoadingException extends ConfigException {
+
+ private List<String> availableVersions = null;
+
+ public VersionedPluginLoadingException(String message) {
+ super(message);
+ }
+
+ public VersionedPluginLoadingException(String message, List<String>
availableVersions) {
+ super(message);
+ this.availableVersions = availableVersions;
+ }
+
+ public List<String> availableVersions() {
+ return availableVersions;
+ }
+}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java
index 70b875b21b8..3c990917400 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.storage.Converter;
+import org.apache.maven.artifact.versioning.VersionRange;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -190,10 +191,10 @@ public class SynchronizationTest {
}
@Override
- public PluginClassLoader pluginClassLoader(String name) {
+ public PluginClassLoader pluginClassLoader(String name, VersionRange
range) {
dclBreakpoint.await(name);
dclBreakpoint.await(name);
- return super.pluginClassLoader(name);
+ return super.pluginClassLoader(name, range);
}
}