[ https://issues.apache.org/jira/browse/KAFKA-6991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16504217#comment-16504217 ]
ASF GitHub Bot commented on KAFKA-6991: --------------------------------------- ewencp closed pull request #5135: KAFKA-6991 : KIP-285 : Fix ServiceLoader issue with PluginClassLoader URL: https://github.com/apache/kafka/pull/5135 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh index 354e10650ee..f33342ecb08 100755 --- a/bin/kafka-run-class.sh +++ b/bin/kafka-run-class.sh @@ -129,7 +129,7 @@ do CLASSPATH="$CLASSPATH:$dir/*" done -for cc_pkg in "api" "transforms" "runtime" "file" "json" "tools" +for cc_pkg in "api" "transforms" "runtime" "file" "json" "tools" "basic-auth-extension" do for file in "$base_dir"/connect/${cc_pkg}/build/libs/connect-${cc_pkg}*.jar; do diff --git a/build.gradle b/build.gradle index 4f3fd770b78..52ba463d4f1 100644 --- a/build.gradle +++ b/build.gradle @@ -507,7 +507,7 @@ for ( sv in availableScalaVersions ) { } } -def connectPkgs = ['connect:api', 'connect:runtime', 'connect:transforms', 'connect:json', 'connect:file'] +def connectPkgs = ['connect:api', 'connect:runtime', 'connect:transforms', 'connect:json', 'connect:file', 'connect:basic-auth-extension'] def pkgs = ['clients', 'examples', 'log4j-appender', 'tools', 'streams', 'streams:streams-scala', 'streams:test-utils', 'streams:examples'] + connectPkgs /** Create one task per default Scala version */ @@ -728,6 +728,8 @@ project(':core') { from(project(':connect:json').configurations.runtime) { into("libs/") } from(project(':connect:file').jar) { into("libs/") } from(project(':connect:file').configurations.runtime) { into("libs/") } + from(project(':connect:basic-auth-extension').jar) { into("libs/") } + from(project(':connect:basic-auth-extension').configurations.runtime) { into("libs/") } from(project(':streams').jar) { into("libs/") } from(project(':streams').configurations.runtime) { into("libs/") } from(project(':streams:streams-scala').jar) { into("libs/") } diff --git a/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extenstion/BasicAuthSecurityRestExtension.java b/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/BasicAuthSecurityRestExtension.java similarity index 94% rename from connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extenstion/BasicAuthSecurityRestExtension.java rename to connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/BasicAuthSecurityRestExtension.java index 91d5d9ca00c..4169e9eaea6 100644 --- a/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extenstion/BasicAuthSecurityRestExtension.java +++ b/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/BasicAuthSecurityRestExtension.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.kafka.connect.rest.basic.auth.extenstion; +package org.apache.kafka.connect.rest.basic.auth.extension; import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.connect.rest.ConnectRestExtension; @@ -33,13 +33,13 @@ * * <p>To use this extension, one needs to add the following config in the {@code worker.properties} * <pre> - * rest.extension.classes = org.apache.kafka.connect.rest.basic.auth.extenstion.BasicAuthSecurityRestExtension + * rest.extension.classes = org.apache.kafka.connect.rest.basic.auth.extension.BasicAuthSecurityRestExtension * </pre> * * <p> An example JAAS config would look as below * <Pre> * KafkaConnect { - * org.apache.kafka.connect.rest.basic.auth.extenstion.PropertyFileLoginModule required + * org.apache.kafka.connect.rest.basic.auth.extension.PropertyFileLoginModule required * file="/mnt/secret/credentials.properties"; * }; *</Pre> diff --git a/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extenstion/JaasBasicAuthFilter.java b/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilter.java similarity index 98% rename from connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extenstion/JaasBasicAuthFilter.java rename to connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilter.java index 7231700af7c..6167434b980 100644 --- a/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extenstion/JaasBasicAuthFilter.java +++ b/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilter.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.kafka.connect.rest.basic.auth.extenstion; +package org.apache.kafka.connect.rest.basic.auth.extension; import org.apache.kafka.common.config.ConfigException; diff --git a/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extenstion/PropertyFileLoginModule.java b/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/PropertyFileLoginModule.java similarity index 98% rename from connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extenstion/PropertyFileLoginModule.java rename to connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/PropertyFileLoginModule.java index 7af7863b2ce..101c6f49d02 100644 --- a/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extenstion/PropertyFileLoginModule.java +++ b/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/PropertyFileLoginModule.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.kafka.connect.rest.basic.auth.extenstion; +package org.apache.kafka.connect.rest.basic.auth.extension; import org.apache.kafka.common.config.ConfigException; import org.slf4j.Logger; diff --git a/connect/basic-auth-extension/src/main/resources/META-INF/services/org.apache.kafka.connect.rest.ConnectRestExtension b/connect/basic-auth-extension/src/main/resources/META-INF/services/org.apache.kafka.connect.rest.ConnectRestExtension index 098c9473d05..ba7ae5b580d 100644 --- a/connect/basic-auth-extension/src/main/resources/META-INF/services/org.apache.kafka.connect.rest.ConnectRestExtension +++ b/connect/basic-auth-extension/src/main/resources/META-INF/services/org.apache.kafka.connect.rest.ConnectRestExtension @@ -13,4 +13,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -org.apache.kafka.connect.rest.basic.auth.extenstion.BasicAuthSecurityRestExtension \ No newline at end of file +org.apache.kafka.connect.rest.basic.auth.extension.BasicAuthSecurityRestExtension \ No newline at end of file diff --git a/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extenstion/JaasBasicAuthFilterTest.java b/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilterTest.java similarity index 98% rename from connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extenstion/JaasBasicAuthFilterTest.java rename to connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilterTest.java index 80299f81c29..d61fc06cd90 100644 --- a/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extenstion/JaasBasicAuthFilterTest.java +++ b/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilterTest.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.kafka.connect.rest.basic.auth.extenstion; +package org.apache.kafka.connect.rest.basic.auth.extension; import org.apache.kafka.common.security.JaasUtils; import org.easymock.EasyMock; @@ -155,7 +155,7 @@ private void setupJaasConfig(String loginModule, String credentialFilePath, bool List<String> lines; lines = new ArrayList<>(); - lines.add(loginModule + " { org.apache.kafka.connect.rest.basic.auth.extenstion.PropertyFileLoginModule required "); + lines.add(loginModule + " { org.apache.kafka.connect.rest.basic.auth.extension.PropertyFileLoginModule required "); if (includeFileOptions) { lines.add("file=\"" + credentialFilePath + "\""); } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java index fb9cae39a22..8e31220fb68 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java @@ -46,6 +46,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Enumeration; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -56,6 +57,7 @@ import java.util.SortedSet; import java.util.TreeMap; import java.util.TreeSet; +import java.util.stream.Collectors; public class DelegatingClassLoader extends URLClassLoader { private static final Logger log = LoggerFactory.getLogger(DelegatingClassLoader.class); @@ -72,6 +74,12 @@ private final List<String> pluginPaths; private final Map<Path, PluginClassLoader> activePaths; + private static final String MANIFEST_PREFIX = "META-INF/services/"; + private static final Class[] SERVICE_LOADER_PLUGINS = new Class[] {ConnectRestExtension.class, ConfigProvider.class}; + private static final Set<String> PLUGIN_MANIFEST_FILES = + Arrays.stream(SERVICE_LOADER_PLUGINS).map(serviceLoaderPlugin -> MANIFEST_PREFIX + serviceLoaderPlugin.getName()) + .collect(Collectors.toSet()); + public DelegatingClassLoader(List<String> pluginPaths, ClassLoader parent) { super(new URL[0], parent); this.pluginPaths = pluginPaths; @@ -324,12 +332,11 @@ private PluginScanResult scanPluginPath( return result; } - private <T> Collection<PluginDesc<T>> getServiceLoaderPluginDesc(Class<T> klass, - ClassLoader loader) { + private <T> Collection<PluginDesc<T>> getServiceLoaderPluginDesc(Class<T> klass, ClassLoader loader) { ServiceLoader<T> serviceLoader = ServiceLoader.load(klass, loader); Collection<PluginDesc<T>> result = new ArrayList<>(); - for (T impl : serviceLoader) { - result.add(new PluginDesc<>(klass, versionFor(impl), loader)); + for (T pluginImpl : serviceLoader) { + result.add(new PluginDesc<>((Class<? extends T>) pluginImpl.getClass(), versionFor(pluginImpl), loader)); } return result; } @@ -407,4 +414,31 @@ protected void scan(URL url) { } } } + + @Override + public URL getResource(String name) { + if (serviceLoaderManifestForPlugin(name)) { + // Default implementation of getResource searches the parent class loader and if not available/found, its own URL paths. + // This will enable thePluginClassLoader to limit its resource search only to its own URL paths. + return null; + } else { + return super.getResource(name); + } + } + + @Override + public Enumeration<URL> getResources(String name) throws IOException { + if (serviceLoaderManifestForPlugin(name)) { + // Default implementation of getResources searches the parent class loader and and also its own URL paths. This will enable the + // PluginClassLoader to limit its resource search to only its own URL paths. + return null; + } else { + return super.getResources(name); + } + } + + //Visible for testing + static boolean serviceLoaderManifestForPlugin(String name) { + return PLUGIN_MANIFEST_FILES.contains(name); + } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java index f6c1185f189..b4aee4741c3 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java @@ -128,6 +128,7 @@ + "|file\\..*" + "|converters\\..*" + "|storage\\.StringConverter" + + "|rest\\.basic\\.auth\\.extension\\.BasicAuthSecurityRestExtension" + "))$"; private static final DirectoryStream.Filter<Path> PLUGIN_PATH_FILTER = new DirectoryStream diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoaderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoaderTest.java new file mode 100644 index 00000000000..83ff0407a76 --- /dev/null +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoaderTest.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.connect.runtime.isolation; + +import org.junit.Test; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class DelegatingClassLoaderTest { + + @Test + public void testWhiteListedManifestResources() { + assertTrue( + DelegatingClassLoader.serviceLoaderManifestForPlugin("META-INF/services/org.apache.kafka.connect.rest.ConnectRestExtension")); + assertTrue( + DelegatingClassLoader.serviceLoaderManifestForPlugin("META-INF/services/org.apache.kafka.common.config.ConfigProvider")); + } + + @Test + public void testOtherResources() { + assertFalse( + DelegatingClassLoader.serviceLoaderManifestForPlugin("META-INF/services/org.apache.kafka.connect.transforms.Transformation")); + assertFalse(DelegatingClassLoader.serviceLoaderManifestForPlugin("resource/version.properties")); + } +} diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java index a5ab50afe0a..9698153f986 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java @@ -146,6 +146,9 @@ public void testAllowedConnectFrameworkClasses() throws Exception { assertTrue(PluginUtils.shouldLoadInIsolation( "org.apache.kafka.connect.storage.StringConverter") ); + assertTrue(PluginUtils.shouldLoadInIsolation( + "org.apache.kafka.connect.rest.basic.auth.extension.BasicAuthSecurityRestExtension" + )); } @Test ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Connect Rest Extension Plugin issue with Class Loader > ----------------------------------------------------- > > Key: KAFKA-6991 > URL: https://issues.apache.org/jira/browse/KAFKA-6991 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect > Affects Versions: 2.0.0 > Reporter: Magesh kumar Nandakumar > Assignee: Magesh kumar Nandakumar > Priority: Major > Fix For: 2.0.0, 2.1.0 > > > KAFKA-6776 implementation has 2 issues with the class loader. > # If a RestExtension plugin is available in the classpath, then the plugin > gets associated with all the plugin class loaders. This is happening because > the Manifest resource is searched in the parent class loader as well by > default. > # The class name associated with the plugin class loader is incorrect. It > should be the implementation class instead of the interface. -- This message was sent by Atlassian JIRA (v7.6.3#76005)