snehashisp commented on code in PR #16984:
URL: https://github.com/apache/kafka/pull/16984#discussion_r1870906495
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java:
##########
@@ -69,36 +70,109 @@ public DelegatingClassLoader() {
/**
* Retrieve the PluginClassLoader associated with a plugin class
+ *
* @param name The fully qualified class name of the plugin
* @return the PluginClassLoader that should be used to load this, or null
if the plugin is not isolated.
*/
// VisibleForTesting
- PluginClassLoader pluginClassLoader(String name) {
+ PluginClassLoader pluginClassLoader(String name, VersionRange range) {
if (!PluginUtils.shouldLoadInIsolation(name)) {
return null;
}
+
SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(name);
if (inner == null) {
return null;
}
- ClassLoader pluginLoader = inner.get(inner.lastKey());
+
+
+ ClassLoader pluginLoader = findPluginLoader(inner, name, range);
return pluginLoader instanceof PluginClassLoader
- ? (PluginClassLoader) pluginLoader
- : null;
+ ? (PluginClassLoader) pluginLoader
+ : null;
+ }
+
+ PluginClassLoader pluginClassLoader(String name) {
+ return pluginClassLoader(name, null);
+ }
+
+ ClassLoader connectorLoader(String connectorClassOrAlias, VersionRange
range) throws ClassNotFoundException {
+ String fullName = aliases.getOrDefault(connectorClassOrAlias,
connectorClassOrAlias);
+ // if the plugin is not loaded via the plugin classloader, it might
still be available in the parent delegating
+ // classloader, in order to check if the version satisfies the
requirement we need to load the plugin class here
+ ClassLoader classLoader = loadVersionedPluginClass(fullName, range,
false).getClassLoader();
+ log.debug(
+ "Got plugin class loader: '{}' for connector: {}",
+ classLoader,
+ connectorClassOrAlias
+ );
+ return classLoader;
}
ClassLoader connectorLoader(String connectorClassOrAlias) {
String fullName = aliases.getOrDefault(connectorClassOrAlias,
connectorClassOrAlias);
ClassLoader classLoader = pluginClassLoader(fullName);
if (classLoader == null) classLoader = this;
log.debug(
- "Getting plugin class loader: '{}' for connector: {}",
- classLoader,
- connectorClassOrAlias
+ "Getting plugin class loader: '{}' for connector: {}",
+ classLoader,
+ connectorClassOrAlias
);
return classLoader;
}
+ String resolveFullClassName(String classOrAlias) {
+ return aliases.getOrDefault(classOrAlias, classOrAlias);
+ }
+
+ String latestVersion(String classOrAlias) {
+ if (classOrAlias == null) {
+ return null;
+ }
+ String fullName = aliases.getOrDefault(classOrAlias, classOrAlias);
+ SortedMap<PluginDesc<?>, ClassLoader> inner =
pluginLoaders.get(fullName);
+ if (inner == null) {
+ return null;
+ }
+ return inner.lastKey().version();
+ }
+
+ private ClassLoader findPluginLoader(
+ SortedMap<PluginDesc<?>, ClassLoader> loaders,
+ String pluginName,
+ VersionRange range
+ ) {
+
+ if (range != null) {
+
+ ArtifactVersion version = range.getRecommendedVersion();
Review Comment:
Have opted to throw a exception here as we should not be getting a soft
requirement here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]