Repository: kafka
Updated Branches:
  refs/heads/trunk 23b50093f -> 78fa20eb5


KAFKA-3316: Add REST API for listing connector plugins

Author: Liquan Pei <[email protected]>

Reviewers: Ewen Cheslack-Postava <[email protected]>

Closes #1090 from Ishiihara/kafka-3316


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/78fa20eb
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/78fa20eb
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/78fa20eb

Branch: refs/heads/trunk
Commit: 78fa20eb58a948abd9ad4e44acfed606400a47f3
Parents: 23b5009
Author: Liquan Pei <[email protected]>
Authored: Fri Mar 25 16:46:53 2016 -0700
Committer: Ewen Cheslack-Postava <[email protected]>
Committed: Fri Mar 25 16:46:53 2016 -0700

----------------------------------------------------------------------
 .../kafka/connect/runtime/AbstractHerder.java   | 32 +++++++++++-
 .../rest/entities/ConnectorPluginInfo.java      | 54 ++++++++++++++++++++
 .../resources/ConnectorPluginsResource.java     | 10 ++++
 .../connect/runtime/AbstractHerderTest.java     |  1 -
 .../resources/ConnectorPluginsResourceTest.java | 22 +++++++-
 5 files changed, 116 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/78fa20eb/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index 8d83644..a97c4db 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -26,20 +26,29 @@ import 
org.apache.kafka.connect.runtime.rest.entities.ConfigInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
 import org.apache.kafka.connect.runtime.rest.entities.ConfigKeyInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ConfigValueInfo;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorPluginInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
 import org.apache.kafka.connect.storage.StatusBackingStore;
+import org.apache.kafka.connect.tools.VerifiableSinkConnector;
+import org.apache.kafka.connect.tools.VerifiableSourceConnector;
 import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.reflections.Reflections;
+import org.reflections.util.ClasspathHelper;
+import org.reflections.util.ConfigurationBuilder;
 
 import java.io.ByteArrayOutputStream;
 import java.io.PrintStream;
 import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Modifier;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
@@ -69,7 +78,9 @@ public abstract class AbstractHerder implements Herder, 
TaskStatus.Listener, Con
     protected final StatusBackingStore statusBackingStore;
     private final String workerId;
 
-    protected Map<String, Connector> tempConnectors = new 
ConcurrentHashMap<>();
+    private Map<String, Connector> tempConnectors = new ConcurrentHashMap<>();
+    private static final List<Class<? extends Connector>> SKIPPED_CONNECTORS = 
Arrays.<Class<? extends Connector>>asList(VerifiableSourceConnector.class, 
VerifiableSinkConnector.class);
+    private static List<ConnectorPluginInfo> validConnectorPlugins;
 
     public AbstractHerder(Worker worker, StatusBackingStore 
statusBackingStore, String workerId) {
         this.worker = worker;
@@ -189,6 +200,25 @@ public abstract class AbstractHerder implements Herder, 
TaskStatus.Listener, Con
         return generateResult(connType, resultConfigKeys, configValues, 
allGroups);
     }
 
