[GitHub] [kafka] C0urante commented on a diff in pull request #13821: KAFKA-15069: Refactor plugin scanning logic into ReflectionScanner and ServiceLoaderScanner
C0urante commented on code in PR #13821: URL: https://github.com/apache/kafka/pull/13821#discussion_r1246751378 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java: ## @@ -104,6 +81,33 @@ public DelegatingClassLoader(List pluginLocations) { this(pluginLocations, DelegatingClassLoader.class.getClassLoader()); } +public Set sources() { Review Comment: I think it's fine in `PluginUtils` if we plan on using it in multiple places. Otherwise, we may want to downgrade its visibility and move it into the single class that uses it. Either way, thanks for the change--I think this is an improvement 👍 I'm not a huge fan of how we're using the `ClassLoaderFactory` class right now, though. I can see the value for it in the `Plugins` class when both methods are used, but requiring an instance of it in `PluginUtils::pluginSources` seems like overkill since we don't need access to the `newDelegatingClassLoader` method. Could we create a separate `PluginClassLoaderFactory` interface, have `ClassLoaderFactory` implement that interface, and change the signature of `PluginUtils::pluginSources` to accept an instance of that interface instead of a `ClassLoaderFactory`? Also, it may be a little unclear to first-time readers why we have the separate `ClassLoaderFactory` class since it tracks no (instance or static) state and seems at first glance like all of its logic might be a better fit for the `PluginUtils` class. Can we document in that class that its purpose is to provide a layer of indirection for the purpose of easier mocking in tests? Finally, we don't have to copy the visibility of the methods that we've extracted into the `ClassLoaderFactory` class; IMO both of those can and should be made `public`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13821: KAFKA-15069: Refactor plugin scanning logic into ReflectionScanner and ServiceLoaderScanner
C0urante commented on code in PR #13821: URL: https://github.com/apache/kafka/pull/13821#discussion_r1246751378 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java: ## @@ -104,6 +81,33 @@ public DelegatingClassLoader(List pluginLocations) { this(pluginLocations, DelegatingClassLoader.class.getClassLoader()); } +public Set sources() { Review Comment: I think it's fine in `PluginUtils` if we plan on using it in multiple places. Otherwise I'm not a huge fan of how we're using the `ClassLoaderFactory` class right now, though. I can see the value for it in the `Plugins` class when both methods are used, but requiring an instance of it in `PluginUtils::pluginSources` seems like overkill since we don't need access to the `newDelegatingClassLoader` method. Could we create a separate `PluginClassLoaderFactory` interface, have `ClassLoaderFactory` implement that interface, and change the signature of `PluginUtils::pluginSources` to accept an instance of that interface instead of a `ClassLoaderFactory`? Also, it may be a little unclear to first-time readers why we have the separate `ClassLoaderFactory` class since it tracks no (instance or static) state and seems at first glance like all of its logic might be a better fit for the `PluginUtils` class. Can we document in that class that its purpose is to provide a layer of indirection for the purpose of easier mocking in tests? Finally, we don't have to copy the visibility of the methods that we've extracted into the `ClassLoaderFactory` class; IMO both of those can and should be made `public`. ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java: ## @@ -325,6 +328,34 @@ public static List pluginUrls(Path topPath) throws IOException { return Arrays.asList(archives.toArray(new Path[0])); } +public static Set pluginSources(List pluginLocations, DelegatingClassLoader classLoader, ClassLoaderFactory factory) { Review Comment: Any reason not to generalize the signature, since we don't use any methods specific to the `DelegatingClassLoader` class? ```suggestion public static Set pluginSources(List pluginLocations, ClassLoader classLoader, ClassLoaderFactory factory) { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13821: KAFKA-15069: Refactor plugin scanning logic into ReflectionScanner and ServiceLoaderScanner
C0urante commented on code in PR #13821: URL: https://github.com/apache/kafka/pull/13821#discussion_r1246710694 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java: ## @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime.isolation; + +import org.apache.kafka.connect.components.Versioned; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.InvocationTargetException; +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.sql.Driver; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.ServiceConfigurationError; +import java.util.ServiceLoader; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; + +public abstract class PluginScanner { + +private static final Logger log = LoggerFactory.getLogger(PluginScanner.class); + +public PluginScanResult discoverPlugins(Set sources) { +long startMs = System.currentTimeMillis(); +List results = new ArrayList<>(); +for (PluginSource source : sources) { +results.add(scanUrlsAndAddPlugins(source)); +} +long endMs = System.currentTimeMillis(); +log.info("Scanning plugins with {} took {} ms", getClass().getSimpleName(), endMs - startMs); +return new PluginScanResult(results); +} + +private PluginScanResult scanUrlsAndAddPlugins(PluginSource source) { +PluginScanResult plugins = scanPlugins(source); +loadJdbcDrivers(source.loader()); +return plugins; +} + +protected abstract PluginScanResult scanPlugins(PluginSource source); + +private void loadJdbcDrivers(final ClassLoader loader) { +// Apply here what java.sql.DriverManager does to discover and register classes +// implementing the java.sql.Driver interface. +AccessController.doPrivileged( +(PrivilegedAction) () -> { +ServiceLoader loadedDrivers = ServiceLoader.load( +Driver.class, +loader +); +Iterator driversIterator = loadedDrivers.iterator(); +try { +while (driversIterator.hasNext()) { +Driver driver = driversIterator.next(); +log.debug( +"Registered java.sql.Driver: {} to java.sql.DriverManager", +driver +); +} +} catch (Throwable t) { +log.debug( +"Ignoring java.sql.Driver classes listed in resources but not" ++ " present in class loader's classpath: ", +t +); +} +return null; +} +); +} + +@SuppressWarnings({"rawtypes", "unchecked"}) +protected PluginDesc pluginDesc(Class plugin, String version, ClassLoader loader) { +return new PluginDesc(plugin, version, loader); +} + +@SuppressWarnings("unchecked") +protected SortedSet> getServiceLoaderPluginDesc(Class klass, ClassLoader loader) { +SortedSet> result = new TreeSet<>(); +ServiceLoader serviceLoader = ServiceLoader.load(klass, loader); +for (Iterator iterator = serviceLoader.iterator(); iterator.hasNext(); ) { +try (LoaderSwap loaderSwap = withClassLoader(loader)) { +T pluginImpl; +try { +pluginImpl = iterator.next(); +} catch (ServiceConfigurationError t) { +log.error("Failed to discover {}{}", klass.getSimpleName(), reflectiveErrorDescription(t.getCause()), t); +continue; +} +Class pluginKlass = (Class) pluginImpl.getClass(); +if (pluginKlass.getClassLoader() != loader) { +log.debug("{} from other classloader {} is visible from {}, excluding to prevent isolated loading", +
[GitHub] [kafka] C0urante commented on a diff in pull request #13821: KAFKA-15069: Refactor plugin scanning logic into ReflectionScanner and ServiceLoaderScanner
C0urante commented on code in PR #13821: URL: https://github.com/apache/kafka/pull/13821#discussion_r1246708497 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java: ## @@ -147,229 +151,7 @@ PluginClassLoader newPluginClassLoader( ); } -public PluginScanResult initLoaders() { -List results = new ArrayList<>(); -for (Path pluginLocation : pluginLocations) { -try { -results.add(registerPlugin(pluginLocation)); -} catch (InvalidPathException | MalformedURLException e) { -log.error("Invalid path in plugin path: {}. Ignoring.", pluginLocation, e); -} catch (IOException e) { -log.error("Could not get listing for plugin path: {}. Ignoring.", pluginLocation, e); -} -} -// Finally add parent/system loader. -results.add(scanUrlsAndAddPlugins( -getParent(), -ClasspathHelper.forJavaClassPath().toArray(new URL[0]) -)); -PluginScanResult scanResult = new PluginScanResult(results); -installDiscoveredPlugins(scanResult); -return scanResult; -} - -private PluginScanResult registerPlugin(Path pluginLocation) -throws IOException { -log.info("Loading plugin from: {}", pluginLocation); -List pluginUrls = new ArrayList<>(); -for (Path path : PluginUtils.pluginUrls(pluginLocation)) { -pluginUrls.add(path.toUri().toURL()); -} -URL[] urls = pluginUrls.toArray(new URL[0]); -if (log.isDebugEnabled()) { -log.debug("Loading plugin urls: {}", Arrays.toString(urls)); -} -PluginClassLoader loader = newPluginClassLoader( -pluginLocation.toUri().toURL(), -urls, -this -); -return scanUrlsAndAddPlugins(loader, urls); -} - -private PluginScanResult scanUrlsAndAddPlugins( -ClassLoader loader, -URL[] urls -) { -PluginScanResult plugins = scanPluginPath(loader, urls); -log.info("Registered loader: {}", loader); Review Comment: 💯 Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13821: KAFKA-15069: Refactor plugin scanning logic into ReflectionScanner and ServiceLoaderScanner
C0urante commented on code in PR #13821: URL: https://github.com/apache/kafka/pull/13821#discussion_r1242437655 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java: ## @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime.isolation; + +import org.apache.kafka.connect.components.Versioned; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.InvocationTargetException; +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.sql.Driver; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.ServiceConfigurationError; +import java.util.ServiceLoader; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; + +public abstract class PluginScanner { + +private static final Logger log = LoggerFactory.getLogger(PluginScanner.class); + +public PluginScanResult discoverPlugins(Set sources) { +long startMs = System.currentTimeMillis(); +List results = new ArrayList<>(); +for (PluginSource source : sources) { +results.add(scanUrlsAndAddPlugins(source)); +} +long endMs = System.currentTimeMillis(); +log.info("Scanning plugins with {} took {} ms", getClass().getSimpleName(), endMs - startMs); +return new PluginScanResult(results); +} + +private PluginScanResult scanUrlsAndAddPlugins(PluginSource source) { +PluginScanResult plugins = scanPlugins(source); +loadJdbcDrivers(source.loader()); +return plugins; +} + +protected abstract PluginScanResult scanPlugins(PluginSource source); + +private void loadJdbcDrivers(final ClassLoader loader) { +// Apply here what java.sql.DriverManager does to discover and register classes +// implementing the java.sql.Driver interface. +AccessController.doPrivileged( +(PrivilegedAction) () -> { +ServiceLoader loadedDrivers = ServiceLoader.load( +Driver.class, +loader +); +Iterator driversIterator = loadedDrivers.iterator(); +try { +while (driversIterator.hasNext()) { +Driver driver = driversIterator.next(); +log.debug( +"Registered java.sql.Driver: {} to java.sql.DriverManager", +driver +); +} +} catch (Throwable t) { +log.debug( +"Ignoring java.sql.Driver classes listed in resources but not" ++ " present in class loader's classpath: ", +t +); +} +return null; +} +); +} + +@SuppressWarnings({"rawtypes", "unchecked"}) +protected PluginDesc pluginDesc(Class plugin, String version, ClassLoader loader) { +return new PluginDesc(plugin, version, loader); +} + +@SuppressWarnings("unchecked") +protected SortedSet> getServiceLoaderPluginDesc(Class klass, ClassLoader loader) { +SortedSet> result = new TreeSet<>(); +ServiceLoader serviceLoader = ServiceLoader.load(klass, loader); +for (Iterator iterator = serviceLoader.iterator(); iterator.hasNext(); ) { +try (LoaderSwap loaderSwap = withClassLoader(loader)) { +T pluginImpl; +try { +pluginImpl = iterator.next(); +} catch (ServiceConfigurationError t) { +log.error("Failed to discover {}{}", klass.getSimpleName(), reflectiveErrorDescription(t.getCause()), t); +continue; +} +Class pluginKlass = (Class) pluginImpl.getClass(); +if (pluginKlass.getClassLoader() != loader) { +log.debug("{} from other classloader {} is visible from {}, excluding to prevent isolated loading", +