C0urante commented on code in PR #13971:
URL: https://github.com/apache/kafka/pull/13971#discussion_r1266782159


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java:
##########
@@ -118,35 +120,80 @@ private void loadJdbcDrivers(final ClassLoader loader) {
     }
 
     @SuppressWarnings({"rawtypes", "unchecked"})
-    protected <T> PluginDesc<T> pluginDesc(Class<? extends T> plugin, String 
version, ClassLoader loader) {
-        return new PluginDesc(plugin, version, loader);
+    protected <T> PluginDesc<T> pluginDesc(Class<? extends T> plugin, String 
version, PluginSource source) {
+        return new PluginDesc(plugin, version, source.loader());
     }
 
     @SuppressWarnings("unchecked")
-    protected <T> SortedSet<PluginDesc<T>> getServiceLoaderPluginDesc(Class<T> 
klass, ClassLoader loader) {
+    protected <T> SortedSet<PluginDesc<T>> getServiceLoaderPluginDesc(Class<T> 
klass, PluginSource source) {
         SortedSet<PluginDesc<T>> result = new TreeSet<>();
-        ServiceLoader<T> serviceLoader = ServiceLoader.load(klass, loader);
-        for (Iterator<T> iterator = serviceLoader.iterator(); 
iterator.hasNext(); ) {
-            try (LoaderSwap loaderSwap = withClassLoader(loader)) {
+        ServiceLoader<T> serviceLoader = handleLinkageError(klass, source, () 
-> ServiceLoader.load(klass, source.loader()));

Review Comment:
   We added the `handleLinkageError` method because it was possible that calls 
to `Iterator::hasNext` and `Iterator::next` might skip over bad plugins, even 
if they threw a `LinkageError`. Do we have any reason to expect the same with 
`ServiceLoader::load` or `ServiceLoader::iterator`? If not, IMO we should leave 
these calls unwrapped.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java:
##########
@@ -118,35 +120,80 @@ private void loadJdbcDrivers(final ClassLoader loader) {
     }
 
     @SuppressWarnings({"rawtypes", "unchecked"})
-    protected <T> PluginDesc<T> pluginDesc(Class<? extends T> plugin, String 
version, ClassLoader loader) {
-        return new PluginDesc(plugin, version, loader);
+    protected <T> PluginDesc<T> pluginDesc(Class<? extends T> plugin, String 
version, PluginSource source) {
+        return new PluginDesc(plugin, version, source.loader());
     }
 
     @SuppressWarnings("unchecked")
-    protected <T> SortedSet<PluginDesc<T>> getServiceLoaderPluginDesc(Class<T> 
klass, ClassLoader loader) {
+    protected <T> SortedSet<PluginDesc<T>> getServiceLoaderPluginDesc(Class<T> 
klass, PluginSource source) {
         SortedSet<PluginDesc<T>> result = new TreeSet<>();
-        ServiceLoader<T> serviceLoader = ServiceLoader.load(klass, loader);
-        for (Iterator<T> iterator = serviceLoader.iterator(); 
iterator.hasNext(); ) {
-            try (LoaderSwap loaderSwap = withClassLoader(loader)) {
+        ServiceLoader<T> serviceLoader = handleLinkageError(klass, source, () 
-> ServiceLoader.load(klass, source.loader()));
+        Iterator<T> iterator = handleLinkageError(klass, source, 
serviceLoader::iterator);
+        while (handleLinkageError(klass, source, iterator::hasNext)) {
+            try (LoaderSwap loaderSwap = withClassLoader(source.loader())) {
                 T pluginImpl;
                 try {
-                    pluginImpl = iterator.next();
+                    pluginImpl = handleLinkageError(klass, source, 
iterator::next);
                 } catch (ServiceConfigurationError t) {
-                    log.error("Failed to discover {}{}", 
klass.getSimpleName(), reflectiveErrorDescription(t.getCause()), t);
+                    log.error("Failed to discover {} in {}{}",
+                            klass.getSimpleName(), source.location(), 
reflectiveErrorDescription(t.getCause()), t);
                     continue;
                 }
                 Class<? extends T> pluginKlass = (Class<? extends T>) 
pluginImpl.getClass();
-                if (pluginKlass.getClassLoader() != loader) {
+                if (pluginKlass.getClassLoader() != source.loader()) {
                     log.debug("{} from other classloader {} is visible from 
{}, excluding to prevent isolated loading",
-                            pluginKlass.getSimpleName(), 
pluginKlass.getClassLoader(), loader);
+                            pluginKlass.getSimpleName(), 
pluginKlass.getClassLoader(), source.location());
                     continue;
                 }
-                result.add(pluginDesc(pluginKlass, versionFor(pluginImpl), 
loader));
+                result.add(pluginDesc(pluginKlass, versionFor(pluginImpl), 
source));
             }
         }
         return result;
     }
 
+    /**
+     * Helper to evaluate a {@link ServiceLoader} operation while handling 
{@link LinkageError}s.
+     *
+     * @param klass The plugin superclass which is being loaded
+     * @param function A function on a {@link ServiceLoader} which may throw 
{@link LinkageError}
+     * @return the return value of function
+     * @throws Error errors thrown by the passed-in function
+     * @param <T> Type being iterated over by the ServiceLoader
+     * @param <U> Return value of the passed-in function
+     */
+    private <T, U> U handleLinkageError(Class<T> klass, PluginSource source, 
Supplier<U> function) {
+        // It's difficult to know for sure if the iterator was able to advance 
past the first broken
+        // plugin class, or if it will continue to fail on that broken class 
for any subsequent calls
+        // to Iterator::hasNext or Iterator::next
+        // For reference, see https://bugs.openjdk.org/browse/JDK-8196182, 
which describes
+        // the behavior we are trying to mitigate with this logic as buggy, 
but indicates that a fix
+        // in the JDK standard library ServiceLoader implementation is 
unlikely to land
+        LinkageError lastError = null;
+        // Try a fixed maximum number of times in case the ServiceLoader 
cannot move past a faulty plugin,
+        // but the LinkageError varies between calls. This limit is chosen to 
be higher than the typical number
+        // of plugins in a single plugin location, and to limit the amount of 
log-spam on startup.
+        for (int i = 0; i < 100; i++) {
+            try {
+                return function.get();
+            } catch (LinkageError t) {
+                // As an optimization, hide subsequent error logs if two 
consecutive errors look similar.
+                // This reduces log-spam for iterators which cannot advance 
and rethrow the same exception.
+                if (lastError == null
+                        || !Objects.equals(lastError.getClass(), t.getClass())
+                        || !Objects.equals(lastError.getMessage(), 
t.getMessage())) {
+                    log.error("Failed to discover {} in {}{}",
+                            klass.getSimpleName(), source.location(), 
reflectiveErrorDescription(t.getCause()), t);
+                }
+                lastError = t;
+            }
+        }
+        log.error("Received excessive ServiceLoader errors: assuming the 
runtime ServiceLoader implementation cannot " +
+                        "skip faulty implementations. Use a different JRE, 
resolve errors for all {} in {}, or " +

Review Comment:
   Nit: I still think it's a bit misleading to specify the plugin class here. 
There may be a ton of broken plugins in this directory and if a user fixes all 
sink connectors and then sees this same error again telling them to fix all 
converters, they might rightfully be annoyed.
   
   I think it's fine to just say "resolve errors for plugins in 
$PLUGIN_LOCATION" and leave it at that.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java:
##########
@@ -118,35 +120,80 @@ private void loadJdbcDrivers(final ClassLoader loader) {
     }
 
     @SuppressWarnings({"rawtypes", "unchecked"})
-    protected <T> PluginDesc<T> pluginDesc(Class<? extends T> plugin, String 
version, ClassLoader loader) {
-        return new PluginDesc(plugin, version, loader);
+    protected <T> PluginDesc<T> pluginDesc(Class<? extends T> plugin, String 
version, PluginSource source) {
+        return new PluginDesc(plugin, version, source.loader());
     }
 
     @SuppressWarnings("unchecked")
-    protected <T> SortedSet<PluginDesc<T>> getServiceLoaderPluginDesc(Class<T> 
klass, ClassLoader loader) {
+    protected <T> SortedSet<PluginDesc<T>> getServiceLoaderPluginDesc(Class<T> 
klass, PluginSource source) {
         SortedSet<PluginDesc<T>> result = new TreeSet<>();
-        ServiceLoader<T> serviceLoader = ServiceLoader.load(klass, loader);
-        for (Iterator<T> iterator = serviceLoader.iterator(); 
iterator.hasNext(); ) {
-            try (LoaderSwap loaderSwap = withClassLoader(loader)) {
+        ServiceLoader<T> serviceLoader = handleLinkageError(klass, source, () 
-> ServiceLoader.load(klass, source.loader()));
+        Iterator<T> iterator = handleLinkageError(klass, source, 
serviceLoader::iterator);
+        while (handleLinkageError(klass, source, iterator::hasNext)) {
+            try (LoaderSwap loaderSwap = withClassLoader(source.loader())) {
                 T pluginImpl;
                 try {
-                    pluginImpl = iterator.next();
+                    pluginImpl = handleLinkageError(klass, source, 
iterator::next);
                 } catch (ServiceConfigurationError t) {
-                    log.error("Failed to discover {}{}", 
klass.getSimpleName(), reflectiveErrorDescription(t.getCause()), t);
+                    log.error("Failed to discover {} in {}{}",
+                            klass.getSimpleName(), source.location(), 
reflectiveErrorDescription(t.getCause()), t);
                     continue;
                 }
                 Class<? extends T> pluginKlass = (Class<? extends T>) 
pluginImpl.getClass();
-                if (pluginKlass.getClassLoader() != loader) {
+                if (pluginKlass.getClassLoader() != source.loader()) {
                     log.debug("{} from other classloader {} is visible from 
{}, excluding to prevent isolated loading",
-                            pluginKlass.getSimpleName(), 
pluginKlass.getClassLoader(), loader);
+                            pluginKlass.getSimpleName(), 
pluginKlass.getClassLoader(), source.location());
                     continue;
                 }
-                result.add(pluginDesc(pluginKlass, versionFor(pluginImpl), 
loader));
+                result.add(pluginDesc(pluginKlass, versionFor(pluginImpl), 
source));
             }
         }
         return result;
     }
 
+    /**
+     * Helper to evaluate a {@link ServiceLoader} operation while handling 
{@link LinkageError}s.
+     *
+     * @param klass The plugin superclass which is being loaded
+     * @param function A function on a {@link ServiceLoader} which may throw 
{@link LinkageError}
+     * @return the return value of function
+     * @throws Error errors thrown by the passed-in function
+     * @param <T> Type being iterated over by the ServiceLoader
+     * @param <U> Return value of the passed-in function
+     */
+    private <T, U> U handleLinkageError(Class<T> klass, PluginSource source, 
Supplier<U> function) {
+        // It's difficult to know for sure if the iterator was able to advance 
past the first broken
+        // plugin class, or if it will continue to fail on that broken class 
for any subsequent calls
+        // to Iterator::hasNext or Iterator::next
+        // For reference, see https://bugs.openjdk.org/browse/JDK-8196182, 
which describes
+        // the behavior we are trying to mitigate with this logic as buggy, 
but indicates that a fix
+        // in the JDK standard library ServiceLoader implementation is 
unlikely to land
+        LinkageError lastError = null;
+        // Try a fixed maximum number of times in case the ServiceLoader 
cannot move past a faulty plugin,
+        // but the LinkageError varies between calls. This limit is chosen to 
be higher than the typical number
+        // of plugins in a single plugin location, and to limit the amount of 
log-spam on startup.
+        for (int i = 0; i < 100; i++) {
+            try {
+                return function.get();
+            } catch (LinkageError t) {
+                // As an optimization, hide subsequent error logs if two 
consecutive errors look similar.
+                // This reduces log-spam for iterators which cannot advance 
and rethrow the same exception.
+                if (lastError == null
+                        || !Objects.equals(lastError.getClass(), t.getClass())
+                        || !Objects.equals(lastError.getMessage(), 
t.getMessage())) {
+                    log.error("Failed to discover {} in {}{}",
+                            klass.getSimpleName(), source.location(), 
reflectiveErrorDescription(t.getCause()), t);
+                }
+                lastError = t;
+            }
+        }
+        log.error("Received excessive ServiceLoader errors: assuming the 
runtime ServiceLoader implementation cannot " +
+                        "skip faulty implementations. Use a different JRE, 
resolve errors for all {} in {}, or " +
+                        "disable service loader scanning.",

Review Comment:
   Since we'll be silently enabling service loader scanning by setting the 
default to `HYBRID_WARN`, can we explicitly instruct users on how to disable 
service loader scanning (i.e., set `plugin.discovery` to `ONLY_SCAN`)?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java:
##########
@@ -118,35 +120,80 @@ private void loadJdbcDrivers(final ClassLoader loader) {
     }
 
     @SuppressWarnings({"rawtypes", "unchecked"})
-    protected <T> PluginDesc<T> pluginDesc(Class<? extends T> plugin, String 
version, ClassLoader loader) {
-        return new PluginDesc(plugin, version, loader);
+    protected <T> PluginDesc<T> pluginDesc(Class<? extends T> plugin, String 
version, PluginSource source) {
+        return new PluginDesc(plugin, version, source.loader());
     }
 
     @SuppressWarnings("unchecked")
-    protected <T> SortedSet<PluginDesc<T>> getServiceLoaderPluginDesc(Class<T> 
klass, ClassLoader loader) {
+    protected <T> SortedSet<PluginDesc<T>> getServiceLoaderPluginDesc(Class<T> 
klass, PluginSource source) {
         SortedSet<PluginDesc<T>> result = new TreeSet<>();
-        ServiceLoader<T> serviceLoader = ServiceLoader.load(klass, loader);
-        for (Iterator<T> iterator = serviceLoader.iterator(); 
iterator.hasNext(); ) {
-            try (LoaderSwap loaderSwap = withClassLoader(loader)) {
+        ServiceLoader<T> serviceLoader = handleLinkageError(klass, source, () 
-> ServiceLoader.load(klass, source.loader()));
+        Iterator<T> iterator = handleLinkageError(klass, source, 
serviceLoader::iterator);
+        while (handleLinkageError(klass, source, iterator::hasNext)) {
+            try (LoaderSwap loaderSwap = withClassLoader(source.loader())) {
                 T pluginImpl;
                 try {
-                    pluginImpl = iterator.next();
+                    pluginImpl = handleLinkageError(klass, source, 
iterator::next);
                 } catch (ServiceConfigurationError t) {
-                    log.error("Failed to discover {}{}", 
klass.getSimpleName(), reflectiveErrorDescription(t.getCause()), t);
+                    log.error("Failed to discover {} in {}{}",
+                            klass.getSimpleName(), source.location(), 
reflectiveErrorDescription(t.getCause()), t);

Review Comment:
   We provide a null `location` for the `PluginSource` corresponding to the 
system classpath, which won't look great in these log messages.
   
   Since that field (and its accessor method) are currently only used for log 
messages, what do you think about altering `PluginSource::location` to return a 
string, and using "System classpath" in that case?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ReflectionScanner.java:
##########
@@ -75,66 +78,67 @@ public static <T> String versionFor(Class<? extends T> 
pluginKlass) throws Refle
 
     @Override
     protected PluginScanResult scanPlugins(PluginSource source) {
-        ClassLoader loader = source.loader();
         ConfigurationBuilder builder = new ConfigurationBuilder();
-        builder.setClassLoaders(new ClassLoader[]{loader});
+        builder.setClassLoaders(new ClassLoader[]{source.loader()});
         builder.addUrls(source.urls());
         builder.setScanners(new SubTypesScanner());
         builder.useParallelExecutor();
         Reflections reflections = new InternalReflections(builder);
 
         return new PluginScanResult(
-                getPluginDesc(reflections, SinkConnector.class, loader),
-                getPluginDesc(reflections, SourceConnector.class, loader),
-                getPluginDesc(reflections, Converter.class, loader),
-                getPluginDesc(reflections, HeaderConverter.class, loader),
-                getTransformationPluginDesc(loader, reflections),
-                getPredicatePluginDesc(loader, reflections),
-                getServiceLoaderPluginDesc(ConfigProvider.class, loader),
-                getServiceLoaderPluginDesc(ConnectRestExtension.class, loader),
-                
getServiceLoaderPluginDesc(ConnectorClientConfigOverridePolicy.class, loader)
+                getPluginDesc(reflections, SinkConnector.class, source),
+                getPluginDesc(reflections, SourceConnector.class, source),
+                getPluginDesc(reflections, Converter.class, source),
+                getPluginDesc(reflections, HeaderConverter.class, source),
+                getTransformationPluginDesc(source, reflections),
+                getPredicatePluginDesc(source, reflections),
+                getServiceLoaderPluginDesc(ConfigProvider.class, source),
+                getServiceLoaderPluginDesc(ConnectRestExtension.class, source),
+                
getServiceLoaderPluginDesc(ConnectorClientConfigOverridePolicy.class, source)
         );
     }
 
     @SuppressWarnings({"unchecked"})
-    private SortedSet<PluginDesc<Predicate<?>>> 
getPredicatePluginDesc(ClassLoader loader, Reflections reflections) {
-        return (SortedSet<PluginDesc<Predicate<?>>>) (SortedSet<?>) 
getPluginDesc(reflections, Predicate.class, loader);
+    private SortedSet<PluginDesc<Predicate<?>>> 
getPredicatePluginDesc(PluginSource source, Reflections reflections) {
+        return (SortedSet<PluginDesc<Predicate<?>>>) (SortedSet<?>) 
getPluginDesc(reflections, Predicate.class, source);
     }
 
     @SuppressWarnings({"unchecked"})
-    private SortedSet<PluginDesc<Transformation<?>>> 
getTransformationPluginDesc(ClassLoader loader, Reflections reflections) {
-        return (SortedSet<PluginDesc<Transformation<?>>>) (SortedSet<?>) 
getPluginDesc(reflections, Transformation.class, loader);
+    private SortedSet<PluginDesc<Transformation<?>>> 
getTransformationPluginDesc(PluginSource source, Reflections reflections) {
+        return (SortedSet<PluginDesc<Transformation<?>>>) (SortedSet<?>) 
getPluginDesc(reflections, Transformation.class, source);
     }
 
     private <T> SortedSet<PluginDesc<T>> getPluginDesc(
             Reflections reflections,
             Class<T> klass,
-            ClassLoader loader
+            PluginSource source
     ) {
         Set<Class<? extends T>> plugins;
         try {
             plugins = reflections.getSubTypesOf(klass);
         } catch (ReflectionsException e) {
-            log.debug("Reflections scanner could not find any classes for 
URLs: " +
-                    reflections.getConfiguration().getUrls(), e);
+            log.debug("Reflections scanner could not find any {} in {} for 
URLs: {}",
+                    klass, source.location(), source.urls(), e);
             return Collections.emptySortedSet();
         }
 
         SortedSet<PluginDesc<T>> result = new TreeSet<>();
         for (Class<? extends T> pluginKlass : plugins) {
             if (!PluginUtils.isConcrete(pluginKlass)) {
-                log.debug("Skipping {} as it is not concrete implementation", 
pluginKlass);
+                log.debug("Skipping {} in {} as it is not concrete 
implementation", pluginKlass, source.location());
                 continue;
             }
-            if (pluginKlass.getClassLoader() != loader) {
-                log.debug("{} from other classloader {} is visible from {}, 
excluding to prevent isolated loading",
-                        pluginKlass.getSimpleName(), 
pluginKlass.getClassLoader(), loader);
+            if (pluginKlass.getClassLoader() != source.loader()) {
+                log.debug("{} from other classloader {} is visible from {} 
excluding to prevent isolated loading",

Review Comment:
   Is this change intentional? The comma made sense IMO (though technically I 
suppose it should be a semicolon).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to