+    public static List<ConnectorPluginInfo> connectorPlugins() {
+        if (validConnectorPlugins != null) {
+            return validConnectorPlugins;
+        }
+
+        Reflections reflections = new Reflections(new 
ConfigurationBuilder().setUrls(ClasspathHelper.forJavaClassPath()));
+        Set<Class<? extends Connector>> connectorClasses = 
reflections.getSubTypesOf(Connector.class);
+        connectorClasses.removeAll(SKIPPED_CONNECTORS);
+        List<ConnectorPluginInfo> connectorPlugins = new LinkedList<>();
+        for (Class<? extends Connector> connectorClass: connectorClasses) {
+            int mod = connectorClass.getModifiers();
+            if (!Modifier.isAbstract(mod) && !Modifier.isInterface(mod)) {
+                connectorPlugins.add(new 
ConnectorPluginInfo(connectorClass.getCanonicalName()));
+            }
+        }
+        validConnectorPlugins = connectorPlugins;
+        return connectorPlugins;
+    }
+
     // public for testing
     public static ConfigInfos generateResult(String connType, Map<String, 
ConfigKey> configKeys, List<ConfigValue> configValues, List<String> groups) {
         int errorCount = 0;

http://git-wip-us.apache.org/repos/asf/kafka/blob/78fa20eb/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorPluginInfo.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorPluginInfo.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorPluginInfo.java
new file mode 100644
index 0000000..097142e
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorPluginInfo.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License.  You may obtain a
+ * copy of the License at <p/> http://www.apache.org/licenses/LICENSE-2.0 <p/> 
Unless required by
+ * applicable law or agreed to in writing, software distributed under the 
License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See
+ * the License for the specific language governing permissions and limitations 
under the License.
+ **/
+
+package org.apache.kafka.connect.runtime.rest.entities;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+public class ConnectorPluginInfo {
+
+    private String clazz;
+
+    @JsonCreator
+    public ConnectorPluginInfo(@JsonProperty("class") String clazz) {
+        this.clazz = clazz;
+    }
+
+    @JsonProperty("class")
+    public String clazz() {
+        return clazz;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        ConnectorPluginInfo that = (ConnectorPluginInfo) o;
+        return Objects.equals(clazz, that.clazz);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(clazz);
+    }
+
+    @Override
+    public String toString() {
+        return clazz;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/78fa20eb/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
index 8439707..9e87d0c 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
@@ -17,12 +17,16 @@
 
 package org.apache.kafka.connect.runtime.rest.resources;
 
+import org.apache.kafka.connect.runtime.AbstractHerder;
 import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorPluginInfo;
 
+import java.util.List;
 import java.util.Map;
 
 import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
 import javax.ws.rs.PUT;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
@@ -46,4 +50,10 @@ public class ConnectorPluginsResource {
                                        final Map<String, String> 
connectorConfig) throws Throwable {
         return herder.validateConfigs(connType, connectorConfig);
     }
+
+    @GET
+    @Path("/")
+    public List<ConnectorPluginInfo> listConnectorPlugins() {
+        return AbstractHerder.connectorPlugins();
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/78fa20eb/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
index 1dc5784..e4084a8 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
@@ -114,5 +114,4 @@ public class AbstractHerderTest extends EasyMockSupport {
 
         verifyAll();
     }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/78fa20eb/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
index 625c91f..1049e7e 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
@@ -21,8 +21,8 @@ import com.fasterxml.jackson.core.type.TypeReference;
 
 import org.apache.kafka.common.config.Config;
 import org.apache.kafka.common.config.ConfigDef;
-import org.apache.kafka.common.config.ConfigDef.Type;
 import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigDef.Type;
 import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.runtime.AbstractHerder;
@@ -32,6 +32,11 @@ import 
org.apache.kafka.connect.runtime.rest.entities.ConfigInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
 import org.apache.kafka.connect.runtime.rest.entities.ConfigKeyInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ConfigValueInfo;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorPluginInfo;
+import org.apache.kafka.connect.sink.SinkConnector;
+import org.apache.kafka.connect.source.SourceConnector;
+import org.apache.kafka.connect.tools.VerifiableSinkConnector;
+import org.apache.kafka.connect.tools.VerifiableSourceConnector;
 import org.easymock.EasyMock;
 import org.easymock.IAnswer;
 import org.junit.Before;
@@ -49,8 +54,11 @@ import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(RestServer.class)
@@ -64,6 +72,7 @@ public class ConnectorPluginsResourceTest {
     }
 
     private static final ConfigInfos CONFIG_INFOS;
+
     static {
         List<ConfigInfo> configs = new LinkedList<>();
 
@@ -120,6 +129,17 @@ public class ConnectorPluginsResourceTest {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testListConnectorPlugins() {
+        Set<ConnectorPluginInfo> connectorPlugins = new 
HashSet<>(connectorPluginsResource.listConnectorPlugins());
+        assertFalse(connectorPlugins.contains(new 
ConnectorPluginInfo(Connector.class.getCanonicalName())));
+        assertFalse(connectorPlugins.contains(new 
ConnectorPluginInfo(SourceConnector.class.getCanonicalName())));
+        assertFalse(connectorPlugins.contains(new 
ConnectorPluginInfo(SinkConnector.class.getCanonicalName())));
+        assertFalse(connectorPlugins.contains(new 
ConnectorPluginInfo(VerifiableSourceConnector.class.getCanonicalName())));
+        assertFalse(connectorPlugins.contains(new 
ConnectorPluginInfo(VerifiableSinkConnector.class.getCanonicalName())));
+        assertTrue(connectorPlugins.contains(new 
ConnectorPluginInfo(ConnectorPluginsResourceTestConnector.class.getCanonicalName())));
+    }
+
     /* Name here needs to be unique as we are testing the aliasing mechanism */
     public static class ConnectorPluginsResourceTestConnector extends 
Connector {
 

Reply via email to