tombentley commented on a change in pull request #11572:
URL: https://github.com/apache/kafka/pull/11572#discussion_r817623175



##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
##########
@@ -127,10 +131,26 @@ public DelegatingClassLoader(List<String> pluginPaths) {
         this(pluginPaths, DelegatingClassLoader.class.getClassLoader());
     }
 
+    @SuppressWarnings("unchecked")
     public Set<PluginDesc<Connector>> connectors() {
+        Set<PluginDesc<Connector>> connectors = new TreeSet<>();
+        for (PluginDesc<SinkConnector> sinkConnector : sinkConnectors) {
+            connectors.add((PluginDesc<Connector>) (PluginDesc<? extends 
Connector>) sinkConnector);
+        }
+        for (PluginDesc<SourceConnector> sourceConnector : sourceConnectors) {
+            connectors.add((PluginDesc<Connector>) (PluginDesc<? extends 
Connector>) sourceConnector);
+        }
         return connectors;

Review comment:
       ```suggestion
           Set<PluginDesc<Connector>> connectors = new TreeSet<>((Set) 
sinkConnectors);
           connectors.addAll((Set) sourceConnectors);
           return connectors;
   ```

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
##########
@@ -750,4 +755,41 @@ private String trace(Throwable t) {
         return keys;
     }
 
+    @Override
+    public List<ConfigKeyInfo> connectorPluginConfig(String pluginName) {

Review comment:
       Should it really be called `connectorPluginConfig` when it handles other 
plugins too?

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorPluginInfo.java
##########
@@ -17,30 +17,32 @@
 package org.apache.kafka.connect.runtime.rest.entities;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader;
 import org.apache.kafka.connect.runtime.isolation.PluginDesc;
+import org.apache.kafka.connect.runtime.isolation.PluginType;
 
 import java.util.Objects;
 
 public class ConnectorPluginInfo {

Review comment:
       Should the name remain as `ConnectorPluginInfo` when it's no longer just 
for connector plugins?

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginType.java
##########
@@ -17,22 +17,24 @@
 package org.apache.kafka.connect.runtime.isolation;
 
 import org.apache.kafka.common.config.provider.ConfigProvider;
-import org.apache.kafka.connect.connector.Connector;
 import 
org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
 import org.apache.kafka.connect.rest.ConnectRestExtension;
 import org.apache.kafka.connect.sink.SinkConnector;
 import org.apache.kafka.connect.source.SourceConnector;
 import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
 import org.apache.kafka.connect.transforms.Transformation;
+import org.apache.kafka.connect.transforms.predicates.Predicate;
 
 import java.util.Locale;
 
 public enum PluginType {
     SOURCE(SourceConnector.class),
     SINK(SinkConnector.class),
-    CONNECTOR(Connector.class),

Review comment:
       This makes me wonder why CONNECTOR was ever in this enum. Do you know 
@mimaison / @C0urante ?

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
##########
@@ -100,21 +135,23 @@ public ConfigInfos validateConfigs(
 
     @GET
     @Path("/")
-    public List<ConnectorPluginInfo> listConnectorPlugins() {
-        return getConnectorPlugins();
-    }
-
-    // TODO: improve once plugins are allowed to be added/removed during 
runtime.
-    private synchronized List<ConnectorPluginInfo> getConnectorPlugins() {
-        if (connectorPlugins.isEmpty()) {
-            for (PluginDesc<Connector> plugin : herder.plugins().connectors()) 
{
-                if (!CONNECTOR_EXCLUDES.contains(plugin.pluginClass())) {
-                    connectorPlugins.add(new ConnectorPluginInfo(plugin));
-                }
+    public List<ConnectorPluginInfo> 
listConnectorPlugins(@DefaultValue("true") @QueryParam("connectorsOnly") 
boolean connectorsOnly) {
+        synchronized (this) {
+            if (connectorsOnly) {
+                Set<String> types = new 
HashSet<>(Arrays.asList(PluginType.SINK.toString(), 
PluginType.SOURCE.toString()));
+                return 
Collections.unmodifiableList(connectorPlugins.stream().filter(p -> 
types.contains(p.type())).collect(Collectors.toList()));

Review comment:
       Is it worth using `Set.contains` for this, rather than `.equals` and 
`||`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to