[GitHub] [kafka] C0urante commented on a diff in pull request #13821: KAFKA-15069: Refactor plugin scanning logic into ReflectionScanner and ServiceLoaderScanner

2023-06-29 Thread via GitHub


C0urante commented on code in PR #13821:
URL: https://github.com/apache/kafka/pull/13821#discussion_r1246751378


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java:
##
@@ -104,6 +81,33 @@ public DelegatingClassLoader(List pluginLocations) {
 this(pluginLocations, DelegatingClassLoader.class.getClassLoader());
 }
 
+public Set sources() {

Review Comment:
   I think it's fine in `PluginUtils` if we plan on using it in multiple 
places. Otherwise, we may want to downgrade its visibility and move it into the 
single class that uses it. Either way, thanks for the change--I think this is 
an improvement 👍
   
   I'm not a huge fan of how we're using the `ClassLoaderFactory` class right 
now, though. I can see the value for it in the `Plugins` class when both 
methods are used, but requiring an instance of it in 
`PluginUtils::pluginSources` seems like overkill since we don't need access to 
the `newDelegatingClassLoader` method.
   
   Could we create a separate `PluginClassLoaderFactory` interface, have 
`ClassLoaderFactory` implement that interface, and change the signature of 
`PluginUtils::pluginSources` to accept an instance of that interface instead of 
a `ClassLoaderFactory`?
   
   Also, it may be a little unclear to first-time readers why we have the 
separate `ClassLoaderFactory` class since it tracks no (instance or static) 
state and seems at first glance like all of its logic might be a better fit for 
the `PluginUtils` class. Can we document in that class that its purpose is to 
provide a layer of indirection for the purpose of easier mocking in tests?
   
   Finally, we don't have to copy the visibility of the methods that we've 
extracted into the `ClassLoaderFactory` class; IMO both of those can and should 
be made `public`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13821: KAFKA-15069: Refactor plugin scanning logic into ReflectionScanner and ServiceLoaderScanner

2023-06-29 Thread via GitHub


C0urante commented on code in PR #13821:
URL: https://github.com/apache/kafka/pull/13821#discussion_r1246751378


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java:
##
@@ -104,6 +81,33 @@ public DelegatingClassLoader(List pluginLocations) {
 this(pluginLocations, DelegatingClassLoader.class.getClassLoader());
 }
 
+public Set sources() {

Review Comment:
   I think it's fine in `PluginUtils` if we plan on using it in multiple 
places. Otherwise 
   
   I'm not a huge fan of how we're using the `ClassLoaderFactory` class right 
now, though. I can see the value for it in the `Plugins` class when both 
methods are used, but requiring an instance of it in 
`PluginUtils::pluginSources` seems like overkill since we don't need access to 
the `newDelegatingClassLoader` method.
   
   Could we create a separate `PluginClassLoaderFactory` interface, have 
`ClassLoaderFactory` implement that interface, and change the signature of 
`PluginUtils::pluginSources` to accept an instance of that interface instead of 
a `ClassLoaderFactory`?
   
   Also, it may be a little unclear to first-time readers why we have the 
separate `ClassLoaderFactory` class since it tracks no (instance or static) 
state and seems at first glance like all of its logic might be a better fit for 
the `PluginUtils` class. Can we document in that class that its purpose is to 
provide a layer of indirection for the purpose of easier mocking in tests?
   
   Finally, we don't have to copy the visibility of the methods that we've 
extracted into the `ClassLoaderFactory` class; IMO both of those can and should 
be made `public`.



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java:
##
@@ -325,6 +328,34 @@ public static List pluginUrls(Path topPath) throws 
IOException {
 return Arrays.asList(archives.toArray(new Path[0]));
 }
 
+public static Set pluginSources(List pluginLocations, 
DelegatingClassLoader classLoader, ClassLoaderFactory factory) {

Review Comment:
   Any reason not to generalize the signature, since we don't use any methods 
specific to the `DelegatingClassLoader` class?
   ```suggestion
   public static Set pluginSources(List 
pluginLocations, ClassLoader classLoader, ClassLoaderFactory factory) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13821: KAFKA-15069: Refactor plugin scanning logic into ReflectionScanner and ServiceLoaderScanner

2023-06-29 Thread via GitHub


C0urante commented on code in PR #13821:
URL: https://github.com/apache/kafka/pull/13821#discussion_r1246710694


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java:
##
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.isolation;
+
+import org.apache.kafka.connect.components.Versioned;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.InvocationTargetException;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.sql.Driver;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ServiceConfigurationError;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+public abstract class PluginScanner {
+
+private static final Logger log = 
LoggerFactory.getLogger(PluginScanner.class);
+
+public PluginScanResult discoverPlugins(Set sources) {
+long startMs = System.currentTimeMillis();
+List results = new ArrayList<>();
+for (PluginSource source : sources) {
+results.add(scanUrlsAndAddPlugins(source));
+}
+long endMs = System.currentTimeMillis();
+log.info("Scanning plugins with {} took {} ms", 
getClass().getSimpleName(), endMs - startMs);
+return new PluginScanResult(results);
+}
+
+private PluginScanResult scanUrlsAndAddPlugins(PluginSource source) {
+PluginScanResult plugins = scanPlugins(source);
+loadJdbcDrivers(source.loader());
+return plugins;
+}
+
+protected abstract PluginScanResult scanPlugins(PluginSource source);
+
+private void loadJdbcDrivers(final ClassLoader loader) {
+// Apply here what java.sql.DriverManager does to discover and 
register classes
+// implementing the java.sql.Driver interface.
+AccessController.doPrivileged(
+(PrivilegedAction) () -> {
+ServiceLoader loadedDrivers = ServiceLoader.load(
+Driver.class,
+loader
+);
+Iterator driversIterator = loadedDrivers.iterator();
+try {
+while (driversIterator.hasNext()) {
+Driver driver = driversIterator.next();
+log.debug(
+"Registered java.sql.Driver: {} to 
java.sql.DriverManager",
+driver
+);
+}
+} catch (Throwable t) {
+log.debug(
+"Ignoring java.sql.Driver classes listed in 
resources but not"
++ " present in class loader's classpath: ",
+t
+);
+}
+return null;
+}
+);
+}
+
+@SuppressWarnings({"rawtypes", "unchecked"})
+protected  PluginDesc pluginDesc(Class plugin, String 
version, ClassLoader loader) {
+return new PluginDesc(plugin, version, loader);
+}
+
+@SuppressWarnings("unchecked")
+protected  SortedSet> getServiceLoaderPluginDesc(Class 
klass, ClassLoader loader) {
+SortedSet> result = new TreeSet<>();
+ServiceLoader serviceLoader = ServiceLoader.load(klass, loader);
+for (Iterator iterator = serviceLoader.iterator(); 
iterator.hasNext(); ) {
+try (LoaderSwap loaderSwap = withClassLoader(loader)) {
+T pluginImpl;
+try {
+pluginImpl = iterator.next();
+} catch (ServiceConfigurationError t) {
+log.error("Failed to discover {}{}", 
klass.getSimpleName(), reflectiveErrorDescription(t.getCause()), t);
+continue;
+}
+Class pluginKlass = (Class) 
pluginImpl.getClass();
+if (pluginKlass.getClassLoader() != loader) {
+log.debug("{} from other classloader {} is visible from 
{}, excluding to prevent isolated loading",
+

[GitHub] [kafka] C0urante commented on a diff in pull request #13821: KAFKA-15069: Refactor plugin scanning logic into ReflectionScanner and ServiceLoaderScanner

2023-06-29 Thread via GitHub


C0urante commented on code in PR #13821:
URL: https://github.com/apache/kafka/pull/13821#discussion_r1246708497


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java:
##
@@ -147,229 +151,7 @@ PluginClassLoader newPluginClassLoader(
 );
 }
 
-public PluginScanResult initLoaders() {
-List results = new ArrayList<>();
-for (Path pluginLocation : pluginLocations) {
-try {
-results.add(registerPlugin(pluginLocation));
-} catch (InvalidPathException | MalformedURLException e) {
-log.error("Invalid path in plugin path: {}. Ignoring.", 
pluginLocation, e);
-} catch (IOException e) {
-log.error("Could not get listing for plugin path: {}. 
Ignoring.", pluginLocation, e);
-}
-}
-// Finally add parent/system loader.
-results.add(scanUrlsAndAddPlugins(
-getParent(),
-ClasspathHelper.forJavaClassPath().toArray(new URL[0])
-));
-PluginScanResult scanResult = new PluginScanResult(results);
-installDiscoveredPlugins(scanResult);
-return scanResult;
-}
-
-private PluginScanResult registerPlugin(Path pluginLocation)
-throws IOException {
-log.info("Loading plugin from: {}", pluginLocation);
-List pluginUrls = new ArrayList<>();
-for (Path path : PluginUtils.pluginUrls(pluginLocation)) {
-pluginUrls.add(path.toUri().toURL());
-}
-URL[] urls = pluginUrls.toArray(new URL[0]);
-if (log.isDebugEnabled()) {
-log.debug("Loading plugin urls: {}", Arrays.toString(urls));
-}
-PluginClassLoader loader = newPluginClassLoader(
-pluginLocation.toUri().toURL(),
-urls,
-this
-);
-return scanUrlsAndAddPlugins(loader, urls);
-}
-
-private PluginScanResult scanUrlsAndAddPlugins(
-ClassLoader loader,
-URL[] urls
-) {
-PluginScanResult plugins = scanPluginPath(loader, urls);
-log.info("Registered loader: {}", loader);

Review Comment:
   💯 Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13821: KAFKA-15069: Refactor plugin scanning logic into ReflectionScanner and ServiceLoaderScanner

2023-06-26 Thread via GitHub


C0urante commented on code in PR #13821:
URL: https://github.com/apache/kafka/pull/13821#discussion_r1242437655


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java:
##
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.isolation;
+
+import org.apache.kafka.connect.components.Versioned;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.InvocationTargetException;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.sql.Driver;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ServiceConfigurationError;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+public abstract class PluginScanner {
+
+private static final Logger log = 
LoggerFactory.getLogger(PluginScanner.class);
+
+public PluginScanResult discoverPlugins(Set sources) {
+long startMs = System.currentTimeMillis();
+List results = new ArrayList<>();
+for (PluginSource source : sources) {
+results.add(scanUrlsAndAddPlugins(source));
+}
+long endMs = System.currentTimeMillis();
+log.info("Scanning plugins with {} took {} ms", 
getClass().getSimpleName(), endMs - startMs);
+return new PluginScanResult(results);
+}
+
+private PluginScanResult scanUrlsAndAddPlugins(PluginSource source) {
+PluginScanResult plugins = scanPlugins(source);
+loadJdbcDrivers(source.loader());
+return plugins;
+}
+
+protected abstract PluginScanResult scanPlugins(PluginSource source);
+
+private void loadJdbcDrivers(final ClassLoader loader) {
+// Apply here what java.sql.DriverManager does to discover and 
register classes
+// implementing the java.sql.Driver interface.
+AccessController.doPrivileged(
+(PrivilegedAction) () -> {
+ServiceLoader loadedDrivers = ServiceLoader.load(
+Driver.class,
+loader
+);
+Iterator driversIterator = loadedDrivers.iterator();
+try {
+while (driversIterator.hasNext()) {
+Driver driver = driversIterator.next();
+log.debug(
+"Registered java.sql.Driver: {} to 
java.sql.DriverManager",
+driver
+);
+}
+} catch (Throwable t) {
+log.debug(
+"Ignoring java.sql.Driver classes listed in 
resources but not"
++ " present in class loader's classpath: ",
+t
+);
+}
+return null;
+}
+);
+}
+
+@SuppressWarnings({"rawtypes", "unchecked"})
+protected  PluginDesc pluginDesc(Class plugin, String 
version, ClassLoader loader) {
+return new PluginDesc(plugin, version, loader);
+}
+
+@SuppressWarnings("unchecked")
+protected  SortedSet> getServiceLoaderPluginDesc(Class 
klass, ClassLoader loader) {
+SortedSet> result = new TreeSet<>();
+ServiceLoader serviceLoader = ServiceLoader.load(klass, loader);
+for (Iterator iterator = serviceLoader.iterator(); 
iterator.hasNext(); ) {
+try (LoaderSwap loaderSwap = withClassLoader(loader)) {
+T pluginImpl;
+try {
+pluginImpl = iterator.next();
+} catch (ServiceConfigurationError t) {
+log.error("Failed to discover {}{}", 
klass.getSimpleName(), reflectiveErrorDescription(t.getCause()), t);
+continue;
+}
+Class pluginKlass = (Class) 
pluginImpl.getClass();
+if (pluginKlass.getClassLoader() != loader) {
+log.debug("{} from other classloader {} is visible from 
{}, excluding to prevent isolated loading",
+