gharris1727 commented on code in PR #13971: URL: https://github.com/apache/kafka/pull/13971#discussion_r1267103768
########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java: ########## @@ -118,35 +120,80 @@ private void loadJdbcDrivers(final ClassLoader loader) { } @SuppressWarnings({"rawtypes", "unchecked"}) - protected <T> PluginDesc<T> pluginDesc(Class<? extends T> plugin, String version, ClassLoader loader) { - return new PluginDesc(plugin, version, loader); + protected <T> PluginDesc<T> pluginDesc(Class<? extends T> plugin, String version, PluginSource source) { + return new PluginDesc(plugin, version, source.loader()); } @SuppressWarnings("unchecked") - protected <T> SortedSet<PluginDesc<T>> getServiceLoaderPluginDesc(Class<T> klass, ClassLoader loader) { + protected <T> SortedSet<PluginDesc<T>> getServiceLoaderPluginDesc(Class<T> klass, PluginSource source) { SortedSet<PluginDesc<T>> result = new TreeSet<>(); - ServiceLoader<T> serviceLoader = ServiceLoader.load(klass, loader); - for (Iterator<T> iterator = serviceLoader.iterator(); iterator.hasNext(); ) { - try (LoaderSwap loaderSwap = withClassLoader(loader)) { + ServiceLoader<T> serviceLoader = handleLinkageError(klass, source, () -> ServiceLoader.load(klass, source.loader())); + Iterator<T> iterator = handleLinkageError(klass, source, serviceLoader::iterator); + while (handleLinkageError(klass, source, iterator::hasNext)) { + try (LoaderSwap loaderSwap = withClassLoader(source.loader())) { T pluginImpl; try { - pluginImpl = iterator.next(); + pluginImpl = handleLinkageError(klass, source, iterator::next); } catch (ServiceConfigurationError t) { - log.error("Failed to discover {}{}", klass.getSimpleName(), reflectiveErrorDescription(t.getCause()), t); + log.error("Failed to discover {} in {}{}", + klass.getSimpleName(), source.location(), reflectiveErrorDescription(t.getCause()), t); continue; } Class<? extends T> pluginKlass = (Class<? extends T>) pluginImpl.getClass(); - if (pluginKlass.getClassLoader() != loader) { + if (pluginKlass.getClassLoader() != source.loader()) { log.debug("{} from other classloader {} is visible from {}, excluding to prevent isolated loading", - pluginKlass.getSimpleName(), pluginKlass.getClassLoader(), loader); + pluginKlass.getSimpleName(), pluginKlass.getClassLoader(), source.location()); continue; } - result.add(pluginDesc(pluginKlass, versionFor(pluginImpl), loader)); + result.add(pluginDesc(pluginKlass, versionFor(pluginImpl), source)); } } return result; } + /** + * Helper to evaluate a {@link ServiceLoader} operation while handling {@link LinkageError}s. + * + * @param klass The plugin superclass which is being loaded + * @param function A function on a {@link ServiceLoader} which may throw {@link LinkageError} + * @return the return value of function + * @throws Error errors thrown by the passed-in function + * @param <T> Type being iterated over by the ServiceLoader + * @param <U> Return value of the passed-in function + */ + private <T, U> U handleLinkageError(Class<T> klass, PluginSource source, Supplier<U> function) { + // It's difficult to know for sure if the iterator was able to advance past the first broken + // plugin class, or if it will continue to fail on that broken class for any subsequent calls + // to Iterator::hasNext or Iterator::next + // For reference, see https://bugs.openjdk.org/browse/JDK-8196182, which describes + // the behavior we are trying to mitigate with this logic as buggy, but indicates that a fix + // in the JDK standard library ServiceLoader implementation is unlikely to land + LinkageError lastError = null; + // Try a fixed maximum number of times in case the ServiceLoader cannot move past a faulty plugin, + // but the LinkageError varies between calls. This limit is chosen to be higher than the typical number + // of plugins in a single plugin location, and to limit the amount of log-spam on startup. + for (int i = 0; i < 100; i++) { + try { + return function.get(); + } catch (LinkageError t) { + // As an optimization, hide subsequent error logs if two consecutive errors look similar. + // This reduces log-spam for iterators which cannot advance and rethrow the same exception. + if (lastError == null + || !Objects.equals(lastError.getClass(), t.getClass()) + || !Objects.equals(lastError.getMessage(), t.getMessage())) { + log.error("Failed to discover {} in {}{}", + klass.getSimpleName(), source.location(), reflectiveErrorDescription(t.getCause()), t); + } + lastError = t; + } + } + log.error("Received excessive ServiceLoader errors: assuming the runtime ServiceLoader implementation cannot " + + "skip faulty implementations. Use a different JRE, resolve errors for all {} in {}, or " + + "disable service loader scanning.", Review Comment: This code-path is used even in ONLY_SCAN, so this remediation advice is actually misplaced. I'll remove this, and make sure that later when we implement the plugin.discovery mode that this error prints a separate recommendation to reconfigure the worker. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org