This is an automated email from the ASF dual-hosted git repository. gharris pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new ff4fed5cbe0 KAFKA-15031: Add plugin.discovery to Connect worker configuration (KIP-898) (#14055) ff4fed5cbe0 is described below commit ff4fed5cbe01480f671cc5070fac97d16ca0a5ba Author: Greg Harris <greg.har...@aiven.io> AuthorDate: Tue Aug 8 10:06:35 2023 -0700 KAFKA-15031: Add plugin.discovery to Connect worker configuration (KIP-898) (#14055) Reviewers: Chris Egerton <chr...@aiven.io> --- .../org.apache.kafka.connect.storage.Converter | 16 +++ ...rg.apache.kafka.connect.storage.HeaderConverter | 17 +++ .../org.apache.kafka.connect.sink.SinkConnector | 16 +++ ...org.apache.kafka.connect.source.SourceConnector | 16 +++ .../org.apache.kafka.connect.sink.SinkConnector | 16 +++ ...org.apache.kafka.connect.source.SourceConnector | 16 +++ .../org.apache.kafka.connect.storage.Converter | 16 +++ ...rg.apache.kafka.connect.storage.HeaderConverter | 16 +++ ...org.apache.kafka.connect.source.SourceConnector | 18 ++++ .../apache/kafka/connect/runtime/WorkerConfig.java | 35 ++++++ .../runtime/isolation/PluginDiscoveryMode.java | 65 ++++++++++++ .../kafka/connect/runtime/isolation/Plugins.java | 62 ++++++++++- .../org.apache.kafka.connect.sink.SinkConnector | 17 +++ ...org.apache.kafka.connect.source.SourceConnector | 18 ++++ .../org.apache.kafka.connect.storage.Converter | 21 ++++ ...rg.apache.kafka.connect.storage.HeaderConverter | 21 ++++ .../connect/runtime/isolation/PluginsTest.java | 118 +++++++++++++++++++++ .../runtime/isolation/SynchronizationTest.java | 5 +- .../util/clusters/EmbeddedConnectCluster.java | 2 + .../org.apache.kafka.connect.sink.SinkConnector | 20 ++++ ...org.apache.kafka.connect.source.SourceConnector | 25 +++++ .../org.apache.kafka.connect.storage.Converter | 20 ++++ ...rg.apache.kafka.connect.storage.HeaderConverter | 20 ++++ ....apache.kafka.connect.transforms.Transformation | 23 ++++ ...e.kafka.connect.transforms.predicates.Predicate | 17 +++ ....apache.kafka.connect.transforms.Transformation | 41 +++++++ ...e.kafka.connect.transforms.predicates.Predicate | 18 ++++ 27 files changed, 688 insertions(+), 7 deletions(-) diff --git a/connect/api/src/main/resources/META-INF/services/org.apache.kafka.connect.storage.Converter b/connect/api/src/main/resources/META-INF/services/org.apache.kafka.connect.storage.Converter new file mode 100644 index 00000000000..78e322373bb --- /dev/null +++ b/connect/api/src/main/resources/META-INF/services/org.apache.kafka.connect.storage.Converter @@ -0,0 +1,16 @@ + # Licensed to the Apache Software Foundation (ASF) under one or more + # contributor license agreements. See the NOTICE file distributed with + # this work for additional information regarding copyright ownership. + # The ASF licenses this file to You under the Apache License, Version 2.0 + # (the "License"); you may not use this file except in compliance with + # the License. You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + +org.apache.kafka.connect.storage.StringConverter \ No newline at end of file diff --git a/connect/api/src/main/resources/META-INF/services/org.apache.kafka.connect.storage.HeaderConverter b/connect/api/src/main/resources/META-INF/services/org.apache.kafka.connect.storage.HeaderConverter new file mode 100644 index 00000000000..42e02d13aaf --- /dev/null +++ b/connect/api/src/main/resources/META-INF/services/org.apache.kafka.connect.storage.HeaderConverter @@ -0,0 +1,17 @@ + # Licensed to the Apache Software Foundation (ASF) under one or more + # contributor license agreements. See the NOTICE file distributed with + # this work for additional information regarding copyright ownership. + # The ASF licenses this file to You under the Apache License, Version 2.0 + # (the "License"); you may not use this file except in compliance with + # the License. You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + +org.apache.kafka.connect.storage.SimpleHeaderConverter +org.apache.kafka.connect.storage.StringConverter \ No newline at end of file diff --git a/connect/api/src/test/resources/META-INF/services/org.apache.kafka.connect.sink.SinkConnector b/connect/api/src/test/resources/META-INF/services/org.apache.kafka.connect.sink.SinkConnector new file mode 100644 index 00000000000..4a1f3a9baf2 --- /dev/null +++ b/connect/api/src/test/resources/META-INF/services/org.apache.kafka.connect.sink.SinkConnector @@ -0,0 +1,16 @@ + # Licensed to the Apache Software Foundation (ASF) under one or more + # contributor license agreements. See the NOTICE file distributed with + # this work for additional information regarding copyright ownership. + # The ASF licenses this file to You under the Apache License, Version 2.0 + # (the "License"); you may not use this file except in compliance with + # the License. You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + +org.apache.kafka.connect.sink.SinkConnectorTest$TestSinkConnector \ No newline at end of file diff --git a/connect/api/src/test/resources/META-INF/services/org.apache.kafka.connect.source.SourceConnector b/connect/api/src/test/resources/META-INF/services/org.apache.kafka.connect.source.SourceConnector new file mode 100644 index 00000000000..ab7f14d0aeb --- /dev/null +++ b/connect/api/src/test/resources/META-INF/services/org.apache.kafka.connect.source.SourceConnector @@ -0,0 +1,16 @@ + # Licensed to the Apache Software Foundation (ASF) under one or more + # contributor license agreements. See the NOTICE file distributed with + # this work for additional information regarding copyright ownership. + # The ASF licenses this file to You under the Apache License, Version 2.0 + # (the "License"); you may not use this file except in compliance with + # the License. You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + +org.apache.kafka.connect.source.SourceConnectorTest$TestSourceConnector \ No newline at end of file diff --git a/connect/file/src/main/resources/META-INF/services/org.apache.kafka.connect.sink.SinkConnector b/connect/file/src/main/resources/META-INF/services/org.apache.kafka.connect.sink.SinkConnector new file mode 100644 index 00000000000..4acecd76b5a --- /dev/null +++ b/connect/file/src/main/resources/META-INF/services/org.apache.kafka.connect.sink.SinkConnector @@ -0,0 +1,16 @@ + # Licensed to the Apache Software Foundation (ASF) under one or more + # contributor license agreements. See the NOTICE file distributed with + # this work for additional information regarding copyright ownership. + # The ASF licenses this file to You under the Apache License, Version 2.0 + # (the "License"); you may not use this file except in compliance with + # the License. You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + +org.apache.kafka.connect.file.FileStreamSinkConnector \ No newline at end of file diff --git a/connect/file/src/main/resources/META-INF/services/org.apache.kafka.connect.source.SourceConnector b/connect/file/src/main/resources/META-INF/services/org.apache.kafka.connect.source.SourceConnector new file mode 100644 index 00000000000..66a0c5d8588 --- /dev/null +++ b/connect/file/src/main/resources/META-INF/services/org.apache.kafka.connect.source.SourceConnector @@ -0,0 +1,16 @@ + # Licensed to the Apache Software Foundation (ASF) under one or more + # contributor license agreements. See the NOTICE file distributed with + # this work for additional information regarding copyright ownership. + # The ASF licenses this file to You under the Apache License, Version 2.0 + # (the "License"); you may not use this file except in compliance with + # the License. You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + +org.apache.kafka.connect.file.FileStreamSourceConnector \ No newline at end of file diff --git a/connect/json/src/main/resources/META-INF/services/org.apache.kafka.connect.storage.Converter b/connect/json/src/main/resources/META-INF/services/org.apache.kafka.connect.storage.Converter new file mode 100644 index 00000000000..0ea37b79c01 --- /dev/null +++ b/connect/json/src/main/resources/META-INF/services/org.apache.kafka.connect.storage.Converter @@ -0,0 +1,16 @@ + # Licensed to the Apache Software Foundation (ASF) under one or more + # contributor license agreements. See the NOTICE file distributed with + # this work for additional information regarding copyright ownership. + # The ASF licenses this file to You under the Apache License, Version 2.0 + # (the "License"); you may not use this file except in compliance with + # the License. You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + +org.apache.kafka.connect.json.JsonConverter \ No newline at end of file diff --git a/connect/json/src/main/resources/META-INF/services/org.apache.kafka.connect.storage.HeaderConverter b/connect/json/src/main/resources/META-INF/services/org.apache.kafka.connect.storage.HeaderConverter new file mode 100644 index 00000000000..0ea37b79c01 --- /dev/null +++ b/connect/json/src/main/resources/META-INF/services/org.apache.kafka.connect.storage.HeaderConverter @@ -0,0 +1,16 @@ + # Licensed to the Apache Software Foundation (ASF) under one or more + # contributor license agreements. See the NOTICE file distributed with + # this work for additional information regarding copyright ownership. + # The ASF licenses this file to You under the Apache License, Version 2.0 + # (the "License"); you may not use this file except in compliance with + # the License. You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + +org.apache.kafka.connect.json.JsonConverter \ No newline at end of file diff --git a/connect/mirror/src/main/resources/META-INF/services/org.apache.kafka.connect.source.SourceConnector b/connect/mirror/src/main/resources/META-INF/services/org.apache.kafka.connect.source.SourceConnector new file mode 100644 index 00000000000..4836e08f3e6 --- /dev/null +++ b/connect/mirror/src/main/resources/META-INF/services/org.apache.kafka.connect.source.SourceConnector @@ -0,0 +1,18 @@ + # Licensed to the Apache Software Foundation (ASF) under one or more + # contributor license agreements. See the NOTICE file distributed with + # this work for additional information regarding copyright ownership. + # The ASF licenses this file to You under the Apache License, Version 2.0 + # (the "License"); you may not use this file except in compliance with + # the License. You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + +org.apache.kafka.connect.mirror.MirrorCheckpointConnector +org.apache.kafka.connect.mirror.MirrorHeartbeatConnector +org.apache.kafka.connect.mirror.MirrorSourceConnector \ No newline at end of file diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java index 6ba7cea7406..c1fd69317bc 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java @@ -25,7 +25,9 @@ import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.runtime.isolation.PluginDiscoveryMode; import org.apache.kafka.connect.runtime.rest.RestServerConfig; import org.apache.kafka.connect.storage.SimpleHeaderConverter; import org.slf4j.Logger; @@ -35,6 +37,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.concurrent.ExecutionException; @@ -42,6 +45,10 @@ import java.util.concurrent.ExecutionException; import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; import static org.apache.kafka.common.config.ConfigDef.ValidString.in; import static org.apache.kafka.connect.runtime.SourceConnectorConfig.TOPIC_CREATION_PREFIX; +import static org.apache.kafka.connect.runtime.isolation.PluginDiscoveryMode.ONLY_SCAN; +import static org.apache.kafka.connect.runtime.isolation.PluginDiscoveryMode.HYBRID_WARN; +import static org.apache.kafka.connect.runtime.isolation.PluginDiscoveryMode.HYBRID_FAIL; +import static org.apache.kafka.connect.runtime.isolation.PluginDiscoveryMode.SERVICE_LOAD; /** * Common base class providing configuration for Kafka Connect workers, whether standalone or distributed. @@ -122,6 +129,18 @@ public class WorkerConfig extends AbstractConfig { + "by the worker's scanner before config providers are initialized and used to " + "replace variables."; + public static final String PLUGIN_DISCOVERY_CONFIG = "plugin.discovery"; + protected static final String PLUGIN_DISCOVERY_DOC = "Method to use to discover plugins present in the classpath " + + "and plugin.path configuration. This can be one of multiple values with the following meanings:\n" + + "* " + ONLY_SCAN + ": Discover plugins only by reflection. " + + "Plugins which are not discoverable by ServiceLoader will not impact worker startup.\n" + + "* " + HYBRID_WARN + ": Discover plugins reflectively and by ServiceLoader. " + + "Plugins which are not discoverable by ServiceLoader will print warnings during worker startup.\n" + + "* " + HYBRID_FAIL + ": Discover plugins reflectively and by ServiceLoader. " + + "Plugins which are not discoverable by ServiceLoader will cause worker startup to fail.\n" + + "* " + SERVICE_LOAD + ": Discover plugins only by ServiceLoader. Faster startup than other modes. " + + "Plugins which are not discoverable by ServiceLoader may not be usable."; + public static final String CONFIG_PROVIDERS_CONFIG = "config.providers"; protected static final String CONFIG_PROVIDERS_DOC = "Comma-separated names of <code>ConfigProvider</code> classes, loaded and used " @@ -199,6 +218,12 @@ public class WorkerConfig extends AbstractConfig { null, Importance.LOW, PLUGIN_PATH_DOC) + .define(PLUGIN_DISCOVERY_CONFIG, + Type.STRING, + PluginDiscoveryMode.HYBRID_WARN.toString(), + ConfigDef.CaseInsensitiveValidString.in(Utils.enumOptions(PluginDiscoveryMode.class)), + Importance.LOW, + PLUGIN_DISCOVERY_DOC) .define(METRICS_SAMPLE_WINDOW_MS_CONFIG, Type.LONG, 30000, atLeast(0), Importance.LOW, CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC) @@ -401,6 +426,16 @@ public class WorkerConfig extends AbstractConfig { return props.get(WorkerConfig.PLUGIN_PATH_CONFIG); } + public static PluginDiscoveryMode pluginDiscovery(Map<String, String> props) { + String value = props.getOrDefault(WorkerConfig.PLUGIN_DISCOVERY_CONFIG, PluginDiscoveryMode.HYBRID_WARN.toString()); + try { + return PluginDiscoveryMode.valueOf(value.toUpperCase(Locale.ROOT)); + } catch (IllegalArgumentException e) { + throw new ConnectException("Invalid " + PLUGIN_DISCOVERY_CONFIG + " value, must be one of " + + Arrays.toString(Utils.enumOptions(PluginDiscoveryMode.class))); + } + } + public WorkerConfig(ConfigDef definition, Map<String, String> props) { super(definition, props); logInternalConverterRemovalWarnings(props); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDiscoveryMode.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDiscoveryMode.java new file mode 100644 index 00000000000..bd390a929b4 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDiscoveryMode.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime.isolation; + +import java.util.Locale; + +/** + * Strategy to use to discover plugins usable on a Connect worker. + * @see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-898%3A+Modernize+Connect+plugin+discovery">KIP-898</a> + */ +public enum PluginDiscoveryMode { + + /** + * Scan for plugins reflectively. This corresponds to the legacy behavior of Connect prior to KIP-898. + * <p>Note: the following plugins are still loaded using {@link java.util.ServiceLoader} in this mode: + * <ul> + * <li>{@link org.apache.kafka.common.config.provider.ConfigProvider}</li> + * <li>{@link org.apache.kafka.connect.rest.ConnectRestExtension}</li> + * <li>{@link org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy}</li> + * </ul> + */ + ONLY_SCAN, + /** + * Scan for plugins reflectively and via {@link java.util.ServiceLoader}. + * Emit warnings if one or more plugins is not available via {@link java.util.ServiceLoader} + */ + HYBRID_WARN, + /** + * Scan for plugins reflectively and via {@link java.util.ServiceLoader}. + * Fail worker during startup if one or more plugins is not available via {@link java.util.ServiceLoader} + */ + HYBRID_FAIL, + /** + * Discover plugins via {@link java.util.ServiceLoader} only. + * Plugins may not be usable if they are not available via {@link java.util.ServiceLoader} + */ + SERVICE_LOAD; + + public boolean reflectivelyScan() { + return this != SERVICE_LOAD; + } + + public boolean serviceLoad() { + return this != ONLY_SCAN; + } + + @Override + public String toString() { + return name().toLowerCase(Locale.ROOT); + } +} diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java index 36e20270abc..72fe40a50a5 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java @@ -39,11 +39,15 @@ import org.slf4j.LoggerFactory; import java.nio.file.Path; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.SortedSet; import java.util.TreeSet; +import java.util.stream.Collectors; public class Plugins { @@ -63,16 +67,64 @@ public class Plugins { // VisibleForTesting Plugins(Map<String, String> props, ClassLoader parent, ClassLoaderFactory factory) { String pluginPath = WorkerConfig.pluginPath(props); + PluginDiscoveryMode discoveryMode = WorkerConfig.pluginDiscovery(props); Set<Path> pluginLocations = PluginUtils.pluginLocations(pluginPath); delegatingLoader = factory.newDelegatingClassLoader(parent); Set<PluginSource> pluginSources = PluginUtils.pluginSources(pluginLocations, delegatingLoader, factory); - scanResult = initLoaders(pluginSources); + scanResult = initLoaders(pluginSources, discoveryMode); } - private PluginScanResult initLoaders(Set<PluginSource> pluginSources) { - PluginScanResult reflectiveScanResult = new ReflectionScanner().discoverPlugins(pluginSources); - delegatingLoader.installDiscoveredPlugins(reflectiveScanResult); - return reflectiveScanResult; + public PluginScanResult initLoaders(Set<PluginSource> pluginSources, PluginDiscoveryMode discoveryMode) { + PluginScanResult empty = new PluginScanResult(Collections.emptyList()); + PluginScanResult serviceLoadingScanResult; + try { + serviceLoadingScanResult = discoveryMode.serviceLoad() ? + new ServiceLoaderScanner().discoverPlugins(pluginSources) : empty; + } catch (Throwable t) { + throw new ConnectException(String.format( + "Unable to perform ServiceLoader scanning as requested by %s=%s. It may be possible to fix this issue by reconfiguring %s=%s", + WorkerConfig.PLUGIN_DISCOVERY_CONFIG, discoveryMode, + WorkerConfig.PLUGIN_DISCOVERY_CONFIG, PluginDiscoveryMode.ONLY_SCAN), t); + } + PluginScanResult reflectiveScanResult = discoveryMode.reflectivelyScan() ? + new ReflectionScanner().discoverPlugins(pluginSources) : empty; + PluginScanResult scanResult = new PluginScanResult(Arrays.asList(reflectiveScanResult, serviceLoadingScanResult)); + maybeReportHybridDiscoveryIssue(discoveryMode, serviceLoadingScanResult, scanResult); + delegatingLoader.installDiscoveredPlugins(scanResult); + return scanResult; + } + + // visible for testing + static void maybeReportHybridDiscoveryIssue(PluginDiscoveryMode discoveryMode, PluginScanResult serviceLoadingScanResult, PluginScanResult mergedResult) { + SortedSet<PluginDesc<?>> missingPlugins = new TreeSet<>(); + mergedResult.forEach(missingPlugins::add); + serviceLoadingScanResult.forEach(missingPlugins::remove); + if (missingPlugins.isEmpty()) { + if (discoveryMode == PluginDiscoveryMode.HYBRID_WARN || discoveryMode == PluginDiscoveryMode.HYBRID_FAIL) { + log.warn("All plugins have ServiceLoader manifests, consider reconfiguring {}={}", + WorkerConfig.PLUGIN_DISCOVERY_CONFIG, PluginDiscoveryMode.SERVICE_LOAD); + } + } else { + String message = String.format( + "One or more plugins are missing ServiceLoader manifests may not be usable with %s=%s: %s%n" + + "Read the documentation at %s for instructions on migrating your plugins " + + "to take advantage of the performance improvements of %s mode.", + WorkerConfig.PLUGIN_DISCOVERY_CONFIG, + PluginDiscoveryMode.SERVICE_LOAD, + missingPlugins.stream() + .map(pluginDesc -> pluginDesc.location() + "\t" + pluginDesc.className() + "\t" + pluginDesc.type() + "\t" + pluginDesc.version()) + .collect(Collectors.joining("\n", "[\n", "\n]")), + "https://kafka.apache.org/documentation.html#connect_plugindiscovery", + PluginDiscoveryMode.SERVICE_LOAD + ); + if (discoveryMode == PluginDiscoveryMode.HYBRID_WARN) { + log.warn("{} To silence this warning, set {}={} in the worker config.", + message, WorkerConfig.PLUGIN_DISCOVERY_CONFIG, PluginDiscoveryMode.ONLY_SCAN); + } else if (discoveryMode == PluginDiscoveryMode.HYBRID_FAIL) { + throw new ConnectException(String.format("%s To silence this error, set %s=%s in the worker config.", + message, WorkerConfig.PLUGIN_DISCOVERY_CONFIG, PluginDiscoveryMode.HYBRID_WARN)); + } + } } private static <T> String pluginNames(Collection<PluginDesc<T>> plugins) { diff --git a/connect/runtime/src/main/resources/META-INF/services/org.apache.kafka.connect.sink.SinkConnector b/connect/runtime/src/main/resources/META-INF/services/org.apache.kafka.connect.sink.SinkConnector new file mode 100644 index 00000000000..170043754d5 --- /dev/null +++ b/connect/runtime/src/main/resources/META-INF/services/org.apache.kafka.connect.sink.SinkConnector @@ -0,0 +1,17 @@ + # Licensed to the Apache Software Foundation (ASF) under one or more + # contributor license agreements. See the NOTICE file distributed with + # this work for additional information regarding copyright ownership. + # The ASF licenses this file to You under the Apache License, Version 2.0 + # (the "License"); you may not use this file except in compliance with + # the License. You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + +org.apache.kafka.connect.tools.MockSinkConnector +org.apache.kafka.connect.tools.VerifiableSinkConnector diff --git a/connect/runtime/src/main/resources/META-INF/services/org.apache.kafka.connect.source.SourceConnector b/connect/runtime/src/main/resources/META-INF/services/org.apache.kafka.connect.source.SourceConnector new file mode 100644 index 00000000000..acc2ddce718 --- /dev/null +++ b/connect/runtime/src/main/resources/META-INF/services/org.apache.kafka.connect.source.SourceConnector @@ -0,0 +1,18 @@ + # Licensed to the Apache Software Foundation (ASF) under one or more + # contributor license agreements. See the NOTICE file distributed with + # this work for additional information regarding copyright ownership. + # The ASF licenses this file to You under the Apache License, Version 2.0 + # (the "License"); you may not use this file except in compliance with + # the License. You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + +org.apache.kafka.connect.tools.MockSourceConnector +org.apache.kafka.connect.tools.SchemaSourceConnector +org.apache.kafka.connect.tools.VerifiableSourceConnector \ No newline at end of file diff --git a/connect/runtime/src/main/resources/META-INF/services/org.apache.kafka.connect.storage.Converter b/connect/runtime/src/main/resources/META-INF/services/org.apache.kafka.connect.storage.Converter new file mode 100644 index 00000000000..134262474b9 --- /dev/null +++ b/connect/runtime/src/main/resources/META-INF/services/org.apache.kafka.connect.storage.Converter @@ -0,0 +1,21 @@ + # Licensed to the Apache Software Foundation (ASF) under one or more + # contributor license agreements. See the NOTICE file distributed with + # this work for additional information regarding copyright ownership. + # The ASF licenses this file to You under the Apache License, Version 2.0 + # (the "License"); you may not use this file except in compliance with + # the License. You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + +org.apache.kafka.connect.converters.ByteArrayConverter +org.apache.kafka.connect.converters.DoubleConverter +org.apache.kafka.connect.converters.FloatConverter +org.apache.kafka.connect.converters.IntegerConverter +org.apache.kafka.connect.converters.LongConverter +org.apache.kafka.connect.converters.ShortConverter diff --git a/connect/runtime/src/main/resources/META-INF/services/org.apache.kafka.connect.storage.HeaderConverter b/connect/runtime/src/main/resources/META-INF/services/org.apache.kafka.connect.storage.HeaderConverter new file mode 100644 index 00000000000..134262474b9 --- /dev/null +++ b/connect/runtime/src/main/resources/META-INF/services/org.apache.kafka.connect.storage.HeaderConverter @@ -0,0 +1,21 @@ + # Licensed to the Apache Software Foundation (ASF) under one or more + # contributor license agreements. See the NOTICE file distributed with + # this work for additional information regarding copyright ownership. + # The ASF licenses this file to You under the Apache License, Version 2.0 + # (the "License"); you may not use this file except in compliance with + # the License. You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + +org.apache.kafka.connect.converters.ByteArrayConverter +org.apache.kafka.connect.converters.DoubleConverter +org.apache.kafka.connect.converters.FloatConverter +org.apache.kafka.connect.converters.IntegerConverter +org.apache.kafka.connect.converters.LongConverter +org.apache.kafka.connect.converters.ShortConverter diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java index 75418b27a6f..189d75a842a 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java @@ -31,6 +31,7 @@ import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.provider.ConfigProvider; +import org.apache.kafka.common.utils.LogCaptureAppender; import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy; import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy; import org.apache.kafka.connect.converters.ByteArrayConverter; @@ -46,6 +47,7 @@ import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.runtime.isolation.Plugins.ClassLoaderUsage; import org.apache.kafka.connect.runtime.isolation.TestPlugins.TestPlugin; import org.apache.kafka.connect.runtime.rest.RestServerConfig; +import org.apache.kafka.connect.sink.SinkConnector; import org.apache.kafka.connect.storage.Converter; import org.apache.kafka.connect.storage.ConverterConfig; import org.apache.kafka.connect.storage.ConverterType; @@ -58,6 +60,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.SortedSet; import java.util.stream.Collectors; import static org.junit.Assert.assertEquals; @@ -76,6 +79,9 @@ public class PluginsTest { private TestConverter converter; private TestHeaderConverter headerConverter; private TestInternalConverter internalConverter; + private PluginScanResult nonEmpty; + private PluginScanResult empty; + private String missingPluginClass; @Before public void setup() { @@ -94,6 +100,22 @@ public class PluginsTest { props.put(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, TestHeaderConverter.class.getName()); props.put("header.converter.extra.config", "baz"); + // Set up some PluginScanResult instances to test the plugin discovery modes + SortedSet<PluginDesc<SinkConnector>> sinkConnectors = (SortedSet<PluginDesc<SinkConnector>>) plugins.sinkConnectors(); + missingPluginClass = sinkConnectors.first().className(); + nonEmpty = new PluginScanResult( + sinkConnectors, + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet() + ); + empty = new PluginScanResult(Collections.emptyList()); + createConfig(); } @@ -476,6 +498,102 @@ public class PluginsTest { } } + @Test + public void testOnlyScanNoPlugins() { + try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(Plugins.class)) { + Plugins.maybeReportHybridDiscoveryIssue(PluginDiscoveryMode.ONLY_SCAN, empty, empty); + assertTrue(logCaptureAppender.getEvents().stream().noneMatch(e -> e.getLevel().contains("ERROR") || e.getLevel().equals("WARN"))); + } + } + + @Test + public void testOnlyScanWithPlugins() { + try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(Plugins.class)) { + Plugins.maybeReportHybridDiscoveryIssue(PluginDiscoveryMode.ONLY_SCAN, empty, nonEmpty); + assertTrue(logCaptureAppender.getEvents().stream().noneMatch(e -> e.getLevel().contains("ERROR") || e.getLevel().equals("WARN"))); + } + } + + @Test + public void testHybridWarnNoPlugins() { + try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(Plugins.class)) { + Plugins.maybeReportHybridDiscoveryIssue(PluginDiscoveryMode.HYBRID_WARN, empty, empty); + assertTrue(logCaptureAppender.getEvents().stream().anyMatch(e -> + e.getLevel().equals("WARN") + // These log messages must contain the config name, it is referenced in the documentation. + && e.getMessage().contains(WorkerConfig.PLUGIN_DISCOVERY_CONFIG) + )); + } + } + + @Test + public void testHybridWarnWithPlugins() { + try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(Plugins.class)) { + Plugins.maybeReportHybridDiscoveryIssue(PluginDiscoveryMode.HYBRID_WARN, nonEmpty, nonEmpty); + assertTrue(logCaptureAppender.getEvents().stream().anyMatch(e -> + e.getLevel().equals("WARN") + && !e.getMessage().contains(missingPluginClass) + && e.getMessage().contains(WorkerConfig.PLUGIN_DISCOVERY_CONFIG) + )); + } + } + + @Test + public void testHybridWarnMissingPlugins() { + try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(Plugins.class)) { + Plugins.maybeReportHybridDiscoveryIssue(PluginDiscoveryMode.HYBRID_WARN, empty, nonEmpty); + assertTrue(logCaptureAppender.getEvents().stream().anyMatch(e -> + e.getLevel().equals("WARN") + && e.getMessage().contains(missingPluginClass) + && e.getMessage().contains(WorkerConfig.PLUGIN_DISCOVERY_CONFIG) + )); + } + } + + @Test + public void testHybridFailNoPlugins() { + try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(Plugins.class)) { + Plugins.maybeReportHybridDiscoveryIssue(PluginDiscoveryMode.HYBRID_FAIL, empty, empty); + assertTrue(logCaptureAppender.getEvents().stream().anyMatch(e -> + e.getLevel().equals("WARN") + && e.getMessage().contains(WorkerConfig.PLUGIN_DISCOVERY_CONFIG) + )); + } + } + + @Test + public void testHybridFailWithPlugins() { + try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(Plugins.class)) { + Plugins.maybeReportHybridDiscoveryIssue(PluginDiscoveryMode.HYBRID_FAIL, nonEmpty, nonEmpty); + assertTrue(logCaptureAppender.getEvents().stream().anyMatch(e -> + e.getLevel().equals("WARN") + && !e.getMessage().contains(missingPluginClass) + && e.getMessage().contains(WorkerConfig.PLUGIN_DISCOVERY_CONFIG) + )); + } + } + + @Test + public void testHybridFailMissingPlugins() { + assertThrows(ConnectException.class, () -> Plugins.maybeReportHybridDiscoveryIssue(PluginDiscoveryMode.HYBRID_FAIL, empty, nonEmpty)); + } + + @Test + public void testServiceLoadNoPlugins() { + try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(Plugins.class)) { + Plugins.maybeReportHybridDiscoveryIssue(PluginDiscoveryMode.SERVICE_LOAD, empty, empty); + assertTrue(logCaptureAppender.getEvents().stream().noneMatch(e -> e.getLevel().contains("ERROR") || e.getLevel().equals("WARN"))); + } + } + + @Test + public void testServiceLoadWithPlugins() { + try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(Plugins.class)) { + Plugins.maybeReportHybridDiscoveryIssue(PluginDiscoveryMode.SERVICE_LOAD, nonEmpty, nonEmpty); + assertTrue(logCaptureAppender.getEvents().stream().noneMatch(e -> e.getLevel().contains("ERROR") || e.getLevel().equals("WARN"))); + } + } + private void assertClassLoaderReadsVersionFromResource( TestPlugin parentResource, TestPlugin childResource, String className, String... expectedVersions) { URL[] systemPath = TestPlugins.pluginPath(parentResource) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java index c7b884c3605..16b4d3a0e9f 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java @@ -44,13 +44,13 @@ import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Type; -import org.apache.kafka.connect.json.JsonConverter; import org.apache.kafka.connect.runtime.WorkerConfig; import org.junit.After; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; +import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -239,7 +239,8 @@ public class SynchronizationTest { }; // THREAD 2: loads a class by delegating upward starting from the PluginClassLoader - String t2Class = JsonConverter.class.getName(); + // Use any non-plugin class that no plugins depend on, so that the class isn't loaded during plugin discovery + String t2Class = Mockito.class.getName(); // PluginClassLoader breakpoint will only trigger on this thread pclBreakpoint.set(t2Class::equals); Runnable thread2 = () -> { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java index 36e0fc765a0..4f0c4369f89 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java @@ -57,6 +57,7 @@ import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG; import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG; import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG; import static org.apache.kafka.connect.runtime.WorkerConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.connect.runtime.WorkerConfig.PLUGIN_DISCOVERY_CONFIG; import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG; import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.CONFIG_TOPIC_CONFIG; import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG; @@ -276,6 +277,7 @@ public class EmbeddedConnectCluster { putIfAbsent(workerProps, STATUS_STORAGE_REPLICATION_FACTOR_CONFIG, internalTopicsReplFactor); putIfAbsent(workerProps, KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter"); putIfAbsent(workerProps, VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter"); + putIfAbsent(workerProps, PLUGIN_DISCOVERY_CONFIG, "hybrid_fail"); for (int i = 0; i < numInitialWorkers; i++) { addWorker(); diff --git a/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.sink.SinkConnector b/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.sink.SinkConnector new file mode 100644 index 00000000000..4c26fece184 --- /dev/null +++ b/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.sink.SinkConnector @@ -0,0 +1,20 @@ + # Licensed to the Apache Software Foundation (ASF) under one or more + # contributor license agreements. See the NOTICE file distributed with + # this work for additional information regarding copyright ownership. + # The ASF licenses this file to You under the Apache License, Version 2.0 + # (the "License"); you may not use this file except in compliance with + # the License. You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + +org.apache.kafka.connect.integration.BlockingConnectorTest$BlockingSinkConnector +org.apache.kafka.connect.integration.BlockingConnectorTest$TaskInitializeBlockingSinkConnector +org.apache.kafka.connect.integration.ErrantRecordSinkConnector +org.apache.kafka.connect.integration.MonitorableSinkConnector +org.apache.kafka.connect.runtime.SampleSinkConnector \ No newline at end of file diff --git a/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.source.SourceConnector b/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.source.SourceConnector new file mode 100644 index 00000000000..73033ca23c0 --- /dev/null +++ b/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.source.SourceConnector @@ -0,0 +1,25 @@ + # Licensed to the Apache Software Foundation (ASF) under one or more + # contributor license agreements. See the NOTICE file distributed with + # this work for additional information regarding copyright ownership. + # The ASF licenses this file to You under the Apache License, Version 2.0 + # (the "License"); you may not use this file except in compliance with + # the License. You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + +org.apache.kafka.connect.integration.BlockingConnectorTest$BlockingConnector +org.apache.kafka.connect.integration.BlockingConnectorTest$InitializeBlockingConnector +org.apache.kafka.connect.integration.BlockingConnectorTest$ConfigBlockingConnector +org.apache.kafka.connect.integration.BlockingConnectorTest$ValidateBlockingConnector +org.apache.kafka.connect.integration.BlockingConnectorTest$BlockingSourceConnector +org.apache.kafka.connect.integration.BlockingConnectorTest$TaskInitializeBlockingSourceConnector +org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest$NaughtyConnector +org.apache.kafka.connect.integration.MonitorableSourceConnector +org.apache.kafka.connect.runtime.SampleSourceConnector +org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResourceTest$ConnectorPluginsResourceTestConnector diff --git a/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.storage.Converter b/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.storage.Converter new file mode 100644 index 00000000000..6d38aebee3d --- /dev/null +++ b/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.storage.Converter @@ -0,0 +1,20 @@ + # Licensed to the Apache Software Foundation (ASF) under one or more + # contributor license agreements. See the NOTICE file distributed with + # this work for additional information regarding copyright ownership. + # The ASF licenses this file to You under the Apache License, Version 2.0 + # (the "License"); you may not use this file except in compliance with + # the License. You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + +org.apache.kafka.connect.runtime.SampleConverterWithHeaders +org.apache.kafka.connect.runtime.ErrorHandlingTaskTest$FaultyConverter +org.apache.kafka.connect.runtime.isolation.PluginsTest$TestConverter +org.apache.kafka.connect.runtime.isolation.PluginsTest$TestInternalConverter +org.apache.kafka.connect.runtime.isolation.PluginUtilsTest$CollidingConverter \ No newline at end of file diff --git a/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.storage.HeaderConverter b/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.storage.HeaderConverter new file mode 100644 index 00000000000..a5b008543b1 --- /dev/null +++ b/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.storage.HeaderConverter @@ -0,0 +1,20 @@ + # Licensed to the Apache Software Foundation (ASF) under one or more + # contributor license agreements. See the NOTICE file distributed with + # this work for additional information regarding copyright ownership. + # The ASF licenses this file to You under the Apache License, Version 2.0 + # (the "License"); you may not use this file except in compliance with + # the License. You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + +org.apache.kafka.connect.runtime.SampleHeaderConverter +org.apache.kafka.connect.runtime.ErrorHandlingTaskTest$FaultyConverter +org.apache.kafka.connect.runtime.isolation.PluginsTest$TestHeaderConverter +org.apache.kafka.connect.runtime.isolation.PluginsTest$TestInternalConverter +org.apache.kafka.connect.runtime.isolation.PluginUtilsTest$CollidingHeaderConverter \ No newline at end of file diff --git a/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.transforms.Transformation b/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.transforms.Transformation new file mode 100644 index 00000000000..6d36ee90888 --- /dev/null +++ b/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.transforms.Transformation @@ -0,0 +1,23 @@ + # Licensed to the Apache Software Foundation (ASF) under one or more + # contributor license agreements. See the NOTICE file distributed with + # this work for additional information regarding copyright ownership. + # The ASF licenses this file to You under the Apache License, Version 2.0 + # (the "License"); you may not use this file except in compliance with + # the License. You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + +org.apache.kafka.connect.integration.ErrorHandlingIntegrationTest$FaultyPassthrough +org.apache.kafka.connect.runtime.ErrorHandlingTaskTest$FaultyPassthrough +org.apache.kafka.connect.runtime.ConnectorConfigTest$SimpleTransformation +org.apache.kafka.connect.runtime.ConnectorConfigTest$HasDuplicateConfigTransformation +org.apache.kafka.connect.runtime.ConnectorConfigTest$AbstractKeyValueTransformation$Key +org.apache.kafka.connect.runtime.ConnectorConfigTest$AbstractKeyValueTransformation$Value +org.apache.kafka.connect.runtime.SampleTransformation +org.apache.kafka.connect.runtime.isolation.PluginUtilsTest$Colliding diff --git a/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.transforms.predicates.Predicate b/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.transforms.predicates.Predicate new file mode 100644 index 00000000000..b235b1fec51 --- /dev/null +++ b/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.transforms.predicates.Predicate @@ -0,0 +1,17 @@ + # Licensed to the Apache Software Foundation (ASF) under one or more + # contributor license agreements. See the NOTICE file distributed with + # this work for additional information regarding copyright ownership. + # The ASF licenses this file to You under the Apache License, Version 2.0 + # (the "License"); you may not use this file except in compliance with + # the License. You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + +org.apache.kafka.connect.runtime.ConnectorConfigTest$TestPredicate +org.apache.kafka.connect.runtime.SamplePredicate \ No newline at end of file diff --git a/connect/transforms/src/main/resources/META-INF/services/org.apache.kafka.connect.transforms.Transformation b/connect/transforms/src/main/resources/META-INF/services/org.apache.kafka.connect.transforms.Transformation new file mode 100644 index 00000000000..cf9646be376 --- /dev/null +++ b/connect/transforms/src/main/resources/META-INF/services/org.apache.kafka.connect.transforms.Transformation @@ -0,0 +1,41 @@ + # Licensed to the Apache Software Foundation (ASF) under one or more + # contributor license agreements. See the NOTICE file distributed with + # this work for additional information regarding copyright ownership. + # The ASF licenses this file to You under the Apache License, Version 2.0 + # (the "License"); you may not use this file except in compliance with + # the License. You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + +org.apache.kafka.connect.transforms.Cast$Key +org.apache.kafka.connect.transforms.Cast$Value +org.apache.kafka.connect.transforms.DropHeaders +org.apache.kafka.connect.transforms.ExtractField$Key +org.apache.kafka.connect.transforms.ExtractField$Value +org.apache.kafka.connect.transforms.Filter +org.apache.kafka.connect.transforms.Flatten$Key +org.apache.kafka.connect.transforms.Flatten$Value +org.apache.kafka.connect.transforms.HeaderFrom$Key +org.apache.kafka.connect.transforms.HeaderFrom$Value +org.apache.kafka.connect.transforms.HoistField$Key +org.apache.kafka.connect.transforms.HoistField$Value +org.apache.kafka.connect.transforms.InsertField$Key +org.apache.kafka.connect.transforms.InsertField$Value +org.apache.kafka.connect.transforms.InsertHeader +org.apache.kafka.connect.transforms.MaskField$Key +org.apache.kafka.connect.transforms.MaskField$Value +org.apache.kafka.connect.transforms.RegexRouter +org.apache.kafka.connect.transforms.ReplaceField$Key +org.apache.kafka.connect.transforms.ReplaceField$Value +org.apache.kafka.connect.transforms.SetSchemaMetadata$Key +org.apache.kafka.connect.transforms.SetSchemaMetadata$Value +org.apache.kafka.connect.transforms.TimestampConverter$Key +org.apache.kafka.connect.transforms.TimestampConverter$Value +org.apache.kafka.connect.transforms.TimestampRouter +org.apache.kafka.connect.transforms.ValueToKey diff --git a/connect/transforms/src/main/resources/META-INF/services/org.apache.kafka.connect.transforms.predicates.Predicate b/connect/transforms/src/main/resources/META-INF/services/org.apache.kafka.connect.transforms.predicates.Predicate new file mode 100644 index 00000000000..b451672377b --- /dev/null +++ b/connect/transforms/src/main/resources/META-INF/services/org.apache.kafka.connect.transforms.predicates.Predicate @@ -0,0 +1,18 @@ + # Licensed to the Apache Software Foundation (ASF) under one or more + # contributor license agreements. See the NOTICE file distributed with + # this work for additional information regarding copyright ownership. + # The ASF licenses this file to You under the Apache License, Version 2.0 + # (the "License"); you may not use this file except in compliance with + # the License. You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + +org.apache.kafka.connect.transforms.predicates.HasHeaderKey +org.apache.kafka.connect.transforms.predicates.RecordIsTombstone +org.apache.kafka.connect.transforms.predicates.TopicNameMatches \ No newline at end of file