Repository: kafka Updated Branches: refs/heads/0.10.0 496bd3fd4 -> 206757eeb
KAFKA-3316: Add REST API for listing connector plugins Author: Liquan Pei <[email protected]> Reviewers: Ewen Cheslack-Postava <[email protected]> Closes #1090 from Ishiihara/kafka-3316 (cherry picked from commit 78fa20eb58a948abd9ad4e44acfed606400a47f3) Signed-off-by: Ewen Cheslack-Postava <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/206757ee Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/206757ee Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/206757ee Branch: refs/heads/0.10.0 Commit: 206757eeb18589952291ce1a5578c66de3669f4c Parents: 496bd3f Author: Liquan Pei <[email protected]> Authored: Fri Mar 25 16:46:53 2016 -0700 Committer: Ewen Cheslack-Postava <[email protected]> Committed: Fri Mar 25 16:47:09 2016 -0700 ---------------------------------------------------------------------- .../kafka/connect/runtime/AbstractHerder.java | 32 +++++++++++- .../rest/entities/ConnectorPluginInfo.java | 54 ++++++++++++++++++++ .../resources/ConnectorPluginsResource.java | 10 ++++ .../connect/runtime/AbstractHerderTest.java | 1 - .../resources/ConnectorPluginsResourceTest.java | 22 +++++++- 5 files changed, 116 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/206757ee/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java index 8d83644..a97c4db 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java @@ -26,20 +26,29 @@ import org.apache.kafka.connect.runtime.rest.entities.ConfigInfo; import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos; import org.apache.kafka.connect.runtime.rest.entities.ConfigKeyInfo; import org.apache.kafka.connect.runtime.rest.entities.ConfigValueInfo; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorPluginInfo; import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; import org.apache.kafka.connect.storage.StatusBackingStore; +import org.apache.kafka.connect.tools.VerifiableSinkConnector; +import org.apache.kafka.connect.tools.VerifiableSourceConnector; import org.apache.kafka.connect.util.ConnectorTaskId; +import org.reflections.Reflections; +import org.reflections.util.ClasspathHelper; +import org.reflections.util.ConfigurationBuilder; import java.io.ByteArrayOutputStream; import java.io.PrintStream; import java.io.UnsupportedEncodingException; +import java.lang.reflect.Modifier; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; /** @@ -69,7 +78,9 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con protected final StatusBackingStore statusBackingStore; private final String workerId; - protected Map<String, Connector> tempConnectors = new ConcurrentHashMap<>(); + private Map<String, Connector> tempConnectors = new ConcurrentHashMap<>(); + private static final List<Class<? extends Connector>> SKIPPED_CONNECTORS = Arrays.<Class<? extends Connector>>asList(VerifiableSourceConnector.class, VerifiableSinkConnector.class); + private static List<ConnectorPluginInfo> validConnectorPlugins; public AbstractHerder(Worker worker, StatusBackingStore statusBackingStore, String workerId) { this.worker = worker; @@ -189,6 +200,25 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con return generateResult(connType, resultConfigKeys, configValues, allGroups); } + public static List<ConnectorPluginInfo> connectorPlugins() { + if (validConnectorPlugins != null) { + return validConnectorPlugins; + } + + Reflections reflections = new Reflections(new ConfigurationBuilder().setUrls(ClasspathHelper.forJavaClassPath())); + Set<Class<? extends Connector>> connectorClasses = reflections.getSubTypesOf(Connector.class); + connectorClasses.removeAll(SKIPPED_CONNECTORS); + List<ConnectorPluginInfo> connectorPlugins = new LinkedList<>(); + for (Class<? extends Connector> connectorClass: connectorClasses) { + int mod = connectorClass.getModifiers(); + if (!Modifier.isAbstract(mod) && !Modifier.isInterface(mod)) { + connectorPlugins.add(new ConnectorPluginInfo(connectorClass.getCanonicalName())); + } + } + validConnectorPlugins = connectorPlugins; + return connectorPlugins; + } + // public for testing public static ConfigInfos generateResult(String connType, Map<String, ConfigKey> configKeys, List<ConfigValue> configValues, List<String> groups) { int errorCount = 0; http://git-wip-us.apache.org/repos/asf/kafka/blob/206757ee/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorPluginInfo.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorPluginInfo.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorPluginInfo.java new file mode 100644 index 0000000..097142e --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorPluginInfo.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at <p/> http://www.apache.org/licenses/LICENSE-2.0 <p/> Unless required by + * applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + **/ + +package org.apache.kafka.connect.runtime.rest.entities; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Objects; + +public class ConnectorPluginInfo { + + private String clazz; + + @JsonCreator + public ConnectorPluginInfo(@JsonProperty("class") String clazz) { + this.clazz = clazz; + } + + @JsonProperty("class") + public String clazz() { + return clazz; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ConnectorPluginInfo that = (ConnectorPluginInfo) o; + return Objects.equals(clazz, that.clazz); + } + + @Override + public int hashCode() { + return Objects.hash(clazz); + } + + @Override + public String toString() { + return clazz; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/206757ee/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java index 8439707..9e87d0c 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java @@ -17,12 +17,16 @@ package org.apache.kafka.connect.runtime.rest.resources; +import org.apache.kafka.connect.runtime.AbstractHerder; import org.apache.kafka.connect.runtime.Herder; import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorPluginInfo; +import java.util.List; import java.util.Map; import javax.ws.rs.Consumes; +import javax.ws.rs.GET; import javax.ws.rs.PUT; import javax.ws.rs.Path; import javax.ws.rs.PathParam; @@ -46,4 +50,10 @@ public class ConnectorPluginsResource { final Map<String, String> connectorConfig) throws Throwable { return herder.validateConfigs(connType, connectorConfig); } + + @GET + @Path("/") + public List<ConnectorPluginInfo> listConnectorPlugins() { + return AbstractHerder.connectorPlugins(); + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/206757ee/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java index 1dc5784..e4084a8 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java @@ -114,5 +114,4 @@ public class AbstractHerderTest extends EasyMockSupport { verifyAll(); } - } http://git-wip-us.apache.org/repos/asf/kafka/blob/206757ee/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java index 625c91f..1049e7e 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java @@ -21,8 +21,8 @@ import com.fasterxml.jackson.core.type.TypeReference; import org.apache.kafka.common.config.Config; import org.apache.kafka.common.config.ConfigDef; -import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.connect.connector.Connector; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.runtime.AbstractHerder; @@ -32,6 +32,11 @@ import org.apache.kafka.connect.runtime.rest.entities.ConfigInfo; import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos; import org.apache.kafka.connect.runtime.rest.entities.ConfigKeyInfo; import org.apache.kafka.connect.runtime.rest.entities.ConfigValueInfo; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorPluginInfo; +import org.apache.kafka.connect.sink.SinkConnector; +import org.apache.kafka.connect.source.SourceConnector; +import org.apache.kafka.connect.tools.VerifiableSinkConnector; +import org.apache.kafka.connect.tools.VerifiableSourceConnector; import org.easymock.EasyMock; import org.easymock.IAnswer; import org.junit.Before; @@ -49,8 +54,11 @@ import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Set; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; @RunWith(PowerMockRunner.class) @PrepareForTest(RestServer.class) @@ -64,6 +72,7 @@ public class ConnectorPluginsResourceTest { } private static final ConfigInfos CONFIG_INFOS; + static { List<ConfigInfo> configs = new LinkedList<>(); @@ -120,6 +129,17 @@ public class ConnectorPluginsResourceTest { PowerMock.verifyAll(); } + @Test + public void testListConnectorPlugins() { + Set<ConnectorPluginInfo> connectorPlugins = new HashSet<>(connectorPluginsResource.listConnectorPlugins()); + assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(Connector.class.getCanonicalName()))); + assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(SourceConnector.class.getCanonicalName()))); + assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(SinkConnector.class.getCanonicalName()))); + assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(VerifiableSourceConnector.class.getCanonicalName()))); + assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(VerifiableSinkConnector.class.getCanonicalName()))); + assertTrue(connectorPlugins.contains(new ConnectorPluginInfo(ConnectorPluginsResourceTestConnector.class.getCanonicalName()))); + } + /* Name here needs to be unique as we are testing the aliasing mechanism */ public static class ConnectorPluginsResourceTestConnector extends Connector {
