This is an automated email from the ASF dual-hosted git repository.

gharris pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ff4fed5cbe0 KAFKA-15031: Add plugin.discovery to Connect worker 
configuration (KIP-898) (#14055)
ff4fed5cbe0 is described below

commit ff4fed5cbe01480f671cc5070fac97d16ca0a5ba
Author: Greg Harris <greg.har...@aiven.io>
AuthorDate: Tue Aug 8 10:06:35 2023 -0700

    KAFKA-15031: Add plugin.discovery to Connect worker configuration (KIP-898) 
(#14055)
    
    Reviewers: Chris Egerton <chr...@aiven.io>
---
 .../org.apache.kafka.connect.storage.Converter     |  16 +++
 ...rg.apache.kafka.connect.storage.HeaderConverter |  17 +++
 .../org.apache.kafka.connect.sink.SinkConnector    |  16 +++
 ...org.apache.kafka.connect.source.SourceConnector |  16 +++
 .../org.apache.kafka.connect.sink.SinkConnector    |  16 +++
 ...org.apache.kafka.connect.source.SourceConnector |  16 +++
 .../org.apache.kafka.connect.storage.Converter     |  16 +++
 ...rg.apache.kafka.connect.storage.HeaderConverter |  16 +++
 ...org.apache.kafka.connect.source.SourceConnector |  18 ++++
 .../apache/kafka/connect/runtime/WorkerConfig.java |  35 ++++++
 .../runtime/isolation/PluginDiscoveryMode.java     |  65 ++++++++++++
 .../kafka/connect/runtime/isolation/Plugins.java   |  62 ++++++++++-
 .../org.apache.kafka.connect.sink.SinkConnector    |  17 +++
 ...org.apache.kafka.connect.source.SourceConnector |  18 ++++
 .../org.apache.kafka.connect.storage.Converter     |  21 ++++
 ...rg.apache.kafka.connect.storage.HeaderConverter |  21 ++++
 .../connect/runtime/isolation/PluginsTest.java     | 118 +++++++++++++++++++++
 .../runtime/isolation/SynchronizationTest.java     |   5 +-
 .../util/clusters/EmbeddedConnectCluster.java      |   2 +
 .../org.apache.kafka.connect.sink.SinkConnector    |  20 ++++
 ...org.apache.kafka.connect.source.SourceConnector |  25 +++++
 .../org.apache.kafka.connect.storage.Converter     |  20 ++++
 ...rg.apache.kafka.connect.storage.HeaderConverter |  20 ++++
 ....apache.kafka.connect.transforms.Transformation |  23 ++++
 ...e.kafka.connect.transforms.predicates.Predicate |  17 +++
 ....apache.kafka.connect.transforms.Transformation |  41 +++++++
 ...e.kafka.connect.transforms.predicates.Predicate |  18 ++++
 27 files changed, 688 insertions(+), 7 deletions(-)

diff --git 
a/connect/api/src/main/resources/META-INF/services/org.apache.kafka.connect.storage.Converter
 
b/connect/api/src/main/resources/META-INF/services/org.apache.kafka.connect.storage.Converter
new file mode 100644
index 00000000000..78e322373bb
--- /dev/null
+++ 
b/connect/api/src/main/resources/META-INF/services/org.apache.kafka.connect.storage.Converter
@@ -0,0 +1,16 @@
+ # Licensed to the Apache Software Foundation (ASF) under one or more
+ # contributor license agreements. See the NOTICE file distributed with
+ # this work for additional information regarding copyright ownership.
+ # The ASF licenses this file to You under the Apache License, Version 2.0
+ # (the "License"); you may not use this file except in compliance with
+ # the License. You may obtain a copy of the License at
+ #
+ #    http://www.apache.org/licenses/LICENSE-2.0
+ #
+ # Unless required by applicable law or agreed to in writing, software
+ # distributed under the License is distributed on an "AS IS" BASIS,
+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ # See the License for the specific language governing permissions and
+ # limitations under the License.
+
+org.apache.kafka.connect.storage.StringConverter
\ No newline at end of file
diff --git 
a/connect/api/src/main/resources/META-INF/services/org.apache.kafka.connect.storage.HeaderConverter
 
b/connect/api/src/main/resources/META-INF/services/org.apache.kafka.connect.storage.HeaderConverter
new file mode 100644
index 00000000000..42e02d13aaf
--- /dev/null
+++ 
b/connect/api/src/main/resources/META-INF/services/org.apache.kafka.connect.storage.HeaderConverter
@@ -0,0 +1,17 @@
+ # Licensed to the Apache Software Foundation (ASF) under one or more
+ # contributor license agreements. See the NOTICE file distributed with
+ # this work for additional information regarding copyright ownership.
+ # The ASF licenses this file to You under the Apache License, Version 2.0
+ # (the "License"); you may not use this file except in compliance with
+ # the License. You may obtain a copy of the License at
+ #
+ #    http://www.apache.org/licenses/LICENSE-2.0
+ #
+ # Unless required by applicable law or agreed to in writing, software
+ # distributed under the License is distributed on an "AS IS" BASIS,
+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ # See the License for the specific language governing permissions and
+ # limitations under the License.
+
+org.apache.kafka.connect.storage.SimpleHeaderConverter
+org.apache.kafka.connect.storage.StringConverter
\ No newline at end of file
diff --git 
a/connect/api/src/test/resources/META-INF/services/org.apache.kafka.connect.sink.SinkConnector
 
b/connect/api/src/test/resources/META-INF/services/org.apache.kafka.connect.sink.SinkConnector
new file mode 100644
index 00000000000..4a1f3a9baf2
--- /dev/null
+++ 
b/connect/api/src/test/resources/META-INF/services/org.apache.kafka.connect.sink.SinkConnector
@@ -0,0 +1,16 @@
+ # Licensed to the Apache Software Foundation (ASF) under one or more
+ # contributor license agreements. See the NOTICE file distributed with
+ # this work for additional information regarding copyright ownership.
+ # The ASF licenses this file to You under the Apache License, Version 2.0
+ # (the "License"); you may not use this file except in compliance with
+ # the License. You may obtain a copy of the License at
+ #
+ #    http://www.apache.org/licenses/LICENSE-2.0
+ #
+ # Unless required by applicable law or agreed to in writing, software
+ # distributed under the License is distributed on an "AS IS" BASIS,
+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ # See the License for the specific language governing permissions and
+ # limitations under the License.
+
+org.apache.kafka.connect.sink.SinkConnectorTest$TestSinkConnector
\ No newline at end of file
diff --git 
a/connect/api/src/test/resources/META-INF/services/org.apache.kafka.connect.source.SourceConnector
 
b/connect/api/src/test/resources/META-INF/services/org.apache.kafka.connect.source.SourceConnector
new file mode 100644
index 00000000000..ab7f14d0aeb
--- /dev/null
+++ 
b/connect/api/src/test/resources/META-INF/services/org.apache.kafka.connect.source.SourceConnector
@@ -0,0 +1,16 @@
+ # Licensed to the Apache Software Foundation (ASF) under one or more
+ # contributor license agreements. See the NOTICE file distributed with
+ # this work for additional information regarding copyright ownership.
+ # The ASF licenses this file to You under the Apache License, Version 2.0
+ # (the "License"); you may not use this file except in compliance with
+ # the License. You may obtain a copy of the License at
+ #
+ #    http://www.apache.org/licenses/LICENSE-2.0
+ #
+ # Unless required by applicable law or agreed to in writing, software
+ # distributed under the License is distributed on an "AS IS" BASIS,
+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ # See the License for the specific language governing permissions and
+ # limitations under the License.
+
+org.apache.kafka.connect.source.SourceConnectorTest$TestSourceConnector
\ No newline at end of file
diff --git 
a/connect/file/src/main/resources/META-INF/services/org.apache.kafka.connect.sink.SinkConnector
 
b/connect/file/src/main/resources/META-INF/services/org.apache.kafka.connect.sink.SinkConnector
new file mode 100644
index 00000000000..4acecd76b5a
--- /dev/null
+++ 
b/connect/file/src/main/resources/META-INF/services/org.apache.kafka.connect.sink.SinkConnector
@@ -0,0 +1,16 @@
+ # Licensed to the Apache Software Foundation (ASF) under one or more
+ # contributor license agreements. See the NOTICE file distributed with
+ # this work for additional information regarding copyright ownership.
+ # The ASF licenses this file to You under the Apache License, Version 2.0
+ # (the "License"); you may not use this file except in compliance with
+ # the License. You may obtain a copy of the License at
+ #
+ #    http://www.apache.org/licenses/LICENSE-2.0
+ #
+ # Unless required by applicable law or agreed to in writing, software
+ # distributed under the License is distributed on an "AS IS" BASIS,
+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ # See the License for the specific language governing permissions and
+ # limitations under the License.
+
+org.apache.kafka.connect.file.FileStreamSinkConnector
\ No newline at end of file
diff --git 
a/connect/file/src/main/resources/META-INF/services/org.apache.kafka.connect.source.SourceConnector
 
b/connect/file/src/main/resources/META-INF/services/org.apache.kafka.connect.source.SourceConnector
new file mode 100644
index 00000000000..66a0c5d8588
--- /dev/null
+++ 
b/connect/file/src/main/resources/META-INF/services/org.apache.kafka.connect.source.SourceConnector
@@ -0,0 +1,16 @@
+ # Licensed to the Apache Software Foundation (ASF) under one or more
+ # contributor license agreements. See the NOTICE file distributed with
+ # this work for additional information regarding copyright ownership.
+ # The ASF licenses this file to You under the Apache License, Version 2.0
+ # (the "License"); you may not use this file except in compliance with
+ # the License. You may obtain a copy of the License at
+ #
+ #    http://www.apache.org/licenses/LICENSE-2.0
+ #
+ # Unless required by applicable law or agreed to in writing, software
+ # distributed under the License is distributed on an "AS IS" BASIS,
+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ # See the License for the specific language governing permissions and
+ # limitations under the License.
+
+org.apache.kafka.connect.file.FileStreamSourceConnector
\ No newline at end of file
diff --git 
a/connect/json/src/main/resources/META-INF/services/org.apache.kafka.connect.storage.Converter
 
b/connect/json/src/main/resources/META-INF/services/org.apache.kafka.connect.storage.Converter
new file mode 100644
index 00000000000..0ea37b79c01
--- /dev/null
+++ 
b/connect/json/src/main/resources/META-INF/services/org.apache.kafka.connect.storage.Converter
@@ -0,0 +1,16 @@
+ # Licensed to the Apache Software Foundation (ASF) under one or more
+ # contributor license agreements. See the NOTICE file distributed with
+ # this work for additional information regarding copyright ownership.
+ # The ASF licenses this file to You under the Apache License, Version 2.0
+ # (the "License"); you may not use this file except in compliance with
+ # the License. You may obtain a copy of the License at
+ #
+ #    http://www.apache.org/licenses/LICENSE-2.0
+ #
+ # Unless required by applicable law or agreed to in writing, software
+ # distributed under the License is distributed on an "AS IS" BASIS,
+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ # See the License for the specific language governing permissions and
+ # limitations under the License.
+
+org.apache.kafka.connect.json.JsonConverter
\ No newline at end of file
diff --git 
a/connect/json/src/main/resources/META-INF/services/org.apache.kafka.connect.storage.HeaderConverter
 
b/connect/json/src/main/resources/META-INF/services/org.apache.kafka.connect.storage.HeaderConverter
new file mode 100644
index 00000000000..0ea37b79c01
--- /dev/null
+++ 
b/connect/json/src/main/resources/META-INF/services/org.apache.kafka.connect.storage.HeaderConverter
@@ -0,0 +1,16 @@
+ # Licensed to the Apache Software Foundation (ASF) under one or more
+ # contributor license agreements. See the NOTICE file distributed with
+ # this work for additional information regarding copyright ownership.
+ # The ASF licenses this file to You under the Apache License, Version 2.0
+ # (the "License"); you may not use this file except in compliance with
+ # the License. You may obtain a copy of the License at
+ #
+ #    http://www.apache.org/licenses/LICENSE-2.0
+ #
+ # Unless required by applicable law or agreed to in writing, software
+ # distributed under the License is distributed on an "AS IS" BASIS,
+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ # See the License for the specific language governing permissions and
+ # limitations under the License.
+
+org.apache.kafka.connect.json.JsonConverter
\ No newline at end of file
diff --git 
a/connect/mirror/src/main/resources/META-INF/services/org.apache.kafka.connect.source.SourceConnector
 
b/connect/mirror/src/main/resources/META-INF/services/org.apache.kafka.connect.source.SourceConnector
new file mode 100644
index 00000000000..4836e08f3e6
--- /dev/null
+++ 
b/connect/mirror/src/main/resources/META-INF/services/org.apache.kafka.connect.source.SourceConnector
@@ -0,0 +1,18 @@
+ # Licensed to the Apache Software Foundation (ASF) under one or more
+ # contributor license agreements. See the NOTICE file distributed with
+ # this work for additional information regarding copyright ownership.
+ # The ASF licenses this file to You under the Apache License, Version 2.0
+ # (the "License"); you may not use this file except in compliance with
+ # the License. You may obtain a copy of the License at
+ #
+ #    http://www.apache.org/licenses/LICENSE-2.0
+ #
+ # Unless required by applicable law or agreed to in writing, software
+ # distributed under the License is distributed on an "AS IS" BASIS,
+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ # See the License for the specific language governing permissions and
+ # limitations under the License.
+
+org.apache.kafka.connect.mirror.MirrorCheckpointConnector
+org.apache.kafka.connect.mirror.MirrorHeartbeatConnector
+org.apache.kafka.connect.mirror.MirrorSourceConnector
\ No newline at end of file
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
index 6ba7cea7406..c1fd69317bc 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
@@ -25,7 +25,9 @@ import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
 import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.runtime.isolation.PluginDiscoveryMode;
 import org.apache.kafka.connect.runtime.rest.RestServerConfig;
 import org.apache.kafka.connect.storage.SimpleHeaderConverter;
 import org.slf4j.Logger;
@@ -35,6 +37,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ExecutionException;
@@ -42,6 +45,10 @@ import java.util.concurrent.ExecutionException;
 import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
 import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
 import static 
org.apache.kafka.connect.runtime.SourceConnectorConfig.TOPIC_CREATION_PREFIX;
+import static 
org.apache.kafka.connect.runtime.isolation.PluginDiscoveryMode.ONLY_SCAN;
+import static 
org.apache.kafka.connect.runtime.isolation.PluginDiscoveryMode.HYBRID_WARN;
+import static 
org.apache.kafka.connect.runtime.isolation.PluginDiscoveryMode.HYBRID_FAIL;
+import static 
org.apache.kafka.connect.runtime.isolation.PluginDiscoveryMode.SERVICE_LOAD;
 
 /**
  * Common base class providing configuration for Kafka Connect workers, 
whether standalone or distributed.
@@ -122,6 +129,18 @@ public class WorkerConfig extends AbstractConfig {
             + "by the worker's scanner before config providers are initialized 
and used to "
             + "replace variables.";
 
+    public static final String PLUGIN_DISCOVERY_CONFIG = "plugin.discovery";
+    protected static final String PLUGIN_DISCOVERY_DOC = "Method to use to 
discover plugins present in the classpath "
+            + "and plugin.path configuration. This can be one of multiple 
values with the following meanings:\n"
+            + "* " + ONLY_SCAN + ": Discover plugins only by reflection. "
+            + "Plugins which are not discoverable by ServiceLoader will not 
impact worker startup.\n"
+            + "* " + HYBRID_WARN + ": Discover plugins reflectively and by 
ServiceLoader. "
+            + "Plugins which are not discoverable by ServiceLoader will print 
warnings during worker startup.\n"
+            + "* " + HYBRID_FAIL + ": Discover plugins reflectively and by 
ServiceLoader. "
+            + "Plugins which are not discoverable by ServiceLoader will cause 
worker startup to fail.\n"
+            + "* " + SERVICE_LOAD + ": Discover plugins only by ServiceLoader. 
Faster startup than other modes. "
+            + "Plugins which are not discoverable by ServiceLoader may not be 
usable.";
+
     public static final String CONFIG_PROVIDERS_CONFIG = "config.providers";
     protected static final String CONFIG_PROVIDERS_DOC =
             "Comma-separated names of <code>ConfigProvider</code> classes, 
loaded and used "
@@ -199,6 +218,12 @@ public class WorkerConfig extends AbstractConfig {
                         null,
                         Importance.LOW,
                         PLUGIN_PATH_DOC)
+                .define(PLUGIN_DISCOVERY_CONFIG,
+                        Type.STRING,
+                        PluginDiscoveryMode.HYBRID_WARN.toString(),
+                        
ConfigDef.CaseInsensitiveValidString.in(Utils.enumOptions(PluginDiscoveryMode.class)),
+                        Importance.LOW,
+                        PLUGIN_DISCOVERY_DOC)
                 .define(METRICS_SAMPLE_WINDOW_MS_CONFIG, Type.LONG,
                         30000, atLeast(0), Importance.LOW,
                         CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC)
@@ -401,6 +426,16 @@ public class WorkerConfig extends AbstractConfig {
         return props.get(WorkerConfig.PLUGIN_PATH_CONFIG);
     }
 
+    public static PluginDiscoveryMode pluginDiscovery(Map<String, String> 
props) {
+        String value = 
props.getOrDefault(WorkerConfig.PLUGIN_DISCOVERY_CONFIG, 
PluginDiscoveryMode.HYBRID_WARN.toString());
+        try {
+            return PluginDiscoveryMode.valueOf(value.toUpperCase(Locale.ROOT));
+        } catch (IllegalArgumentException e) {
+            throw new ConnectException("Invalid " + PLUGIN_DISCOVERY_CONFIG + 
" value, must be one of "
+                    + 
Arrays.toString(Utils.enumOptions(PluginDiscoveryMode.class)));
+        }
+    }
+
     public WorkerConfig(ConfigDef definition, Map<String, String> props) {
         super(definition, props);
         logInternalConverterRemovalWarnings(props);
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDiscoveryMode.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDiscoveryMode.java
new file mode 100644
index 00000000000..bd390a929b4
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDiscoveryMode.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.isolation;
+
+import java.util.Locale;
+
+/**
+ * Strategy to use to discover plugins usable on a Connect worker.
+ * @see <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-898%3A+Modernize+Connect+plugin+discovery";>KIP-898</a>
+ */
+public enum PluginDiscoveryMode {
+
+    /**
+     * Scan for plugins reflectively. This corresponds to the legacy behavior 
of Connect prior to KIP-898.
+     * <p>Note: the following plugins are still loaded using {@link 
java.util.ServiceLoader} in this mode:
+     * <ul>
+     *     <li>{@link 
org.apache.kafka.common.config.provider.ConfigProvider}</li>
+     *     <li>{@link org.apache.kafka.connect.rest.ConnectRestExtension}</li>
+     *     <li>{@link 
org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy}</li>
+     * </ul>
+     */
+    ONLY_SCAN,
+    /**
+     * Scan for plugins reflectively and via {@link java.util.ServiceLoader}.
+     * Emit warnings if one or more plugins is not available via {@link 
java.util.ServiceLoader}
+     */
+    HYBRID_WARN,
+    /**
+     * Scan for plugins reflectively and via {@link java.util.ServiceLoader}.
+     * Fail worker during startup if one or more plugins is not available via 
{@link java.util.ServiceLoader}
+     */
+    HYBRID_FAIL,
+    /**
+     * Discover plugins via {@link java.util.ServiceLoader} only.
+     * Plugins may not be usable if they are not available via {@link 
java.util.ServiceLoader}
+     */
+    SERVICE_LOAD;
+
+    public boolean reflectivelyScan() {
+        return this != SERVICE_LOAD;
+    }
+
+    public boolean serviceLoad() {
+        return this != ONLY_SCAN;
+    }
+
+    @Override
+    public String toString() {
+        return name().toLowerCase(Locale.ROOT);
+    }
+}
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
index 36e20270abc..72fe40a50a5 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
@@ -39,11 +39,15 @@ import org.slf4j.LoggerFactory;
 
 import java.nio.file.Path;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.SortedSet;
 import java.util.TreeSet;
+import java.util.stream.Collectors;
 
 public class Plugins {
 
@@ -63,16 +67,64 @@ public class Plugins {
     // VisibleForTesting
     Plugins(Map<String, String> props, ClassLoader parent, ClassLoaderFactory 
factory) {
         String pluginPath = WorkerConfig.pluginPath(props);
+        PluginDiscoveryMode discoveryMode = 
WorkerConfig.pluginDiscovery(props);
         Set<Path> pluginLocations = PluginUtils.pluginLocations(pluginPath);
         delegatingLoader = factory.newDelegatingClassLoader(parent);
         Set<PluginSource> pluginSources = 
PluginUtils.pluginSources(pluginLocations, delegatingLoader, factory);
-        scanResult = initLoaders(pluginSources);
+        scanResult = initLoaders(pluginSources, discoveryMode);
     }
 
-    private PluginScanResult initLoaders(Set<PluginSource> pluginSources) {
-        PluginScanResult reflectiveScanResult = new 
ReflectionScanner().discoverPlugins(pluginSources);
-        delegatingLoader.installDiscoveredPlugins(reflectiveScanResult);
-        return reflectiveScanResult;
+    public PluginScanResult initLoaders(Set<PluginSource> pluginSources, 
PluginDiscoveryMode discoveryMode) {
+        PluginScanResult empty = new PluginScanResult(Collections.emptyList());
+        PluginScanResult serviceLoadingScanResult;
+        try {
+            serviceLoadingScanResult = discoveryMode.serviceLoad() ?
+                    new ServiceLoaderScanner().discoverPlugins(pluginSources) 
: empty;
+        } catch (Throwable t) {
+            throw new ConnectException(String.format(
+                    "Unable to perform ServiceLoader scanning as requested by 
%s=%s. It may be possible to fix this issue by reconfiguring %s=%s",
+                    WorkerConfig.PLUGIN_DISCOVERY_CONFIG, discoveryMode,
+                    WorkerConfig.PLUGIN_DISCOVERY_CONFIG, 
PluginDiscoveryMode.ONLY_SCAN), t);
+        }
+        PluginScanResult reflectiveScanResult = 
discoveryMode.reflectivelyScan() ?
+                new ReflectionScanner().discoverPlugins(pluginSources) : empty;
+        PluginScanResult scanResult = new 
PluginScanResult(Arrays.asList(reflectiveScanResult, serviceLoadingScanResult));
+        maybeReportHybridDiscoveryIssue(discoveryMode, 
serviceLoadingScanResult, scanResult);
+        delegatingLoader.installDiscoveredPlugins(scanResult);
+        return scanResult;
+    }
+
+    // visible for testing
+    static void maybeReportHybridDiscoveryIssue(PluginDiscoveryMode 
discoveryMode, PluginScanResult serviceLoadingScanResult, PluginScanResult 
mergedResult) {
+        SortedSet<PluginDesc<?>> missingPlugins = new TreeSet<>();
+        mergedResult.forEach(missingPlugins::add);
+        serviceLoadingScanResult.forEach(missingPlugins::remove);
+        if (missingPlugins.isEmpty()) {
+            if (discoveryMode == PluginDiscoveryMode.HYBRID_WARN || 
discoveryMode == PluginDiscoveryMode.HYBRID_FAIL) {
+                log.warn("All plugins have ServiceLoader manifests, consider 
reconfiguring {}={}",
+                        WorkerConfig.PLUGIN_DISCOVERY_CONFIG, 
PluginDiscoveryMode.SERVICE_LOAD);
+            }
+        } else {
+            String message = String.format(
+                "One or more plugins are missing ServiceLoader manifests may 
not be usable with %s=%s: %s%n" +
+                        "Read the documentation at %s for instructions on 
migrating your plugins " +
+                        "to take advantage of the performance improvements of 
%s mode.",
+                            WorkerConfig.PLUGIN_DISCOVERY_CONFIG,
+                    PluginDiscoveryMode.SERVICE_LOAD,
+                    missingPlugins.stream()
+                            .map(pluginDesc -> pluginDesc.location() + "\t" + 
pluginDesc.className() + "\t" + pluginDesc.type() + "\t" + pluginDesc.version())
+                            .collect(Collectors.joining("\n", "[\n", "\n]")),
+                    
"https://kafka.apache.org/documentation.html#connect_plugindiscovery";,
+                    PluginDiscoveryMode.SERVICE_LOAD
+            );
+            if (discoveryMode == PluginDiscoveryMode.HYBRID_WARN) {
+                log.warn("{} To silence this warning, set {}={} in the worker 
config.",
+                        message, WorkerConfig.PLUGIN_DISCOVERY_CONFIG, 
PluginDiscoveryMode.ONLY_SCAN);
+            } else if (discoveryMode == PluginDiscoveryMode.HYBRID_FAIL) {
+                throw new ConnectException(String.format("%s To silence this 
error, set %s=%s in the worker config.",
+                        message, WorkerConfig.PLUGIN_DISCOVERY_CONFIG, 
PluginDiscoveryMode.HYBRID_WARN));
+            }
+        }
     }
 
     private static <T> String pluginNames(Collection<PluginDesc<T>> plugins) {
diff --git 
a/connect/runtime/src/main/resources/META-INF/services/org.apache.kafka.connect.sink.SinkConnector
 
b/connect/runtime/src/main/resources/META-INF/services/org.apache.kafka.connect.sink.SinkConnector
new file mode 100644
index 00000000000..170043754d5
--- /dev/null
+++ 
b/connect/runtime/src/main/resources/META-INF/services/org.apache.kafka.connect.sink.SinkConnector
@@ -0,0 +1,17 @@
+ # Licensed to the Apache Software Foundation (ASF) under one or more
+ # contributor license agreements. See the NOTICE file distributed with
+ # this work for additional information regarding copyright ownership.
+ # The ASF licenses this file to You under the Apache License, Version 2.0
+ # (the "License"); you may not use this file except in compliance with
+ # the License. You may obtain a copy of the License at
+ #
+ #    http://www.apache.org/licenses/LICENSE-2.0
+ #
+ # Unless required by applicable law or agreed to in writing, software
+ # distributed under the License is distributed on an "AS IS" BASIS,
+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ # See the License for the specific language governing permissions and
+ # limitations under the License.
+
+org.apache.kafka.connect.tools.MockSinkConnector
+org.apache.kafka.connect.tools.VerifiableSinkConnector
diff --git 
a/connect/runtime/src/main/resources/META-INF/services/org.apache.kafka.connect.source.SourceConnector
 
b/connect/runtime/src/main/resources/META-INF/services/org.apache.kafka.connect.source.SourceConnector
new file mode 100644
index 00000000000..acc2ddce718
--- /dev/null
+++ 
b/connect/runtime/src/main/resources/META-INF/services/org.apache.kafka.connect.source.SourceConnector
@@ -0,0 +1,18 @@
+ # Licensed to the Apache Software Foundation (ASF) under one or more
+ # contributor license agreements. See the NOTICE file distributed with
+ # this work for additional information regarding copyright ownership.
+ # The ASF licenses this file to You under the Apache License, Version 2.0
+ # (the "License"); you may not use this file except in compliance with
+ # the License. You may obtain a copy of the License at
+ #
+ #    http://www.apache.org/licenses/LICENSE-2.0
+ #
+ # Unless required by applicable law or agreed to in writing, software
+ # distributed under the License is distributed on an "AS IS" BASIS,
+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ # See the License for the specific language governing permissions and
+ # limitations under the License.
+
+org.apache.kafka.connect.tools.MockSourceConnector
+org.apache.kafka.connect.tools.SchemaSourceConnector
+org.apache.kafka.connect.tools.VerifiableSourceConnector
\ No newline at end of file
diff --git 
a/connect/runtime/src/main/resources/META-INF/services/org.apache.kafka.connect.storage.Converter
 
b/connect/runtime/src/main/resources/META-INF/services/org.apache.kafka.connect.storage.Converter
new file mode 100644
index 00000000000..134262474b9
--- /dev/null
+++ 
b/connect/runtime/src/main/resources/META-INF/services/org.apache.kafka.connect.storage.Converter
@@ -0,0 +1,21 @@
+ # Licensed to the Apache Software Foundation (ASF) under one or more
+ # contributor license agreements. See the NOTICE file distributed with
+ # this work for additional information regarding copyright ownership.
+ # The ASF licenses this file to You under the Apache License, Version 2.0
+ # (the "License"); you may not use this file except in compliance with
+ # the License. You may obtain a copy of the License at
+ #
+ #    http://www.apache.org/licenses/LICENSE-2.0
+ #
+ # Unless required by applicable law or agreed to in writing, software
+ # distributed under the License is distributed on an "AS IS" BASIS,
+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ # See the License for the specific language governing permissions and
+ # limitations under the License.
+
+org.apache.kafka.connect.converters.ByteArrayConverter
+org.apache.kafka.connect.converters.DoubleConverter
+org.apache.kafka.connect.converters.FloatConverter
+org.apache.kafka.connect.converters.IntegerConverter
+org.apache.kafka.connect.converters.LongConverter
+org.apache.kafka.connect.converters.ShortConverter
diff --git 
a/connect/runtime/src/main/resources/META-INF/services/org.apache.kafka.connect.storage.HeaderConverter
 
b/connect/runtime/src/main/resources/META-INF/services/org.apache.kafka.connect.storage.HeaderConverter
new file mode 100644
index 00000000000..134262474b9
--- /dev/null
+++ 
b/connect/runtime/src/main/resources/META-INF/services/org.apache.kafka.connect.storage.HeaderConverter
@@ -0,0 +1,21 @@
+ # Licensed to the Apache Software Foundation (ASF) under one or more
+ # contributor license agreements. See the NOTICE file distributed with
+ # this work for additional information regarding copyright ownership.
+ # The ASF licenses this file to You under the Apache License, Version 2.0
+ # (the "License"); you may not use this file except in compliance with
+ # the License. You may obtain a copy of the License at
+ #
+ #    http://www.apache.org/licenses/LICENSE-2.0
+ #
+ # Unless required by applicable law or agreed to in writing, software
+ # distributed under the License is distributed on an "AS IS" BASIS,
+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ # See the License for the specific language governing permissions and
+ # limitations under the License.
+
+org.apache.kafka.connect.converters.ByteArrayConverter
+org.apache.kafka.connect.converters.DoubleConverter
+org.apache.kafka.connect.converters.FloatConverter
+org.apache.kafka.connect.converters.IntegerConverter
+org.apache.kafka.connect.converters.LongConverter
+org.apache.kafka.connect.converters.ShortConverter
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
index 75418b27a6f..189d75a842a 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
@@ -31,6 +31,7 @@ import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.config.provider.ConfigProvider;
+import org.apache.kafka.common.utils.LogCaptureAppender;
 import 
org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy;
 import 
org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
 import org.apache.kafka.connect.converters.ByteArrayConverter;
@@ -46,6 +47,7 @@ import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.runtime.isolation.Plugins.ClassLoaderUsage;
 import org.apache.kafka.connect.runtime.isolation.TestPlugins.TestPlugin;
 import org.apache.kafka.connect.runtime.rest.RestServerConfig;
+import org.apache.kafka.connect.sink.SinkConnector;
 import org.apache.kafka.connect.storage.Converter;
 import org.apache.kafka.connect.storage.ConverterConfig;
 import org.apache.kafka.connect.storage.ConverterType;
@@ -58,6 +60,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.SortedSet;
 import java.util.stream.Collectors;
 
 import static org.junit.Assert.assertEquals;
@@ -76,6 +79,9 @@ public class PluginsTest {
     private TestConverter converter;
     private TestHeaderConverter headerConverter;
     private TestInternalConverter internalConverter;
+    private PluginScanResult nonEmpty;
+    private PluginScanResult empty;
+    private String missingPluginClass;
 
     @Before
     public void setup() {
@@ -94,6 +100,22 @@ public class PluginsTest {
         props.put(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, 
TestHeaderConverter.class.getName());
         props.put("header.converter.extra.config", "baz");
 
+        // Set up some PluginScanResult instances to test the plugin discovery 
modes
+        SortedSet<PluginDesc<SinkConnector>> sinkConnectors = 
(SortedSet<PluginDesc<SinkConnector>>) plugins.sinkConnectors();
+        missingPluginClass = sinkConnectors.first().className();
+        nonEmpty = new PluginScanResult(
+                sinkConnectors,
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet(),
+                Collections.emptySortedSet()
+        );
+        empty = new PluginScanResult(Collections.emptyList());
+
         createConfig();
     }
 
@@ -476,6 +498,102 @@ public class PluginsTest {
         }
     }
 
+    @Test
+    public void testOnlyScanNoPlugins() {
+        try (LogCaptureAppender logCaptureAppender = 
LogCaptureAppender.createAndRegister(Plugins.class)) {
+            
Plugins.maybeReportHybridDiscoveryIssue(PluginDiscoveryMode.ONLY_SCAN, empty, 
empty);
+            assertTrue(logCaptureAppender.getEvents().stream().noneMatch(e -> 
e.getLevel().contains("ERROR") || e.getLevel().equals("WARN")));
+        }
+    }
+
+    @Test
+    public void testOnlyScanWithPlugins() {
+        try (LogCaptureAppender logCaptureAppender = 
LogCaptureAppender.createAndRegister(Plugins.class)) {
+            
Plugins.maybeReportHybridDiscoveryIssue(PluginDiscoveryMode.ONLY_SCAN, empty, 
nonEmpty);
+            assertTrue(logCaptureAppender.getEvents().stream().noneMatch(e -> 
e.getLevel().contains("ERROR") || e.getLevel().equals("WARN")));
+        }
+    }
+
+    @Test
+    public void testHybridWarnNoPlugins() {
+        try (LogCaptureAppender logCaptureAppender = 
LogCaptureAppender.createAndRegister(Plugins.class)) {
+            
Plugins.maybeReportHybridDiscoveryIssue(PluginDiscoveryMode.HYBRID_WARN, empty, 
empty);
+            assertTrue(logCaptureAppender.getEvents().stream().anyMatch(e ->
+                    e.getLevel().equals("WARN")
+                            // These log messages must contain the config 
name, it is referenced in the documentation.
+                            && 
e.getMessage().contains(WorkerConfig.PLUGIN_DISCOVERY_CONFIG)
+            ));
+        }
+    }
+
+    @Test
+    public void testHybridWarnWithPlugins() {
+        try (LogCaptureAppender logCaptureAppender = 
LogCaptureAppender.createAndRegister(Plugins.class)) {
+            
Plugins.maybeReportHybridDiscoveryIssue(PluginDiscoveryMode.HYBRID_WARN, 
nonEmpty, nonEmpty);
+            assertTrue(logCaptureAppender.getEvents().stream().anyMatch(e ->
+                    e.getLevel().equals("WARN")
+                            && !e.getMessage().contains(missingPluginClass)
+                            && 
e.getMessage().contains(WorkerConfig.PLUGIN_DISCOVERY_CONFIG)
+            ));
+        }
+    }
+
+    @Test
+    public void testHybridWarnMissingPlugins() {
+        try (LogCaptureAppender logCaptureAppender = 
LogCaptureAppender.createAndRegister(Plugins.class)) {
+            
Plugins.maybeReportHybridDiscoveryIssue(PluginDiscoveryMode.HYBRID_WARN, empty, 
nonEmpty);
+            assertTrue(logCaptureAppender.getEvents().stream().anyMatch(e ->
+                    e.getLevel().equals("WARN")
+                            && e.getMessage().contains(missingPluginClass)
+                            && 
e.getMessage().contains(WorkerConfig.PLUGIN_DISCOVERY_CONFIG)
+            ));
+        }
+    }
+
+    @Test
+    public void testHybridFailNoPlugins() {
+        try (LogCaptureAppender logCaptureAppender = 
LogCaptureAppender.createAndRegister(Plugins.class)) {
+            
Plugins.maybeReportHybridDiscoveryIssue(PluginDiscoveryMode.HYBRID_FAIL, empty, 
empty);
+            assertTrue(logCaptureAppender.getEvents().stream().anyMatch(e ->
+                    e.getLevel().equals("WARN")
+                            && 
e.getMessage().contains(WorkerConfig.PLUGIN_DISCOVERY_CONFIG)
+            ));
+        }
+    }
+
+    @Test
+    public void testHybridFailWithPlugins() {
+        try (LogCaptureAppender logCaptureAppender = 
LogCaptureAppender.createAndRegister(Plugins.class)) {
+            
Plugins.maybeReportHybridDiscoveryIssue(PluginDiscoveryMode.HYBRID_FAIL, 
nonEmpty, nonEmpty);
+            assertTrue(logCaptureAppender.getEvents().stream().anyMatch(e ->
+                    e.getLevel().equals("WARN")
+                            && !e.getMessage().contains(missingPluginClass)
+                            && 
e.getMessage().contains(WorkerConfig.PLUGIN_DISCOVERY_CONFIG)
+            ));
+        }
+    }
+
+    @Test
+    public void testHybridFailMissingPlugins() {
+        assertThrows(ConnectException.class, () -> 
Plugins.maybeReportHybridDiscoveryIssue(PluginDiscoveryMode.HYBRID_FAIL, empty, 
nonEmpty));
+    }
+
+    @Test
+    public void testServiceLoadNoPlugins() {
+        try (LogCaptureAppender logCaptureAppender = 
LogCaptureAppender.createAndRegister(Plugins.class)) {
+            
Plugins.maybeReportHybridDiscoveryIssue(PluginDiscoveryMode.SERVICE_LOAD, 
empty, empty);
+            assertTrue(logCaptureAppender.getEvents().stream().noneMatch(e -> 
e.getLevel().contains("ERROR") || e.getLevel().equals("WARN")));
+        }
+    }
+
+    @Test
+    public void testServiceLoadWithPlugins() {
+        try (LogCaptureAppender logCaptureAppender = 
LogCaptureAppender.createAndRegister(Plugins.class)) {
+            
Plugins.maybeReportHybridDiscoveryIssue(PluginDiscoveryMode.SERVICE_LOAD, 
nonEmpty, nonEmpty);
+            assertTrue(logCaptureAppender.getEvents().stream().noneMatch(e -> 
e.getLevel().contains("ERROR") || e.getLevel().equals("WARN")));
+        }
+    }
+
     private void assertClassLoaderReadsVersionFromResource(
             TestPlugin parentResource, TestPlugin childResource, String 
className, String... expectedVersions) {
         URL[] systemPath = TestPlugins.pluginPath(parentResource)
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java
index c7b884c3605..16b4d3a0e9f 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java
@@ -44,13 +44,13 @@ import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
-import org.apache.kafka.connect.json.JsonConverter;
 import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
+import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -239,7 +239,8 @@ public class SynchronizationTest {
         };
 
         // THREAD 2: loads a class by delegating upward starting from the 
PluginClassLoader
-        String t2Class = JsonConverter.class.getName();
+        // Use any non-plugin class that no plugins depend on, so that the 
class isn't loaded during plugin discovery
+        String t2Class = Mockito.class.getName();
         // PluginClassLoader breakpoint will only trigger on this thread
         pclBreakpoint.set(t2Class::equals);
         Runnable thread2 = () -> {
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
index 36e0fc765a0..4f0c4369f89 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
@@ -57,6 +57,7 @@ import static 
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
 import static 
org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
 import static 
org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
 import static 
org.apache.kafka.connect.runtime.WorkerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.WorkerConfig.PLUGIN_DISCOVERY_CONFIG;
 import static 
org.apache.kafka.connect.runtime.distributed.DistributedConfig.CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG;
 import static 
org.apache.kafka.connect.runtime.distributed.DistributedConfig.CONFIG_TOPIC_CONFIG;
 import static 
org.apache.kafka.connect.runtime.distributed.DistributedConfig.OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG;
@@ -276,6 +277,7 @@ public class EmbeddedConnectCluster {
         putIfAbsent(workerProps, STATUS_STORAGE_REPLICATION_FACTOR_CONFIG, 
internalTopicsReplFactor);
         putIfAbsent(workerProps, KEY_CONVERTER_CLASS_CONFIG, 
"org.apache.kafka.connect.storage.StringConverter");
         putIfAbsent(workerProps, VALUE_CONVERTER_CLASS_CONFIG, 
"org.apache.kafka.connect.storage.StringConverter");
+        putIfAbsent(workerProps, PLUGIN_DISCOVERY_CONFIG, "hybrid_fail");
 
         for (int i = 0; i < numInitialWorkers; i++) {
             addWorker();
diff --git 
a/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.sink.SinkConnector
 
b/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.sink.SinkConnector
new file mode 100644
index 00000000000..4c26fece184
--- /dev/null
+++ 
b/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.sink.SinkConnector
@@ -0,0 +1,20 @@
+ # Licensed to the Apache Software Foundation (ASF) under one or more
+ # contributor license agreements. See the NOTICE file distributed with
+ # this work for additional information regarding copyright ownership.
+ # The ASF licenses this file to You under the Apache License, Version 2.0
+ # (the "License"); you may not use this file except in compliance with
+ # the License. You may obtain a copy of the License at
+ #
+ #    http://www.apache.org/licenses/LICENSE-2.0
+ #
+ # Unless required by applicable law or agreed to in writing, software
+ # distributed under the License is distributed on an "AS IS" BASIS,
+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ # See the License for the specific language governing permissions and
+ # limitations under the License.
+
+org.apache.kafka.connect.integration.BlockingConnectorTest$BlockingSinkConnector
+org.apache.kafka.connect.integration.BlockingConnectorTest$TaskInitializeBlockingSinkConnector
+org.apache.kafka.connect.integration.ErrantRecordSinkConnector
+org.apache.kafka.connect.integration.MonitorableSinkConnector
+org.apache.kafka.connect.runtime.SampleSinkConnector
\ No newline at end of file
diff --git 
a/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.source.SourceConnector
 
b/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.source.SourceConnector
new file mode 100644
index 00000000000..73033ca23c0
--- /dev/null
+++ 
b/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.source.SourceConnector
@@ -0,0 +1,25 @@
+ # Licensed to the Apache Software Foundation (ASF) under one or more
+ # contributor license agreements. See the NOTICE file distributed with
+ # this work for additional information regarding copyright ownership.
+ # The ASF licenses this file to You under the Apache License, Version 2.0
+ # (the "License"); you may not use this file except in compliance with
+ # the License. You may obtain a copy of the License at
+ #
+ #    http://www.apache.org/licenses/LICENSE-2.0
+ #
+ # Unless required by applicable law or agreed to in writing, software
+ # distributed under the License is distributed on an "AS IS" BASIS,
+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ # See the License for the specific language governing permissions and
+ # limitations under the License.
+
+org.apache.kafka.connect.integration.BlockingConnectorTest$BlockingConnector
+org.apache.kafka.connect.integration.BlockingConnectorTest$InitializeBlockingConnector
+org.apache.kafka.connect.integration.BlockingConnectorTest$ConfigBlockingConnector
+org.apache.kafka.connect.integration.BlockingConnectorTest$ValidateBlockingConnector
+org.apache.kafka.connect.integration.BlockingConnectorTest$BlockingSourceConnector
+org.apache.kafka.connect.integration.BlockingConnectorTest$TaskInitializeBlockingSourceConnector
+org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest$NaughtyConnector
+org.apache.kafka.connect.integration.MonitorableSourceConnector
+org.apache.kafka.connect.runtime.SampleSourceConnector
+org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResourceTest$ConnectorPluginsResourceTestConnector
diff --git 
a/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.storage.Converter
 
b/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.storage.Converter
new file mode 100644
index 00000000000..6d38aebee3d
--- /dev/null
+++ 
b/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.storage.Converter
@@ -0,0 +1,20 @@
+ # Licensed to the Apache Software Foundation (ASF) under one or more
+ # contributor license agreements. See the NOTICE file distributed with
+ # this work for additional information regarding copyright ownership.
+ # The ASF licenses this file to You under the Apache License, Version 2.0
+ # (the "License"); you may not use this file except in compliance with
+ # the License. You may obtain a copy of the License at
+ #
+ #    http://www.apache.org/licenses/LICENSE-2.0
+ #
+ # Unless required by applicable law or agreed to in writing, software
+ # distributed under the License is distributed on an "AS IS" BASIS,
+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ # See the License for the specific language governing permissions and
+ # limitations under the License.
+
+org.apache.kafka.connect.runtime.SampleConverterWithHeaders
+org.apache.kafka.connect.runtime.ErrorHandlingTaskTest$FaultyConverter
+org.apache.kafka.connect.runtime.isolation.PluginsTest$TestConverter
+org.apache.kafka.connect.runtime.isolation.PluginsTest$TestInternalConverter
+org.apache.kafka.connect.runtime.isolation.PluginUtilsTest$CollidingConverter
\ No newline at end of file
diff --git 
a/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.storage.HeaderConverter
 
b/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.storage.HeaderConverter
new file mode 100644
index 00000000000..a5b008543b1
--- /dev/null
+++ 
b/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.storage.HeaderConverter
@@ -0,0 +1,20 @@
+ # Licensed to the Apache Software Foundation (ASF) under one or more
+ # contributor license agreements. See the NOTICE file distributed with
+ # this work for additional information regarding copyright ownership.
+ # The ASF licenses this file to You under the Apache License, Version 2.0
+ # (the "License"); you may not use this file except in compliance with
+ # the License. You may obtain a copy of the License at
+ #
+ #    http://www.apache.org/licenses/LICENSE-2.0
+ #
+ # Unless required by applicable law or agreed to in writing, software
+ # distributed under the License is distributed on an "AS IS" BASIS,
+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ # See the License for the specific language governing permissions and
+ # limitations under the License.
+
+org.apache.kafka.connect.runtime.SampleHeaderConverter
+org.apache.kafka.connect.runtime.ErrorHandlingTaskTest$FaultyConverter
+org.apache.kafka.connect.runtime.isolation.PluginsTest$TestHeaderConverter
+org.apache.kafka.connect.runtime.isolation.PluginsTest$TestInternalConverter
+org.apache.kafka.connect.runtime.isolation.PluginUtilsTest$CollidingHeaderConverter
\ No newline at end of file
diff --git 
a/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.transforms.Transformation
 
b/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.transforms.Transformation
new file mode 100644
index 00000000000..6d36ee90888
--- /dev/null
+++ 
b/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.transforms.Transformation
@@ -0,0 +1,23 @@
+ # Licensed to the Apache Software Foundation (ASF) under one or more
+ # contributor license agreements. See the NOTICE file distributed with
+ # this work for additional information regarding copyright ownership.
+ # The ASF licenses this file to You under the Apache License, Version 2.0
+ # (the "License"); you may not use this file except in compliance with
+ # the License. You may obtain a copy of the License at
+ #
+ #    http://www.apache.org/licenses/LICENSE-2.0
+ #
+ # Unless required by applicable law or agreed to in writing, software
+ # distributed under the License is distributed on an "AS IS" BASIS,
+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ # See the License for the specific language governing permissions and
+ # limitations under the License.
+
+org.apache.kafka.connect.integration.ErrorHandlingIntegrationTest$FaultyPassthrough
+org.apache.kafka.connect.runtime.ErrorHandlingTaskTest$FaultyPassthrough
+org.apache.kafka.connect.runtime.ConnectorConfigTest$SimpleTransformation
+org.apache.kafka.connect.runtime.ConnectorConfigTest$HasDuplicateConfigTransformation
+org.apache.kafka.connect.runtime.ConnectorConfigTest$AbstractKeyValueTransformation$Key
+org.apache.kafka.connect.runtime.ConnectorConfigTest$AbstractKeyValueTransformation$Value
+org.apache.kafka.connect.runtime.SampleTransformation
+org.apache.kafka.connect.runtime.isolation.PluginUtilsTest$Colliding
diff --git 
a/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.transforms.predicates.Predicate
 
b/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.transforms.predicates.Predicate
new file mode 100644
index 00000000000..b235b1fec51
--- /dev/null
+++ 
b/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.transforms.predicates.Predicate
@@ -0,0 +1,17 @@
+ # Licensed to the Apache Software Foundation (ASF) under one or more
+ # contributor license agreements. See the NOTICE file distributed with
+ # this work for additional information regarding copyright ownership.
+ # The ASF licenses this file to You under the Apache License, Version 2.0
+ # (the "License"); you may not use this file except in compliance with
+ # the License. You may obtain a copy of the License at
+ #
+ #    http://www.apache.org/licenses/LICENSE-2.0
+ #
+ # Unless required by applicable law or agreed to in writing, software
+ # distributed under the License is distributed on an "AS IS" BASIS,
+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ # See the License for the specific language governing permissions and
+ # limitations under the License.
+
+org.apache.kafka.connect.runtime.ConnectorConfigTest$TestPredicate
+org.apache.kafka.connect.runtime.SamplePredicate
\ No newline at end of file
diff --git 
a/connect/transforms/src/main/resources/META-INF/services/org.apache.kafka.connect.transforms.Transformation
 
b/connect/transforms/src/main/resources/META-INF/services/org.apache.kafka.connect.transforms.Transformation
new file mode 100644
index 00000000000..cf9646be376
--- /dev/null
+++ 
b/connect/transforms/src/main/resources/META-INF/services/org.apache.kafka.connect.transforms.Transformation
@@ -0,0 +1,41 @@
+ # Licensed to the Apache Software Foundation (ASF) under one or more
+ # contributor license agreements. See the NOTICE file distributed with
+ # this work for additional information regarding copyright ownership.
+ # The ASF licenses this file to You under the Apache License, Version 2.0
+ # (the "License"); you may not use this file except in compliance with
+ # the License. You may obtain a copy of the License at
+ #
+ #    http://www.apache.org/licenses/LICENSE-2.0
+ #
+ # Unless required by applicable law or agreed to in writing, software
+ # distributed under the License is distributed on an "AS IS" BASIS,
+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ # See the License for the specific language governing permissions and
+ # limitations under the License.
+
+org.apache.kafka.connect.transforms.Cast$Key
+org.apache.kafka.connect.transforms.Cast$Value
+org.apache.kafka.connect.transforms.DropHeaders
+org.apache.kafka.connect.transforms.ExtractField$Key
+org.apache.kafka.connect.transforms.ExtractField$Value
+org.apache.kafka.connect.transforms.Filter
+org.apache.kafka.connect.transforms.Flatten$Key
+org.apache.kafka.connect.transforms.Flatten$Value
+org.apache.kafka.connect.transforms.HeaderFrom$Key
+org.apache.kafka.connect.transforms.HeaderFrom$Value
+org.apache.kafka.connect.transforms.HoistField$Key
+org.apache.kafka.connect.transforms.HoistField$Value
+org.apache.kafka.connect.transforms.InsertField$Key
+org.apache.kafka.connect.transforms.InsertField$Value
+org.apache.kafka.connect.transforms.InsertHeader
+org.apache.kafka.connect.transforms.MaskField$Key
+org.apache.kafka.connect.transforms.MaskField$Value
+org.apache.kafka.connect.transforms.RegexRouter
+org.apache.kafka.connect.transforms.ReplaceField$Key
+org.apache.kafka.connect.transforms.ReplaceField$Value
+org.apache.kafka.connect.transforms.SetSchemaMetadata$Key
+org.apache.kafka.connect.transforms.SetSchemaMetadata$Value
+org.apache.kafka.connect.transforms.TimestampConverter$Key
+org.apache.kafka.connect.transforms.TimestampConverter$Value
+org.apache.kafka.connect.transforms.TimestampRouter
+org.apache.kafka.connect.transforms.ValueToKey
diff --git 
a/connect/transforms/src/main/resources/META-INF/services/org.apache.kafka.connect.transforms.predicates.Predicate
 
b/connect/transforms/src/main/resources/META-INF/services/org.apache.kafka.connect.transforms.predicates.Predicate
new file mode 100644
index 00000000000..b451672377b
--- /dev/null
+++ 
b/connect/transforms/src/main/resources/META-INF/services/org.apache.kafka.connect.transforms.predicates.Predicate
@@ -0,0 +1,18 @@
+ # Licensed to the Apache Software Foundation (ASF) under one or more
+ # contributor license agreements. See the NOTICE file distributed with
+ # this work for additional information regarding copyright ownership.
+ # The ASF licenses this file to You under the Apache License, Version 2.0
+ # (the "License"); you may not use this file except in compliance with
+ # the License. You may obtain a copy of the License at
+ #
+ #    http://www.apache.org/licenses/LICENSE-2.0
+ #
+ # Unless required by applicable law or agreed to in writing, software
+ # distributed under the License is distributed on an "AS IS" BASIS,
+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ # See the License for the specific language governing permissions and
+ # limitations under the License.
+
+org.apache.kafka.connect.transforms.predicates.HasHeaderKey
+org.apache.kafka.connect.transforms.predicates.RecordIsTombstone
+org.apache.kafka.connect.transforms.predicates.TopicNameMatches
\ No newline at end of file

Reply via email to