C0urante commented on a change in pull request #11572:
URL: https://github.com/apache/kafka/pull/11572#discussion_r814358703



##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
##########
@@ -168,6 +173,44 @@ public Connector newConnector(String 
connectorClassOrAlias) {
         return newPlugin(klass);
     }
 
+    public Converter newConverter(String className) throws 
ClassNotFoundException {

Review comment:
       Nit: Might be nice to have (brief) Javadocs on these methods. I was 
thinking "don't we already use the `Plugins` class to instantiate converters? 
Why do we need another?". After taking a look it's clear now that this is for 
just instantiating a converter in order to grab its `ConfigDef`, and the other 
methods (like `Plugins.newConverter(AbstractConfig config, String 
classPropertyName, ClassLoaderUsage classLoaderUsage)`) are for instantiating 
and configuring converters for use with tasks.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
##########
@@ -56,16 +69,49 @@
     private static final String ALIAS_SUFFIX = "Connector";
     private final Herder herder;
     private final List<ConnectorPluginInfo> connectorPlugins;
+    private final Map<String, PluginType> pluginsByType;
 
-    private static final List<Class<? extends Connector>> CONNECTOR_EXCLUDES = 
Arrays.asList(
+    static final List<Class<? extends Connector>> CONNECTOR_EXCLUDES = 
Arrays.asList(
             VerifiableSourceConnector.class, VerifiableSinkConnector.class,
             MockConnector.class, MockSourceConnector.class, 
MockSinkConnector.class,
             SchemaSourceConnector.class
     );
 
+    @SuppressWarnings("rawtypes")
+    static final List<Class<? extends Transformation>> TRANSFORM_EXCLUDES = 
Arrays.asList(
+            PredicatedTransformation.class
+    );
+
     public ConnectorPluginsResource(Herder herder) {
         this.herder = herder;
         this.connectorPlugins = new ArrayList<>();
+        this.pluginsByType = new HashMap<>();
+
+        // TODO: improve once plugins are allowed to be added/removed during 
runtime.
+        for (PluginDesc<Connector> plugin : herder.plugins().connectors()) {
+            if (!CONNECTOR_EXCLUDES.contains(plugin.pluginClass())) {
+                connectorPlugins.add(new ConnectorPluginInfo(plugin, 
PluginType.from(plugin.pluginClass())));
+                pluginsByType.put(getAlias(plugin.className()), 
PluginType.from(plugin.pluginClass()));
+            }
+        }
+        for (PluginDesc<Transformation<?>> transform : 
herder.plugins().transformations()) {
+            if (!TRANSFORM_EXCLUDES.contains(transform.pluginClass())) {
+                connectorPlugins.add(new ConnectorPluginInfo(transform, 
PluginType.TRANSFORMATION));
+                pluginsByType.put(getAlias(transform.className()), 
PluginType.TRANSFORMATION);
+            }
+        }
+        for (PluginDesc<Predicate<?>> predicate : 
herder.plugins().predicates()) {
+            connectorPlugins.add(new ConnectorPluginInfo(predicate, 
PluginType.PREDICATE));
+            pluginsByType.put(getAlias(predicate.className()), 
PluginType.PREDICATE);
+        }
+        for (PluginDesc<Converter> converter : herder.plugins().converters()) {
+            connectorPlugins.add(new ConnectorPluginInfo(converter, 
PluginType.CONVERTER));
+            pluginsByType.put(getAlias(converter.className()), 
PluginType.CONVERTER);
+        }
+        for (PluginDesc<HeaderConverter> headerConverter : 
herder.plugins().headerConverters()) {
+            connectorPlugins.add(new ConnectorPluginInfo(headerConverter, 
PluginType.HEADER_CONVERTER));
+            pluginsByType.put(getAlias(headerConverter.className()), 
PluginType.HEADER_CONVERTER);
+        }

Review comment:
       Also, could we abstract and simplify the logic for populating 
`connectorPlugins`? It looks like the only wrinkle here preventing us from 
writing a generic `private void addConnectorPlugins(Collection<PluginDesc<?>>, 
PluginType pluginType)` method and then invoking that once each for sink 
connectors, source connectors, converters, etc. is that the `Plugins` class 
doesn't expose source and sink connectors separately.
   
   Considering that `Plugins::connectors` is only used (excluding test code) in 
one place--this class--could we refactor that into two separate 
`Plugins::sinkConnectors` and `Plugins::sourceConnectors` methods?

##########
File path: 
connect/api/src/main/java/org/apache/kafka/connect/storage/Converter.java
##########
@@ -86,4 +87,12 @@
     default SchemaAndValue toConnectData(String topic, Headers headers, byte[] 
value) {
         return toConnectData(topic, value);
     }
+
+    /**
+     * Configuration specification for this set of converters.

Review comment:
       I know this matches the Javadoc from the KIP but I'm wondering why we're 
using "converters" plural here? Would it be more clear to say "for this 
converter" instead?

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
##########
@@ -613,7 +614,7 @@ public static ConfigInfos generateResult(String connType, 
Map<String, ConfigKey>
         return new ConfigInfos(connType, errorCount, groups, configInfoList);
     }
 
-    private static ConfigKeyInfo convertConfigKey(ConfigKey configKey) {
+    public static ConfigKeyInfo convertConfigKey(ConfigKey configKey) {

Review comment:
       Nit: kind of strange that this method (and honestly a lot of the config 
wrangling logic) is contained in the `AbstractHerder` class. Might be worth 
refactoring into a separate config utils class/package. Probably best to leave 
for a separate PR, but if you agree that that'd be cleaner I could see about 
doing that work as a follow-up.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
##########
@@ -168,6 +173,44 @@ public Connector newConnector(String 
connectorClassOrAlias) {
         return newPlugin(klass);
     }
 
+    public Converter newConverter(String className) throws 
ClassNotFoundException {
+        Class<? extends Converter> klass = pluginClass(
+                delegatingLoader,
+                className,
+                Converter.class
+        );
+        return newPlugin(klass);
+    }
+
+    public HeaderConverter newHeaderConverter(String className) throws 
ClassNotFoundException {
+        Class<? extends HeaderConverter> klass = pluginClass(
+                delegatingLoader,
+                className,
+                HeaderConverter.class
+        );
+        return newPlugin(klass);
+    }
+
+    @SuppressWarnings("rawtypes")
+    public Predicate newPredicate(String className) throws 
ClassNotFoundException {

Review comment:
       This should match the type-safe declaration for `newTransformation` IMO:
   
   ```suggestion
       @SuppressWarnings({"unchecked", "rawtypes"})
       public <R extends ConnectRecord<R>> Predicate<R> newPredicate(String 
className) throws ClassNotFoundException {
   ```
   
   I think this is due to testing woes with the 
`AbstractHerderTest::testConnectorPluginConfig` test case; I've left a 
suggestion in that section for how to keep the type safety here with a small 
change there.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorPluginInfo.java
##########
@@ -79,11 +82,21 @@ public int hashCode() {
 
     @Override
     public String toString() {
-        final StringBuilder sb = new StringBuilder("ConnectorPluginInfo{");
-        sb.append("className='").append(className).append('\'');
-        sb.append(", type=").append(type);
-        sb.append(", version='").append(version).append('\'');
-        sb.append('}');
-        return sb.toString();
+        return "ConnectorPluginInfo{" + "className='" + className + '\'' +
+                ", type=" + type.toString() +
+                ", version='" + version + '\'' +
+                '}';
+    }
+
+    public static final class NoVersionFilter {
+        @Override
+        public boolean equals(Object obj) {
+            return DelegatingClassLoader.UNDEFINED_VERSION.equals(obj);
+        }
+
+        @Override
+        public int hashCode() {
+            return super.hashCode();
+        }

Review comment:
       Do we need this method declaration at all? Looks like it might have been 
left in accidentally while trying to wrestle with SpotBugs?
   ```suggestion
   ```

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
##########
@@ -750,4 +751,38 @@ private String trace(Throwable t) {
         return keys;
     }
 
+    @Override

Review comment:
       (Left thoughts in `ConnectorPluginsResource` about possible ways we 
could refactor this to reduce the workload on the REST layer to handle some of 
the classloading logic)

##########
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
##########
@@ -901,6 +902,48 @@ public void 
testGenerateResultWithConfigValuesWithNoConfigKeysAndWithSomeErrors(
         assertInfoValue(infos, "config.extra2", "value.extra2", "error 
extra2");
     }
 
+    @Test
+    public void testConnectorPluginConfig() throws Exception {
+        AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
+                .withConstructor(
+                        Worker.class,
+                        String.class,
+                        String.class,
+                        StatusBackingStore.class,
+                        ConfigBackingStore.class,
+                        ConnectorClientConfigOverridePolicy.class
+                )
+                .withArgs(worker, workerId, kafkaClusterId, statusStore, 
configStore, noneConnectorClientConfigOverridePolicy)
+                .addMockedMethod("generation")
+                .createMock();
+
+        
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(new 
SampleSourceConnector()).anyTimes();
+        
EasyMock.expect(plugins.newConverter(EasyMock.anyString())).andReturn(new 
SampleConverterWithHeaders()).anyTimes();
+        
EasyMock.expect(plugins.newHeaderConverter(EasyMock.anyString())).andReturn(new 
SampleHeaderConverter()).anyTimes();
+        
EasyMock.expect(plugins.newPredicate(EasyMock.anyString())).andReturn(new 
SamplePredicate()).anyTimes();

Review comment:
       If we want to keep the type-safe signature for `Plugins::newPredicate`:
   ```suggestion
           
EasyMock.expect(plugins.<SourceRecord>newPredicate(EasyMock.anyString())).andReturn(new
 SamplePredicate()).anyTimes();
   ```
   
   Note that this also requires adding an import for 
`org.apache.kafka.connect.source.SourceRecord` to this test class.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
##########
@@ -123,4 +171,15 @@ private String normalizedPluginName(String pluginName) {
             ? pluginName.substring(0, pluginName.length() - 
ALIAS_SUFFIX.length())
             : pluginName;
     }
+
+    String getAlias(String name) {
+        name = normalizedPluginName(name);
+        int lastIndexOf = name.lastIndexOf('.');
+        return lastIndexOf >= 0 ? name.substring(lastIndexOf + 1) : name;
+    }
+
+    private synchronized List<ConfigKeyInfo> doGetConfigDef(final String 
pluginName) {
+        PluginType pluginType = 
pluginsByType.getOrDefault(getAlias(pluginName), PluginType.UNKNOWN);

Review comment:
       Also, it seems like using a default of `PluginType.UNKNOWN` here might 
be suboptimal. If someone wants to the view the config for a REST extension, 
for example, they'll end up seeing an error message later on (in 
`AbstractHerder::connectorPluginConfig`) that says something like "Invalid 
plugin type unknown. Valid types are..."
   
   I think it'd be clearer to users if we could differentiate between these two 
cases:
   1. User requests config for a plugin that does exist on the worker, but 
which we don't expose config information via the REST API for (such as a REST 
extension or a config provider)
   2. User requests config for a plugin that doesn't exist on the worker
   
   Status-wise, In the case of 1, a 400 response probably makes sense, but for 
2, a 404 response might be more applicable.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
##########
@@ -56,16 +69,49 @@
     private static final String ALIAS_SUFFIX = "Connector";
     private final Herder herder;
     private final List<ConnectorPluginInfo> connectorPlugins;
+    private final Map<String, PluginType> pluginsByType;
 
-    private static final List<Class<? extends Connector>> CONNECTOR_EXCLUDES = 
Arrays.asList(
+    static final List<Class<? extends Connector>> CONNECTOR_EXCLUDES = 
Arrays.asList(
             VerifiableSourceConnector.class, VerifiableSinkConnector.class,
             MockConnector.class, MockSourceConnector.class, 
MockSinkConnector.class,
             SchemaSourceConnector.class
     );
 
+    @SuppressWarnings("rawtypes")
+    static final List<Class<? extends Transformation>> TRANSFORM_EXCLUDES = 
Arrays.asList(
+            PredicatedTransformation.class
+    );
+
     public ConnectorPluginsResource(Herder herder) {
         this.herder = herder;
         this.connectorPlugins = new ArrayList<>();
+        this.pluginsByType = new HashMap<>();
+
+        // TODO: improve once plugins are allowed to be added/removed during 
runtime.
+        for (PluginDesc<Connector> plugin : herder.plugins().connectors()) {
+            if (!CONNECTOR_EXCLUDES.contains(plugin.pluginClass())) {
+                connectorPlugins.add(new ConnectorPluginInfo(plugin, 
PluginType.from(plugin.pluginClass())));
+                pluginsByType.put(getAlias(plugin.className()), 
PluginType.from(plugin.pluginClass()));
+            }
+        }
+        for (PluginDesc<Transformation<?>> transform : 
herder.plugins().transformations()) {
+            if (!TRANSFORM_EXCLUDES.contains(transform.pluginClass())) {
+                connectorPlugins.add(new ConnectorPluginInfo(transform, 
PluginType.TRANSFORMATION));
+                pluginsByType.put(getAlias(transform.className()), 
PluginType.TRANSFORMATION);
+            }
+        }
+        for (PluginDesc<Predicate<?>> predicate : 
herder.plugins().predicates()) {
+            connectorPlugins.add(new ConnectorPluginInfo(predicate, 
PluginType.PREDICATE));
+            pluginsByType.put(getAlias(predicate.className()), 
PluginType.PREDICATE);
+        }
+        for (PluginDesc<Converter> converter : herder.plugins().converters()) {
+            connectorPlugins.add(new ConnectorPluginInfo(converter, 
PluginType.CONVERTER));
+            pluginsByType.put(getAlias(converter.className()), 
PluginType.CONVERTER);
+        }
+        for (PluginDesc<HeaderConverter> headerConverter : 
herder.plugins().headerConverters()) {
+            connectorPlugins.add(new ConnectorPluginInfo(headerConverter, 
PluginType.HEADER_CONVERTER));
+            pluginsByType.put(getAlias(headerConverter.className()), 
PluginType.HEADER_CONVERTER);
+        }

Review comment:
       It seems like we're duplicating some of the logic contained in `Plugins` 
into this class by tracking class alias names and pre-computing plugin type 
based on them.
   
   Did you consider a `Herder` method that only accepted the name of the 
plugin, and took on the responsibility of deducing the plugin type itself?
   ```java
   List<ConfigKeyInfo> connectorPluginConfig(String pluginName);
   ```
   
   In `AbstractHerder`, we could do something like this:
   ```java
       @Override
       public List<ConfigKeyInfo> connectorPluginConfig(String pluginName) {
           try {
               Object plugin = Plugins.newPlugin(pluginName);
               PluginType pluginType = PluginType.from(plugin.class);
               List<ConfigKeyInfo> results = new ArrayList<>();
               ConfigDef configDefs;
               switch (pluginType) {
                   case SINK:
                   case SOURCE:
                       configDefs = ((Connector) plugin).config();
                       break;
                   case CONVERTER:
                       configDefs = ((Converter) plugin).config();
                       break;
               // ... Rest of switch statement follows same pattern, and rest 
of the method remains unchanged
       }
   ```
   
   And in `Plugins` we could do this:
   ```java
       public Object newPlugin(String classOrAlias) throws 
ClassNotFoundException {
           Class<? extends Object> klass = pluginClass(delegatingLoader, 
classOrAlias, Object.class);
           return newPlugin(klass);
       }
   ```
   
   
   Or alternatively, we could introduce a common interface for plugins that 
expose a `ConfigDef`:
   ```java
   interface DefinedConfigPlugin {
       ConfigDef config();
   }
   ```
   (this could be kept package-private so as not to quality as public interface)
   
   And we could simplify some of the `AbstractHerder` logic:
   ```java
       @Override
       public List<ConfigKeyInfo> connectorPluginConfig(String pluginName) {
           try {
               DefinedConfigPlugin plugin = 
Plugins.newDefinedConfigPlugin(pluginName);
               ConfigDef configDefs = plugin.config();
               // No switch statement on plugin type necessary
               // ... Rest of the method  remains unchanged
       }
   ```
   
   And `Plugins` would still be fairly simple:
   ```java
       public DefinedConfigPlugin newDefinedConfigPlugin(String classOrAlias) 
throws ClassNotFoundException {
           Class<? extends DefinedConfigPlugin> klass = 
pluginClass(delegatingLoader, classOrAlias, DefinedConfigPlugin.class);
           return newPlugin(klass);
       }
   ```
   

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorPluginInfo.java
##########
@@ -17,30 +17,32 @@
 package org.apache.kafka.connect.runtime.rest.entities;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader;
 import org.apache.kafka.connect.runtime.isolation.PluginDesc;
+import org.apache.kafka.connect.runtime.isolation.PluginType;
 
 import java.util.Objects;
 
 public class ConnectorPluginInfo {
     private final String className;
-    private final ConnectorType type;
+    private final PluginType type;
     private final String version;
 
     @JsonCreator
     public ConnectorPluginInfo(
         @JsonProperty("class") String className,
-        @JsonProperty("type") ConnectorType type,
+        @JsonProperty("type") PluginType type,
         @JsonProperty("version") String version
     ) {
         this.className = className;
         this.type = type;
         this.version = version;
     }
 
-    public ConnectorPluginInfo(PluginDesc<Connector> plugin) {
-        this(plugin.className(), ConnectorType.from(plugin.pluginClass()), 
plugin.version());
+    public ConnectorPluginInfo(PluginDesc<?> plugin, PluginType type) {
+        this(plugin.className(), type, plugin.version());

Review comment:
       Isn't the `PluginType` argument redundant if we already have a 
`PluginDesc`?
   ```suggestion
       public ConnectorPluginInfo(PluginDesc<?> plugin) {
           this(plugin.className(), plugin.type(), plugin.version());
   ```

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
##########
@@ -123,4 +171,15 @@ private String normalizedPluginName(String pluginName) {
             ? pluginName.substring(0, pluginName.length() - 
ALIAS_SUFFIX.length())
             : pluginName;
     }
+
+    String getAlias(String name) {
+        name = normalizedPluginName(name);
+        int lastIndexOf = name.lastIndexOf('.');
+        return lastIndexOf >= 0 ? name.substring(lastIndexOf + 1) : name;
+    }
+
+    private synchronized List<ConfigKeyInfo> doGetConfigDef(final String 
pluginName) {
+        PluginType pluginType = 
pluginsByType.getOrDefault(getAlias(pluginName), PluginType.UNKNOWN);

Review comment:
       Doesn't this introduce the possibility of conflict between two plugins 
(or I guess specifically connectors, since those are the only ones we strip 
suffixes from) which have different fully-qualified class names, but the same 
simple class name? Or where they would have the same simple class name, except 
that one ends with `Connector` and the other doesn't?
   
   In practice this is unlikely to come up but if we support it at the moment, 
probably best to take care here to avoid introducing a potential regression, 
especially if someone for some reason wants to run, e.g., two different 
`MySqlSink` connectors on their worker.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
##########
@@ -100,21 +146,23 @@ public ConfigInfos validateConfigs(
 
     @GET
     @Path("/")
-    public List<ConnectorPluginInfo> listConnectorPlugins() {
-        return getConnectorPlugins();
+    public List<ConnectorPluginInfo> 
listConnectorPlugins(@DefaultValue("true") @QueryParam("connectorsOnly") 
boolean connectorsOnly) {
+        return getConnectorPlugins(connectorsOnly);

Review comment:
       Nit: I know this is following the [existing 
style](https://github.com/apache/kafka/blob/c2ee1411c8bb73fcf96c12abeedbfe6fde2c6354/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java#L101-L118)
 in the code base, but do you think the separate `getConnectorPlugins` method 
is actually bringing anything to the table readability-wise?
   
   Think we could just as easily eliminate the `getConnectorPlugins` method and 
inline it directly here. Same thought with `getConnectorConfigDef`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to