C0urante commented on code in PR #13821:
URL: https://github.com/apache/kafka/pull/13821#discussion_r1242437655


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java:
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.isolation;
+
+import org.apache.kafka.connect.components.Versioned;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.InvocationTargetException;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.sql.Driver;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ServiceConfigurationError;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+public abstract class PluginScanner {
+
+    private static final Logger log = 
LoggerFactory.getLogger(PluginScanner.class);
+
+    public PluginScanResult discoverPlugins(Set<PluginSource> sources) {
+        long startMs = System.currentTimeMillis();
+        List<PluginScanResult> results = new ArrayList<>();
+        for (PluginSource source : sources) {
+            results.add(scanUrlsAndAddPlugins(source));
+        }
+        long endMs = System.currentTimeMillis();
+        log.info("Scanning plugins with {} took {} ms", 
getClass().getSimpleName(), endMs - startMs);
+        return new PluginScanResult(results);
+    }
+
+    private PluginScanResult scanUrlsAndAddPlugins(PluginSource source) {
+        PluginScanResult plugins = scanPlugins(source);
+        loadJdbcDrivers(source.loader());
+        return plugins;
+    }
+
+    protected abstract PluginScanResult scanPlugins(PluginSource source);
+
+    private void loadJdbcDrivers(final ClassLoader loader) {
+        // Apply here what java.sql.DriverManager does to discover and 
register classes
+        // implementing the java.sql.Driver interface.
+        AccessController.doPrivileged(
+            (PrivilegedAction<Void>) () -> {
+                ServiceLoader<Driver> loadedDrivers = ServiceLoader.load(
+                        Driver.class,
+                        loader
+                );
+                Iterator<Driver> driversIterator = loadedDrivers.iterator();
+                try {
+                    while (driversIterator.hasNext()) {
+                        Driver driver = driversIterator.next();
+                        log.debug(
+                                "Registered java.sql.Driver: {} to 
java.sql.DriverManager",
+                                driver
+                        );
+                    }
+                } catch (Throwable t) {
+                    log.debug(
+                            "Ignoring java.sql.Driver classes listed in 
resources but not"
+                                    + " present in class loader's classpath: ",
+                            t
+                    );
+                }
+                return null;
+            }
+        );
+    }
+
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    protected <T> PluginDesc<T> pluginDesc(Class<? extends T> plugin, String 
version, ClassLoader loader) {
+        return new PluginDesc(plugin, version, loader);
+    }
+
+    @SuppressWarnings("unchecked")
+    protected <T> SortedSet<PluginDesc<T>> getServiceLoaderPluginDesc(Class<T> 
klass, ClassLoader loader) {
+        SortedSet<PluginDesc<T>> result = new TreeSet<>();
+        ServiceLoader<T> serviceLoader = ServiceLoader.load(klass, loader);
+        for (Iterator<T> iterator = serviceLoader.iterator(); 
iterator.hasNext(); ) {
+            try (LoaderSwap loaderSwap = withClassLoader(loader)) {
+                T pluginImpl;
+                try {
+                    pluginImpl = iterator.next();
+                } catch (ServiceConfigurationError t) {
+                    log.error("Failed to discover {}{}", 
klass.getSimpleName(), reflectiveErrorDescription(t.getCause()), t);
+                    continue;
+                }
+                Class<? extends T> pluginKlass = (Class<? extends T>) 
pluginImpl.getClass();
+                if (pluginKlass.getClassLoader() != loader) {
+                    log.debug("{} from other classloader {} is visible from 
{}, excluding to prevent isolated loading",
+                            pluginKlass.getSimpleName(), 
pluginKlass.getClassLoader(), loader);
+                    continue;
+                }
+                result.add(pluginDesc(pluginKlass, versionFor(pluginImpl), 
loader));
+            }
+        }
+        return result;
+    }
+
+    protected static <T> String versionFor(T pluginImpl) {
+        try {
+            if (pluginImpl instanceof Versioned) {
+                return ((Versioned) pluginImpl).version();
+            }
+        } catch (Throwable t) {
+            log.error("Failed to get plugin version for " + 
pluginImpl.getClass(), t);
+        }
+        return DelegatingClassLoader.UNDEFINED_VERSION;

Review Comment:
   Nit: since this constant isn't used in `DelegatingClassLoader` anymore, can 
we move it somewhere else (probably `PluginScanner`)?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java:
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.isolation;
+
+import org.apache.kafka.connect.components.Versioned;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.InvocationTargetException;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.sql.Driver;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ServiceConfigurationError;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+public abstract class PluginScanner {
+
+    private static final Logger log = 
LoggerFactory.getLogger(PluginScanner.class);
+
+    public PluginScanResult discoverPlugins(Set<PluginSource> sources) {
+        long startMs = System.currentTimeMillis();
+        List<PluginScanResult> results = new ArrayList<>();
+        for (PluginSource source : sources) {
+            results.add(scanUrlsAndAddPlugins(source));
+        }
+        long endMs = System.currentTimeMillis();
+        log.info("Scanning plugins with {} took {} ms", 
getClass().getSimpleName(), endMs - startMs);
+        return new PluginScanResult(results);
+    }
+
+    private PluginScanResult scanUrlsAndAddPlugins(PluginSource source) {
+        PluginScanResult plugins = scanPlugins(source);
+        loadJdbcDrivers(source.loader());
+        return plugins;
+    }
+
+    protected abstract PluginScanResult scanPlugins(PluginSource source);
+
+    private void loadJdbcDrivers(final ClassLoader loader) {
+        // Apply here what java.sql.DriverManager does to discover and 
register classes
+        // implementing the java.sql.Driver interface.
+        AccessController.doPrivileged(
+            (PrivilegedAction<Void>) () -> {
+                ServiceLoader<Driver> loadedDrivers = ServiceLoader.load(
+                        Driver.class,
+                        loader
+                );
+                Iterator<Driver> driversIterator = loadedDrivers.iterator();
+                try {
+                    while (driversIterator.hasNext()) {
+                        Driver driver = driversIterator.next();
+                        log.debug(
+                                "Registered java.sql.Driver: {} to 
java.sql.DriverManager",
+                                driver
+                        );
+                    }
+                } catch (Throwable t) {
+                    log.debug(
+                            "Ignoring java.sql.Driver classes listed in 
resources but not"
+                                    + " present in class loader's classpath: ",
+                            t
+                    );
+                }
+                return null;
+            }
+        );
+    }
+
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    protected <T> PluginDesc<T> pluginDesc(Class<? extends T> plugin, String 
version, ClassLoader loader) {
+        return new PluginDesc(plugin, version, loader);
+    }
+
+    @SuppressWarnings("unchecked")
+    protected <T> SortedSet<PluginDesc<T>> getServiceLoaderPluginDesc(Class<T> 
klass, ClassLoader loader) {
+        SortedSet<PluginDesc<T>> result = new TreeSet<>();
+        ServiceLoader<T> serviceLoader = ServiceLoader.load(klass, loader);
+        for (Iterator<T> iterator = serviceLoader.iterator(); 
iterator.hasNext(); ) {
+            try (LoaderSwap loaderSwap = withClassLoader(loader)) {
+                T pluginImpl;
+                try {
+                    pluginImpl = iterator.next();
+                } catch (ServiceConfigurationError t) {
+                    log.error("Failed to discover {}{}", 
klass.getSimpleName(), reflectiveErrorDescription(t.getCause()), t);
+                    continue;
+                }
+                Class<? extends T> pluginKlass = (Class<? extends T>) 
pluginImpl.getClass();
+                if (pluginKlass.getClassLoader() != loader) {
+                    log.debug("{} from other classloader {} is visible from 
{}, excluding to prevent isolated loading",
+                            pluginKlass.getSimpleName(), 
pluginKlass.getClassLoader(), loader);
+                    continue;
+                }
+                result.add(pluginDesc(pluginKlass, versionFor(pluginImpl), 
loader));
+            }
+        }
+        return result;
+    }
+
+    protected static <T> String versionFor(T pluginImpl) {
+        try {
+            if (pluginImpl instanceof Versioned) {
+                return ((Versioned) pluginImpl).version();
+            }
+        } catch (Throwable t) {
+            log.error("Failed to get plugin version for " + 
pluginImpl.getClass(), t);
+        }
+        return DelegatingClassLoader.UNDEFINED_VERSION;
+    }
+
+    protected String reflectiveErrorDescription(Throwable t) {

Review Comment:
   Nit: this was `static` before, which is a bit better since we never need to 
mock it and it demonstrates that the method doesn't rely on instance state.
   ```suggestion
       protected static String reflectiveErrorDescription(Throwable t) {
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ReflectionScanner.java:
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.isolation;
+
+import org.apache.kafka.common.config.provider.ConfigProvider;
+import 
org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
+import org.apache.kafka.connect.rest.ConnectRestExtension;
+import org.apache.kafka.connect.sink.SinkConnector;
+import org.apache.kafka.connect.source.SourceConnector;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.apache.kafka.connect.transforms.Transformation;
+import org.apache.kafka.connect.transforms.predicates.Predicate;
+import org.reflections.Configuration;
+import org.reflections.Reflections;
+import org.reflections.ReflectionsException;
+import org.reflections.scanners.SubTypesScanner;
+import org.reflections.util.ConfigurationBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URL;
+import java.util.Collections;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+public class ReflectionScanner extends PluginScanner {
+
+    private static final Logger log = 
LoggerFactory.getLogger(ReflectionScanner.class);
+
+    public static <T> String versionFor(Class<? extends T> pluginKlass) throws 
ReflectiveOperationException {
+        T pluginImpl = pluginKlass.getDeclaredConstructor().newInstance();
+        return versionFor(pluginImpl);
+    }
+
+    protected PluginScanResult scanPlugins(PluginSource source) {

Review Comment:
   Nit:
   ```suggestion
       @Override
       protected PluginScanResult scanPlugins(PluginSource source) {
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java:
##########
@@ -147,229 +151,7 @@ PluginClassLoader newPluginClassLoader(
         );
     }
 
-    public PluginScanResult initLoaders() {
-        List<PluginScanResult> results = new ArrayList<>();
-        for (Path pluginLocation : pluginLocations) {
-            try {
-                results.add(registerPlugin(pluginLocation));
-            } catch (InvalidPathException | MalformedURLException e) {
-                log.error("Invalid path in plugin path: {}. Ignoring.", 
pluginLocation, e);
-            } catch (IOException e) {
-                log.error("Could not get listing for plugin path: {}. 
Ignoring.", pluginLocation, e);
-            }
-        }
-        // Finally add parent/system loader.
-        results.add(scanUrlsAndAddPlugins(
-                getParent(),
-                ClasspathHelper.forJavaClassPath().toArray(new URL[0])
-        ));
-        PluginScanResult scanResult = new PluginScanResult(results);
-        installDiscoveredPlugins(scanResult);
-        return scanResult;
-    }
-
-    private PluginScanResult registerPlugin(Path pluginLocation)
-        throws IOException {
-        log.info("Loading plugin from: {}", pluginLocation);
-        List<URL> pluginUrls = new ArrayList<>();
-        for (Path path : PluginUtils.pluginUrls(pluginLocation)) {
-            pluginUrls.add(path.toUri().toURL());
-        }
-        URL[] urls = pluginUrls.toArray(new URL[0]);
-        if (log.isDebugEnabled()) {
-            log.debug("Loading plugin urls: {}", Arrays.toString(urls));
-        }

Review Comment:
   Can we preserve this log message?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ReflectionScanner.java:
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.isolation;
+
+import org.apache.kafka.common.config.provider.ConfigProvider;
+import 
org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
+import org.apache.kafka.connect.rest.ConnectRestExtension;
+import org.apache.kafka.connect.sink.SinkConnector;
+import org.apache.kafka.connect.source.SourceConnector;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.apache.kafka.connect.transforms.Transformation;
+import org.apache.kafka.connect.transforms.predicates.Predicate;
+import org.reflections.Configuration;
+import org.reflections.Reflections;
+import org.reflections.ReflectionsException;
+import org.reflections.scanners.SubTypesScanner;
+import org.reflections.util.ConfigurationBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URL;
+import java.util.Collections;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+public class ReflectionScanner extends PluginScanner {

Review Comment:
   Can we get a brief Javadoc on this class?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoaderTest.java:
##########
@@ -39,7 +40,9 @@ public void testLoadingUnloadedPluginClass() {
                 Collections.emptyList(),
                 DelegatingClassLoader.class.getClassLoader()
         );
-        classLoader.initLoaders();
+        Set<PluginSource> classLoaders = classLoader.sources();
+        PluginScanResult scanResult = new 
ReflectionScanner().discoverPlugins(classLoaders);
+        classLoader.installDiscoveredPlugins(scanResult);

Review Comment:
   These lines are repeated a few times throughout this test suite; worth 
pulling out into a reusable method?
   
   ```java
   private void initLoaders(DelegatingClassLoader classLoader) {
       Set<PluginSource> classLoaders = classLoader.sources();
       PluginScanResult scanResult = new 
ReflectionScanner().discoverPlugins(classLoaders);
       classLoader.installDiscoveredPlugins(scanResult);
   }
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java:
##########
@@ -104,6 +81,33 @@ public DelegatingClassLoader(List<Path> pluginLocations) {
         this(pluginLocations, DelegatingClassLoader.class.getClassLoader());
     }
 
+    public Set<PluginSource> sources() {
+        Set<PluginSource> pluginLoaders = new HashSet<>();

Review Comment:
   Nit:
   ```suggestion
           Set<PluginSource> pluginSources = new HashSet<>();
   ```
   
   (Or just `result` would be fine too)



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ServiceLoaderScanner.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.isolation;
+
+import org.apache.kafka.common.config.provider.ConfigProvider;
+import 
org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
+import org.apache.kafka.connect.rest.ConnectRestExtension;
+import org.apache.kafka.connect.sink.SinkConnector;
+import org.apache.kafka.connect.source.SourceConnector;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.apache.kafka.connect.transforms.Transformation;
+import org.apache.kafka.connect.transforms.predicates.Predicate;
+
+import java.util.SortedSet;
+
+public class ServiceLoaderScanner extends PluginScanner {
+    @Override

Review Comment:
   Nit:
   ```suggestion
   
           @Override
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java:
##########
@@ -104,6 +81,33 @@ public DelegatingClassLoader(List<Path> pluginLocations) {
         this(pluginLocations, DelegatingClassLoader.class.getClassLoader());
     }
 
+    public Set<PluginSource> sources() {

Review Comment:
   Since this is only used in one non-testing place right now we can leave it 
as-is, but I'm wondering if we might extract this logic into a common place in 
a subsequent PR.
   
   I see that the description explains how this logic is dependent on the 
`ClassLoader` used to load plugins, but if that's the only dependency, would it 
be enough to add a `ClassLoader` parameter to this method (along with, 
presumably, a `List<Path>` for the to-be-scanned plugin locations)?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java:
##########
@@ -147,229 +151,7 @@ PluginClassLoader newPluginClassLoader(
         );
     }
 
-    public PluginScanResult initLoaders() {
-        List<PluginScanResult> results = new ArrayList<>();
-        for (Path pluginLocation : pluginLocations) {
-            try {
-                results.add(registerPlugin(pluginLocation));
-            } catch (InvalidPathException | MalformedURLException e) {
-                log.error("Invalid path in plugin path: {}. Ignoring.", 
pluginLocation, e);
-            } catch (IOException e) {
-                log.error("Could not get listing for plugin path: {}. 
Ignoring.", pluginLocation, e);
-            }
-        }
-        // Finally add parent/system loader.
-        results.add(scanUrlsAndAddPlugins(
-                getParent(),
-                ClasspathHelper.forJavaClassPath().toArray(new URL[0])
-        ));
-        PluginScanResult scanResult = new PluginScanResult(results);
-        installDiscoveredPlugins(scanResult);
-        return scanResult;
-    }
-
-    private PluginScanResult registerPlugin(Path pluginLocation)
-        throws IOException {
-        log.info("Loading plugin from: {}", pluginLocation);
-        List<URL> pluginUrls = new ArrayList<>();
-        for (Path path : PluginUtils.pluginUrls(pluginLocation)) {
-            pluginUrls.add(path.toUri().toURL());
-        }
-        URL[] urls = pluginUrls.toArray(new URL[0]);
-        if (log.isDebugEnabled()) {
-            log.debug("Loading plugin urls: {}", Arrays.toString(urls));
-        }
-        PluginClassLoader loader = newPluginClassLoader(
-                pluginLocation.toUri().toURL(),
-                urls,
-                this
-        );
-        return scanUrlsAndAddPlugins(loader, urls);
-    }
-
-    private PluginScanResult scanUrlsAndAddPlugins(
-            ClassLoader loader,
-            URL[] urls
-    ) {
-        PluginScanResult plugins = scanPluginPath(loader, urls);
-        log.info("Registered loader: {}", loader);

Review Comment:
   Can we preserve this log message, since the underlying logic of creating a 
new classloader per plugin location remains unchanged?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java:
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.isolation;
+
+import org.apache.kafka.connect.components.Versioned;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.InvocationTargetException;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.sql.Driver;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ServiceConfigurationError;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+public abstract class PluginScanner {
+
+    private static final Logger log = 
LoggerFactory.getLogger(PluginScanner.class);
+
+    public PluginScanResult discoverPlugins(Set<PluginSource> sources) {
+        long startMs = System.currentTimeMillis();
+        List<PluginScanResult> results = new ArrayList<>();
+        for (PluginSource source : sources) {
+            results.add(scanUrlsAndAddPlugins(source));
+        }
+        long endMs = System.currentTimeMillis();
+        log.info("Scanning plugins with {} took {} ms", 
getClass().getSimpleName(), endMs - startMs);
+        return new PluginScanResult(results);
+    }
+
+    private PluginScanResult scanUrlsAndAddPlugins(PluginSource source) {
+        PluginScanResult plugins = scanPlugins(source);
+        loadJdbcDrivers(source.loader());
+        return plugins;
+    }
+
+    protected abstract PluginScanResult scanPlugins(PluginSource source);
+
+    private void loadJdbcDrivers(final ClassLoader loader) {
+        // Apply here what java.sql.DriverManager does to discover and 
register classes
+        // implementing the java.sql.Driver interface.
+        AccessController.doPrivileged(
+            (PrivilegedAction<Void>) () -> {
+                ServiceLoader<Driver> loadedDrivers = ServiceLoader.load(
+                        Driver.class,
+                        loader
+                );
+                Iterator<Driver> driversIterator = loadedDrivers.iterator();
+                try {
+                    while (driversIterator.hasNext()) {
+                        Driver driver = driversIterator.next();
+                        log.debug(
+                                "Registered java.sql.Driver: {} to 
java.sql.DriverManager",
+                                driver
+                        );
+                    }
+                } catch (Throwable t) {
+                    log.debug(
+                            "Ignoring java.sql.Driver classes listed in 
resources but not"
+                                    + " present in class loader's classpath: ",
+                            t
+                    );
+                }
+                return null;
+            }
+        );
+    }
+
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    protected <T> PluginDesc<T> pluginDesc(Class<? extends T> plugin, String 
version, ClassLoader loader) {
+        return new PluginDesc(plugin, version, loader);
+    }
+
+    @SuppressWarnings("unchecked")
+    protected <T> SortedSet<PluginDesc<T>> getServiceLoaderPluginDesc(Class<T> 
klass, ClassLoader loader) {
+        SortedSet<PluginDesc<T>> result = new TreeSet<>();
+        ServiceLoader<T> serviceLoader = ServiceLoader.load(klass, loader);
+        for (Iterator<T> iterator = serviceLoader.iterator(); 
iterator.hasNext(); ) {
+            try (LoaderSwap loaderSwap = withClassLoader(loader)) {
+                T pluginImpl;
+                try {
+                    pluginImpl = iterator.next();
+                } catch (ServiceConfigurationError t) {
+                    log.error("Failed to discover {}{}", 
klass.getSimpleName(), reflectiveErrorDescription(t.getCause()), t);
+                    continue;
+                }
+                Class<? extends T> pluginKlass = (Class<? extends T>) 
pluginImpl.getClass();
+                if (pluginKlass.getClassLoader() != loader) {
+                    log.debug("{} from other classloader {} is visible from 
{}, excluding to prevent isolated loading",
+                            pluginKlass.getSimpleName(), 
pluginKlass.getClassLoader(), loader);
+                    continue;
+                }
+                result.add(pluginDesc(pluginKlass, versionFor(pluginImpl), 
loader));
+            }
+        }
+        return result;
+    }
+
+    protected static <T> String versionFor(T pluginImpl) {
+        try {
+            if (pluginImpl instanceof Versioned) {
+                return ((Versioned) pluginImpl).version();
+            }
+        } catch (Throwable t) {
+            log.error("Failed to get plugin version for " + 
pluginImpl.getClass(), t);
+        }
+        return DelegatingClassLoader.UNDEFINED_VERSION;
+    }
+
+    protected String reflectiveErrorDescription(Throwable t) {
+        if (t instanceof NoSuchMethodException) {
+            return ": Plugin class must have a no-args constructor, and cannot 
be a non-static inner class";
+        } else if (t instanceof SecurityException) {
+            return ": Security settings must allow reflective instantiation of 
plugin classes";
+        } else if (t instanceof IllegalAccessException) {
+            return ": Plugin class default constructor must be public";
+        } else if (t instanceof ExceptionInInitializerError) {
+            return ": Failed to statically initialize plugin class";
+        } else if (t instanceof InvocationTargetException) {
+            return ": Failed to invoke plugin constructor";
+        } else {
+            return "";
+        }
+    }
+
+    public LoaderSwap withClassLoader(ClassLoader loader) {

Review Comment:
   Nit: doesn't have to be public
   ```suggestion
       protected LoaderSwap withClassLoader(ClassLoader loader) {
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java:
##########
@@ -147,229 +151,7 @@ PluginClassLoader newPluginClassLoader(
         );
     }
 
-    public PluginScanResult initLoaders() {
-        List<PluginScanResult> results = new ArrayList<>();
-        for (Path pluginLocation : pluginLocations) {
-            try {
-                results.add(registerPlugin(pluginLocation));
-            } catch (InvalidPathException | MalformedURLException e) {
-                log.error("Invalid path in plugin path: {}. Ignoring.", 
pluginLocation, e);
-            } catch (IOException e) {
-                log.error("Could not get listing for plugin path: {}. 
Ignoring.", pluginLocation, e);
-            }
-        }
-        // Finally add parent/system loader.
-        results.add(scanUrlsAndAddPlugins(
-                getParent(),
-                ClasspathHelper.forJavaClassPath().toArray(new URL[0])
-        ));
-        PluginScanResult scanResult = new PluginScanResult(results);
-        installDiscoveredPlugins(scanResult);
-        return scanResult;
-    }
-
-    private PluginScanResult registerPlugin(Path pluginLocation)
-        throws IOException {
-        log.info("Loading plugin from: {}", pluginLocation);

Review Comment:
   Can we preserve this log message?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ServiceLoaderScanner.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.isolation;
+
+import org.apache.kafka.common.config.provider.ConfigProvider;
+import 
org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
+import org.apache.kafka.connect.rest.ConnectRestExtension;
+import org.apache.kafka.connect.sink.SinkConnector;
+import org.apache.kafka.connect.source.SourceConnector;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.apache.kafka.connect.transforms.Transformation;
+import org.apache.kafka.connect.transforms.predicates.Predicate;
+
+import java.util.SortedSet;
+
+public class ServiceLoaderScanner extends PluginScanner {

Review Comment:
   Can we get a brief Javadoc on this class?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java:
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.isolation;
+
+import org.apache.kafka.connect.components.Versioned;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.InvocationTargetException;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.sql.Driver;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ServiceConfigurationError;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+public abstract class PluginScanner {

Review Comment:
   Can we get some Javadocs on this class and its public and protected methods? 
Trying to reduce the bus factor in case someone else needs to look at this 
later.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java:
##########
@@ -77,6 +77,13 @@ protected DelegatingClassLoader 
newDelegatingClassLoader(final List<Path> plugin
         );
     }
 
+    public PluginScanResult initLoaders() {

Review Comment:
   Nit: can be private
   ```suggestion
       private PluginScanResult initLoaders() {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to