gharris1727 commented on code in PR #14055: URL: https://github.com/apache/kafka/pull/14055#discussion_r1271051021
########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java: ########## @@ -63,16 +67,72 @@ public Plugins(Map<String, String> props) { // VisibleForTesting Plugins(Map<String, String> props, ClassLoader parent, ClassLoaderFactory factory) { String pluginPath = WorkerConfig.pluginPath(props); + PluginDiscoveryMode discoveryMode = WorkerConfig.pluginDiscovery(props); List<Path> pluginLocations = PluginUtils.pluginLocations(pluginPath); delegatingLoader = factory.newDelegatingClassLoader(parent); Set<PluginSource> pluginSources = PluginUtils.pluginSources(pluginLocations, delegatingLoader, factory); - scanResult = initLoaders(pluginSources); + scanResult = initLoaders(pluginSources, discoveryMode); } - private PluginScanResult initLoaders(Set<PluginSource> pluginSources) { - PluginScanResult reflectiveScanResult = new ReflectionScanner().discoverPlugins(pluginSources); - delegatingLoader.installDiscoveredPlugins(reflectiveScanResult); - return reflectiveScanResult; + public PluginScanResult initLoaders(Set<PluginSource> pluginSources, PluginDiscoveryMode discoveryMode) { + PluginScanResult empty = new PluginScanResult(Collections.emptyList()); + PluginScanResult serviceLoadingScanResult; + try { + serviceLoadingScanResult = PluginDiscoveryMode.serviceLoad(discoveryMode) ? + new ServiceLoaderScanner().discoverPlugins(pluginSources) : empty; + } catch (Throwable t) { + log.error("Unable to perform ServiceLoader scanning as requested by {}={}, this error may be avoided by reconfiguring {}={}", + WorkerConfig.PLUGIN_DISCOVERY_CONFIG, discoveryMode, + WorkerConfig.PLUGIN_DISCOVERY_CONFIG, PluginDiscoveryMode.ONLY_SCAN, t); + throw t; + } + PluginScanResult reflectiveScanResult = PluginDiscoveryMode.reflectivelyScan(discoveryMode) ? + new ReflectionScanner().discoverPlugins(pluginSources) : empty; + PluginScanResult scanResult = new PluginScanResult(Arrays.asList(reflectiveScanResult, serviceLoadingScanResult)); + maybeReportHybridDiscoveryIssue(discoveryMode, serviceLoadingScanResult, scanResult); + delegatingLoader.installDiscoveredPlugins(scanResult); + return scanResult; + } + + private static void maybeReportHybridDiscoveryIssue(PluginDiscoveryMode discoveryMode, PluginScanResult serviceLoadingScanResult, PluginScanResult mergedResult) { + SortedSet<PluginDesc<?>> missingPlugins = new TreeSet<>(); + mergedResult.forEach(missingPlugins::add); + serviceLoadingScanResult.forEach(missingPlugins::remove); + if (missingPlugins.isEmpty()) { + switch (discoveryMode) { + case ONLY_SCAN: + log.debug("Service loading of plugins disabled, consider reconfiguring {}={}", + WorkerConfig.PLUGIN_DISCOVERY_CONFIG, PluginDiscoveryMode.HYBRID_WARN); + break; + case HYBRID_WARN: + case HYBRID_FAIL: + log.warn("All plugins have ServiceLoader manifests, consider reconfiguring {}={}", + WorkerConfig.PLUGIN_DISCOVERY_CONFIG, PluginDiscoveryMode.SERVICE_LOAD); + break; + case SERVICE_LOAD: + log.debug("Reflective loading of plugins disabled, plugins without manifests will not be visible"); + break; + default: + throw new IllegalArgumentException("Unknown discovery mode"); + } + } else { + String message = String.format( + "Plugins are missing ServiceLoader manifests, these plugins will not be visible with %s=%s: %s", Review Comment: Linked documentation is added in #14068 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org