C0urante commented on code in PR #14055:
URL: https://github.com/apache/kafka/pull/14055#discussion_r1270745253


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java:
##########
@@ -122,6 +124,18 @@ public class WorkerConfig extends AbstractConfig {
             + "by the worker's scanner before config providers are initialized 
and used to "
             + "replace variables.";
 
+    public static final String PLUGIN_DISCOVERY_CONFIG = "plugin.discovery";
+    protected static final String PLUGIN_DISCOVERY_DOC = "Method to use to 
discover plugins provided in the "
+            + "plugin.path configuration. This can be one of multiple values 
with the following meanings:\n"

Review Comment:
   To be clear--this property will only affect isolated plugins; it will not 
affect plugins installed directly onto the worker's classpath, correct?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java:
##########
@@ -199,6 +213,11 @@ protected static ConfigDef baseConfigDef() {
                         null,
                         Importance.LOW,
                         PLUGIN_PATH_DOC)
+                .define(PLUGIN_DISCOVERY_CONFIG,
+                        Type.STRING,
+                        PluginDiscoveryMode.HYBRID_WARN.toString(),

Review Comment:
   We can add a validator for this:
   ```suggestion
                           PluginDiscoveryMode.HYBRID_WARN.toString(),
                           
ConfigDef.CaseInsensitiveValidString.in(Utils.enumOptions(PluginDiscoveryMode.class)),
   ```
   
   It may not be applied before we parse the property value, but it also helps 
with our autogenerated docs for the property (for example, see the "Valid 
Values" section in the [docs for 
exactly.once.source.support](https://kafka.apache.org/documentation.html#connectconfigs_exactly.once.source.support)).



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java:
##########
@@ -63,16 +67,72 @@ public Plugins(Map<String, String> props) {
     // VisibleForTesting
     Plugins(Map<String, String> props, ClassLoader parent, ClassLoaderFactory 
factory) {
         String pluginPath = WorkerConfig.pluginPath(props);
+        PluginDiscoveryMode discoveryMode = 
WorkerConfig.pluginDiscovery(props);
         List<Path> pluginLocations = PluginUtils.pluginLocations(pluginPath);
         delegatingLoader = factory.newDelegatingClassLoader(parent);
         Set<PluginSource> pluginSources = 
PluginUtils.pluginSources(pluginLocations, delegatingLoader, factory);
-        scanResult = initLoaders(pluginSources);
+        scanResult = initLoaders(pluginSources, discoveryMode);
     }
 
-    private PluginScanResult initLoaders(Set<PluginSource> pluginSources) {
-        PluginScanResult reflectiveScanResult = new 
ReflectionScanner().discoverPlugins(pluginSources);
-        delegatingLoader.installDiscoveredPlugins(reflectiveScanResult);
-        return reflectiveScanResult;
+    public PluginScanResult initLoaders(Set<PluginSource> pluginSources, 
PluginDiscoveryMode discoveryMode) {
+        PluginScanResult empty = new PluginScanResult(Collections.emptyList());
+        PluginScanResult serviceLoadingScanResult;
+        try {
+            serviceLoadingScanResult = 
PluginDiscoveryMode.serviceLoad(discoveryMode) ?
+                    new ServiceLoaderScanner().discoverPlugins(pluginSources) 
: empty;
+        } catch (Throwable t) {
+            log.error("Unable to perform ServiceLoader scanning as requested 
by {}={}, this error may be avoided by reconfiguring {}={}",
+                    WorkerConfig.PLUGIN_DISCOVERY_CONFIG, discoveryMode,
+                    WorkerConfig.PLUGIN_DISCOVERY_CONFIG, 
PluginDiscoveryMode.ONLY_SCAN, t);
+            throw t;

Review Comment:
   Should we wrap this in a `ConnectException` that uses the same error message 
that we log above? Users are more likely to see messages in the exception that 
killed the worker than ones earlier in the logs.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDiscoveryMode.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.isolation;
+
+/**
+ * Method to use to discover plugins usable on a Connect worker.

Review Comment:
   Also, related to the question in `WorkerConfig`--if the way we use this enum 
doesn't affect how plugins installed directly onto the classpath are 
discovered, perhaps we should say "isolated plugins" instead of just "plugins", 
and/or add a note about classpath-installed plugins being loaded no matter what?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java:
##########
@@ -63,16 +67,72 @@ public Plugins(Map<String, String> props) {
     // VisibleForTesting
     Plugins(Map<String, String> props, ClassLoader parent, ClassLoaderFactory 
factory) {
         String pluginPath = WorkerConfig.pluginPath(props);
+        PluginDiscoveryMode discoveryMode = 
WorkerConfig.pluginDiscovery(props);
         List<Path> pluginLocations = PluginUtils.pluginLocations(pluginPath);
         delegatingLoader = factory.newDelegatingClassLoader(parent);
         Set<PluginSource> pluginSources = 
PluginUtils.pluginSources(pluginLocations, delegatingLoader, factory);
-        scanResult = initLoaders(pluginSources);
+        scanResult = initLoaders(pluginSources, discoveryMode);
     }
 
-    private PluginScanResult initLoaders(Set<PluginSource> pluginSources) {
-        PluginScanResult reflectiveScanResult = new 
ReflectionScanner().discoverPlugins(pluginSources);
-        delegatingLoader.installDiscoveredPlugins(reflectiveScanResult);
-        return reflectiveScanResult;
+    public PluginScanResult initLoaders(Set<PluginSource> pluginSources, 
PluginDiscoveryMode discoveryMode) {
+        PluginScanResult empty = new PluginScanResult(Collections.emptyList());
+        PluginScanResult serviceLoadingScanResult;
+        try {
+            serviceLoadingScanResult = 
PluginDiscoveryMode.serviceLoad(discoveryMode) ?
+                    new ServiceLoaderScanner().discoverPlugins(pluginSources) 
: empty;
+        } catch (Throwable t) {
+            log.error("Unable to perform ServiceLoader scanning as requested 
by {}={}, this error may be avoided by reconfiguring {}={}",
+                    WorkerConfig.PLUGIN_DISCOVERY_CONFIG, discoveryMode,
+                    WorkerConfig.PLUGIN_DISCOVERY_CONFIG, 
PluginDiscoveryMode.ONLY_SCAN, t);
+            throw t;
+        }
+        PluginScanResult reflectiveScanResult = 
PluginDiscoveryMode.reflectivelyScan(discoveryMode) ?
+                new ReflectionScanner().discoverPlugins(pluginSources) : empty;
+        PluginScanResult scanResult = new 
PluginScanResult(Arrays.asList(reflectiveScanResult, serviceLoadingScanResult));
+        maybeReportHybridDiscoveryIssue(discoveryMode, 
serviceLoadingScanResult, scanResult);
+        delegatingLoader.installDiscoveredPlugins(scanResult);
+        return scanResult;
+    }
+
+    private static void maybeReportHybridDiscoveryIssue(PluginDiscoveryMode 
discoveryMode, PluginScanResult serviceLoadingScanResult, PluginScanResult 
mergedResult) {
+        SortedSet<PluginDesc<?>> missingPlugins = new TreeSet<>();
+        mergedResult.forEach(missingPlugins::add);
+        serviceLoadingScanResult.forEach(missingPlugins::remove);
+        if (missingPlugins.isEmpty()) {
+            switch (discoveryMode) {
+                case ONLY_SCAN:
+                    log.debug("Service loading of plugins disabled, consider 
reconfiguring {}={}",
+                            WorkerConfig.PLUGIN_DISCOVERY_CONFIG, 
PluginDiscoveryMode.HYBRID_WARN);
+                    break;
+                case HYBRID_WARN:
+                case HYBRID_FAIL:
+                    log.warn("All plugins have ServiceLoader manifests, 
consider reconfiguring {}={}",
+                            WorkerConfig.PLUGIN_DISCOVERY_CONFIG, 
PluginDiscoveryMode.SERVICE_LOAD);
+                    break;
+                case SERVICE_LOAD:
+                    log.debug("Reflective loading of plugins disabled, plugins 
without manifests will not be visible");
+                    break;
+                default:
+                    throw new IllegalArgumentException("Unknown discovery 
mode");
+            }
+        } else {
+            String message = String.format(
+                    "Plugins are missing ServiceLoader manifests, these 
plugins will not be visible with %s=%s: %s",

Review Comment:
   Also also, if the point about classpath-installed plugins being unaffected 
by the plugin discovery mode is correct, should we either filter these plugins 
out from `missingPlugins` or clarify that those plugins will still be usable 
with `SERVICE_LOAD`?
   
   I suppose "visible" is accurate in the sense that these plugins won't be 
listed by the `GET /connector-plugins` endpoint, but that may confuse users if 
they notice that they're still able to use these plugins in a connector config 
after switching to `SERVICE_LOAD`.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java:
##########
@@ -122,6 +124,18 @@ public class WorkerConfig extends AbstractConfig {
             + "by the worker's scanner before config providers are initialized 
and used to "
             + "replace variables.";
 
+    public static final String PLUGIN_DISCOVERY_CONFIG = "plugin.discovery";
+    protected static final String PLUGIN_DISCOVERY_DOC = "Method to use to 
discover plugins provided in the "
+            + "plugin.path configuration. This can be one of multiple values 
with the following meanings:\n"
+            + "* ONLY_SCAN: Discover plugins only by reflection. "
+            + "Plugins which are not discoverable by ServiceLoader will not 
impact worker startup.\n"
+            + "* HYBRID_WARN: Discover plugins reflectively and by 
ServiceLoader. "
+            + "Plugins which are not discoverable by ServiceLoader will print 
warnings during worker startup.\n"
+            + "* HYBRID_FAIL: Discover plugins reflectively and by 
ServiceLoader."
+            + "Plugins which are not discoverable by ServiceLoader will cause 
worker startup to fail.\n"
+            + "* SERVICE_LOAD: Discover plugins only by ServiceLoader. Faster 
startup than prior modes. "

Review Comment:
   Nit: simpler language
   
   ```suggestion
               + "* SERVICE_LOAD: Discover plugins only by ServiceLoader. 
Faster startup than other modes. "
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java:
##########
@@ -401,6 +420,11 @@ public static String pluginPath(Map<String, String> props) 
{
         return props.get(WorkerConfig.PLUGIN_PATH_CONFIG);
     }
 
+    public static PluginDiscoveryMode pluginDiscovery(Map<String, String> 
props) {
+        String value = 
props.getOrDefault(WorkerConfig.PLUGIN_DISCOVERY_CONFIG, 
PluginDiscoveryMode.HYBRID_WARN.toString());
+        return PluginDiscoveryMode.valueOf(value.toUpperCase(Locale.ROOT));

Review Comment:
   Can we catch `IllegalArgumentException` here and wrap it in a 
`ConfigException` with a more human-friendly error message?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java:
##########
@@ -122,6 +124,18 @@ public class WorkerConfig extends AbstractConfig {
             + "by the worker's scanner before config providers are initialized 
and used to "
             + "replace variables.";
 
+    public static final String PLUGIN_DISCOVERY_CONFIG = "plugin.discovery";
+    protected static final String PLUGIN_DISCOVERY_DOC = "Method to use to 
discover plugins provided in the "
+            + "plugin.path configuration. This can be one of multiple values 
with the following meanings:\n"
+            + "* ONLY_SCAN: Discover plugins only by reflection. "

Review Comment:
   Nit: can we stay away from raw strings and use, e.g., 
`PluginDiscoveryMode.ONLY_SCAN.name()`? Can also statically import the enum 
constants to avoid having to qualify these with the class name.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java:
##########
@@ -122,6 +124,18 @@ public class WorkerConfig extends AbstractConfig {
             + "by the worker's scanner before config providers are initialized 
and used to "
             + "replace variables.";
 
+    public static final String PLUGIN_DISCOVERY_CONFIG = "plugin.discovery";
+    protected static final String PLUGIN_DISCOVERY_DOC = "Method to use to 
discover plugins provided in the "
+            + "plugin.path configuration. This can be one of multiple values 
with the following meanings:\n"
+            + "* ONLY_SCAN: Discover plugins only by reflection. "
+            + "Plugins which are not discoverable by ServiceLoader will not 
impact worker startup.\n"
+            + "* HYBRID_WARN: Discover plugins reflectively and by 
ServiceLoader. "
+            + "Plugins which are not discoverable by ServiceLoader will print 
warnings during worker startup.\n"
+            + "* HYBRID_FAIL: Discover plugins reflectively and by 
ServiceLoader."

Review Comment:
   ```suggestion
               + "* HYBRID_FAIL: Discover plugins reflectively and by 
ServiceLoader. "
   ```
   
   Can't wait for Java 15 text blocks 🙏



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDiscoveryMode.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.isolation;
+
+/**
+ * Method to use to discover plugins usable on a Connect worker.
+ */
+public enum PluginDiscoveryMode {
+
+    /**
+     * Scan for plugins reflectively. This corresponds to the legacy behavior 
of Connect prior to KIP-898.
+     * <p>Note: the following plugins are still loaded using {@link 
java.util.ServiceLoader} in this mode:
+     * <ul>
+     *     <li>{@link 
org.apache.kafka.common.config.provider.ConfigProvider}</li>
+     *     <li>{@link org.apache.kafka.connect.rest.ConnectRestExtension}</li>
+     *     <li>{@link 
org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy}</li>
+     * </ul>
+     */
+    ONLY_SCAN,
+    /**
+     * Scan for plugins reflectively and via {@link java.util.ServiceLoader}.
+     * Emit warnings if one or more plugins is not available via {@link 
java.util.ServiceLoader}
+     */
+    HYBRID_WARN,
+    /**
+     * Scan for plugins reflectively and via {@link java.util.ServiceLoader}.
+     * Fail worker during startup if one or more plugins is not available via 
{@link java.util.ServiceLoader}
+     */
+    HYBRID_FAIL,
+    /**
+     * Discover plugins via {@link java.util.ServiceLoader} only.
+     * Plugins will not be present in the REST API if it is not available via 
{@link java.util.ServiceLoader}

Review Comment:
   ```suggestion
        * Plugins will not be present in the REST API if they are not available 
via {@link java.util.ServiceLoader}
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDiscoveryMode.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.isolation;
+
+/**
+ * Method to use to discover plugins usable on a Connect worker.
+ */
+public enum PluginDiscoveryMode {
+
+    /**
+     * Scan for plugins reflectively. This corresponds to the legacy behavior 
of Connect prior to KIP-898.
+     * <p>Note: the following plugins are still loaded using {@link 
java.util.ServiceLoader} in this mode:
+     * <ul>
+     *     <li>{@link 
org.apache.kafka.common.config.provider.ConfigProvider}</li>
+     *     <li>{@link org.apache.kafka.connect.rest.ConnectRestExtension}</li>
+     *     <li>{@link 
org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy}</li>
+     * </ul>
+     */
+    ONLY_SCAN,
+    /**
+     * Scan for plugins reflectively and via {@link java.util.ServiceLoader}.
+     * Emit warnings if one or more plugins is not available via {@link 
java.util.ServiceLoader}
+     */
+    HYBRID_WARN,
+    /**
+     * Scan for plugins reflectively and via {@link java.util.ServiceLoader}.
+     * Fail worker during startup if one or more plugins is not available via 
{@link java.util.ServiceLoader}
+     */
+    HYBRID_FAIL,
+    /**
+     * Discover plugins via {@link java.util.ServiceLoader} only.
+     * Plugins will not be present in the REST API if it is not available via 
{@link java.util.ServiceLoader}
+     */
+    SERVICE_LOAD;
+
+    public static boolean reflectivelyScan(PluginDiscoveryMode 
pluginDiscoveryMode) {
+        return pluginDiscoveryMode != SERVICE_LOAD;
+    }
+
+    public static boolean serviceLoad(PluginDiscoveryMode pluginDiscoveryMode) 
{
+        return pluginDiscoveryMode != ONLY_SCAN;
+    }

Review Comment:
   The only reason I can think of to implement this logic with static methods 
instead of instance methods is to gracefully handle null values, but it seems 
like we disallow those anyways based on how we parse them in 
`WorkerConfig::pluginDiscovery`.
   
   Thoughts on making these instance methods?
   ```suggestion
       public boolean reflectivelyScan() {
           return this != SERVICE_LOAD;
       }
   
       public boolean serviceLoad() {
           return this != ONLY_SCAN;
       }
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDiscoveryMode.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.isolation;
+
+/**
+ * Method to use to discover plugins usable on a Connect worker.

Review Comment:
   Nit: "Method" is a bit misleading (I thought this was incorrectly documented 
at first). Some synonyms from Google that could also work include "procedure", 
"technique", "process", and "mechanism".



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java:
##########
@@ -63,16 +67,72 @@ public Plugins(Map<String, String> props) {
     // VisibleForTesting
     Plugins(Map<String, String> props, ClassLoader parent, ClassLoaderFactory 
factory) {
         String pluginPath = WorkerConfig.pluginPath(props);
+        PluginDiscoveryMode discoveryMode = 
WorkerConfig.pluginDiscovery(props);
         List<Path> pluginLocations = PluginUtils.pluginLocations(pluginPath);
         delegatingLoader = factory.newDelegatingClassLoader(parent);
         Set<PluginSource> pluginSources = 
PluginUtils.pluginSources(pluginLocations, delegatingLoader, factory);
-        scanResult = initLoaders(pluginSources);
+        scanResult = initLoaders(pluginSources, discoveryMode);
     }
 
-    private PluginScanResult initLoaders(Set<PluginSource> pluginSources) {
-        PluginScanResult reflectiveScanResult = new 
ReflectionScanner().discoverPlugins(pluginSources);
-        delegatingLoader.installDiscoveredPlugins(reflectiveScanResult);
-        return reflectiveScanResult;
+    public PluginScanResult initLoaders(Set<PluginSource> pluginSources, 
PluginDiscoveryMode discoveryMode) {
+        PluginScanResult empty = new PluginScanResult(Collections.emptyList());
+        PluginScanResult serviceLoadingScanResult;
+        try {
+            serviceLoadingScanResult = 
PluginDiscoveryMode.serviceLoad(discoveryMode) ?
+                    new ServiceLoaderScanner().discoverPlugins(pluginSources) 
: empty;
+        } catch (Throwable t) {
+            log.error("Unable to perform ServiceLoader scanning as requested 
by {}={}, this error may be avoided by reconfiguring {}={}",

Review Comment:
   I think we should be more explicit about how reconfiguration isn't 
guaranteed to solve the problem:
   ```suggestion
               log.error("Unable to perform ServiceLoader scanning as requested 
by {}={}. It may be possible to fix this issue by reconfiguring {}={}",
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java:
##########
@@ -122,6 +124,18 @@ public class WorkerConfig extends AbstractConfig {
             + "by the worker's scanner before config providers are initialized 
and used to "
             + "replace variables.";
 
+    public static final String PLUGIN_DISCOVERY_CONFIG = "plugin.discovery";
+    protected static final String PLUGIN_DISCOVERY_DOC = "Method to use to 
discover plugins provided in the "
+            + "plugin.path configuration. This can be one of multiple values 
with the following meanings:\n"
+            + "* ONLY_SCAN: Discover plugins only by reflection. "
+            + "Plugins which are not discoverable by ServiceLoader will not 
impact worker startup.\n"
+            + "* HYBRID_WARN: Discover plugins reflectively and by 
ServiceLoader. "
+            + "Plugins which are not discoverable by ServiceLoader will print 
warnings during worker startup.\n"
+            + "* HYBRID_FAIL: Discover plugins reflectively and by 
ServiceLoader."

Review Comment:
   ```suggestion
               + "* HYBRID_FAIL: Discover plugins reflectively and by 
ServiceLoader. "
   ```
   
   Can't wait for Java 15 text blocks 🙏



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java:
##########
@@ -63,16 +67,72 @@ public Plugins(Map<String, String> props) {
     // VisibleForTesting
     Plugins(Map<String, String> props, ClassLoader parent, ClassLoaderFactory 
factory) {
         String pluginPath = WorkerConfig.pluginPath(props);
+        PluginDiscoveryMode discoveryMode = 
WorkerConfig.pluginDiscovery(props);
         List<Path> pluginLocations = PluginUtils.pluginLocations(pluginPath);
         delegatingLoader = factory.newDelegatingClassLoader(parent);
         Set<PluginSource> pluginSources = 
PluginUtils.pluginSources(pluginLocations, delegatingLoader, factory);
-        scanResult = initLoaders(pluginSources);
+        scanResult = initLoaders(pluginSources, discoveryMode);
     }
 
-    private PluginScanResult initLoaders(Set<PluginSource> pluginSources) {
-        PluginScanResult reflectiveScanResult = new 
ReflectionScanner().discoverPlugins(pluginSources);
-        delegatingLoader.installDiscoveredPlugins(reflectiveScanResult);
-        return reflectiveScanResult;
+    public PluginScanResult initLoaders(Set<PluginSource> pluginSources, 
PluginDiscoveryMode discoveryMode) {
+        PluginScanResult empty = new PluginScanResult(Collections.emptyList());
+        PluginScanResult serviceLoadingScanResult;
+        try {
+            serviceLoadingScanResult = 
PluginDiscoveryMode.serviceLoad(discoveryMode) ?
+                    new ServiceLoaderScanner().discoverPlugins(pluginSources) 
: empty;
+        } catch (Throwable t) {
+            log.error("Unable to perform ServiceLoader scanning as requested 
by {}={}, this error may be avoided by reconfiguring {}={}",
+                    WorkerConfig.PLUGIN_DISCOVERY_CONFIG, discoveryMode,
+                    WorkerConfig.PLUGIN_DISCOVERY_CONFIG, 
PluginDiscoveryMode.ONLY_SCAN, t);
+            throw t;
+        }
+        PluginScanResult reflectiveScanResult = 
PluginDiscoveryMode.reflectivelyScan(discoveryMode) ?
+                new ReflectionScanner().discoverPlugins(pluginSources) : empty;
+        PluginScanResult scanResult = new 
PluginScanResult(Arrays.asList(reflectiveScanResult, serviceLoadingScanResult));
+        maybeReportHybridDiscoveryIssue(discoveryMode, 
serviceLoadingScanResult, scanResult);
+        delegatingLoader.installDiscoveredPlugins(scanResult);
+        return scanResult;
+    }
+
+    private static void maybeReportHybridDiscoveryIssue(PluginDiscoveryMode 
discoveryMode, PluginScanResult serviceLoadingScanResult, PluginScanResult 
mergedResult) {
+        SortedSet<PluginDesc<?>> missingPlugins = new TreeSet<>();
+        mergedResult.forEach(missingPlugins::add);
+        serviceLoadingScanResult.forEach(missingPlugins::remove);
+        if (missingPlugins.isEmpty()) {
+            switch (discoveryMode) {
+                case ONLY_SCAN:
+                    log.debug("Service loading of plugins disabled, consider 
reconfiguring {}={}",

Review Comment:
   Is this likely to be useful at debug level? At that point most people are 
usually trying to diagnose bugs or some other kind of unusual behavior; IMO we 
should either bump to info level or remove.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java:
##########
@@ -63,16 +67,72 @@ public Plugins(Map<String, String> props) {
     // VisibleForTesting
     Plugins(Map<String, String> props, ClassLoader parent, ClassLoaderFactory 
factory) {
         String pluginPath = WorkerConfig.pluginPath(props);
+        PluginDiscoveryMode discoveryMode = 
WorkerConfig.pluginDiscovery(props);
         List<Path> pluginLocations = PluginUtils.pluginLocations(pluginPath);
         delegatingLoader = factory.newDelegatingClassLoader(parent);
         Set<PluginSource> pluginSources = 
PluginUtils.pluginSources(pluginLocations, delegatingLoader, factory);
-        scanResult = initLoaders(pluginSources);
+        scanResult = initLoaders(pluginSources, discoveryMode);
     }
 
-    private PluginScanResult initLoaders(Set<PluginSource> pluginSources) {
-        PluginScanResult reflectiveScanResult = new 
ReflectionScanner().discoverPlugins(pluginSources);
-        delegatingLoader.installDiscoveredPlugins(reflectiveScanResult);
-        return reflectiveScanResult;
+    public PluginScanResult initLoaders(Set<PluginSource> pluginSources, 
PluginDiscoveryMode discoveryMode) {
+        PluginScanResult empty = new PluginScanResult(Collections.emptyList());
+        PluginScanResult serviceLoadingScanResult;
+        try {
+            serviceLoadingScanResult = 
PluginDiscoveryMode.serviceLoad(discoveryMode) ?
+                    new ServiceLoaderScanner().discoverPlugins(pluginSources) 
: empty;
+        } catch (Throwable t) {
+            log.error("Unable to perform ServiceLoader scanning as requested 
by {}={}, this error may be avoided by reconfiguring {}={}",
+                    WorkerConfig.PLUGIN_DISCOVERY_CONFIG, discoveryMode,
+                    WorkerConfig.PLUGIN_DISCOVERY_CONFIG, 
PluginDiscoveryMode.ONLY_SCAN, t);
+            throw t;
+        }
+        PluginScanResult reflectiveScanResult = 
PluginDiscoveryMode.reflectivelyScan(discoveryMode) ?
+                new ReflectionScanner().discoverPlugins(pluginSources) : empty;
+        PluginScanResult scanResult = new 
PluginScanResult(Arrays.asList(reflectiveScanResult, serviceLoadingScanResult));
+        maybeReportHybridDiscoveryIssue(discoveryMode, 
serviceLoadingScanResult, scanResult);
+        delegatingLoader.installDiscoveredPlugins(scanResult);
+        return scanResult;
+    }
+
+    private static void maybeReportHybridDiscoveryIssue(PluginDiscoveryMode 
discoveryMode, PluginScanResult serviceLoadingScanResult, PluginScanResult 
mergedResult) {
+        SortedSet<PluginDesc<?>> missingPlugins = new TreeSet<>();
+        mergedResult.forEach(missingPlugins::add);
+        serviceLoadingScanResult.forEach(missingPlugins::remove);
+        if (missingPlugins.isEmpty()) {
+            switch (discoveryMode) {
+                case ONLY_SCAN:
+                    log.debug("Service loading of plugins disabled, consider 
reconfiguring {}={}",
+                            WorkerConfig.PLUGIN_DISCOVERY_CONFIG, 
PluginDiscoveryMode.HYBRID_WARN);
+                    break;
+                case HYBRID_WARN:
+                case HYBRID_FAIL:
+                    log.warn("All plugins have ServiceLoader manifests, 
consider reconfiguring {}={}",
+                            WorkerConfig.PLUGIN_DISCOVERY_CONFIG, 
PluginDiscoveryMode.SERVICE_LOAD);
+                    break;
+                case SERVICE_LOAD:
+                    log.debug("Reflective loading of plugins disabled, plugins 
without manifests will not be visible");
+                    break;
+                default:
+                    throw new IllegalArgumentException("Unknown discovery 
mode");

Review Comment:
   Can we include the discovery mode in the error message?
   ```suggestion
                       throw new IllegalArgumentException("Unknown discovery 
mode: " + discoveryMode);
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java:
##########
@@ -63,16 +67,72 @@ public Plugins(Map<String, String> props) {
     // VisibleForTesting
     Plugins(Map<String, String> props, ClassLoader parent, ClassLoaderFactory 
factory) {
         String pluginPath = WorkerConfig.pluginPath(props);
+        PluginDiscoveryMode discoveryMode = 
WorkerConfig.pluginDiscovery(props);
         List<Path> pluginLocations = PluginUtils.pluginLocations(pluginPath);
         delegatingLoader = factory.newDelegatingClassLoader(parent);
         Set<PluginSource> pluginSources = 
PluginUtils.pluginSources(pluginLocations, delegatingLoader, factory);
-        scanResult = initLoaders(pluginSources);
+        scanResult = initLoaders(pluginSources, discoveryMode);
     }
 
-    private PluginScanResult initLoaders(Set<PluginSource> pluginSources) {
-        PluginScanResult reflectiveScanResult = new 
ReflectionScanner().discoverPlugins(pluginSources);
-        delegatingLoader.installDiscoveredPlugins(reflectiveScanResult);
-        return reflectiveScanResult;
+    public PluginScanResult initLoaders(Set<PluginSource> pluginSources, 
PluginDiscoveryMode discoveryMode) {
+        PluginScanResult empty = new PluginScanResult(Collections.emptyList());
+        PluginScanResult serviceLoadingScanResult;
+        try {
+            serviceLoadingScanResult = 
PluginDiscoveryMode.serviceLoad(discoveryMode) ?
+                    new ServiceLoaderScanner().discoverPlugins(pluginSources) 
: empty;
+        } catch (Throwable t) {
+            log.error("Unable to perform ServiceLoader scanning as requested 
by {}={}, this error may be avoided by reconfiguring {}={}",
+                    WorkerConfig.PLUGIN_DISCOVERY_CONFIG, discoveryMode,
+                    WorkerConfig.PLUGIN_DISCOVERY_CONFIG, 
PluginDiscoveryMode.ONLY_SCAN, t);
+            throw t;
+        }
+        PluginScanResult reflectiveScanResult = 
PluginDiscoveryMode.reflectivelyScan(discoveryMode) ?
+                new ReflectionScanner().discoverPlugins(pluginSources) : empty;
+        PluginScanResult scanResult = new 
PluginScanResult(Arrays.asList(reflectiveScanResult, serviceLoadingScanResult));
+        maybeReportHybridDiscoveryIssue(discoveryMode, 
serviceLoadingScanResult, scanResult);
+        delegatingLoader.installDiscoveredPlugins(scanResult);
+        return scanResult;
+    }
+
+    private static void maybeReportHybridDiscoveryIssue(PluginDiscoveryMode 
discoveryMode, PluginScanResult serviceLoadingScanResult, PluginScanResult 
mergedResult) {
+        SortedSet<PluginDesc<?>> missingPlugins = new TreeSet<>();
+        mergedResult.forEach(missingPlugins::add);
+        serviceLoadingScanResult.forEach(missingPlugins::remove);
+        if (missingPlugins.isEmpty()) {
+            switch (discoveryMode) {
+                case ONLY_SCAN:
+                    log.debug("Service loading of plugins disabled, consider 
reconfiguring {}={}",
+                            WorkerConfig.PLUGIN_DISCOVERY_CONFIG, 
PluginDiscoveryMode.HYBRID_WARN);
+                    break;
+                case HYBRID_WARN:
+                case HYBRID_FAIL:
+                    log.warn("All plugins have ServiceLoader manifests, 
consider reconfiguring {}={}",
+                            WorkerConfig.PLUGIN_DISCOVERY_CONFIG, 
PluginDiscoveryMode.SERVICE_LOAD);
+                    break;
+                case SERVICE_LOAD:
+                    log.debug("Reflective loading of plugins disabled, plugins 
without manifests will not be visible");
+                    break;
+                default:
+                    throw new IllegalArgumentException("Unknown discovery 
mode");
+            }
+        } else {
+            String message = String.format(
+                    "Plugins are missing ServiceLoader manifests, these 
plugins will not be visible with %s=%s: %s",
+                    WorkerConfig.PLUGIN_DISCOVERY_CONFIG,
+                    PluginDiscoveryMode.SERVICE_LOAD,
+                    missingPlugins.stream()
+                            .map(pluginDesc -> pluginDesc.location() + "\t" + 
pluginDesc.className() + "\t" + pluginDesc.version())
+                            .collect(Collectors.joining("\n", "[\n", "\n]")));
+            switch (discoveryMode) {
+                case HYBRID_WARN:
+                    log.warn(message);
+                    break;
+                case HYBRID_FAIL:
+                    throw new ConnectException(message);
+                default:

Review Comment:
   Maybe we can add cases for `ONLY_SCAN` and `SERVICE_LOAD` that give a more 
useful error message? "Unknown" doesn't seem accurate here.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java:
##########
@@ -63,16 +67,72 @@ public Plugins(Map<String, String> props) {
     // VisibleForTesting
     Plugins(Map<String, String> props, ClassLoader parent, ClassLoaderFactory 
factory) {
         String pluginPath = WorkerConfig.pluginPath(props);
+        PluginDiscoveryMode discoveryMode = 
WorkerConfig.pluginDiscovery(props);
         List<Path> pluginLocations = PluginUtils.pluginLocations(pluginPath);
         delegatingLoader = factory.newDelegatingClassLoader(parent);
         Set<PluginSource> pluginSources = 
PluginUtils.pluginSources(pluginLocations, delegatingLoader, factory);
-        scanResult = initLoaders(pluginSources);
+        scanResult = initLoaders(pluginSources, discoveryMode);
     }
 
-    private PluginScanResult initLoaders(Set<PluginSource> pluginSources) {
-        PluginScanResult reflectiveScanResult = new 
ReflectionScanner().discoverPlugins(pluginSources);
-        delegatingLoader.installDiscoveredPlugins(reflectiveScanResult);
-        return reflectiveScanResult;
+    public PluginScanResult initLoaders(Set<PluginSource> pluginSources, 
PluginDiscoveryMode discoveryMode) {
+        PluginScanResult empty = new PluginScanResult(Collections.emptyList());
+        PluginScanResult serviceLoadingScanResult;
+        try {
+            serviceLoadingScanResult = 
PluginDiscoveryMode.serviceLoad(discoveryMode) ?
+                    new ServiceLoaderScanner().discoverPlugins(pluginSources) 
: empty;
+        } catch (Throwable t) {
+            log.error("Unable to perform ServiceLoader scanning as requested 
by {}={}, this error may be avoided by reconfiguring {}={}",
+                    WorkerConfig.PLUGIN_DISCOVERY_CONFIG, discoveryMode,
+                    WorkerConfig.PLUGIN_DISCOVERY_CONFIG, 
PluginDiscoveryMode.ONLY_SCAN, t);
+            throw t;
+        }
+        PluginScanResult reflectiveScanResult = 
PluginDiscoveryMode.reflectivelyScan(discoveryMode) ?
+                new ReflectionScanner().discoverPlugins(pluginSources) : empty;
+        PluginScanResult scanResult = new 
PluginScanResult(Arrays.asList(reflectiveScanResult, serviceLoadingScanResult));
+        maybeReportHybridDiscoveryIssue(discoveryMode, 
serviceLoadingScanResult, scanResult);
+        delegatingLoader.installDiscoveredPlugins(scanResult);
+        return scanResult;
+    }
+
+    private static void maybeReportHybridDiscoveryIssue(PluginDiscoveryMode 
discoveryMode, PluginScanResult serviceLoadingScanResult, PluginScanResult 
mergedResult) {
+        SortedSet<PluginDesc<?>> missingPlugins = new TreeSet<>();
+        mergedResult.forEach(missingPlugins::add);
+        serviceLoadingScanResult.forEach(missingPlugins::remove);
+        if (missingPlugins.isEmpty()) {
+            switch (discoveryMode) {
+                case ONLY_SCAN:
+                    log.debug("Service loading of plugins disabled, consider 
reconfiguring {}={}",
+                            WorkerConfig.PLUGIN_DISCOVERY_CONFIG, 
PluginDiscoveryMode.HYBRID_WARN);
+                    break;
+                case HYBRID_WARN:
+                case HYBRID_FAIL:
+                    log.warn("All plugins have ServiceLoader manifests, 
consider reconfiguring {}={}",
+                            WorkerConfig.PLUGIN_DISCOVERY_CONFIG, 
PluginDiscoveryMode.SERVICE_LOAD);
+                    break;
+                case SERVICE_LOAD:
+                    log.debug("Reflective loading of plugins disabled, plugins 
without manifests will not be visible");
+                    break;
+                default:
+                    throw new IllegalArgumentException("Unknown discovery 
mode");
+            }
+        } else {
+            String message = String.format(
+                    "Plugins are missing ServiceLoader manifests, these 
plugins will not be visible with %s=%s: %s",
+                    WorkerConfig.PLUGIN_DISCOVERY_CONFIG,
+                    PluginDiscoveryMode.SERVICE_LOAD,
+                    missingPlugins.stream()
+                            .map(pluginDesc -> pluginDesc.location() + "\t" + 
pluginDesc.className() + "\t" + pluginDesc.version())
+                            .collect(Collectors.joining("\n", "[\n", "\n]")));
+            switch (discoveryMode) {
+                case HYBRID_WARN:
+                    log.warn(message);
+                    break;
+                case HYBRID_FAIL:
+                    throw new ConnectException(message);
+                default:
+                    throw new IllegalArgumentException("Unknown discovery 
mode");

Review Comment:
   Same thought RE including enum in error message:
   ```suggestion
                       throw new IllegalArgumentException("Unknown discovery 
mode: " + discoveryMode);
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java:
##########
@@ -63,16 +67,72 @@ public Plugins(Map<String, String> props) {
     // VisibleForTesting
     Plugins(Map<String, String> props, ClassLoader parent, ClassLoaderFactory 
factory) {
         String pluginPath = WorkerConfig.pluginPath(props);
+        PluginDiscoveryMode discoveryMode = 
WorkerConfig.pluginDiscovery(props);
         List<Path> pluginLocations = PluginUtils.pluginLocations(pluginPath);
         delegatingLoader = factory.newDelegatingClassLoader(parent);
         Set<PluginSource> pluginSources = 
PluginUtils.pluginSources(pluginLocations, delegatingLoader, factory);
-        scanResult = initLoaders(pluginSources);
+        scanResult = initLoaders(pluginSources, discoveryMode);
     }
 
-    private PluginScanResult initLoaders(Set<PluginSource> pluginSources) {
-        PluginScanResult reflectiveScanResult = new 
ReflectionScanner().discoverPlugins(pluginSources);
-        delegatingLoader.installDiscoveredPlugins(reflectiveScanResult);
-        return reflectiveScanResult;
+    public PluginScanResult initLoaders(Set<PluginSource> pluginSources, 
PluginDiscoveryMode discoveryMode) {
+        PluginScanResult empty = new PluginScanResult(Collections.emptyList());
+        PluginScanResult serviceLoadingScanResult;
+        try {
+            serviceLoadingScanResult = 
PluginDiscoveryMode.serviceLoad(discoveryMode) ?
+                    new ServiceLoaderScanner().discoverPlugins(pluginSources) 
: empty;
+        } catch (Throwable t) {
+            log.error("Unable to perform ServiceLoader scanning as requested 
by {}={}, this error may be avoided by reconfiguring {}={}",
+                    WorkerConfig.PLUGIN_DISCOVERY_CONFIG, discoveryMode,
+                    WorkerConfig.PLUGIN_DISCOVERY_CONFIG, 
PluginDiscoveryMode.ONLY_SCAN, t);
+            throw t;
+        }
+        PluginScanResult reflectiveScanResult = 
PluginDiscoveryMode.reflectivelyScan(discoveryMode) ?
+                new ReflectionScanner().discoverPlugins(pluginSources) : empty;
+        PluginScanResult scanResult = new 
PluginScanResult(Arrays.asList(reflectiveScanResult, serviceLoadingScanResult));
+        maybeReportHybridDiscoveryIssue(discoveryMode, 
serviceLoadingScanResult, scanResult);
+        delegatingLoader.installDiscoveredPlugins(scanResult);
+        return scanResult;
+    }
+
+    private static void maybeReportHybridDiscoveryIssue(PluginDiscoveryMode 
discoveryMode, PluginScanResult serviceLoadingScanResult, PluginScanResult 
mergedResult) {

Review Comment:
   IMO it's worth it to add some unit tests for this method. We can use the 
[LogCaptureAppender](https://github.com/apache/kafka/blob/trunk/clients/src/test/java/org/apache/kafka/common/utils/LogCaptureAppender.java)
 class to make assertions about log messages and their levels; see 
[here](https://github.com/apache/kafka/blob/2e30dd894f4f502f24ad797e1018a523fc138574/connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetUtilsTest.java#L80-L88)
 and 
[here](https://github.com/apache/kafka/blob/4bba2c8a32a3e35f6870cf3f738c0eef8bb652d2/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java#L103-L106)
 for a couple examples.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java:
##########
@@ -63,16 +67,72 @@ public Plugins(Map<String, String> props) {
     // VisibleForTesting
     Plugins(Map<String, String> props, ClassLoader parent, ClassLoaderFactory 
factory) {
         String pluginPath = WorkerConfig.pluginPath(props);
+        PluginDiscoveryMode discoveryMode = 
WorkerConfig.pluginDiscovery(props);
         List<Path> pluginLocations = PluginUtils.pluginLocations(pluginPath);
         delegatingLoader = factory.newDelegatingClassLoader(parent);
         Set<PluginSource> pluginSources = 
PluginUtils.pluginSources(pluginLocations, delegatingLoader, factory);
-        scanResult = initLoaders(pluginSources);
+        scanResult = initLoaders(pluginSources, discoveryMode);
     }
 
-    private PluginScanResult initLoaders(Set<PluginSource> pluginSources) {
-        PluginScanResult reflectiveScanResult = new 
ReflectionScanner().discoverPlugins(pluginSources);
-        delegatingLoader.installDiscoveredPlugins(reflectiveScanResult);
-        return reflectiveScanResult;
+    public PluginScanResult initLoaders(Set<PluginSource> pluginSources, 
PluginDiscoveryMode discoveryMode) {
+        PluginScanResult empty = new PluginScanResult(Collections.emptyList());
+        PluginScanResult serviceLoadingScanResult;
+        try {
+            serviceLoadingScanResult = 
PluginDiscoveryMode.serviceLoad(discoveryMode) ?
+                    new ServiceLoaderScanner().discoverPlugins(pluginSources) 
: empty;
+        } catch (Throwable t) {
+            log.error("Unable to perform ServiceLoader scanning as requested 
by {}={}, this error may be avoided by reconfiguring {}={}",
+                    WorkerConfig.PLUGIN_DISCOVERY_CONFIG, discoveryMode,
+                    WorkerConfig.PLUGIN_DISCOVERY_CONFIG, 
PluginDiscoveryMode.ONLY_SCAN, t);
+            throw t;
+        }
+        PluginScanResult reflectiveScanResult = 
PluginDiscoveryMode.reflectivelyScan(discoveryMode) ?
+                new ReflectionScanner().discoverPlugins(pluginSources) : empty;
+        PluginScanResult scanResult = new 
PluginScanResult(Arrays.asList(reflectiveScanResult, serviceLoadingScanResult));
+        maybeReportHybridDiscoveryIssue(discoveryMode, 
serviceLoadingScanResult, scanResult);
+        delegatingLoader.installDiscoveredPlugins(scanResult);
+        return scanResult;
+    }
+
+    private static void maybeReportHybridDiscoveryIssue(PluginDiscoveryMode 
discoveryMode, PluginScanResult serviceLoadingScanResult, PluginScanResult 
mergedResult) {
+        SortedSet<PluginDesc<?>> missingPlugins = new TreeSet<>();
+        mergedResult.forEach(missingPlugins::add);
+        serviceLoadingScanResult.forEach(missingPlugins::remove);
+        if (missingPlugins.isEmpty()) {
+            switch (discoveryMode) {
+                case ONLY_SCAN:
+                    log.debug("Service loading of plugins disabled, consider 
reconfiguring {}={}",
+                            WorkerConfig.PLUGIN_DISCOVERY_CONFIG, 
PluginDiscoveryMode.HYBRID_WARN);
+                    break;
+                case HYBRID_WARN:
+                case HYBRID_FAIL:
+                    log.warn("All plugins have ServiceLoader manifests, 
consider reconfiguring {}={}",
+                            WorkerConfig.PLUGIN_DISCOVERY_CONFIG, 
PluginDiscoveryMode.SERVICE_LOAD);
+                    break;
+                case SERVICE_LOAD:
+                    log.debug("Reflective loading of plugins disabled, plugins 
without manifests will not be visible");
+                    break;
+                default:
+                    throw new IllegalArgumentException("Unknown discovery 
mode");
+            }
+        } else {
+            String message = String.format(
+                    "Plugins are missing ServiceLoader manifests, these 
plugins will not be visible with %s=%s: %s",

Review Comment:
   Nit: language
   ```suggestion
                       "One or more plugins are missing ServiceLoader manifests 
and will not be visible with %s=%s: %s",
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java:
##########
@@ -63,16 +67,72 @@ public Plugins(Map<String, String> props) {
     // VisibleForTesting
     Plugins(Map<String, String> props, ClassLoader parent, ClassLoaderFactory 
factory) {
         String pluginPath = WorkerConfig.pluginPath(props);
+        PluginDiscoveryMode discoveryMode = 
WorkerConfig.pluginDiscovery(props);
         List<Path> pluginLocations = PluginUtils.pluginLocations(pluginPath);
         delegatingLoader = factory.newDelegatingClassLoader(parent);
         Set<PluginSource> pluginSources = 
PluginUtils.pluginSources(pluginLocations, delegatingLoader, factory);
-        scanResult = initLoaders(pluginSources);
+        scanResult = initLoaders(pluginSources, discoveryMode);
     }
 
-    private PluginScanResult initLoaders(Set<PluginSource> pluginSources) {
-        PluginScanResult reflectiveScanResult = new 
ReflectionScanner().discoverPlugins(pluginSources);
-        delegatingLoader.installDiscoveredPlugins(reflectiveScanResult);
-        return reflectiveScanResult;
+    public PluginScanResult initLoaders(Set<PluginSource> pluginSources, 
PluginDiscoveryMode discoveryMode) {
+        PluginScanResult empty = new PluginScanResult(Collections.emptyList());
+        PluginScanResult serviceLoadingScanResult;
+        try {
+            serviceLoadingScanResult = 
PluginDiscoveryMode.serviceLoad(discoveryMode) ?
+                    new ServiceLoaderScanner().discoverPlugins(pluginSources) 
: empty;
+        } catch (Throwable t) {
+            log.error("Unable to perform ServiceLoader scanning as requested 
by {}={}, this error may be avoided by reconfiguring {}={}",
+                    WorkerConfig.PLUGIN_DISCOVERY_CONFIG, discoveryMode,
+                    WorkerConfig.PLUGIN_DISCOVERY_CONFIG, 
PluginDiscoveryMode.ONLY_SCAN, t);
+            throw t;
+        }
+        PluginScanResult reflectiveScanResult = 
PluginDiscoveryMode.reflectivelyScan(discoveryMode) ?
+                new ReflectionScanner().discoverPlugins(pluginSources) : empty;
+        PluginScanResult scanResult = new 
PluginScanResult(Arrays.asList(reflectiveScanResult, serviceLoadingScanResult));
+        maybeReportHybridDiscoveryIssue(discoveryMode, 
serviceLoadingScanResult, scanResult);
+        delegatingLoader.installDiscoveredPlugins(scanResult);
+        return scanResult;
+    }
+
+    private static void maybeReportHybridDiscoveryIssue(PluginDiscoveryMode 
discoveryMode, PluginScanResult serviceLoadingScanResult, PluginScanResult 
mergedResult) {
+        SortedSet<PluginDesc<?>> missingPlugins = new TreeSet<>();
+        mergedResult.forEach(missingPlugins::add);
+        serviceLoadingScanResult.forEach(missingPlugins::remove);
+        if (missingPlugins.isEmpty()) {
+            switch (discoveryMode) {
+                case ONLY_SCAN:
+                    log.debug("Service loading of plugins disabled, consider 
reconfiguring {}={}",
+                            WorkerConfig.PLUGIN_DISCOVERY_CONFIG, 
PluginDiscoveryMode.HYBRID_WARN);
+                    break;
+                case HYBRID_WARN:
+                case HYBRID_FAIL:
+                    log.warn("All plugins have ServiceLoader manifests, 
consider reconfiguring {}={}",
+                            WorkerConfig.PLUGIN_DISCOVERY_CONFIG, 
PluginDiscoveryMode.SERVICE_LOAD);
+                    break;
+                case SERVICE_LOAD:
+                    log.debug("Reflective loading of plugins disabled, plugins 
without manifests will not be visible");

Review Comment:
   Same thought RE debug level



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDiscoveryMode.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.isolation;
+
+/**
+ * Method to use to discover plugins usable on a Connect worker.

Review Comment:
   Also also, can't hurt to link the KIP here:
   ```suggestion
    * Method to use to discover plugins usable on a Connect worker.
    * @see <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-898%3A+Modernize+Connect+plugin+discovery";>KIP-898</a>
   ```



##########
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java:
##########
@@ -276,6 +277,7 @@ public void startConnect() {
         putIfAbsent(workerProps, STATUS_STORAGE_REPLICATION_FACTOR_CONFIG, 
internalTopicsReplFactor);
         putIfAbsent(workerProps, KEY_CONVERTER_CLASS_CONFIG, 
"org.apache.kafka.connect.storage.StringConverter");
         putIfAbsent(workerProps, VALUE_CONVERTER_CLASS_CONFIG, 
"org.apache.kafka.connect.storage.StringConverter");
+        putIfAbsent(workerProps, PLUGIN_DISCOVERY_CONFIG, "hybrid_fail");

Review Comment:
   Crazy how powerful adding this single line to our existing tests is! 🙌 



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java:
##########
@@ -63,16 +67,72 @@ public Plugins(Map<String, String> props) {
     // VisibleForTesting
     Plugins(Map<String, String> props, ClassLoader parent, ClassLoaderFactory 
factory) {
         String pluginPath = WorkerConfig.pluginPath(props);
+        PluginDiscoveryMode discoveryMode = 
WorkerConfig.pluginDiscovery(props);
         List<Path> pluginLocations = PluginUtils.pluginLocations(pluginPath);
         delegatingLoader = factory.newDelegatingClassLoader(parent);
         Set<PluginSource> pluginSources = 
PluginUtils.pluginSources(pluginLocations, delegatingLoader, factory);
-        scanResult = initLoaders(pluginSources);
+        scanResult = initLoaders(pluginSources, discoveryMode);
     }
 
-    private PluginScanResult initLoaders(Set<PluginSource> pluginSources) {
-        PluginScanResult reflectiveScanResult = new 
ReflectionScanner().discoverPlugins(pluginSources);
-        delegatingLoader.installDiscoveredPlugins(reflectiveScanResult);
-        return reflectiveScanResult;
+    public PluginScanResult initLoaders(Set<PluginSource> pluginSources, 
PluginDiscoveryMode discoveryMode) {
+        PluginScanResult empty = new PluginScanResult(Collections.emptyList());
+        PluginScanResult serviceLoadingScanResult;
+        try {
+            serviceLoadingScanResult = 
PluginDiscoveryMode.serviceLoad(discoveryMode) ?
+                    new ServiceLoaderScanner().discoverPlugins(pluginSources) 
: empty;
+        } catch (Throwable t) {
+            log.error("Unable to perform ServiceLoader scanning as requested 
by {}={}, this error may be avoided by reconfiguring {}={}",
+                    WorkerConfig.PLUGIN_DISCOVERY_CONFIG, discoveryMode,
+                    WorkerConfig.PLUGIN_DISCOVERY_CONFIG, 
PluginDiscoveryMode.ONLY_SCAN, t);
+            throw t;
+        }
+        PluginScanResult reflectiveScanResult = 
PluginDiscoveryMode.reflectivelyScan(discoveryMode) ?
+                new ReflectionScanner().discoverPlugins(pluginSources) : empty;
+        PluginScanResult scanResult = new 
PluginScanResult(Arrays.asList(reflectiveScanResult, serviceLoadingScanResult));
+        maybeReportHybridDiscoveryIssue(discoveryMode, 
serviceLoadingScanResult, scanResult);
+        delegatingLoader.installDiscoveredPlugins(scanResult);
+        return scanResult;
+    }
+
+    private static void maybeReportHybridDiscoveryIssue(PluginDiscoveryMode 
discoveryMode, PluginScanResult serviceLoadingScanResult, PluginScanResult 
mergedResult) {
+        SortedSet<PluginDesc<?>> missingPlugins = new TreeSet<>();
+        mergedResult.forEach(missingPlugins::add);
+        serviceLoadingScanResult.forEach(missingPlugins::remove);
+        if (missingPlugins.isEmpty()) {
+            switch (discoveryMode) {
+                case ONLY_SCAN:
+                    log.debug("Service loading of plugins disabled, consider 
reconfiguring {}={}",
+                            WorkerConfig.PLUGIN_DISCOVERY_CONFIG, 
PluginDiscoveryMode.HYBRID_WARN);
+                    break;
+                case HYBRID_WARN:
+                case HYBRID_FAIL:
+                    log.warn("All plugins have ServiceLoader manifests, 
consider reconfiguring {}={}",
+                            WorkerConfig.PLUGIN_DISCOVERY_CONFIG, 
PluginDiscoveryMode.SERVICE_LOAD);
+                    break;
+                case SERVICE_LOAD:
+                    log.debug("Reflective loading of plugins disabled, plugins 
without manifests will not be visible");
+                    break;
+                default:
+                    throw new IllegalArgumentException("Unknown discovery 
mode");
+            }
+        } else {
+            String message = String.format(
+                    "Plugins are missing ServiceLoader manifests, these 
plugins will not be visible with %s=%s: %s",

Review Comment:
   Also, should we include instructions on what users should do in this case? 
For many of them, this is going to be the first time hearing about new 
discovery logic; we should make this message as informative as possible if we 
want them to start to make use of it.
   
   Some options include:
   - Suggest that they add service loader manifests for these plugins (possibly 
by mentioning the CLI tool added in https://github.com/apache/kafka/pull/14064)
   - Link to a docs section on the website describing the new plugin discovery 
logic
   - Link to a docs section for the Java standard library describing the 
service loader feature
   - In the case of `HYBRID_FAIL`, suggest changing the discovery mode if the 
missing plugins cannot be updated



##########
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java:
##########
@@ -276,6 +277,7 @@ public void startConnect() {
         putIfAbsent(workerProps, STATUS_STORAGE_REPLICATION_FACTOR_CONFIG, 
internalTopicsReplFactor);
         putIfAbsent(workerProps, KEY_CONVERTER_CLASS_CONFIG, 
"org.apache.kafka.connect.storage.StringConverter");
         putIfAbsent(workerProps, VALUE_CONVERTER_CLASS_CONFIG, 
"org.apache.kafka.connect.storage.StringConverter");
+        putIfAbsent(workerProps, PLUGIN_DISCOVERY_CONFIG, "hybrid_fail");

Review Comment:
   Crazy how powerful adding this single line to our existing tests is! 🙌 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to