gharris1727 commented on code in PR #14055:
URL: https://github.com/apache/kafka/pull/14055#discussion_r1270918323
##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java:
##
@@ -63,16 +67,72 @@ public Plugins(Map props) {
// VisibleForTesting
Plugins(Map props, ClassLoader parent, ClassLoaderFactory
factory) {
String pluginPath = WorkerConfig.pluginPath(props);
+PluginDiscoveryMode discoveryMode =
WorkerConfig.pluginDiscovery(props);
List pluginLocations = PluginUtils.pluginLocations(pluginPath);
delegatingLoader = factory.newDelegatingClassLoader(parent);
Set pluginSources =
PluginUtils.pluginSources(pluginLocations, delegatingLoader, factory);
-scanResult = initLoaders(pluginSources);
+scanResult = initLoaders(pluginSources, discoveryMode);
}
-private PluginScanResult initLoaders(Set pluginSources) {
-PluginScanResult reflectiveScanResult = new
ReflectionScanner().discoverPlugins(pluginSources);
-delegatingLoader.installDiscoveredPlugins(reflectiveScanResult);
-return reflectiveScanResult;
+public PluginScanResult initLoaders(Set pluginSources,
PluginDiscoveryMode discoveryMode) {
+PluginScanResult empty = new PluginScanResult(Collections.emptyList());
+PluginScanResult serviceLoadingScanResult;
+try {
+serviceLoadingScanResult =
PluginDiscoveryMode.serviceLoad(discoveryMode) ?
+new ServiceLoaderScanner().discoverPlugins(pluginSources)
: empty;
+} catch (Throwable t) {
+log.error("Unable to perform ServiceLoader scanning as requested
by {}={}, this error may be avoided by reconfiguring {}={}",
+WorkerConfig.PLUGIN_DISCOVERY_CONFIG, discoveryMode,
+WorkerConfig.PLUGIN_DISCOVERY_CONFIG,
PluginDiscoveryMode.ONLY_SCAN, t);
+throw t;
+}
+PluginScanResult reflectiveScanResult =
PluginDiscoveryMode.reflectivelyScan(discoveryMode) ?
+new ReflectionScanner().discoverPlugins(pluginSources) : empty;
+PluginScanResult scanResult = new
PluginScanResult(Arrays.asList(reflectiveScanResult, serviceLoadingScanResult));
+maybeReportHybridDiscoveryIssue(discoveryMode,
serviceLoadingScanResult, scanResult);
+delegatingLoader.installDiscoveredPlugins(scanResult);
+return scanResult;
+}
+
+private static void maybeReportHybridDiscoveryIssue(PluginDiscoveryMode
discoveryMode, PluginScanResult serviceLoadingScanResult, PluginScanResult
mergedResult) {
+SortedSet> missingPlugins = new TreeSet<>();
+mergedResult.forEach(missingPlugins::add);
+serviceLoadingScanResult.forEach(missingPlugins::remove);
+if (missingPlugins.isEmpty()) {
+switch (discoveryMode) {
+case ONLY_SCAN:
+log.debug("Service loading of plugins disabled, consider
reconfiguring {}={}",
+WorkerConfig.PLUGIN_DISCOVERY_CONFIG,
PluginDiscoveryMode.HYBRID_WARN);
+break;
+case HYBRID_WARN:
+case HYBRID_FAIL:
+log.warn("All plugins have ServiceLoader manifests,
consider reconfiguring {}={}",
+WorkerConfig.PLUGIN_DISCOVERY_CONFIG,
PluginDiscoveryMode.SERVICE_LOAD);
+break;
+case SERVICE_LOAD:
+log.debug("Reflective loading of plugins disabled, plugins
without manifests will not be visible");
+break;
+default:
+throw new IllegalArgumentException("Unknown discovery
mode");
+}
+} else {
+String message = String.format(
+"Plugins are missing ServiceLoader manifests, these
plugins will not be visible with %s=%s: %s",
Review Comment:
> For many of them, this is going to be the first time hearing about new
discovery logic; we should make this message as informative as possible if we
want them to start to make use of it.
> Link to a docs section on the website describing the new plugin discovery
logic
I think to explaining everything in one warning/error message is
unreasonable, and linking out to external documentation is the better strategy.
The error message is already lengthy enough with each plugin listing.
##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java:
##
@@ -122,6 +124,18 @@ public class WorkerConfig extends AbstractConfig {
+ "by the worker's scanner before config providers are initialized
and used to "
+ "replace variables.";
+public static final String PLUGIN_DISCOVERY_CONFIG = "plugin.discovery";
+protected static final String PLUGIN_DISCOVERY_DOC = "Method to use to
discover plugins provided in the "