tombentley commented on a change in pull request #11572: URL: https://github.com/apache/kafka/pull/11572#discussion_r817623175
########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java ########## @@ -127,10 +131,26 @@ public DelegatingClassLoader(List<String> pluginPaths) { this(pluginPaths, DelegatingClassLoader.class.getClassLoader()); } + @SuppressWarnings("unchecked") public Set<PluginDesc<Connector>> connectors() { + Set<PluginDesc<Connector>> connectors = new TreeSet<>(); + for (PluginDesc<SinkConnector> sinkConnector : sinkConnectors) { + connectors.add((PluginDesc<Connector>) (PluginDesc<? extends Connector>) sinkConnector); + } + for (PluginDesc<SourceConnector> sourceConnector : sourceConnectors) { + connectors.add((PluginDesc<Connector>) (PluginDesc<? extends Connector>) sourceConnector); + } return connectors; Review comment: ```suggestion Set<PluginDesc<Connector>> connectors = new TreeSet<>((Set) sinkConnectors); connectors.addAll((Set) sourceConnectors); return connectors; ``` ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java ########## @@ -750,4 +755,41 @@ private String trace(Throwable t) { return keys; } + @Override + public List<ConfigKeyInfo> connectorPluginConfig(String pluginName) { Review comment: Should it really be called `connectorPluginConfig` when it handles other plugins too? ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorPluginInfo.java ########## @@ -17,30 +17,32 @@ package org.apache.kafka.connect.runtime.rest.entities; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; -import org.apache.kafka.connect.connector.Connector; +import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader; import org.apache.kafka.connect.runtime.isolation.PluginDesc; +import org.apache.kafka.connect.runtime.isolation.PluginType; import java.util.Objects; public class ConnectorPluginInfo { Review comment: Should the name remain as `ConnectorPluginInfo` when it's no longer just for connector plugins? ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginType.java ########## @@ -17,22 +17,24 @@ package org.apache.kafka.connect.runtime.isolation; import org.apache.kafka.common.config.provider.ConfigProvider; -import org.apache.kafka.connect.connector.Connector; import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy; import org.apache.kafka.connect.rest.ConnectRestExtension; import org.apache.kafka.connect.sink.SinkConnector; import org.apache.kafka.connect.source.SourceConnector; import org.apache.kafka.connect.storage.Converter; +import org.apache.kafka.connect.storage.HeaderConverter; import org.apache.kafka.connect.transforms.Transformation; +import org.apache.kafka.connect.transforms.predicates.Predicate; import java.util.Locale; public enum PluginType { SOURCE(SourceConnector.class), SINK(SinkConnector.class), - CONNECTOR(Connector.class), Review comment: This makes me wonder why CONNECTOR was ever in this enum. Do you know @mimaison / @C0urante ? ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java ########## @@ -100,21 +135,23 @@ public ConfigInfos validateConfigs( @GET @Path("/") - public List<ConnectorPluginInfo> listConnectorPlugins() { - return getConnectorPlugins(); - } - - // TODO: improve once plugins are allowed to be added/removed during runtime. - private synchronized List<ConnectorPluginInfo> getConnectorPlugins() { - if (connectorPlugins.isEmpty()) { - for (PluginDesc<Connector> plugin : herder.plugins().connectors()) { - if (!CONNECTOR_EXCLUDES.contains(plugin.pluginClass())) { - connectorPlugins.add(new ConnectorPluginInfo(plugin)); - } + public List<ConnectorPluginInfo> listConnectorPlugins(@DefaultValue("true") @QueryParam("connectorsOnly") boolean connectorsOnly) { + synchronized (this) { + if (connectorsOnly) { + Set<String> types = new HashSet<>(Arrays.asList(PluginType.SINK.toString(), PluginType.SOURCE.toString())); + return Collections.unmodifiableList(connectorPlugins.stream().filter(p -> types.contains(p.type())).collect(Collectors.toList())); Review comment: Is it worth using `Set.contains` for this, rather than `.equals` and `||`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org