This is an automated email from the ASF dual-hosted git repository.
gharris pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ad3369859ca KAFKA-18419: KIP-891 Connect Multiversion Support
(Transformation and Predicate Changes) (#17742)
ad3369859ca is described below
commit ad3369859ca65e41be01278e529bc7b9165c9d09
Author: snehashisp <[email protected]>
AuthorDate: Tue Jan 7 01:48:45 2025 +0530
KAFKA-18419: KIP-891 Connect Multiversion Support (Transformation and
Predicate Changes) (#17742)
Reviewers: Greg Harris <[email protected]>
---
.../kafka/connect/runtime/AbstractHerder.java | 4 +-
.../kafka/connect/runtime/CachedConnectors.java | 11 +-
.../kafka/connect/runtime/ConnectorConfig.java | 156 ++++++++++++++-------
.../runtime/isolation/DelegatingClassLoader.java | 40 +++---
.../kafka/connect/runtime/isolation/Plugins.java | 17 ++-
.../runtime/isolation/PluginsRecommenders.java | 67 +++++++++
.../kafka/connect/runtime/AbstractHerderTest.java | 21 ++-
.../kafka/connect/runtime/ConnectorConfigTest.java | 12 +-
.../connect/runtime/TransformationConfigTest.java | 39 ++++--
9 files changed, 255 insertions(+), 112 deletions(-)
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index 525fc30b1d6..5984a32d3cd 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -859,9 +859,11 @@ public abstract class AbstractHerder implements Herder,
TaskStatus.Listener, Con
addNullValuedErrors(connectorProps, validatedConnectorConfig);
- ConfigInfos connectorConfigInfo =
validateConnectorPluginSpecifiedConfigs(connectorProps,
validatedConnectorConfig, enrichedConfigDef, connector, reportStage);
+ // the order of operations here is important, converter
validations can add error messages to the connector config
+ // which are collected and converted to ConfigInfos in
validateConnectorPluginSpecifiedConfigs
ConfigInfos converterConfigInfo =
validateAllConverterConfigs(connectorProps, validatedConnectorConfig,
connectorLoader, reportStage);
ConfigInfos clientOverrideInfo =
validateClientOverrides(connectorProps, connectorType, connector.getClass(),
reportStage, doLog);
+ ConfigInfos connectorConfigInfo =
validateConnectorPluginSpecifiedConfigs(connectorProps,
validatedConnectorConfig, enrichedConfigDef, connector, reportStage);
return mergeConfigInfos(connType,
connectorConfigInfo,
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/CachedConnectors.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/CachedConnectors.java
index 59c4281fff8..ebfa3522f90 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/CachedConnectors.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/CachedConnectors.java
@@ -18,6 +18,7 @@
package org.apache.kafka.connect.runtime;
import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import
org.apache.kafka.connect.runtime.isolation.VersionedPluginLoadingException;
@@ -31,8 +32,8 @@ public class CachedConnectors {
private static final String LATEST_VERSION = "latest";
private final Map<String, Map<String, Connector>> connectors;
- private final Map<String, Exception> invalidConnectors;
- private final Map<String, Map<String, Exception>> invalidVersions;
+ private final Map<String, Throwable> invalidConnectors;
+ private final Map<String, Map<String, VersionedPluginLoadingException>>
invalidVersions;
private final Plugins plugins;
public CachedConnectors(Plugins plugins) {
@@ -42,14 +43,14 @@ public class CachedConnectors {
this.invalidVersions = new ConcurrentHashMap<>();
}
- private void validate(String connectorName, VersionRange range) throws
Exception {
+ private void validate(String connectorName, VersionRange range) throws
ConnectException, VersionedPluginLoadingException {
if (invalidConnectors.containsKey(connectorName)) {
- throw new Exception(invalidConnectors.get(connectorName));
+ throw new ConnectException(invalidConnectors.get(connectorName));
}
String version = range == null ? LATEST_VERSION : range.toString();
if (invalidVersions.containsKey(connectorName) &&
invalidVersions.get(connectorName).containsKey(version)) {
- throw new
Exception(invalidVersions.get(connectorName).get(version));
+ throw new
VersionedPluginLoadingException(invalidVersions.get(connectorName).get(version).getMessage());
}
}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
index fb523ac4d75..67102f69cc5 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
@@ -27,6 +27,7 @@ import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.errors.ToleranceType;
import org.apache.kafka.connect.runtime.isolation.PluginDesc;
+import org.apache.kafka.connect.runtime.isolation.PluginType;
import org.apache.kafka.connect.runtime.isolation.PluginUtils;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.isolation.PluginsRecommenders;
@@ -274,17 +275,17 @@ public class ConnectorConfig extends AbstractConfig {
public static ConfigDef enrichedConfigDef(Plugins plugins, Map<String,
String> connProps, WorkerConfig workerConfig) {
PluginsRecommenders recommender = new PluginsRecommenders(plugins);
ConverterDefaults keyConverterDefaults = converterDefaults(plugins,
KEY_CONVERTER_CLASS_CONFIG,
- WorkerConfig.KEY_CONVERTER_CLASS_CONFIG,
WorkerConfig.KEY_CONVERTER_VERSION, connProps, workerConfig, Converter.class);
+ WorkerConfig.KEY_CONVERTER_CLASS_CONFIG,
WorkerConfig.KEY_CONVERTER_VERSION, connProps, workerConfig,
PluginType.CONVERTER);
ConverterDefaults valueConverterDefaults = converterDefaults(plugins,
VALUE_CONVERTER_CLASS_CONFIG,
- WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG,
WorkerConfig.VALUE_CONVERTER_VERSION, connProps, workerConfig, Converter.class);
+ WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG,
WorkerConfig.VALUE_CONVERTER_VERSION, connProps, workerConfig,
PluginType.CONVERTER);
ConverterDefaults headerConverterDefaults = converterDefaults(plugins,
HEADER_CONVERTER_CLASS_CONFIG,
- WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG,
WorkerConfig.HEADER_CONVERTER_VERSION, connProps, workerConfig,
HeaderConverter.class);
- return
configDef(plugins.latestVersion(connProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG)),
+ WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG,
WorkerConfig.HEADER_CONVERTER_VERSION, connProps, workerConfig,
PluginType.HEADER_CONVERTER);
+ return
configDef(plugins.latestVersion(connProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG),
PluginType.SINK, PluginType.SOURCE),
keyConverterDefaults, valueConverterDefaults,
headerConverterDefaults, recommender);
}
public static ConfigDef enrichedConfigDef(Plugins plugins, String
connectorClass) {
- return configDef(plugins.latestVersion(connectorClass),
CONVERTER_DEFAULTS, CONVERTER_DEFAULTS, CONVERTER_DEFAULTS, EMPTY_RECOMMENDER);
+ return configDef(plugins.latestVersion(connectorClass,
PluginType.SINK, PluginType.SOURCE), CONVERTER_DEFAULTS, CONVERTER_DEFAULTS,
CONVERTER_DEFAULTS, EMPTY_RECOMMENDER);
}
private static ConfigDef.CompositeValidator aliasValidator(String kind) {
@@ -395,10 +396,9 @@ public class ConnectorConfig extends AbstractConfig {
* <p>
* {@code requireFullConfig} specifies whether required config values that
are missing should cause an exception to be thrown.
*/
- @SuppressWarnings({"rawtypes", "unchecked"})
public static ConfigDef enrich(Plugins plugins, ConfigDef baseConfigDef,
Map<String, String> props, boolean requireFullConfig) {
ConfigDef newDef = new ConfigDef(baseConfigDef);
- new EnrichablePlugin<Transformation<?>>("Transformation",
TRANSFORMS_CONFIG, TRANSFORMS_GROUP, (Class) Transformation.class,
+ new EnrichablePlugin<Transformation<?>>("Transformation",
TRANSFORMS_CONFIG, TRANSFORMS_GROUP, PluginType.TRANSFORMATION,
props, requireFullConfig) {
@Override
@@ -417,8 +417,8 @@ public class ConnectorConfig extends AbstractConfig {
}
@Override
- protected Stream<Map.Entry<String, ConfigDef.ConfigKey>>
configDefsForClass(String typeConfig) {
- return super.configDefsForClass(typeConfig)
+ protected Stream<Map.Entry<String, ConfigDef.ConfigKey>>
configDefsForClass(String typeConfig, String versionConfig, Plugins plugins) {
+ return super.configDefsForClass(typeConfig, versionConfig,
plugins)
.filter(entry -> {
// The implicit parameters mask any from the
transformer with the same name
if
(TransformationStage.PREDICATE_CONFIG.equals(entry.getKey())
@@ -447,10 +447,16 @@ public class ConnectorConfig extends AbstractConfig {
"but there is no config '" + prefixedPredicate +
"' defining a predicate to be negated.");
}
}
- }.enrich(newDef);
+
+ @Override
+ protected ConfigDef.Recommender versionRecommender(String
typeConfig) {
+ return new
PluginsRecommenders(plugins).transformationPluginRecommender(typeConfig);
+ }
+
+ }.enrich(newDef, plugins);
new EnrichablePlugin<Predicate<?>>("Predicate", PREDICATES_CONFIG,
PREDICATES_GROUP,
- (Class) Predicate.class, props, requireFullConfig) {
+ PluginType.PREDICATE, props, requireFullConfig) {
@Override
protected Set<PluginDesc<Predicate<?>>> plugins() {
return plugins.predicates();
@@ -460,7 +466,14 @@ public class ConnectorConfig extends AbstractConfig {
protected ConfigDef config(Predicate<?> predicate) {
return predicate.config();
}
- }.enrich(newDef);
+
+ @Override
+ protected ConfigDef.Recommender versionRecommender(String
typeConfig) {
+ return new
PluginsRecommenders(plugins).predicatePluginRecommender(typeConfig);
+ }
+
+ }.enrich(newDef, plugins);
+
return newDef;
}
@@ -471,7 +484,7 @@ public class ConnectorConfig extends AbstractConfig {
String workerConverterVersionConfig,
Map<String, String> connectorProps,
WorkerConfig workerConfig,
- Class<T> converterType
+ PluginType converterType
) {
/*
if a converter is specified in the connector config it overrides the
worker config for the corresponding converter
@@ -510,34 +523,23 @@ public class ConnectorConfig extends AbstractConfig {
String version = null;
if (connectorConverter != null) {
- version = fetchPluginVersion(plugins, connectorConverter,
connectorVersion, connectorConverter);
+ version = fetchPluginVersion(plugins, connectorClass,
connectorVersion, connectorConverter, converterType);
} else {
version =
workerConfig.originalsStrings().get(workerConverterVersionConfig);
if (version == null) {
- version = plugins.latestVersion(workerConverter);
+ version = plugins.latestVersion(workerConverter,
converterType);
}
}
return new ConverterDefaults(type, version);
}
- private static void updateKeyDefault(ConfigDef configDef, String
versionConfigKey, String versionDefault) {
- ConfigDef.ConfigKey key = configDef.configKeys().get(versionConfigKey);
- if (key == null) {
- return;
- }
- configDef.configKeys().put(versionConfigKey, new ConfigDef.ConfigKey(
- versionConfigKey, key.type, versionDefault, key.validator,
key.importance, key.documentation, key.group, key.orderInGroup, key.width,
key.displayName, key.dependents, key.recommender, false
- ));
- }
-
- @SuppressWarnings("unchecked")
- private static <T> String fetchPluginVersion(Plugins plugins, String
connectorClass, String connectorVersion, String pluginName) {
- if (pluginName == null) {
+ private static <T> String fetchPluginVersion(Plugins plugins, String
connectorClass, String connectorVersion, String pluginName, PluginType
pluginType) {
+ if (pluginName == null || connectorClass == null) {
return null;
}
try {
VersionRange range =
PluginUtils.connectorVersionRequirement(connectorVersion);
- return plugins.pluginVersion(pluginName,
plugins.pluginLoader(connectorClass, range));
+ return plugins.pluginVersion(pluginName,
plugins.pluginLoader(connectorClass, range), pluginType);
} catch (InvalidVersionSpecificationException |
VersionedPluginLoadingException e) {
// these errors should be captured in other places, so we can
ignore them here
log.warn("Failed to determine default plugin version for {}",
connectorClass, e);
@@ -559,24 +561,27 @@ public class ConnectorConfig extends AbstractConfig {
private final String aliasKind;
private final String aliasConfig;
private final String aliasGroup;
+ private final PluginType pluginType;
private final Class<T> baseClass;
private final Map<String, String> props;
private final boolean requireFullConfig;
+ @SuppressWarnings("unchecked")
public EnrichablePlugin(
String aliasKind,
- String aliasConfig, String aliasGroup, Class<T> baseClass,
+ String aliasConfig, String aliasGroup, PluginType pluginType,
Map<String, String> props, boolean requireFullConfig) {
this.aliasKind = aliasKind;
this.aliasConfig = aliasConfig;
this.aliasGroup = aliasGroup;
- this.baseClass = baseClass;
+ this.pluginType = pluginType;
+ this.baseClass = (Class<T>) pluginType.superClass();
this.props = props;
this.requireFullConfig = requireFullConfig;
}
/** Add the configs for this alias to the given {@code ConfigDef}. */
- void enrich(ConfigDef newDef) {
+ void enrich(ConfigDef newDef, Plugins plugins) {
Object aliases = ConfigDef.parseType(aliasConfig,
props.get(aliasConfig), Type.LIST);
if (!(aliases instanceof List)) {
return;
@@ -593,12 +598,17 @@ public class ConnectorConfig extends AbstractConfig {
int orderInGroup = 0;
final String typeConfig = prefix + "type";
+ final String versionConfig = prefix +
WorkerConfig.PLUGIN_VERSION_SUFFIX;
+ final String defaultVersion = fetchPluginVersion(plugins,
props.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG),
+ props.get(ConnectorConfig.CONNECTOR_VERSION),
props.get(typeConfig), pluginType);
+
+ // Add the class configuration
final ConfigDef.Validator typeValidator =
ConfigDef.LambdaValidator.with(
(String name, Object value) -> {
validateProps(prefix);
// The value will be null if the class couldn't be
found; no point in performing follow-up validation
if (value != null) {
-
getConfigDefFromConfigProvidingClass(typeConfig, (Class<?>) value);
+ getConfigDefFromPlugin(typeConfig, ((Class<?>)
value).getName(), props.getOrDefault(versionConfig, defaultVersion), plugins);
}
},
() -> "valid configs for " + alias + " " +
aliasKind.toLowerCase(Locale.ENGLISH));
@@ -607,7 +617,25 @@ public class ConnectorConfig extends AbstractConfig {
baseClass.getSimpleName() + " type for " + alias,
Collections.emptyList(), new ClassRecommender());
- final ConfigDef configDef = populateConfigDef(typeConfig);
+ // Add the version configuration
+ final ConfigDef.Validator versionValidator = (name, value) -> {
+ if (value != null) {
+ try {
+ getConfigDefFromPlugin(typeConfig,
props.get(typeConfig), (String) value, plugins);
+ } catch (VersionedPluginLoadingException e) {
+ throw e;
+ } catch (Exception e) {
+ // ignore any other exception here as they are not
related to version validation and
+ // will be captured in the validation of the class
configuration
+ }
+ }
+ };
+ newDef.define(versionConfig, Type.STRING, defaultVersion,
versionValidator, Importance.HIGH,
+ "Version of the '" + alias + "' " +
aliasKind.toLowerCase(Locale.ENGLISH) + ".", group, orderInGroup++, Width.LONG,
+ baseClass.getSimpleName() + " version for " + alias,
+ Collections.emptyList(),
versionRecommender(typeConfig));
+
+ final ConfigDef configDef = populateConfigDef(typeConfig,
versionConfig, plugins);
if (configDef == null) continue;
newDef.embed(prefix, group, orderInGroup, configDef);
}
@@ -621,10 +649,10 @@ public class ConnectorConfig extends AbstractConfig {
* Populates the ConfigDef according to the configs returned from
{@code configs()} method of class
* named in the {@code ...type} parameter of the {@code props}.
*/
- protected ConfigDef populateConfigDef(String typeConfig) {
+ protected ConfigDef populateConfigDef(String typeConfig, String
versionConfig, Plugins plugins) {
final ConfigDef configDef = initialConfigDef();
try {
- configDefsForClass(typeConfig)
+ configDefsForClass(typeConfig, versionConfig, plugins)
.forEach(entry -> configDef.define(entry.getValue()));
} catch (ConfigException e) {
if (requireFullConfig) {
@@ -640,9 +668,11 @@ public class ConnectorConfig extends AbstractConfig {
* Return a stream of configs provided by the {@code configs()} method
of class
* named in the {@code ...type} parameter of the {@code props}.
*/
- protected Stream<Map.Entry<String, ConfigDef.ConfigKey>>
configDefsForClass(String typeConfig) {
- final Class<?> cls = (Class<?>) ConfigDef.parseType(typeConfig,
props.get(typeConfig), Type.CLASS);
- return getConfigDefFromConfigProvidingClass(typeConfig, cls)
+ protected Stream<Map.Entry<String, ConfigDef.ConfigKey>>
configDefsForClass(String typeConfig, String versionConfig, Plugins plugins) {
+ if (props.get(typeConfig) == null) {
+ throw new ConfigException(typeConfig, null, "Not a " +
baseClass.getSimpleName());
+ }
+ return getConfigDefFromPlugin(typeConfig, props.get(typeConfig),
props.get(versionConfig), plugins)
.configKeys().entrySet().stream();
}
@@ -651,30 +681,46 @@ public class ConnectorConfig extends AbstractConfig {
return new ConfigDef();
}
- /**
- * Return {@link ConfigDef} from {@code cls}, which is expected to be
a non-null {@code Class<T>},
- * by instantiating it and invoking {@link #config(T)}.
- * @param key
- * @param cls The subclass of the baseclass.
- */
- ConfigDef getConfigDefFromConfigProvidingClass(String key, Class<?>
cls) {
- if (cls == null) {
- throw new ConfigException(key, null, "Not a " +
baseClass.getSimpleName());
+ @SuppressWarnings("unchecked")
+ ConfigDef getConfigDefFromPlugin(String key, String pluginClass,
String version, Plugins plugins) {
+ String connectorClass = props.get(CONNECTOR_CLASS_CONFIG);
+ if (pluginClass == null || connectorClass == null) {
+ // if transformation class is null or connector class is null,
we return empty as these validations are done in respective validators
+ return new ConfigDef();
+ }
+ VersionRange connectorVersionRange;
+ try {
+ connectorVersionRange =
PluginUtils.connectorVersionRequirement(props.get(CONNECTOR_VERSION));
+ } catch (InvalidVersionSpecificationException e) {
+ // this should be caught in connector version validation
+ return new ConfigDef();
}
+
+ VersionRange pluginVersion;
+ try {
+ pluginVersion =
PluginUtils.connectorVersionRequirement(version);
+ } catch (InvalidVersionSpecificationException e) {
+ throw new VersionedPluginLoadingException(e.getMessage());
+ }
+
+ // validate that the plugin class is a subclass of the base class
+ final Class<?> cls = (Class<?>) ConfigDef.parseType(key,
props.get(key), Type.CLASS);
Utils.ensureConcreteSubclass(baseClass, cls);
- T pluginInstance;
+ T plugin;
try {
- pluginInstance = Utils.newInstance(cls, baseClass);
+ plugin = (T) plugins.newPlugin(pluginClass, pluginVersion,
plugins.pluginLoader(connectorClass, connectorVersionRange));
+ } catch (VersionedPluginLoadingException e) {
+ throw e;
} catch (Exception e) {
- throw new ConfigException(key, String.valueOf(cls), "Error
getting config definition from " + baseClass.getSimpleName() + ": " +
e.getMessage());
+ throw new ConfigException(key, pluginClass, "Error getting
config definition from " + baseClass.getSimpleName() + ": " + e.getMessage());
}
- ConfigDef configDef = config(pluginInstance);
+ ConfigDef configDef = config(plugin);
if (null == configDef) {
throw new ConnectException(
String.format(
"%s.config() must return a ConfigDef that is
not null.",
- cls.getName()
+ plugin.getClass().getName()
)
);
}
@@ -694,6 +740,8 @@ public class ConnectorConfig extends AbstractConfig {
*/
protected abstract Set<PluginDesc<T>> plugins();
+ protected abstract ConfigDef.Recommender versionRecommender(String
typeConfig);
+
/**
* Recommend bundled transformations or predicates.
*/
@@ -741,7 +789,7 @@ public class ConnectorConfig extends AbstractConfig {
try {
PluginUtils.connectorVersionRequirement((String) value);
} catch (InvalidVersionSpecificationException e) {
- throw new ConfigException(name, value, e.getMessage());
+ throw new VersionedPluginLoadingException(e.getMessage());
}
}
}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
index 6a850439287..bdeb224cfde 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.connect.runtime.isolation;
-import org.apache.maven.artifact.versioning.ArtifactVersion;
import org.apache.maven.artifact.versioning.DefaultArtifactVersion;
import org.apache.maven.artifact.versioning.VersionRange;
import org.slf4j.Logger;
@@ -28,6 +27,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
@@ -127,19 +127,7 @@ public class DelegatingClassLoader extends URLClassLoader {
return aliases.getOrDefault(classOrAlias, classOrAlias);
}
- String latestVersion(String classOrAlias) {
- if (classOrAlias == null) {
- return null;
- }
- String fullName = aliases.getOrDefault(classOrAlias, classOrAlias);
- SortedMap<PluginDesc<?>, ClassLoader> inner =
pluginLoaders.get(fullName);
- if (inner == null) {
- return null;
- }
- return inner.lastKey().version();
- }
-
- String versionInLocation(String classOrAlias, String location) {
+ PluginDesc<?> pluginDesc(String classOrAlias, String preferredLocation,
Set<PluginType> allowedTypes) {
if (classOrAlias == null) {
return null;
}
@@ -148,12 +136,17 @@ public class DelegatingClassLoader extends URLClassLoader
{
if (inner == null) {
return null;
}
+ PluginDesc<?> result = null;
for (Map.Entry<PluginDesc<?>, ClassLoader> entry : inner.entrySet()) {
- if (entry.getKey().location().equals(location)) {
- return entry.getKey().version();
+ if (!allowedTypes.contains(entry.getKey().type())) {
+ continue;
+ }
+ result = entry.getKey();
+ if (result.location().equals(preferredLocation)) {
+ return result;
}
}
- return null;
+ return result;
}
private ClassLoader findPluginLoader(
@@ -170,7 +163,6 @@ public class DelegatingClassLoader extends URLClassLoader {
+ "Provided soft version: %s ", range));
}
- ArtifactVersion version = null;
ClassLoader loader = null;
for (Map.Entry<PluginDesc<?>, ClassLoader> entry :
loaders.entrySet()) {
// the entries should be in sorted order of versions so this
should end up picking the latest version which matches the range
@@ -227,19 +219,19 @@ public class DelegatingClassLoader extends URLClassLoader
{
if (range == null) {
return plugin;
}
- verifyClasspathVersionedPlugin(name, plugin, range);
+ verifyClasspathVersionedPlugin(fullName, plugin, range);
}
return plugin;
}
- private void verifyClasspathVersionedPlugin(String name, Class<?> plugin,
VersionRange range) throws VersionedPluginLoadingException {
+ private void verifyClasspathVersionedPlugin(String fullName, Class<?>
plugin, VersionRange range) throws VersionedPluginLoadingException {
String pluginVersion;
- SortedMap<PluginDesc<?>, ClassLoader> scannedPlugin =
pluginLoaders.get(name);
+ SortedMap<PluginDesc<?>, ClassLoader> scannedPlugin =
pluginLoaders.get(fullName);
if (scannedPlugin == null) {
throw new VersionedPluginLoadingException(String.format(
"Plugin %s is not part of Connect's plugin loading
mechanism (ClassPath or Plugin Path)",
- name
+ fullName
));
}
@@ -255,7 +247,7 @@ public class DelegatingClassLoader extends URLClassLoader {
throw new VersionedPluginLoadingException(String.format(
"Plugin %s has multiple versions specified in class path, "
+ "only one version is allowed in class path for
loading a plugin with version range",
- name
+ fullName
));
} else if (classpathPlugins.isEmpty()) {
throw new VersionedPluginLoadingException("Invalid plugin found in
classpath");
@@ -264,7 +256,7 @@ public class DelegatingClassLoader extends URLClassLoader {
if (!range.containsVersion(new
DefaultArtifactVersion(pluginVersion))) {
throw new VersionedPluginLoadingException(String.format(
"Plugin %s has version %s which does not match the
required version range %s",
- name,
+ fullName,
pluginVersion,
range
), Collections.singletonList(pluginVersion));
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
index 310adb40c7f..8be45e773b3 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
@@ -45,6 +45,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -265,15 +266,17 @@ public class Plugins {
};
}
- public String latestVersion(String classOrAlias) {
- return delegatingLoader.latestVersion(classOrAlias);
+ public String latestVersion(String classOrAlias, PluginType...
allowedTypes) {
+ return pluginVersion(classOrAlias, null, allowedTypes);
}
- public String pluginVersion(String classOrAlias, ClassLoader sourceLoader)
{
- if (!(sourceLoader instanceof PluginClassLoader)) {
- return latestVersion(classOrAlias);
+ public String pluginVersion(String classOrAlias, ClassLoader sourceLoader,
PluginType... allowedTypes) {
+ String location = (sourceLoader instanceof PluginClassLoader) ?
((PluginClassLoader) sourceLoader).location() : null;
+ PluginDesc<?> desc = delegatingLoader.pluginDesc(classOrAlias,
location, new HashSet<>(Arrays.asList(allowedTypes)));
+ if (desc != null) {
+ return desc.version();
}
- return delegatingLoader.versionInLocation(classOrAlias,
((PluginClassLoader) sourceLoader).location());
+ return null;
}
public DelegatingClassLoader delegatingLoader() {
@@ -376,7 +379,7 @@ public class Plugins {
public Object newPlugin(String classOrAlias, VersionRange range,
ClassLoader sourceLoader) throws ClassNotFoundException {
if (range == null && sourceLoader instanceof PluginClassLoader) {
- sourceLoader.loadClass(classOrAlias);
+ return newPlugin(sourceLoader.loadClass(classOrAlias));
}
return newPlugin(classOrAlias, range);
}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginsRecommenders.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginsRecommenders.java
index 8cf209ac08c..76f28659726 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginsRecommenders.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginsRecommenders.java
@@ -19,10 +19,13 @@ package org.apache.kafka.connect.runtime.isolation;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.runtime.ConnectorConfig;
+import org.apache.kafka.connect.transforms.Transformation;
+import org.apache.kafka.connect.transforms.predicates.Predicate;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -74,6 +77,14 @@ public class PluginsRecommenders {
return headerConverterPluginVersionRecommender;
}
+ public TransformationPluginRecommender
transformationPluginRecommender(String classOrAlias) {
+ return new TransformationPluginRecommender(classOrAlias);
+ }
+
+ public PredicatePluginRecommender predicatePluginRecommender(String
classOrAlias) {
+ return new PredicatePluginRecommender(classOrAlias);
+ }
+
public class ConnectorPluginVersionRecommender implements
ConfigDef.Recommender {
@SuppressWarnings({"unchecked", "rawtypes"})
@@ -195,4 +206,60 @@ public class PluginsRecommenders {
.map(PluginDesc::version).distinct().collect(Collectors.toList());
}
}
+
+ // Recommender for transformation and predicate plugins
+ public abstract class SMTPluginRecommender<T> implements
ConfigDef.Recommender {
+
+ protected abstract Function<String, Set<PluginDesc<T>>> plugins();
+
+ protected final String classOrAliasConfig;
+
+ public SMTPluginRecommender(String classOrAliasConfig) {
+ this.classOrAliasConfig = classOrAliasConfig;
+ }
+
+ @Override
+ @SuppressWarnings({"rawtypes"})
+ public List<Object> validValues(String name, Map<String, Object>
parsedConfig) {
+ if (plugins == null) {
+ return Collections.emptyList();
+ }
+ if (parsedConfig.get(classOrAliasConfig) == null) {
+ return Collections.emptyList();
+ }
+
+ Class classOrAlias = (Class) parsedConfig.get(classOrAliasConfig);
+ return plugins().apply(classOrAlias.getName())
+
.stream().map(PluginDesc::version).distinct().collect(Collectors.toList());
+ }
+
+ @Override
+ public boolean visible(String name, Map<String, Object> parsedConfig) {
+ return true;
+ }
+ }
+
+ public class TransformationPluginRecommender extends
SMTPluginRecommender<Transformation<?>> {
+
+ public TransformationPluginRecommender(String classOrAliasConfig) {
+ super(classOrAliasConfig);
+ }
+
+ @Override
+ protected Function<String, Set<PluginDesc<Transformation<?>>>>
plugins() {
+ return plugins::transformations;
+ }
+ }
+
+ public class PredicatePluginRecommender extends
SMTPluginRecommender<Predicate<?>> {
+
+ public PredicatePluginRecommender(String classOrAliasConfig) {
+ super(classOrAliasConfig);
+ }
+
+ @Override
+ protected Function<String, Set<PluginDesc<Predicate<?>>>> plugins() {
+ return plugins::predicates;
+ }
+ }
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
index 01a9bcf6373..6a5556fcf50 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
@@ -62,6 +62,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
+import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
@@ -559,12 +560,14 @@ public class AbstractHerderTest {
}
@Test
- public void testConfigValidationTransformsExtendResults() {
+ @SuppressWarnings("rawtypes")
+ public void testConfigValidationTransformsExtendResults() throws
ClassNotFoundException {
final Class<? extends Connector> connectorClass =
SampleSourceConnector.class;
AbstractHerder herder = createConfigValidationHerder(connectorClass,
noneConnectorClientConfigOverridePolicy);
// 2 transform aliases defined -> 2 plugin lookups
-
when(plugins.transformations()).thenReturn(Collections.singleton(transformationPluginDesc()));
+
Mockito.lenient().when(plugins.transformations()).thenReturn(Collections.singleton(transformationPluginDesc()));
+
Mockito.lenient().when(plugins.newPlugin(SampleTransformation.class.getName(),
null, classLoader)).thenReturn(new SampleTransformation());
// Define 2 transformations. One has a class defined and so can get
embedded configs, the other is missing
// class info that should generate an error.
@@ -575,6 +578,7 @@ public class AbstractHerderTest {
config.put(ConnectorConfig.TRANSFORMS_CONFIG + ".xformA.type",
SampleTransformation.class.getName());
config.put("required", "value"); // connector required config
ConfigInfos result = herder.validateConnectorConfig(config, s -> null,
false);
+
assertEquals(herder.connectorType(config), ConnectorType.SOURCE);
// We expect there to be errors due to the missing name and .... Note
that these assertions depend heavily on
@@ -596,7 +600,7 @@ public class AbstractHerderTest {
assertEquals(1, result.errorCount());
Map<String, ConfigInfo> infos = result.values().stream()
.collect(Collectors.toMap(info -> info.configKey().name(),
Function.identity()));
- assertEquals(31, infos.size());
+ assertEquals(33, infos.size());
// Should get 2 type fields from the transforms, first adds its own
config since it has a valid class
assertEquals("transforms.xformA.type",
infos.get("transforms.xformA.type").configValue().name());
@@ -611,12 +615,15 @@ public class AbstractHerderTest {
}
@Test
- public void testConfigValidationPredicatesExtendResults() {
+ @SuppressWarnings("rawtypes")
+ public void testConfigValidationPredicatesExtendResults() throws
ClassNotFoundException {
final Class<? extends Connector> connectorClass =
SampleSourceConnector.class;
AbstractHerder herder = createConfigValidationHerder(connectorClass,
noneConnectorClientConfigOverridePolicy);
-
when(plugins.transformations()).thenReturn(Collections.singleton(transformationPluginDesc()));
-
when(plugins.predicates()).thenReturn(Collections.singleton(predicatePluginDesc()));
+
Mockito.lenient().when(plugins.transformations()).thenReturn(Collections.singleton(transformationPluginDesc()));
+
Mockito.lenient().when(plugins.predicates()).thenReturn(Collections.singleton(predicatePluginDesc()));
+
Mockito.lenient().when(plugins.newPlugin(SampleTransformation.class.getName(),
null, classLoader)).thenReturn(new SampleTransformation());
+
Mockito.lenient().when(plugins.newPlugin(SamplePredicate.class.getName(), null,
classLoader)).thenReturn(new SamplePredicate());
// Define 2 predicates. One has a class defined and so can get
embedded configs, the other is missing
// class info that should generate an error.
@@ -653,7 +660,7 @@ public class AbstractHerderTest {
assertEquals(1, result.errorCount());
Map<String, ConfigInfo> infos = result.values().stream()
.collect(Collectors.toMap(info -> info.configKey().name(),
Function.identity()));
- assertEquals(33, infos.size());
+ assertEquals(36, infos.size());
// Should get 2 type fields from the transforms, first adds its own
config since it has a valid class
assertEquals("transforms.xformA.type",
infos.get("transforms.xformA.type").configValue().name());
assertTrue(infos.get("transforms.xformA.type").configValue().errors().isEmpty());
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
index 6092f8ca7bd..a5af1d13469 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
@@ -41,6 +41,8 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
public class ConnectorConfigTest<R extends ConnectRecord<R>> {
@@ -455,13 +457,19 @@ public class ConnectorConfigTest<R extends
ConnectRecord<R>> {
}
@Test
- public void testEnrichedConfigDef() {
+ @SuppressWarnings("rawtypes")
+ public void testEnrichedConfigDef() throws ClassNotFoundException {
String alias = "hdt";
String prefix = ConnectorConfig.TRANSFORMS_CONFIG + "." + alias + ".";
Map<String, String> props = new HashMap<>();
props.put(ConnectorConfig.TRANSFORMS_CONFIG, alias);
+ props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG,
TestConnector.class.getName());
props.put(prefix + "type",
HasDuplicateConfigTransformation.class.getName());
- ConfigDef def = ConnectorConfig.enrich(MOCK_PLUGINS, new ConfigDef(),
props, false);
+ Plugins mockPlugins = mock(Plugins.class);
+
when(mockPlugins.newPlugin(HasDuplicateConfigTransformation.class.getName(),
+ null, (ClassLoader) null)).thenReturn(new
HasDuplicateConfigTransformation());
+ when(mockPlugins.transformations()).thenReturn(Collections.emptySet());
+ ConfigDef def = ConnectorConfig.enrich(mockPlugins, new ConfigDef(),
props, false);
assertEnrichedConfigDef(def, prefix,
HasDuplicateConfigTransformation.MUST_EXIST_KEY, ConfigDef.Type.BOOLEAN);
assertEnrichedConfigDef(def, prefix,
TransformationStage.PREDICATE_CONFIG, ConfigDef.Type.STRING);
assertEnrichedConfigDef(def, prefix,
TransformationStage.NEGATE_CONFIG, ConfigDef.Type.BOOLEAN);
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TransformationConfigTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TransformationConfigTest.java
index 2a6c0ed2b9d..ef7f17e1d09 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TransformationConfigTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TransformationConfigTest.java
@@ -29,12 +29,16 @@ import org.apache.kafka.connect.transforms.ReplaceField;
import org.apache.kafka.connect.transforms.SetSchemaMetadata;
import org.apache.kafka.connect.transforms.TimestampConverter;
import org.apache.kafka.connect.transforms.TimestampRouter;
+import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.ValueToKey;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
/**
* Tests that transformations' configs can be composed with ConnectorConfig
during its construction, ensuring no
* conflicting fields or other issues.
@@ -42,8 +46,19 @@ import java.util.HashMap;
* This test appears here simply because it requires both connect-runtime and
connect-transforms and connect-runtime
* already depends on connect-transforms.
*/
+@SuppressWarnings("rawtypes")
public class TransformationConfigTest {
+ private Plugins setupMockPlugins(Transformation transformation) {
+ Plugins plugins = mock(Plugins.class);
+ try {
+ when(plugins.newPlugin(transformation.getClass().getName(), null,
(ClassLoader) null)).thenReturn(transformation);
+ } catch (ClassNotFoundException e) {
+ // Shouldn't happen since we're mocking the plugins
+ }
+ return plugins;
+ }
+
@Test
public void testEmbeddedConfigCast() {
// Validate that we can construct a Connector config containing the
extended config for the transform
@@ -54,7 +69,7 @@ public class TransformationConfigTest {
connProps.put("transforms.example.type", Cast.Value.class.getName());
connProps.put("transforms.example.spec", "int8");
- Plugins plugins = null; // Safe when we're only constructing the config
+ Plugins plugins = setupMockPlugins(new Cast.Value());
new ConnectorConfig(plugins, connProps);
}
@@ -68,7 +83,7 @@ public class TransformationConfigTest {
connProps.put("transforms.example.type",
ExtractField.Value.class.getName());
connProps.put("transforms.example.field", "field");
- Plugins plugins = null; // Safe when we're only constructing the config
+ Plugins plugins = setupMockPlugins(new ExtractField.Value());
new ConnectorConfig(plugins, connProps);
}
@@ -81,7 +96,7 @@ public class TransformationConfigTest {
connProps.put("transforms", "example");
connProps.put("transforms.example.type",
Flatten.Value.class.getName());
- Plugins plugins = null; // Safe when we're only constructing the config
+ Plugins plugins = setupMockPlugins(new Flatten.Value());
new ConnectorConfig(plugins, connProps);
}
@@ -95,7 +110,7 @@ public class TransformationConfigTest {
connProps.put("transforms.example.type",
HoistField.Value.class.getName());
connProps.put("transforms.example.field", "field");
- Plugins plugins = null; // Safe when we're only constructing the config
+ Plugins plugins = setupMockPlugins(new HoistField.Value());
new ConnectorConfig(plugins, connProps);
}
@@ -108,7 +123,7 @@ public class TransformationConfigTest {
connProps.put("transforms", "example");
connProps.put("transforms.example.type",
InsertField.Value.class.getName());
- Plugins plugins = null; // Safe when we're only constructing the config
+ Plugins plugins = setupMockPlugins(new InsertField.Value());
new ConnectorConfig(plugins, connProps);
}
@@ -123,7 +138,7 @@ public class TransformationConfigTest {
connProps.put("transforms.example.fields", "field");
connProps.put("transforms.example.replacement", "nothing");
- Plugins plugins = null; // Safe when we're only constructing the config
+ Plugins plugins = setupMockPlugins(new MaskField.Value());
new ConnectorConfig(plugins, connProps);
}
@@ -138,7 +153,7 @@ public class TransformationConfigTest {
connProps.put("transforms.example.regex", "(.*)");
connProps.put("transforms.example.replacement", "prefix-$1");
- Plugins plugins = null; // Safe when we're only constructing the config
+ Plugins plugins = setupMockPlugins(new RegexRouter());
new ConnectorConfig(plugins, connProps);
}
@@ -151,7 +166,7 @@ public class TransformationConfigTest {
connProps.put("transforms", "example");
connProps.put("transforms.example.type",
ReplaceField.Value.class.getName());
- Plugins plugins = null; // Safe when we're only constructing the config
+ Plugins plugins = setupMockPlugins(new ReplaceField.Value());
new ConnectorConfig(plugins, connProps);
}
@@ -164,7 +179,7 @@ public class TransformationConfigTest {
connProps.put("transforms", "example");
connProps.put("transforms.example.type",
SetSchemaMetadata.Value.class.getName());
- Plugins plugins = null; // Safe when we're only constructing the config
+ Plugins plugins = setupMockPlugins(new SetSchemaMetadata.Value());
new ConnectorConfig(plugins, connProps);
}
@@ -178,7 +193,7 @@ public class TransformationConfigTest {
connProps.put("transforms.example.type",
TimestampConverter.Value.class.getName());
connProps.put("transforms.example.target.type", "unix");
- Plugins plugins = null; // Safe when we're only constructing the config
+ Plugins plugins = setupMockPlugins(new TimestampConverter.Value());
new ConnectorConfig(plugins, connProps);
}
@@ -191,7 +206,7 @@ public class TransformationConfigTest {
connProps.put("transforms", "example");
connProps.put("transforms.example.type",
TimestampRouter.class.getName());
- Plugins plugins = null; // Safe when we're only constructing the config
+ Plugins plugins = setupMockPlugins(new TimestampRouter());
new ConnectorConfig(plugins, connProps);
}
@@ -205,7 +220,7 @@ public class TransformationConfigTest {
connProps.put("transforms.example.type", ValueToKey.class.getName());
connProps.put("transforms.example.fields", "field");
- Plugins plugins = null; // Safe when we're only constructing the config
+ Plugins plugins = setupMockPlugins(new ValueToKey());
new ConnectorConfig(plugins, connProps);
}