[GitHub] [kafka] gharris1727 commented on a diff in pull request #14055: KAFKA-15031: Add plugin.discovery to Connect worker configuration (KIP-898)

2023-07-21 Thread via GitHub


gharris1727 commented on code in PR #14055:
URL: https://github.com/apache/kafka/pull/14055#discussion_r1271051021


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java:
##
@@ -63,16 +67,72 @@ public Plugins(Map props) {
 // VisibleForTesting
 Plugins(Map props, ClassLoader parent, ClassLoaderFactory 
factory) {
 String pluginPath = WorkerConfig.pluginPath(props);
+PluginDiscoveryMode discoveryMode = 
WorkerConfig.pluginDiscovery(props);
 List pluginLocations = PluginUtils.pluginLocations(pluginPath);
 delegatingLoader = factory.newDelegatingClassLoader(parent);
 Set pluginSources = 
PluginUtils.pluginSources(pluginLocations, delegatingLoader, factory);
-scanResult = initLoaders(pluginSources);
+scanResult = initLoaders(pluginSources, discoveryMode);
 }
 
-private PluginScanResult initLoaders(Set pluginSources) {
-PluginScanResult reflectiveScanResult = new 
ReflectionScanner().discoverPlugins(pluginSources);
-delegatingLoader.installDiscoveredPlugins(reflectiveScanResult);
-return reflectiveScanResult;
+public PluginScanResult initLoaders(Set pluginSources, 
PluginDiscoveryMode discoveryMode) {
+PluginScanResult empty = new PluginScanResult(Collections.emptyList());
+PluginScanResult serviceLoadingScanResult;
+try {
+serviceLoadingScanResult = 
PluginDiscoveryMode.serviceLoad(discoveryMode) ?
+new ServiceLoaderScanner().discoverPlugins(pluginSources) 
: empty;
+} catch (Throwable t) {
+log.error("Unable to perform ServiceLoader scanning as requested 
by {}={}, this error may be avoided by reconfiguring {}={}",
+WorkerConfig.PLUGIN_DISCOVERY_CONFIG, discoveryMode,
+WorkerConfig.PLUGIN_DISCOVERY_CONFIG, 
PluginDiscoveryMode.ONLY_SCAN, t);
+throw t;
+}
+PluginScanResult reflectiveScanResult = 
PluginDiscoveryMode.reflectivelyScan(discoveryMode) ?
+new ReflectionScanner().discoverPlugins(pluginSources) : empty;
+PluginScanResult scanResult = new 
PluginScanResult(Arrays.asList(reflectiveScanResult, serviceLoadingScanResult));
+maybeReportHybridDiscoveryIssue(discoveryMode, 
serviceLoadingScanResult, scanResult);
+delegatingLoader.installDiscoveredPlugins(scanResult);
+return scanResult;
+}
+
+private static void maybeReportHybridDiscoveryIssue(PluginDiscoveryMode 
discoveryMode, PluginScanResult serviceLoadingScanResult, PluginScanResult 
mergedResult) {
+SortedSet> missingPlugins = new TreeSet<>();
+mergedResult.forEach(missingPlugins::add);
+serviceLoadingScanResult.forEach(missingPlugins::remove);
+if (missingPlugins.isEmpty()) {
+switch (discoveryMode) {
+case ONLY_SCAN:
+log.debug("Service loading of plugins disabled, consider 
reconfiguring {}={}",
+WorkerConfig.PLUGIN_DISCOVERY_CONFIG, 
PluginDiscoveryMode.HYBRID_WARN);
+break;
+case HYBRID_WARN:
+case HYBRID_FAIL:
+log.warn("All plugins have ServiceLoader manifests, 
consider reconfiguring {}={}",
+WorkerConfig.PLUGIN_DISCOVERY_CONFIG, 
PluginDiscoveryMode.SERVICE_LOAD);
+break;
+case SERVICE_LOAD:
+log.debug("Reflective loading of plugins disabled, plugins 
without manifests will not be visible");
+break;
+default:
+throw new IllegalArgumentException("Unknown discovery 
mode");
+}
+} else {
+String message = String.format(
+"Plugins are missing ServiceLoader manifests, these 
plugins will not be visible with %s=%s: %s",

Review Comment:
   Linked documentation is added in #14068 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a diff in pull request #14055: KAFKA-15031: Add plugin.discovery to Connect worker configuration (KIP-898)

2023-07-21 Thread via GitHub


gharris1727 commented on code in PR #14055:
URL: https://github.com/apache/kafka/pull/14055#discussion_r1270918323


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java:
##
@@ -63,16 +67,72 @@ public Plugins(Map props) {
 // VisibleForTesting
 Plugins(Map props, ClassLoader parent, ClassLoaderFactory 
factory) {
 String pluginPath = WorkerConfig.pluginPath(props);
+PluginDiscoveryMode discoveryMode = 
WorkerConfig.pluginDiscovery(props);
 List pluginLocations = PluginUtils.pluginLocations(pluginPath);
 delegatingLoader = factory.newDelegatingClassLoader(parent);
 Set pluginSources = 
PluginUtils.pluginSources(pluginLocations, delegatingLoader, factory);
-scanResult = initLoaders(pluginSources);
+scanResult = initLoaders(pluginSources, discoveryMode);
 }
 
-private PluginScanResult initLoaders(Set pluginSources) {
-PluginScanResult reflectiveScanResult = new 
ReflectionScanner().discoverPlugins(pluginSources);
-delegatingLoader.installDiscoveredPlugins(reflectiveScanResult);
-return reflectiveScanResult;
+public PluginScanResult initLoaders(Set pluginSources, 
PluginDiscoveryMode discoveryMode) {
+PluginScanResult empty = new PluginScanResult(Collections.emptyList());
+PluginScanResult serviceLoadingScanResult;
+try {
+serviceLoadingScanResult = 
PluginDiscoveryMode.serviceLoad(discoveryMode) ?
+new ServiceLoaderScanner().discoverPlugins(pluginSources) 
: empty;
+} catch (Throwable t) {
+log.error("Unable to perform ServiceLoader scanning as requested 
by {}={}, this error may be avoided by reconfiguring {}={}",
+WorkerConfig.PLUGIN_DISCOVERY_CONFIG, discoveryMode,
+WorkerConfig.PLUGIN_DISCOVERY_CONFIG, 
PluginDiscoveryMode.ONLY_SCAN, t);
+throw t;
+}
+PluginScanResult reflectiveScanResult = 
PluginDiscoveryMode.reflectivelyScan(discoveryMode) ?
+new ReflectionScanner().discoverPlugins(pluginSources) : empty;
+PluginScanResult scanResult = new 
PluginScanResult(Arrays.asList(reflectiveScanResult, serviceLoadingScanResult));
+maybeReportHybridDiscoveryIssue(discoveryMode, 
serviceLoadingScanResult, scanResult);
+delegatingLoader.installDiscoveredPlugins(scanResult);
+return scanResult;
+}
+
+private static void maybeReportHybridDiscoveryIssue(PluginDiscoveryMode 
discoveryMode, PluginScanResult serviceLoadingScanResult, PluginScanResult 
mergedResult) {
+SortedSet> missingPlugins = new TreeSet<>();
+mergedResult.forEach(missingPlugins::add);
+serviceLoadingScanResult.forEach(missingPlugins::remove);
+if (missingPlugins.isEmpty()) {
+switch (discoveryMode) {
+case ONLY_SCAN:
+log.debug("Service loading of plugins disabled, consider 
reconfiguring {}={}",
+WorkerConfig.PLUGIN_DISCOVERY_CONFIG, 
PluginDiscoveryMode.HYBRID_WARN);
+break;
+case HYBRID_WARN:
+case HYBRID_FAIL:
+log.warn("All plugins have ServiceLoader manifests, 
consider reconfiguring {}={}",
+WorkerConfig.PLUGIN_DISCOVERY_CONFIG, 
PluginDiscoveryMode.SERVICE_LOAD);
+break;
+case SERVICE_LOAD:
+log.debug("Reflective loading of plugins disabled, plugins 
without manifests will not be visible");
+break;
+default:
+throw new IllegalArgumentException("Unknown discovery 
mode");
+}
+} else {
+String message = String.format(
+"Plugins are missing ServiceLoader manifests, these 
plugins will not be visible with %s=%s: %s",

Review Comment:
   > For many of them, this is going to be the first time hearing about new 
discovery logic; we should make this message as informative as possible if we 
want them to start to make use of it.
   > Link to a docs section on the website describing the new plugin discovery 
logic
   
   I think to explaining everything in one warning/error message is 
unreasonable, and linking out to external documentation is the better strategy. 
The error message is already lengthy enough with each plugin listing.
   
   
   



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java:
##
@@ -122,6 +124,18 @@ public class WorkerConfig extends AbstractConfig {
 + "by the worker's scanner before config providers are initialized 
and used to "
 + "replace variables.";
 
+public static final String PLUGIN_DISCOVERY_CONFIG = "plugin.discovery";
+protected static final String PLUGIN_DISCOVERY_DOC = "Method to use to 
discover plugins provided in the "