KAFKA-3487: Support classloading isolation in Connect (KIP-146) Author: Konstantine Karantasis <[email protected]>
Reviewers: Randall Hauch <[email protected]>, Ewen Cheslack-Postava <[email protected]> Closes #3028 from kkonstantine/KAFKA-3487-Support-classloading-isolation-in-Connect Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/45f22617 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/45f22617 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/45f22617 Branch: refs/heads/trunk Commit: 45f2261763eac5caaebf860daab32ef5337c9293 Parents: 5aaaba7 Author: Konstantine Karantasis <[email protected]> Authored: Thu May 18 10:39:15 2017 -0700 Committer: Ewen Cheslack-Postava <[email protected]> Committed: Thu May 18 10:39:15 2017 -0700 ---------------------------------------------------------------------- build.gradle | 1 + checkstyle/import-control.xml | 5 + checkstyle/suppressions.xml | 2 +- config/connect-distributed.properties | 10 + config/connect-standalone.properties | 11 + .../kafka/connect/cli/ConnectDistributed.java | 7 +- .../kafka/connect/cli/ConnectStandalone.java | 7 +- .../kafka/connect/runtime/AbstractHerder.java | 91 +++--- .../kafka/connect/runtime/ConnectorConfig.java | 67 ++++- .../apache/kafka/connect/runtime/Herder.java | 9 +- .../kafka/connect/runtime/PluginDiscovery.java | 126 -------- .../connect/runtime/SinkConnectorConfig.java | 10 +- .../connect/runtime/SourceConnectorConfig.java | 5 +- .../apache/kafka/connect/runtime/Worker.java | 173 ++++++++--- .../kafka/connect/runtime/WorkerConfig.java | 31 +- .../kafka/connect/runtime/WorkerSinkTask.java | 3 +- .../kafka/connect/runtime/WorkerSourceTask.java | 3 +- .../kafka/connect/runtime/WorkerTask.java | 12 +- .../runtime/distributed/DistributedHerder.java | 4 +- .../isolation/DelegatingClassLoader.java | 299 +++++++++++++++++++ .../runtime/isolation/PluginClassLoader.java | 68 +++++ .../connect/runtime/isolation/PluginDesc.java | 110 +++++++ .../runtime/isolation/PluginScanResult.java | 55 ++++ .../connect/runtime/isolation/PluginType.java | 58 ++++ .../connect/runtime/isolation/PluginUtils.java | 147 +++++++++ .../connect/runtime/isolation/Plugins.java | 217 ++++++++++++++ .../rest/entities/ConnectorPluginInfo.java | 37 +-- .../resources/ConnectorPluginsResource.java | 40 ++- .../runtime/standalone/StandaloneHerder.java | 4 +- .../connect/runtime/ConnectorConfigTest.java | 27 +- .../connect/runtime/WorkerConnectorTest.java | 30 +- .../connect/runtime/WorkerSinkTaskTest.java | 8 +- .../runtime/WorkerSinkTaskThreadedTest.java | 7 +- .../connect/runtime/WorkerSourceTaskTest.java | 5 +- .../kafka/connect/runtime/WorkerTaskTest.java | 30 +- .../kafka/connect/runtime/WorkerTest.java | 224 ++++++++++++-- .../distributed/DistributedHerderTest.java | 67 +++-- .../runtime/isolation/PluginUtilsTest.java | 127 ++++++++ .../resources/ConnectorPluginsResourceTest.java | 128 ++++++-- .../standalone/StandaloneHerderTest.java | 72 +++-- gradle/dependencies.gradle | 4 +- 41 files changed, 1937 insertions(+), 404 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index 8d6e703..1693723 100644 --- a/build.gradle +++ b/build.gradle @@ -1056,6 +1056,7 @@ project(':connect:runtime') { compile libs.jettyServlet compile libs.jettyServlets compile(libs.reflections) + compile(libs.mavenArtifact) testCompile project(':clients').sourceSets.test.output testCompile libs.easymock http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/checkstyle/import-control.xml ---------------------------------------------------------------------- diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 21d9d3c..7f51979 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -254,6 +254,11 @@ <allow pkg="org.glassfish.jersey" /> <allow pkg="com.fasterxml.jackson" /> </subpackage> + + <subpackage name="isolation"> + <allow pkg="com.fasterxml.jackson" /> + <allow pkg="org.apache.maven.artifact.versioning" /> + </subpackage> </subpackage> <subpackage name="cli"> http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/checkstyle/suppressions.xml ---------------------------------------------------------------------- diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index dc00bee..3b865bc 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -66,7 +66,7 @@ <!-- Connect --> <suppress checks="ClassFanOutComplexity" - files="DistributedHerder.java"/> + files="DistributedHerder(|Test).java"/> <suppress checks="MethodLength" files="(KafkaConfigBackingStore|RequestResponseTest).java"/> http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/config/connect-distributed.properties ---------------------------------------------------------------------- diff --git a/config/connect-distributed.properties b/config/connect-distributed.properties index b0092bb..752e1f5 100644 --- a/config/connect-distributed.properties +++ b/config/connect-distributed.properties @@ -58,3 +58,13 @@ offset.flush.interval.ms=10000 # The Hostname & Port that will be given out to other workers to connect to i.e. URLs that are routable from other servers. #rest.advertised.host.name= #rest.advertised.port= + +# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins +# (connectors, converters, transformations). The list should consist of top level directories that include +# any combination of: +# a) directories immediately containing jars with plugins and their dependencies +# b) uber-jars with plugins and their dependencies +# c) directories immediately containing the package directory structure of classes of plugins and their dependencies +# Examples: +# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors, +#plugin.path= http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/config/connect-standalone.properties ---------------------------------------------------------------------- diff --git a/config/connect-standalone.properties b/config/connect-standalone.properties index 8760590..0039796 100644 --- a/config/connect-standalone.properties +++ b/config/connect-standalone.properties @@ -35,3 +35,14 @@ internal.value.converter.schemas.enable=false offset.storage.file.filename=/tmp/connect.offsets # Flush much faster than normal, which is useful for testing/debugging offset.flush.interval.ms=10000 + +# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins +# (connectors, converters, transformations). The list should consist of top level directories that include +# any combination of: +# a) directories immediately containing jars with plugins and their dependencies +# b) uber-jars with plugins and their dependencies +# c) directories immediately containing the package directory structure of classes of plugins and their dependencies +# Note: symlinks will be followed to discover dependencies or plugins. +# Examples: +# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors, +#plugin.path= http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java index fb3d693..717ccd9 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java @@ -20,10 +20,10 @@ import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.runtime.Connect; -import org.apache.kafka.connect.runtime.ConnectorFactory; import org.apache.kafka.connect.runtime.Worker; import org.apache.kafka.connect.runtime.distributed.DistributedConfig; import org.apache.kafka.connect.runtime.distributed.DistributedHerder; +import org.apache.kafka.connect.runtime.isolation.Plugins; import org.apache.kafka.connect.runtime.rest.RestServer; import org.apache.kafka.connect.storage.ConfigBackingStore; import org.apache.kafka.connect.storage.KafkaConfigBackingStore; @@ -60,7 +60,8 @@ public class ConnectDistributed { Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.<String, String>emptyMap(); Time time = Time.SYSTEM; - ConnectorFactory connectorFactory = new ConnectorFactory(); + Plugins plugins = new Plugins(workerProps); + plugins.compareAndSwapWithDelegatingLoader(); DistributedConfig config = new DistributedConfig(workerProps); RestServer rest = new RestServer(config); @@ -70,7 +71,7 @@ public class ConnectDistributed { KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(); offsetBackingStore.configure(config); - Worker worker = new Worker(workerId, time, connectorFactory, config, offsetBackingStore); + Worker worker = new Worker(workerId, time, plugins, config, offsetBackingStore); StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, worker.getInternalValueConverter()); statusBackingStore.configure(config); http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java index 0465048..c6d0e59 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java @@ -21,9 +21,9 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.runtime.Connect; import org.apache.kafka.connect.runtime.ConnectorConfig; -import org.apache.kafka.connect.runtime.ConnectorFactory; import org.apache.kafka.connect.runtime.Herder; import org.apache.kafka.connect.runtime.Worker; +import org.apache.kafka.connect.runtime.isolation.Plugins; import org.apache.kafka.connect.runtime.rest.RestServer; import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; @@ -65,14 +65,15 @@ public class ConnectStandalone { Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.<String, String>emptyMap(); Time time = Time.SYSTEM; - ConnectorFactory connectorFactory = new ConnectorFactory(); + Plugins plugins = new Plugins(workerProps); + plugins.compareAndSwapWithDelegatingLoader(); StandaloneConfig config = new StandaloneConfig(workerProps); RestServer rest = new RestServer(config); URI advertisedUrl = rest.advertisedUrl(); String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort(); - Worker worker = new Worker(workerId, time, connectorFactory, config, new FileOffsetBackingStore()); + Worker worker = new Worker(workerId, time, plugins, config, new FileOffsetBackingStore()); Herder herder = new StandaloneHerder(worker); final Connect connect = new Connect(herder, rest); http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java index fb286e2..6293b01 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java @@ -20,9 +20,11 @@ import org.apache.kafka.common.config.Config; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.ConfigKey; import org.apache.kafka.common.config.ConfigDef.Type; +import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.ConfigValue; import org.apache.kafka.connect.connector.Connector; import org.apache.kafka.connect.errors.NotFoundException; +import org.apache.kafka.connect.runtime.isolation.Plugins; import org.apache.kafka.connect.runtime.rest.entities.ConfigInfo; import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos; import org.apache.kafka.connect.runtime.rest.entities.ConfigKeyInfo; @@ -78,7 +80,6 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con protected final ConfigBackingStore configBackingStore; private Map<String, Connector> tempConnectors = new ConcurrentHashMap<>(); - private Thread classPathTraverser; public AbstractHerder(Worker worker, String workerId, @@ -96,20 +97,12 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con this.worker.start(); this.statusBackingStore.start(); this.configBackingStore.start(); - traverseClassPath(); } protected void stopServices() { this.statusBackingStore.stop(); this.configBackingStore.stop(); this.worker.stop(); - if (this.classPathTraverser != null) { - try { - this.classPathTraverser.join(); - } catch (InterruptedException e) { - // ignore as it can only happen during shutdown - } - } } @Override @@ -189,6 +182,11 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con } @Override + public Plugins plugins() { + return worker.getPlugins(); + } + + @Override public ConnectorStateInfo connectorStatus(String connName) { ConnectorStatus connector = statusBackingStore.get(connName); if (connector == null) @@ -233,32 +231,53 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con if (connType == null) throw new BadRequestException("Connector config " + connectorConfig + " contains no connector type"); - Connector connector = getConnector(connType); - - final ConfigDef connectorConfigDef = ConnectorConfig.enrich( - (connector instanceof SourceConnector) ? SourceConnectorConfig.configDef() : SinkConnectorConfig.configDef(), - connectorConfig, - false - ); - List<ConfigValue> configValues = new ArrayList<>(); Map<String, ConfigKey> configKeys = new HashMap<>(); List<String> allGroups = new ArrayList<>(); - // do basic connector validation (name, connector type, etc.) - Map<String, ConfigValue> validatedConnectorConfig = validateBasicConnectorConfig(connector, connectorConfigDef, connectorConfig); - configValues.addAll(validatedConnectorConfig.values()); - configKeys.putAll(connectorConfigDef.configKeys()); - allGroups.addAll(connectorConfigDef.groups()); - - // do custom connector-specific validation - Config config = connector.validate(connectorConfig); - ConfigDef configDef = connector.config(); - configKeys.putAll(configDef.configKeys()); - allGroups.addAll(configDef.groups()); - configValues.addAll(config.configValues()); + Connector connector = getConnector(connType); + ClassLoader savedLoader = worker.getPlugins().compareAndSwapLoaders(connector); + try { + // do basic connector validation (name, connector type, etc.) + ConfigDef basicConfigDef = (connector instanceof SourceConnector) + ? SourceConnectorConfig.configDef() + : SinkConnectorConfig.configDef(); + Map<String, ConfigValue> validatedConnectorConfig = validateBasicConnectorConfig( + connector, + basicConfigDef, + connectorConfig + ); + configValues.addAll(validatedConnectorConfig.values()); + configKeys.putAll(basicConfigDef.configKeys()); + allGroups.addAll(basicConfigDef.groups()); + + ConnectorConfig connectorConfigToEnrich = (connector instanceof SourceConnector) + ? new SourceConnectorConfig(plugins(), connectorConfig) + : new SinkConnectorConfig(plugins(), connectorConfig); + final ConfigDef connectorConfigDef = connectorConfigToEnrich.enrich( + plugins(), + basicConfigDef, + connectorConfig, + false + ); - return generateResult(connType, configKeys, configValues, allGroups); + // Override is required here after the enriched ConfigDef has been created successfully + configKeys.putAll(connectorConfigDef.configKeys()); + allGroups.addAll(connectorConfigDef.groups()); + + // do custom connector-specific validation + Config config = connector.validate(connectorConfig); + ConfigDef configDef = connector.config(); + configKeys.putAll(configDef.configKeys()); + allGroups.addAll(configDef.groups()); + configValues.addAll(config.configValues()); + return generateResult(connType, configKeys, configValues, allGroups); + } catch (ConfigException e) { + // Basic validation must have failed. Return the result. + return generateResult(connType, configKeys, configValues, allGroups); + } finally { + Plugins.compareAndSwapLoaders(savedLoader); + } } // public for testing @@ -334,7 +353,7 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con if (tempConnectors.containsKey(connType)) { return tempConnectors.get(connType); } else { - Connector connector = worker.getConnectorFactory().newConnector(connType); + Connector connector = worker.getPlugins().newConnector(connType); tempConnectors.put(connType, connector); return connector; } @@ -383,14 +402,4 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con return null; } } - - private void traverseClassPath() { - classPathTraverser = new Thread(new Runnable() { - @Override - public void run() { - PluginDiscovery.scanClasspathForPlugins(); - } - }, "CLASSPATH traversal thread."); - classPathTraverser.start(); - } } http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java index 74aef62..869cfbd 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java @@ -24,6 +24,8 @@ import org.apache.kafka.common.config.ConfigDef.Width; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.runtime.isolation.PluginDesc; +import org.apache.kafka.connect.runtime.isolation.Plugins; import org.apache.kafka.connect.transforms.Transformation; import java.util.ArrayList; @@ -81,6 +83,17 @@ public class ConnectorConfig extends AbstractConfig { private static final String TRANSFORMS_DOC = "Aliases for the transformations to be applied to records."; private static final String TRANSFORMS_DISPLAY = "Transforms"; + private final EnrichedConnectorConfig enrichedConfig; + private static class EnrichedConnectorConfig extends AbstractConfig { + EnrichedConnectorConfig(ConfigDef configDef, Map<String, String> props) { + super(configDef, props); + } + + public Object get(String key) { + return super.get(key); + } + } + public static ConfigDef configDef() { return new ConfigDef() .define(NAME_CONFIG, Type.STRING, Importance.HIGH, NAME_DOC, COMMON_GROUP, 1, Width.MEDIUM, NAME_DISPLAY) @@ -100,16 +113,25 @@ public class ConnectorConfig extends AbstractConfig { }, Importance.LOW, TRANSFORMS_DOC, TRANSFORMS_GROUP, 6, Width.LONG, TRANSFORMS_DISPLAY); } - public ConnectorConfig() { - this(new HashMap<String, String>()); + public ConnectorConfig(Plugins plugins) { + this(plugins, new HashMap<String, String>()); + } + + public ConnectorConfig(Plugins plugins, Map<String, String> props) { + this(plugins, configDef(), props); } - public ConnectorConfig(Map<String, String> props) { - this(configDef(), props); + public ConnectorConfig(Plugins plugins, ConfigDef configDef, Map<String, String> props) { + super(configDef, props); + enrichedConfig = new EnrichedConnectorConfig( + enrich(plugins, configDef, props, true), + props + ); } - public ConnectorConfig(ConfigDef configDef, Map<String, String> props) { - super(enrich(configDef, props, true), props); + @Override + public Object get(String key) { + return enrichedConfig.get(key); } /** @@ -142,15 +164,20 @@ public class ConnectorConfig extends AbstractConfig { * <p> * {@code requireFullConfig} specifies whether required config values that are missing should cause an exception to be thrown. */ - public static ConfigDef enrich(ConfigDef baseConfigDef, Map<String, String> props, boolean requireFullConfig) { - final List<String> transformAliases = (List<String>) ConfigDef.parseType(TRANSFORMS_CONFIG, props.get(TRANSFORMS_CONFIG), Type.LIST); - if (transformAliases == null || transformAliases.isEmpty()) { + public ConfigDef enrich(Plugins plugins, ConfigDef baseConfigDef, Map<String, String> props, boolean requireFullConfig) { + Object transformAliases = ConfigDef.parseType(TRANSFORMS_CONFIG, props.get(TRANSFORMS_CONFIG), Type.LIST); + if (!(transformAliases instanceof List)) { return baseConfigDef; } - final ConfigDef newDef = new ConfigDef(baseConfigDef); - - for (String alias : new LinkedHashSet<>(transformAliases)) { + ConfigDef newDef = new ConfigDef(baseConfigDef); + LinkedHashSet<?> uniqueTransformAliases = new LinkedHashSet<>((List<?>) transformAliases); + for (Object o : uniqueTransformAliases) { + if (!(o instanceof String)) { + throw new ConfigException("Item in " + TRANSFORMS_CONFIG + " property is not of " + + "type String"); + } + String alias = (String) o; final String prefix = TRANSFORMS_CONFIG + "." + alias + "."; final String group = TRANSFORMS_GROUP + ": " + alias; int orderInGroup = 0; @@ -164,7 +191,7 @@ public class ConnectorConfig extends AbstractConfig { }; newDef.define(transformationTypeConfig, Type.CLASS, ConfigDef.NO_DEFAULT_VALUE, typeValidator, Importance.HIGH, "Class for the '" + alias + "' transformation.", group, orderInGroup++, Width.LONG, "Transformation type for " + alias, - Collections.<String>emptyList(), new TransformationClassRecommender()); + Collections.<String>emptyList(), new TransformationClassRecommender(plugins)); final ConfigDef transformationConfigDef; try { @@ -204,9 +231,19 @@ public class ConnectorConfig extends AbstractConfig { * Recommend bundled transformations. */ static final class TransformationClassRecommender implements ConfigDef.Recommender { + private final Plugins plugins; + + TransformationClassRecommender(Plugins plugins) { + this.plugins = plugins; + } + @Override public List<Object> validValues(String name, Map<String, Object> parsedConfig) { - return (List) PluginDiscovery.transformationPlugins(); + List<Object> transformationPlugins = new ArrayList<>(); + for (PluginDesc<Transformation> plugin : plugins.transformations()) { + transformationPlugins.add(plugin.pluginClass()); + } + return Collections.unmodifiableList(transformationPlugins); } @Override @@ -215,4 +252,4 @@ public class ConnectorConfig extends AbstractConfig { } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java index 93fc6f0..5dfb808 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.connect.runtime; +import org.apache.kafka.connect.runtime.isolation.Plugins; import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos; import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; @@ -168,6 +169,12 @@ public interface Herder { */ void resumeConnector(String connector); + /** + * Returns a handle to the plugin factory used by this herder and its worker. + * + * @return a reference to the plugin factory. + */ + Plugins plugins(); class Created<T> { private final boolean created; @@ -200,4 +207,4 @@ public interface Herder { return Objects.hash(created, result); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/PluginDiscovery.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/PluginDiscovery.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/PluginDiscovery.java deleted file mode 100644 index 482139a..0000000 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/PluginDiscovery.java +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.connect.runtime; - -import org.apache.kafka.connect.connector.Connector; -import org.apache.kafka.connect.runtime.rest.entities.ConnectorPluginInfo; -import org.apache.kafka.connect.tools.MockConnector; -import org.apache.kafka.connect.tools.MockSinkConnector; -import org.apache.kafka.connect.tools.MockSourceConnector; -import org.apache.kafka.connect.tools.SchemaSourceConnector; -import org.apache.kafka.connect.tools.VerifiableSinkConnector; -import org.apache.kafka.connect.tools.VerifiableSourceConnector; -import org.apache.kafka.connect.transforms.Transformation; -import org.apache.kafka.connect.util.ReflectionsUtil; -import org.reflections.Reflections; -import org.reflections.util.ClasspathHelper; -import org.reflections.util.ConfigurationBuilder; - -import java.lang.reflect.Modifier; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Comparator; -import java.util.List; -import java.util.Set; - -public class PluginDiscovery { - - private static final List<Class<? extends Connector>> CONNECTOR_EXCLUDES = Arrays.asList( - VerifiableSourceConnector.class, VerifiableSinkConnector.class, - MockConnector.class, MockSourceConnector.class, MockSinkConnector.class, - SchemaSourceConnector.class - ); - - private static final List<Class<? extends Transformation>> TRANSFORMATION_EXCLUDES = Arrays.asList(); - - private static boolean scanned = false; - private static List<ConnectorPluginInfo> validConnectorPlugins; - private static List<Class<? extends Transformation>> validTransformationPlugins; - - public static synchronized List<ConnectorPluginInfo> connectorPlugins() { - scanClasspathForPlugins(); - return validConnectorPlugins; - } - - public static synchronized List<Class<? extends Transformation>> transformationPlugins() { - scanClasspathForPlugins(); - return validTransformationPlugins; - } - - public static synchronized void scanClasspathForPlugins() { - if (scanned) return; - ReflectionsUtil.registerUrlTypes(); - final Reflections reflections = new Reflections(new ConfigurationBuilder().setUrls(ClasspathHelper.forJavaClassPath())); - validConnectorPlugins = Collections.unmodifiableList(connectorPlugins(reflections)); - validTransformationPlugins = Collections.unmodifiableList(transformationPlugins(reflections)); - scanned = true; - } - - private static List<ConnectorPluginInfo> connectorPlugins(Reflections reflections) { - final Set<Class<? extends Connector>> connectorClasses = reflections.getSubTypesOf(Connector.class); - connectorClasses.removeAll(CONNECTOR_EXCLUDES); - - final List<ConnectorPluginInfo> connectorPlugins = new ArrayList<>(connectorClasses.size()); - for (Class<? extends Connector> connectorClass : connectorClasses) { - if (isConcrete(connectorClass)) { - connectorPlugins.add(new ConnectorPluginInfo(connectorClass)); - } - } - - Collections.sort(connectorPlugins, new Comparator<ConnectorPluginInfo>() { - @Override - public int compare(ConnectorPluginInfo a, ConnectorPluginInfo b) { - return a.className().compareTo(b.className()); - } - }); - - return connectorPlugins; - } - - private static List<Class<? extends Transformation>> transformationPlugins(Reflections reflections) { - final Set<Class<? extends Transformation>> transformationClasses = reflections.getSubTypesOf(Transformation.class); - transformationClasses.removeAll(TRANSFORMATION_EXCLUDES); - - final List<Class<? extends Transformation>> transformationPlugins = new ArrayList<>(transformationClasses.size()); - for (Class<? extends Transformation> transformationClass : transformationClasses) { - if (isConcrete(transformationClass)) { - transformationPlugins.add(transformationClass); - } - } - - Collections.sort(transformationPlugins, new Comparator<Class<? extends Transformation>>() { - @Override - public int compare(Class<? extends Transformation> a, Class<? extends Transformation> b) { - return a.getName().compareTo(b.getName()); - } - }); - - return transformationPlugins; - } - - private static boolean isConcrete(Class<?> cls) { - final int mod = cls.getModifiers(); - return !Modifier.isAbstract(mod) && !Modifier.isInterface(mod); - } - - public static void main(String... args) { - System.out.println(connectorPlugins()); - System.out.println(transformationPlugins()); - } - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java index 21abdd0..e47d537 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java @@ -17,8 +17,8 @@ package org.apache.kafka.connect.runtime; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.runtime.isolation.Plugins; -import java.util.HashMap; import java.util.Map; /** @@ -35,15 +35,11 @@ public class SinkConnectorConfig extends ConnectorConfig { static ConfigDef config = ConnectorConfig.configDef() .define(TOPICS_CONFIG, ConfigDef.Type.LIST, TOPICS_DEFAULT, ConfigDef.Importance.HIGH, TOPICS_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, TOPICS_DISPLAY); - public SinkConnectorConfig() { - this(new HashMap<String, String>()); - } - public static ConfigDef configDef() { return config; } - public SinkConnectorConfig(Map<String, String> props) { - super(config, props); + public SinkConnectorConfig(Plugins plugins, Map<String, String> props) { + super(plugins, config, props); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java index 651ac74..6915421 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java @@ -17,6 +17,7 @@ package org.apache.kafka.connect.runtime; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.runtime.isolation.Plugins; import java.util.Map; @@ -24,7 +25,7 @@ public class SourceConnectorConfig extends ConnectorConfig { private static ConfigDef config = configDef(); - public SourceConnectorConfig(Map<String, String> props) { - super(config, props); + public SourceConnectorConfig(Plugins plugins, Map<String, String> props) { + super(plugins, config, props); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java index 400ae08..12802c1 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java @@ -24,6 +24,7 @@ import org.apache.kafka.connect.connector.Connector; import org.apache.kafka.connect.connector.ConnectorContext; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.runtime.isolation.Plugins; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTask; import org.apache.kafka.connect.source.SourceRecord; @@ -65,7 +66,7 @@ public class Worker { private final ExecutorService executor; private final Time time; private final String workerId; - private final ConnectorFactory connectorFactory; + private final Plugins plugins; private final WorkerConfig config; private final Converter defaultKeyConverter; private final Converter defaultValueConverter; @@ -78,20 +79,45 @@ public class Worker { private final ConcurrentMap<ConnectorTaskId, WorkerTask> tasks = new ConcurrentHashMap<>(); private SourceTaskOffsetCommitter sourceTaskOffsetCommitter; - public Worker(String workerId, Time time, ConnectorFactory connectorFactory, WorkerConfig config, OffsetBackingStore offsetBackingStore) { + public Worker( + String workerId, + Time time, + Plugins plugins, + WorkerConfig config, + OffsetBackingStore offsetBackingStore + ) { this.executor = Executors.newCachedThreadPool(); this.workerId = workerId; this.time = time; - this.connectorFactory = connectorFactory; + this.plugins = plugins; this.config = config; - this.defaultKeyConverter = config.getConfiguredInstance(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, Converter.class); + // Converters are required properties, thus getClass won't return null. + this.defaultKeyConverter = plugins.newConverter( + config.getClass(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG).getName(), + config + ); this.defaultKeyConverter.configure(config.originalsWithPrefix("key.converter."), true); - this.defaultValueConverter = config.getConfiguredInstance(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, Converter.class); + this.defaultValueConverter = plugins.newConverter( + config.getClass(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG).getName(), + config + ); this.defaultValueConverter.configure(config.originalsWithPrefix("value.converter."), false); - this.internalKeyConverter = config.getConfiguredInstance(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, Converter.class); - this.internalKeyConverter.configure(config.originalsWithPrefix("internal.key.converter."), true); - this.internalValueConverter = config.getConfiguredInstance(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, Converter.class); - this.internalValueConverter.configure(config.originalsWithPrefix("internal.value.converter."), false); + // Same, internal converters are required properties, thus getClass won't return null. + this.internalKeyConverter = plugins.newConverter( + config.getClass(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG).getName(), + config + ); + this.internalKeyConverter.configure( + config.originalsWithPrefix("internal.key.converter."), + true); + this.internalValueConverter = plugins.newConverter( + config.getClass(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG).getName(), + config + ); + this.internalValueConverter.configure( + config.originalsWithPrefix("internal.value.converter."), + false + ); this.offsetBackingStore = offsetBackingStore; this.offsetBackingStore.configure(config); @@ -171,17 +197,23 @@ public class Worker { throw new ConnectException("Connector with name " + connName + " already exists"); final WorkerConnector workerConnector; + ClassLoader savedLoader = plugins.currentThreadLoader(); try { - final ConnectorConfig connConfig = new ConnectorConfig(connProps); + final ConnectorConfig connConfig = new ConnectorConfig(plugins, connProps); final String connClass = connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG); log.info("Creating connector {} of type {}", connName, connClass); - final Connector connector = connectorFactory.newConnector(connClass); + final Connector connector = plugins.newConnector(connClass); workerConnector = new WorkerConnector(connName, connector, ctx, statusListener); log.info("Instantiated connector {} with version {} of type {}", connName, connector.version(), connector.getClass()); + savedLoader = plugins.compareAndSwapLoaders(connector); workerConnector.initialize(connConfig); workerConnector.transitionTo(initialState); + Plugins.compareAndSwapLoaders(savedLoader); } catch (Throwable t) { log.error("Failed to start connector {}", connName, t); + // Can't be put in a finally block because it needs to be swapped before the call on + // statusListener + Plugins.compareAndSwapLoaders(savedLoader); statusListener.onFailure(connName, t); return false; } @@ -205,7 +237,14 @@ public class Worker { WorkerConnector workerConnector = connectors.get(connName); if (workerConnector == null) throw new ConnectException("Connector " + connName + " not found in this worker."); - return workerConnector.isSinkConnector(); + + ClassLoader savedLoader = plugins.currentThreadLoader(); + try { + savedLoader = plugins.compareAndSwapLoaders(workerConnector.connector()); + return workerConnector.isSinkConnector(); + } finally { + Plugins.compareAndSwapLoaders(savedLoader); + } } /** @@ -225,14 +264,23 @@ public class Worker { Connector connector = workerConnector.connector(); List<Map<String, String>> result = new ArrayList<>(); - String taskClassName = connector.taskClass().getName(); - for (Map<String, String> taskProps : connector.taskConfigs(maxTasks)) { - Map<String, String> taskConfig = new HashMap<>(taskProps); // Ensure we don't modify the connector's copy of the config - taskConfig.put(TaskConfig.TASK_CLASS_CONFIG, taskClassName); - if (sinkTopics != null) - taskConfig.put(SinkTask.TOPICS_CONFIG, Utils.join(sinkTopics, ",")); - result.add(taskConfig); + ClassLoader savedLoader = plugins.currentThreadLoader(); + try { + savedLoader = plugins.compareAndSwapLoaders(connector); + String taskClassName = connector.taskClass().getName(); + for (Map<String, String> taskProps : connector.taskConfigs(maxTasks)) { + // Ensure we don't modify the connector's copy of the config + Map<String, String> taskConfig = new HashMap<>(taskProps); + taskConfig.put(TaskConfig.TASK_CLASS_CONFIG, taskClassName); + if (sinkTopics != null) { + taskConfig.put(SinkTask.TOPICS_CONFIG, Utils.join(sinkTopics, ",")); + } + result.add(taskConfig); + } + } finally { + Plugins.compareAndSwapLoaders(savedLoader); } + return result; } @@ -252,13 +300,19 @@ public class Worker { public boolean stopConnector(String connName) { log.info("Stopping connector {}", connName); - WorkerConnector connector = connectors.remove(connName); - if (connector == null) { + WorkerConnector workerConnector = connectors.remove(connName); + if (workerConnector == null) { log.warn("Ignoring stop request for unowned connector {}", connName); return false; } - connector.shutdown(); + ClassLoader savedLoader = plugins.currentThreadLoader(); + try { + savedLoader = plugins.compareAndSwapLoaders(workerConnector.connector()); + workerConnector.shutdown(); + } finally { + Plugins.compareAndSwapLoaders(savedLoader); + } log.info("Stopped connector {}", connName); return true; @@ -280,8 +334,8 @@ public class Worker { * @return true if the connector is running, false if the connector is not running or is not manages by this worker. */ public boolean isRunning(String connName) { - WorkerConnector connector = connectors.get(connName); - return connector != null && connector.isRunning(); + WorkerConnector workerConnector = connectors.get(connName); + return workerConnector != null && workerConnector.isRunning(); } /** @@ -307,14 +361,20 @@ public class Worker { throw new ConnectException("Task already exists in this worker: " + id); final WorkerTask workerTask; + ClassLoader savedLoader = plugins.currentThreadLoader(); try { - final ConnectorConfig connConfig = new ConnectorConfig(connProps); + final ConnectorConfig connConfig = new ConnectorConfig(plugins, connProps); + String connType = connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG); + ClassLoader connectorLoader = plugins.delegatingLoader().connectorLoader(connType); + savedLoader = Plugins.compareAndSwapLoaders(connectorLoader); final TaskConfig taskConfig = new TaskConfig(taskProps); - final Class<? extends Task> taskClass = taskConfig.getClass(TaskConfig.TASK_CLASS_CONFIG).asSubclass(Task.class); - final Task task = connectorFactory.newTask(taskClass); + final Task task = plugins.newTask(taskClass); log.info("Instantiated task {} with version {} of type {}", id, task.version(), taskClass.getName()); + // By maintaining connector's specific class loader for this thread here, we first + // search for converters within the connector dependencies, and if not found the + // plugin class loader delegates loading to the delegating classloader. Converter keyConverter = connConfig.getConfiguredInstance(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, Converter.class); if (keyConverter != null) keyConverter.configure(connConfig.originalsWithPrefix("key.converter."), true); @@ -326,10 +386,14 @@ public class Worker { else valueConverter = defaultValueConverter; - workerTask = buildWorkerTask(connConfig, id, task, statusListener, initialState, keyConverter, valueConverter); + workerTask = buildWorkerTask(connConfig, id, task, statusListener, initialState, keyConverter, valueConverter, connectorLoader); workerTask.initialize(taskConfig); + Plugins.compareAndSwapLoaders(savedLoader); } catch (Throwable t) { log.error("Failed to start task {}", id, t); + // Can't be put in a finally block because it needs to be swapped before the call on + // statusListener + Plugins.compareAndSwapLoaders(savedLoader); statusListener.onFailure(id, t); return false; } @@ -351,7 +415,8 @@ public class Worker { TaskStatus.Listener statusListener, TargetState initialState, Converter keyConverter, - Converter valueConverter) { + Converter valueConverter, + ClassLoader loader) { // Decide which type of worker task we need based on the type of task. if (task instanceof SourceTask) { TransformationChain<SourceRecord> transformationChain = new TransformationChain<>(connConfig.<SourceRecord>transformations()); @@ -361,11 +426,11 @@ public class Worker { internalKeyConverter, internalValueConverter); KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps); return new WorkerSourceTask(id, (SourceTask) task, statusListener, initialState, keyConverter, - valueConverter, transformationChain, producer, offsetReader, offsetWriter, config, time); + valueConverter, transformationChain, producer, offsetReader, offsetWriter, config, loader, time); } else if (task instanceof SinkTask) { TransformationChain<SinkRecord> transformationChain = new TransformationChain<>(connConfig.<SinkRecord>transformations()); return new WorkerSinkTask(id, (SinkTask) task, statusListener, initialState, config, keyConverter, - valueConverter, transformationChain, time); + valueConverter, transformationChain, loader, time); } else { log.error("Tasks must be a subclass of either SourceTask or SinkTask", task); throw new ConnectException("Tasks must be a subclass of either SourceTask or SinkTask"); @@ -382,7 +447,14 @@ public class Worker { log.info("Stopping task {}", task.id()); if (task instanceof WorkerSourceTask) sourceTaskOffsetCommitter.remove(task.id()); - task.stop(); + + ClassLoader savedLoader = plugins.currentThreadLoader(); + try { + savedLoader = Plugins.compareAndSwapLoaders(task.loader()); + task.stop(); + } finally { + Plugins.compareAndSwapLoaders(savedLoader); + } } private void stopTasks(Collection<ConnectorTaskId> ids) { @@ -457,8 +529,8 @@ public class Worker { return internalValueConverter; } - public ConnectorFactory getConnectorFactory() { - return connectorFactory; + public Plugins getPlugins() { + return plugins; } public String workerId() { @@ -468,14 +540,37 @@ public class Worker { public void setTargetState(String connName, TargetState state) { log.info("Setting connector {} state to {}", connName, state); - WorkerConnector connector = connectors.get(connName); - if (connector != null) - connector.transitionTo(state); + WorkerConnector workerConnector = connectors.get(connName); + if (workerConnector != null) { + ClassLoader connectorLoader = + plugins.delegatingLoader().connectorLoader(workerConnector.connector()); + transitionTo(workerConnector, state, connectorLoader); + } for (Map.Entry<ConnectorTaskId, WorkerTask> taskEntry : tasks.entrySet()) { - if (taskEntry.getKey().connector().equals(connName)) - taskEntry.getValue().transitionTo(state); + if (taskEntry.getKey().connector().equals(connName)) { + WorkerTask workerTask = taskEntry.getValue(); + transitionTo(workerTask, state, workerTask.loader()); + } } } + private void transitionTo(Object connectorOrTask, TargetState state, ClassLoader loader) { + ClassLoader savedLoader = plugins.currentThreadLoader(); + try { + savedLoader = Plugins.compareAndSwapLoaders(loader); + if (connectorOrTask instanceof WorkerConnector) { + ((WorkerConnector) connectorOrTask).transitionTo(state); + } else if (connectorOrTask instanceof WorkerTask) { + ((WorkerTask) connectorOrTask).transitionTo(state); + } else { + throw new ConnectException( + "Request for state transition on an object that is neither a " + + "WorkerConnector nor a WorkerTask: " + + connectorOrTask.getClass()); + } + } finally { + Plugins.compareAndSwapLoaders(savedLoader); + } + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java index 680edaf..fe7a35a 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java @@ -21,6 +21,9 @@ import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Type; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import java.util.Map; /** @@ -122,6 +125,18 @@ public class WorkerConfig extends AbstractConfig { + "The default value of the Access-Control-Allow-Methods header allows cross origin requests for GET, POST and HEAD."; protected static final String ACCESS_CONTROL_ALLOW_METHODS_DEFAULT = ""; + public static final String PLUGIN_PATH_CONFIG = "plugin.path"; + protected static final String PLUGIN_PATH_DOC = "List of paths separated by commas (,) that " + + "contain plugins (connectors, converters, transformations). The list should consist" + + " of top level directories that include any combination of: \n" + + "a) directories immediately containing jars with plugins and their dependencies\n" + + "b) uber-jars with plugins and their dependencies\n" + + "c) directories immediately containing the package directory structure of classes of " + + "plugins and their dependencies\n" + + "Note: symlinks will be followed to discover dependencies or plugins.\n" + + "Examples: plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins," + + "/opt/connectors"; + /** * Get a basic ConfigDef for a WorkerConfig. This includes all the common settings. Subclasses can use this to * bootstrap their own ConfigDef. @@ -155,7 +170,21 @@ public class WorkerConfig extends AbstractConfig { ACCESS_CONTROL_ALLOW_ORIGIN_DOC) .define(ACCESS_CONTROL_ALLOW_METHODS_CONFIG, Type.STRING, ACCESS_CONTROL_ALLOW_METHODS_DEFAULT, Importance.LOW, - ACCESS_CONTROL_ALLOW_METHODS_DOC); + ACCESS_CONTROL_ALLOW_METHODS_DOC) + .define( + PLUGIN_PATH_CONFIG, + Type.LIST, + null, + Importance.LOW, + PLUGIN_PATH_DOC + ); + } + + public static List<String> pluginLocations(Map<String, String> props) { + String locationList = props.get(WorkerConfig.PLUGIN_PATH_CONFIG); + return locationList == null + ? new ArrayList<String>() + : Arrays.asList(locationList.trim().split("\\s*,\\s*", -1)); } public WorkerConfig(ConfigDef definition, Map<String, String> props) { http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java index d5f337d..43ad6a1 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java @@ -83,8 +83,9 @@ class WorkerSinkTask extends WorkerTask { Converter keyConverter, Converter valueConverter, TransformationChain<SinkRecord> transformationChain, + ClassLoader loader, Time time) { - super(id, statusListener, initialState); + super(id, statusListener, initialState, loader); this.workerConfig = workerConfig; this.task = task; http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java index ed15b85..5627145 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java @@ -86,8 +86,9 @@ class WorkerSourceTask extends WorkerTask { OffsetStorageReader offsetReader, OffsetStorageWriter offsetWriter, WorkerConfig workerConfig, + ClassLoader loader, Time time) { - super(id, statusListener, initialState); + super(id, statusListener, initialState, loader); this.workerConfig = workerConfig; this.task = task; http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java index 43d45d8..9b233dd 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.connect.runtime; +import org.apache.kafka.connect.runtime.isolation.Plugins; import org.apache.kafka.connect.util.ConnectorTaskId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,6 +40,7 @@ abstract class WorkerTask implements Runnable { protected final ConnectorTaskId id; private final TaskStatus.Listener statusListener; + protected final ClassLoader loader; private final CountDownLatch shutdownLatch = new CountDownLatch(1); private volatile TargetState targetState; private volatile boolean stopping; // indicates whether the Worker has asked the task to stop @@ -46,9 +48,11 @@ abstract class WorkerTask implements Runnable { public WorkerTask(ConnectorTaskId id, TaskStatus.Listener statusListener, - TargetState initialState) { + TargetState initialState, + ClassLoader loader) { this.id = id; this.statusListener = statusListener; + this.loader = loader; this.targetState = initialState; this.stopping = false; this.cancelled = false; @@ -58,6 +62,10 @@ abstract class WorkerTask implements Runnable { return id; } + public ClassLoader loader() { + return loader; + } + /** * Initialize the task for execution. * @param taskConfig initial configuration @@ -177,6 +185,7 @@ abstract class WorkerTask implements Runnable { @Override public void run() { + ClassLoader savedLoader = Plugins.compareAndSwapLoaders(loader); try { doRun(); onShutdown(); @@ -186,6 +195,7 @@ abstract class WorkerTask implements Runnable { if (t instanceof Error) throw (Error) t; } finally { + Plugins.compareAndSwapLoaders(savedLoader); shutdownLatch.countDown(); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java index e908d0b..8f7503e 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java @@ -954,10 +954,10 @@ public class DistributedHerder extends AbstractHerder implements Runnable { ConnectorConfig connConfig; List<String> sinkTopics = null; if (worker.isSinkConnector(connName)) { - connConfig = new SinkConnectorConfig(configs); + connConfig = new SinkConnectorConfig(plugins(), configs); sinkTopics = connConfig.getList(SinkConnectorConfig.TOPICS_CONFIG); } else { - connConfig = new SourceConnectorConfig(configs); + connConfig = new SourceConnectorConfig(plugins(), configs); } final List<Map<String, String>> taskProps http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java new file mode 100644 index 0000000..da8b444 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime.isolation; + +import org.apache.kafka.connect.connector.Connector; +import org.apache.kafka.connect.storage.Converter; +import org.apache.kafka.connect.transforms.Transformation; +import org.reflections.Reflections; +import org.reflections.util.ClasspathHelper; +import org.reflections.util.ConfigurationBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLClassLoader; +import java.nio.file.Files; +import java.nio.file.InvalidPathException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedMap; +import java.util.SortedSet; +import java.util.TreeMap; +import java.util.TreeSet; + +public class DelegatingClassLoader extends URLClassLoader { + private static final Logger log = LoggerFactory.getLogger(DelegatingClassLoader.class); + + private final Map<String, SortedMap<PluginDesc<?>, ClassLoader>> pluginLoaders; + private final SortedSet<PluginDesc<Connector>> connectors; + private final SortedSet<PluginDesc<Converter>> converters; + private final SortedSet<PluginDesc<Transformation>> transformations; + private final List<String> pluginPaths; + private final Map<Path, PluginClassLoader> activePaths; + + public DelegatingClassLoader(List<String> pluginPaths, ClassLoader parent) { + super(new URL[0], parent); + this.pluginPaths = pluginPaths; + this.pluginLoaders = new HashMap<>(); + this.activePaths = new HashMap<>(); + this.connectors = new TreeSet<>(); + this.converters = new TreeSet<>(); + this.transformations = new TreeSet<>(); + } + + public DelegatingClassLoader(List<String> pluginPaths) { + this(pluginPaths, ClassLoader.getSystemClassLoader()); + } + + public Set<PluginDesc<Connector>> connectors() { + return connectors; + } + + public Set<PluginDesc<Converter>> converters() { + return converters; + } + + public Set<PluginDesc<Transformation>> transformations() { + return transformations; + } + + public ClassLoader connectorLoader(Connector connector) { + return connectorLoader(connector.getClass().getName()); + } + + public ClassLoader connectorLoader(String connectorClassOrAlias) { + log.debug("Getting plugin class loader for connector: '{}'", connectorClassOrAlias); + SortedMap<PluginDesc<?>, ClassLoader> inner = + pluginLoaders.get(connectorClassOrAlias); + if (inner == null) { + log.error( + "Plugin class loader for connector: '{}' was not found. Returning: {}", + connectorClassOrAlias, + this + ); + return this; + } + return inner.get(inner.lastKey()); + } + + private static PluginClassLoader newPluginClassLoader( + final URL pluginLocation, + final URL[] urls, + final ClassLoader parent + ) { + return (PluginClassLoader) AccessController.doPrivileged( + new PrivilegedAction() { + @Override + public Object run() { + return new PluginClassLoader(pluginLocation, urls, parent); + } + } + ); + } + + private <T> void addPlugins(Collection<PluginDesc<T>> plugins, ClassLoader loader) { + for (PluginDesc<T> plugin : plugins) { + String pluginClassName = plugin.className(); + SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(pluginClassName); + if (inner == null) { + inner = new TreeMap<>(); + pluginLoaders.put(pluginClassName, inner); + // TODO: once versioning is enabled this line should be moved outside this if branch + log.info("Added plugin '{}'", pluginClassName); + } + inner.put(plugin, loader); + } + } + + protected void initLoaders() { + String path = null; + try { + for (String configPath : pluginPaths) { + path = configPath; + Path pluginPath = Paths.get(path).toAbsolutePath(); + // Currently 'plugin.paths' property is a list of top-level directories + // containing plugins + if (Files.isDirectory(pluginPath)) { + for (Path pluginLocation : PluginUtils.pluginLocations(pluginPath)) { + log.info("Loading plugin from: {}", pluginLocation); + URL[] urls = PluginUtils.pluginUrls(pluginLocation).toArray(new URL[0]); + if (log.isDebugEnabled()) { + log.debug("Loading plugin urls: {}", Arrays.toString(urls)); + } + PluginClassLoader loader = newPluginClassLoader( + pluginLocation.toUri().toURL(), + urls, + this + ); + + scanUrlsAndAddPlugins(loader, urls, pluginLocation); + } + } + } + + path = "classpath"; + // Finally add parent/system loader. + scanUrlsAndAddPlugins( + getParent(), + ClasspathHelper.forJavaClassPath().toArray(new URL[0]), + null + ); + } catch (InvalidPathException | MalformedURLException e) { + log.error("Invalid path in plugin path: {}. Ignoring.", path); + } catch (IOException e) { + log.error("Could not get listing for plugin path: {}. Ignoring.", path); + } catch (InstantiationException | IllegalAccessException e) { + log.error("Could not instantiate plugins in: {}. Ignoring: {}", path, e); + } + addAllAliases(); + } + + private void scanUrlsAndAddPlugins( + ClassLoader loader, + URL[] urls, + Path pluginLocation + ) throws InstantiationException, IllegalAccessException { + PluginScanResult plugins = scanPluginPath(loader, urls); + log.info("Registered loader: {}", loader); + if (!plugins.isEmpty()) { + if (loader instanceof PluginClassLoader) { + activePaths.put(pluginLocation, (PluginClassLoader) loader); + } + + addPlugins(plugins.connectors(), loader); + connectors.addAll(plugins.connectors()); + addPlugins(plugins.converters(), loader); + converters.addAll(plugins.converters()); + addPlugins(plugins.transformations(), loader); + transformations.addAll(plugins.transformations()); + } + } + + private PluginScanResult scanPluginPath( + ClassLoader loader, + URL[] urls + ) throws InstantiationException, IllegalAccessException { + ConfigurationBuilder builder = new ConfigurationBuilder(); + builder.setClassLoaders(new ClassLoader[]{loader}); + builder.addUrls(urls); + Reflections reflections = new Reflections(builder); + + return new PluginScanResult( + getPluginDesc(reflections, Connector.class, loader), + getPluginDesc(reflections, Converter.class, loader), + getPluginDesc(reflections, Transformation.class, loader) + ); + } + + private <T> Collection<PluginDesc<T>> getPluginDesc( + Reflections reflections, + Class<T> klass, + ClassLoader loader + ) throws InstantiationException, IllegalAccessException { + Set<Class<? extends T>> plugins = reflections.getSubTypesOf(klass); + + Collection<PluginDesc<T>> result = new ArrayList<>(); + for (Class<? extends T> plugin : plugins) { + if (PluginUtils.isConcrete(plugin)) { + // Temporary workaround until all the plugins are versioned. + if (Connector.class.isAssignableFrom(plugin)) { + result.add( + new PluginDesc<>( + plugin, + ((Connector) plugin.newInstance()).version(), + loader + ) + ); + } else { + result.add(new PluginDesc<>(plugin, "undefined", loader)); + } + } + } + return result; + } + + @Override + protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException { + if (!PluginUtils.shouldLoadInIsolation(name)) { + // There are no paths in this classloader, will attempt to load with the parent. + return super.loadClass(name, resolve); + } + + SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(name); + if (inner != null) { + log.trace("Retrieving loaded class '{}' from '{}'", name, inner.get(inner.lastKey())); + ClassLoader pluginLoader = inner.get(inner.lastKey()); + return pluginLoader instanceof PluginClassLoader + ? ((PluginClassLoader) pluginLoader).loadClass(name, resolve) + : super.loadClass(name, resolve); + } + + Class<?> klass = null; + for (PluginClassLoader loader : activePaths.values()) { + try { + klass = loader.loadClass(name, resolve); + break; + } catch (ClassNotFoundException e) { + // Not found in this loader. + } + } + if (klass == null) { + return super.loadClass(name, resolve); + } + return klass; + } + + private void addAllAliases() { + addAliases(connectors); + addAliases(converters); + addAliases(transformations); + } + + private <S> void addAliases(Collection<PluginDesc<S>> plugins) { + for (PluginDesc<S> plugin : plugins) { + if (PluginUtils.isAliasUnique(plugin, plugins)) { + String simple = PluginUtils.simpleName(plugin); + String pruned = PluginUtils.prunedName(plugin); + SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(plugin.className()); + pluginLoaders.put(simple, inner); + if (simple.equals(pruned)) { + log.info("Added alias '{}' to plugin '{}'", simple, plugin.className()); + } else { + pluginLoaders.put(pruned, inner); + log.info( + "Added aliases '{}' and '{}' to plugin '{}'", + simple, + pruned, + plugin.className() + ); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginClassLoader.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginClassLoader.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginClassLoader.java new file mode 100644 index 0000000..07438e9 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginClassLoader.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime.isolation; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URL; +import java.net.URLClassLoader; + +public class PluginClassLoader extends URLClassLoader { + private static final Logger log = LoggerFactory.getLogger(DelegatingClassLoader.class); + private final URL pluginLocation; + + public PluginClassLoader(URL pluginLocation, URL[] urls, ClassLoader parent) { + super(urls, parent); + this.pluginLocation = pluginLocation; + } + + public PluginClassLoader(URL pluginLocation, URL[] urls) { + super(urls); + this.pluginLocation = pluginLocation; + } + + public String location() { + return pluginLocation.toString(); + } + + @Override + public String toString() { + return "PluginClassLoader{pluginLocation=" + pluginLocation + "}"; + } + + @Override + protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException { + Class<?> klass = findLoadedClass(name); + if (klass == null) { + if (PluginUtils.shouldLoadInIsolation(name)) { + try { + klass = findClass(name); + } catch (ClassNotFoundException e) { + // Not found in loader's path. Search in parents. + } + } + if (klass == null) { + klass = super.loadClass(name, false); + } + } + if (resolve) { + resolveClass(klass); + } + return klass; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java new file mode 100644 index 0000000..a607704 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime.isolation; + +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.maven.artifact.versioning.DefaultArtifactVersion; + +import java.util.Objects; + +public class PluginDesc<T> implements Comparable<PluginDesc<T>> { + private final Class<? extends T> klass; + private final String name; + private final String version; + private final DefaultArtifactVersion encodedVersion; + private final PluginType type; + private final String typeName; + private final String location; + + public PluginDesc(Class<? extends T> klass, String version, ClassLoader loader) { + this.klass = klass; + this.name = klass.getName(); + this.version = version; + this.encodedVersion = new DefaultArtifactVersion(version); + this.type = PluginType.from(klass); + this.typeName = type.toString(); + this.location = loader instanceof PluginClassLoader + ? ((PluginClassLoader) loader).location() + : "classpath"; + } + + @Override + public String toString() { + return "PluginDesc{" + + "klass=" + klass + + ", name='" + name + '\'' + + ", version='" + version + '\'' + + ", encodedVersion=" + encodedVersion + + ", type=" + type + + ", typeName='" + typeName + '\'' + + ", location='" + location + '\'' + + '}'; + } + + public Class<? extends T> pluginClass() { + return klass; + } + + @JsonProperty("class") + public String className() { + return name; + } + + @JsonProperty("version") + public String version() { + return version; + } + + public PluginType type() { + return type; + } + + @JsonProperty("type") + public String typeName() { + return typeName; + } + + @JsonProperty("location") + public String location() { + return location; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof PluginDesc)) { + return false; + } + PluginDesc<?> that = (PluginDesc<?>) o; + return Objects.equals(klass, that.klass) && + Objects.equals(version, that.version) && + type == that.type; + } + + @Override + public int hashCode() { + return Objects.hash(klass, version, type); + } + + @Override + public int compareTo(PluginDesc other) { + int nameComp = name.compareTo(other.name); + return nameComp != 0 ? nameComp : encodedVersion.compareTo(other.encodedVersion); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java new file mode 100644 index 0000000..f3d2f21 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime.isolation; + +import org.apache.kafka.connect.connector.Connector; +import org.apache.kafka.connect.storage.Converter; +import org.apache.kafka.connect.transforms.Transformation; + +import java.util.Collection; + +public class PluginScanResult { + private final Collection<PluginDesc<Connector>> connectors; + private final Collection<PluginDesc<Converter>> converters; + private final Collection<PluginDesc<Transformation>> transformations; + + public PluginScanResult( + Collection<PluginDesc<Connector>> connectors, + Collection<PluginDesc<Converter>> converters, + Collection<PluginDesc<Transformation>> transformations + ) { + this.connectors = connectors; + this.converters = converters; + this.transformations = transformations; + } + + public Collection<PluginDesc<Connector>> connectors() { + return connectors; + } + + public Collection<PluginDesc<Converter>> converters() { + return converters; + } + + public Collection<PluginDesc<Transformation>> transformations() { + return transformations; + } + + public boolean isEmpty() { + return connectors().isEmpty() && converters().isEmpty() && transformations().isEmpty(); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/45f22617/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginType.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginType.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginType.java new file mode 100644 index 0000000..5649213 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginType.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime.isolation; + +import org.apache.kafka.connect.connector.Connector; +import org.apache.kafka.connect.sink.SinkConnector; +import org.apache.kafka.connect.source.SourceConnector; +import org.apache.kafka.connect.storage.Converter; +import org.apache.kafka.connect.transforms.Transformation; + +import java.util.Locale; + +public enum PluginType { + SOURCE(SourceConnector.class), + SINK(SinkConnector.class), + CONNECTOR(Connector.class), + CONVERTER(Converter.class), + TRANSFORMATION(Transformation.class), + UNKNOWN(Object.class); + + private Class<?> klass; + + PluginType(Class<?> klass) { + this.klass = klass; + } + + public static PluginType from(Class<?> klass) { + for (PluginType type : PluginType.values()) { + if (type.klass.isAssignableFrom(klass)) { + return type; + } + } + return UNKNOWN; + } + + public String simpleName() { + return klass.getSimpleName(); + } + + @Override + public String toString() { + return super.toString().toLowerCase(Locale.ROOT); + } +}
