snehashisp commented on code in PR #16984:
URL: https://github.com/apache/kafka/pull/16984#discussion_r1870737937
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java:
##########
@@ -434,50 +516,93 @@ public Converter newInternalConverter(boolean isKey,
String className, Map<Strin
* @throws ConnectException if the {@link HeaderConverter} implementation
class could not be found
*/
public HeaderConverter newHeaderConverter(AbstractConfig config, String
classPropertyName, ClassLoaderUsage classLoaderUsage) {
- Class<? extends HeaderConverter> klass = null;
+ return getHeaderConverter(config, classPropertyName, null,
classLoaderUsage);
+ }
+
+ public HeaderConverter newHeaderConverter(AbstractConfig config, String
classPropertyName, String versionPropertyName) {
+ ClassLoaderUsage classLoader = config.getString(versionPropertyName)
== null ? ClassLoaderUsage.CURRENT_CLASSLOADER: ClassLoaderUsage.PLUGINS;
+ return getHeaderConverter(config, classPropertyName,
versionPropertyName, classLoader);
+ }
+
+ private HeaderConverter getHeaderConverter(AbstractConfig config, String
classPropertyName, String versionPropertyName, ClassLoaderUsage
classLoaderUsage) {
+ if (!config.originals().containsKey(classPropertyName)) {
+ // This configuration does not define the Header Converter via the
specified property name
+ return null;
+ }
+
+ HeaderConverter plugin = getVersionedPlugin(config, classPropertyName,
versionPropertyName,
+ HeaderConverter.class, classLoaderUsage,
scanResult.headerConverters());
+
+ String configPrefix = classPropertyName + ".";
+ Map<String, Object> converterConfig =
config.originalsWithPrefix(configPrefix);
+ converterConfig.put(ConverterConfig.TYPE_CONFIG,
ConverterType.HEADER.getName());
+ log.debug("Configuring the header converter with configuration
keys:{}{}", System.lineSeparator(), converterConfig.keySet());
+
+ try (LoaderSwap loaderSwap =
withClassLoader(plugin.getClass().getClassLoader())) {
+ plugin.configure(converterConfig);
+ }
+ return plugin;
+ }
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ private <U> U getVersionedPlugin(
+ AbstractConfig config,
+ String classPropertyName,
+ String versionPropertyName,
+ Class basePluginClass,
+ ClassLoaderUsage classLoaderUsage,
+ SortedSet<PluginDesc<U>> availablePlugins
+ ) {
+
+ String version = versionPropertyName == null ? null :
config.getString(versionPropertyName);
+ VersionRange range = null;
+ if (version != null) {
+ try {
+ range = VersionRange.createFromVersionSpec(version);
+ } catch (InvalidVersionSpecificationException e) {
+ throw new ConnectException(String.format("Invalid version
range for %s: %s %s", classPropertyName, version, e));
+ }
+ }
+
+ Class<? extends U> klass = null;
+ String basePluginClassName = basePluginClass.getSimpleName();
switch (classLoaderUsage) {
case CURRENT_CLASSLOADER:
- if (!config.originals().containsKey(classPropertyName)) {
- // This connector configuration does not define the header
converter via the specified property name
- return null;
- }
// Attempt to load first with the current classloader, and
plugins as a fallback.
- // Note: we can't use config.getConfiguredInstance because we
have to remove the property prefixes
- // before calling config(...)
- klass = pluginClassFromConfig(config, classPropertyName,
HeaderConverter.class, scanResult.headerConverters());
+ // Note: we can't use config.getConfiguredInstance because
Converter doesn't implement Configurable, and even if it did
+ // we have to remove the property prefixes before calling
config(...) and we still always want to call Converter.config.
+ klass = pluginClassFromConfig(config, classPropertyName,
basePluginClass, availablePlugins);
break;
case PLUGINS:
- // Attempt to load with the plugin class loader, which uses
the current classloader as a fallback.
- // Note that there will always be at least a default header
converter for the worker
- String converterClassOrAlias =
config.getClass(classPropertyName).getName();
+ // Attempt to load with the plugin class loader, which uses
the current classloader as a fallback
+ String classOrAlias =
config.getClass(classPropertyName).getName();
try {
- klass = pluginClass(
- delegatingLoader,
- converterClassOrAlias,
- HeaderConverter.class
- );
+ klass = pluginClass(delegatingLoader, classOrAlias,
basePluginClass, range);
} catch (ClassNotFoundException e) {
throw new ConnectException(
- "Failed to find any class that implements
HeaderConverter and which name matches "
- + converterClassOrAlias
- + ", available header converters are: "
- +
pluginNames(scanResult.headerConverters())
+ "Failed to find any class that implements " +
basePluginClassName + " and which name matches "
+ + classOrAlias + ", available plugins are:
"
+ + pluginNames(availablePlugins)
);
}
+ break;
}
if (klass == null) {
- throw new ConnectException("Unable to initialize the
HeaderConverter specified in '" + classPropertyName + "'");
+ throw new ConnectException("Unable to initialize the '" +
basePluginClassName
+ + "' specified in '" + classPropertyName + "'");
}
- String configPrefix = classPropertyName + ".";
- Map<String, Object> converterConfig =
config.originalsWithPrefix(configPrefix);
- converterConfig.put(ConverterConfig.TYPE_CONFIG,
ConverterType.HEADER.getName());
- log.debug("Configuring the header converter with configuration
keys:{}{}", System.lineSeparator(), converterConfig.keySet());
-
- HeaderConverter plugin;
+ U plugin;
try (LoaderSwap loaderSwap = withClassLoader(klass.getClassLoader())) {
plugin = newPlugin(klass);
- plugin.configure(converterConfig);
+ DefaultArtifactVersion pluginVersion = new
DefaultArtifactVersion(PluginScanner.versionFor(plugin));
+ if (range != null && range.hasRestrictions() &&
!range.containsVersion(pluginVersion)) {
+ // this can happen if the current class loader is used
+ // if there are version restrictions then this should be
captured, and we should load using the plugin class loader
+ if (classLoaderUsage == ClassLoaderUsage.CURRENT_CLASSLOADER) {
+ return getVersionedPlugin(config, classPropertyName,
versionPropertyName, basePluginClass, ClassLoaderUsage.PLUGINS,
availablePlugins);
+ }
+ }
Review Comment:
Yes, the situation you mentioned is exactly the reason why the public
methods to create a converter with a version provided we don't have the option
to pass in a class loader and defaults to using the plugins loader, otherwise
it can potentially shadow a higher version of a plugin requirement. This code
path should ideally never be executed. Will remove this and add the assertion
suggestion.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]