This is an automated email from the ASF dual-hosted git repository.

gharris pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a9efca0bf63 KAFKA-14759: Move Mock, Schema, and Verifiable connectors 
to new test-plugins module (#13302)
a9efca0bf63 is described below

commit a9efca0bf63110d68f84fc9841d8a31f245e10e0
Author: Greg Harris <[email protected]>
AuthorDate: Wed Aug 16 10:30:24 2023 -0700

    KAFKA-14759: Move Mock, Schema, and Verifiable connectors to new 
test-plugins module (#13302)
    
    Reviewers: Hector Geraldino <[email protected]>, Chris Egerton 
<[email protected]>
---
 build.gradle                                       | 16 +++++++-
 .../rest/resources/ConnectorPluginsResource.java   | 34 ++++------------
 .../resources/ConnectorPluginsResourceTest.java    | 10 -----
 .../apache/kafka/connect/tools/MockConnector.java  |  0
 .../kafka/connect/tools/MockSinkConnector.java     |  0
 .../apache/kafka/connect/tools/MockSinkTask.java   |  0
 .../kafka/connect/tools/MockSourceConnector.java   |  0
 .../apache/kafka/connect/tools/MockSourceTask.java |  0
 .../kafka/connect/tools/SchemaSourceConnector.java |  0
 .../kafka/connect/tools/SchemaSourceTask.java      |  0
 .../connect/tools/VerifiableSinkConnector.java     |  0
 .../kafka/connect/tools/VerifiableSinkTask.java    |  0
 .../connect/tools/VerifiableSourceConnector.java   |  0
 .../kafka/connect/tools/VerifiableSourceTask.java  |  0
 .../org.apache.kafka.connect.sink.SinkConnector    |  0
 ...org.apache.kafka.connect.source.SourceConnector |  0
 settings.gradle                                    |  1 +
 tests/kafkatest/services/connect.py                | 45 ++++++++++++----------
 18 files changed, 48 insertions(+), 58 deletions(-)

diff --git a/build.gradle b/build.gradle
index 23c8b4b9ee9..1787987133d 100644
--- a/build.gradle
+++ b/build.gradle
@@ -829,6 +829,7 @@ def connectPkgs = [
     'connect:file',
     'connect:json',
     'connect:runtime',
+    'connect:test-plugins',
     'connect:transforms',
     'connect:mirror',
     'connect:mirror-client'
@@ -2768,7 +2769,6 @@ project(':connect:runtime') {
     implementation libs.reflections
     implementation libs.mavenArtifact
     implementation libs.swaggerAnnotations
-    implementation project(':server-common')
 
     // We use this library to generate OpenAPI docs for the REST API, but we 
don't want or need it at compile
     // or run time. So, we add it to a separate configuration, which we use 
later on during docs generation
@@ -2778,6 +2778,8 @@ project(':connect:runtime') {
     testImplementation project(':core')
     testImplementation project(':metadata')
     testImplementation project(':core').sourceSets.test.output
+    testImplementation project(':server-common')
+    testImplementation project(':connect:test-plugins')
 
     testImplementation libs.easymock
     testImplementation libs.junitJupiterApi
@@ -3075,6 +3077,18 @@ project(':connect:mirror-client') {
   }
 }
 
+project(':connect:test-plugins') {
+  archivesBaseName = "connect-test-plugins"
+
+  dependencies {
+    api project(':connect:api')
+
+    implementation project(':server-common')
+    implementation libs.slf4jApi
+    implementation libs.jacksonDatabind
+  }
+}
+
 task aggregatedJavadoc(type: Javadoc, dependsOn: compileJava) {
   def projectsWithJavadoc = subprojects.findAll { it.javadoc.enabled }
   source = projectsWithJavadoc.collect { it.sourceSets.main.allJava }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
index ad8ff00cb96..947c467ae1a 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
@@ -26,13 +26,6 @@ import 
org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
 import org.apache.kafka.connect.runtime.rest.entities.ConfigKeyInfo;
 import org.apache.kafka.connect.runtime.rest.entities.PluginInfo;
 import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
-import org.apache.kafka.connect.sink.SinkConnector;
-import org.apache.kafka.connect.source.SourceConnector;
-import org.apache.kafka.connect.tools.MockSinkConnector;
-import org.apache.kafka.connect.tools.MockSourceConnector;
-import org.apache.kafka.connect.tools.SchemaSourceConnector;
-import org.apache.kafka.connect.tools.VerifiableSinkConnector;
-import org.apache.kafka.connect.tools.VerifiableSourceConnector;
 import org.apache.kafka.connect.util.FutureCallback;
 
 import javax.ws.rs.BadRequestException;
@@ -47,7 +40,6 @@ import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -66,34 +58,22 @@ public class ConnectorPluginsResource implements 
ConnectResource {
     private final List<PluginInfo> connectorPlugins;
     private long requestTimeoutMs;
 
-    static final List<Class<? extends SinkConnector>> SINK_CONNECTOR_EXCLUDES 
= Arrays.asList(
-            VerifiableSinkConnector.class,
-            MockSinkConnector.class
-    );
-
-    static final List<Class<? extends SourceConnector>> 
SOURCE_CONNECTOR_EXCLUDES = Arrays.asList(
-            VerifiableSourceConnector.class,
-            MockSourceConnector.class,
-            SchemaSourceConnector.class
-    );
-
     public ConnectorPluginsResource(Herder herder) {
         this.herder = herder;
         this.connectorPlugins = new ArrayList<>();
         this.requestTimeoutMs = DEFAULT_REST_REQUEST_TIMEOUT_MS;
 
         // TODO: improve once plugins are allowed to be added/removed during 
runtime.
-        addConnectorPlugins(herder.plugins().sinkConnectors(), 
SINK_CONNECTOR_EXCLUDES);
-        addConnectorPlugins(herder.plugins().sourceConnectors(), 
SOURCE_CONNECTOR_EXCLUDES);
-        addConnectorPlugins(herder.plugins().transformations(), 
Collections.emptySet());
-        addConnectorPlugins(herder.plugins().predicates(), 
Collections.emptySet());
-        addConnectorPlugins(herder.plugins().converters(), 
Collections.emptySet());
-        addConnectorPlugins(herder.plugins().headerConverters(), 
Collections.emptySet());
+        addConnectorPlugins(herder.plugins().sinkConnectors());
+        addConnectorPlugins(herder.plugins().sourceConnectors());
+        addConnectorPlugins(herder.plugins().transformations());
+        addConnectorPlugins(herder.plugins().predicates());
+        addConnectorPlugins(herder.plugins().converters());
+        addConnectorPlugins(herder.plugins().headerConverters());
     }
 
-    private <T> void addConnectorPlugins(Collection<PluginDesc<T>> plugins, 
Collection<Class<? extends T>> excludes) {
+    private <T> void addConnectorPlugins(Collection<PluginDesc<T>> plugins) {
         plugins.stream()
-                .filter(p -> !excludes.contains(p.pluginClass()))
                 .map(PluginInfo::new)
                 .forEach(connectorPlugins::add);
     }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
index 94304b87810..c39017adc40 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
@@ -342,12 +342,8 @@ public class ConnectorPluginsResourceTest {
 
     @Test
     public void testListConnectorPlugins() {
-        Set<Class<?>> excludes = 
Stream.of(ConnectorPluginsResource.SINK_CONNECTOR_EXCLUDES, 
ConnectorPluginsResource.SOURCE_CONNECTOR_EXCLUDES)
-                .flatMap(Collection::stream)
-                .collect(Collectors.toSet());
         Set<PluginInfo> expectedConnectorPlugins = 
Stream.of(SINK_CONNECTOR_PLUGINS, SOURCE_CONNECTOR_PLUGINS)
                 .flatMap(Collection::stream)
-                .filter(p -> !excludes.contains(p.pluginClass()))
                 .map(PluginInfo::new)
                 .collect(Collectors.toSet());
         Set<PluginInfo> actualConnectorPlugins = new 
HashSet<>(connectorPluginsResource.listConnectorPlugins(true));
@@ -390,11 +386,6 @@ public class ConnectorPluginsResourceTest {
 
     @Test
     public void testListAllPlugins() {
-        Set<Class<?>> excludes = Stream.of(
-                        ConnectorPluginsResource.SINK_CONNECTOR_EXCLUDES,
-                        ConnectorPluginsResource.SOURCE_CONNECTOR_EXCLUDES
-                ).flatMap(Collection::stream)
-                .collect(Collectors.toSet());
         Set<PluginInfo> expectedConnectorPlugins = Stream.of(
                         SINK_CONNECTOR_PLUGINS,
                         SOURCE_CONNECTOR_PLUGINS,
@@ -403,7 +394,6 @@ public class ConnectorPluginsResourceTest {
                         TRANSFORMATION_PLUGINS,
                         PREDICATE_PLUGINS
                 ).flatMap(Collection::stream)
-                .filter(p -> !excludes.contains(p.pluginClass()))
                 .map(PluginInfo::new)
                 .collect(Collectors.toSet());
         Set<PluginInfo> actualConnectorPlugins = new 
HashSet<>(connectorPluginsResource.listConnectorPlugins(false));
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockConnector.java
 
b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/MockConnector.java
similarity index 100%
rename from 
connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockConnector.java
rename to 
connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/MockConnector.java
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockSinkConnector.java
 
b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/MockSinkConnector.java
similarity index 100%
rename from 
connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockSinkConnector.java
rename to 
connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/MockSinkConnector.java
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockSinkTask.java
 
b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/MockSinkTask.java
similarity index 100%
rename from 
connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockSinkTask.java
rename to 
connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/MockSinkTask.java
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockSourceConnector.java
 
b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/MockSourceConnector.java
similarity index 100%
rename from 
connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockSourceConnector.java
rename to 
connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/MockSourceConnector.java
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockSourceTask.java
 
b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/MockSourceTask.java
similarity index 100%
rename from 
connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockSourceTask.java
rename to 
connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/MockSourceTask.java
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/SchemaSourceConnector.java
 
b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/SchemaSourceConnector.java
similarity index 100%
rename from 
connect/runtime/src/main/java/org/apache/kafka/connect/tools/SchemaSourceConnector.java
rename to 
connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/SchemaSourceConnector.java
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/SchemaSourceTask.java
 
b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/SchemaSourceTask.java
similarity index 100%
rename from 
connect/runtime/src/main/java/org/apache/kafka/connect/tools/SchemaSourceTask.java
rename to 
connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/SchemaSourceTask.java
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkConnector.java
 
b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkConnector.java
similarity index 100%
rename from 
connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkConnector.java
rename to 
connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkConnector.java
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkTask.java
 
b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkTask.java
similarity index 100%
rename from 
connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkTask.java
rename to 
connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkTask.java
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceConnector.java
 
b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceConnector.java
similarity index 100%
rename from 
connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceConnector.java
rename to 
connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceConnector.java
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java
 
b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java
similarity index 100%
rename from 
connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java
rename to 
connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java
diff --git 
a/connect/runtime/src/main/resources/META-INF/services/org.apache.kafka.connect.sink.SinkConnector
 
b/connect/test-plugins/src/main/resources/META-INF/services/org.apache.kafka.connect.sink.SinkConnector
similarity index 100%
rename from 
connect/runtime/src/main/resources/META-INF/services/org.apache.kafka.connect.sink.SinkConnector
rename to 
connect/test-plugins/src/main/resources/META-INF/services/org.apache.kafka.connect.sink.SinkConnector
diff --git 
a/connect/runtime/src/main/resources/META-INF/services/org.apache.kafka.connect.source.SourceConnector
 
b/connect/test-plugins/src/main/resources/META-INF/services/org.apache.kafka.connect.source.SourceConnector
similarity index 100%
rename from 
connect/runtime/src/main/resources/META-INF/services/org.apache.kafka.connect.source.SourceConnector
rename to 
connect/test-plugins/src/main/resources/META-INF/services/org.apache.kafka.connect.source.SourceConnector
diff --git a/settings.gradle b/settings.gradle
index 0f4f7e52d94..09cb1b44d82 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -55,6 +55,7 @@ include 'clients',
     'connect:mirror',
     'connect:mirror-client',
     'connect:runtime',
+    'connect:test-plugins',
     'connect:transforms',
     'core',
     'examples',
diff --git a/tests/kafkatest/services/connect.py 
b/tests/kafkatest/services/connect.py
index dce275665ba..2cc93aa5b3b 100644
--- a/tests/kafkatest/services/connect.py
+++ b/tests/kafkatest/services/connect.py
@@ -285,26 +285,31 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
             env_opts = "\"%s %s\"" % (env_opts.strip('\"'), value)
         self.environment[envvar] = env_opts
 
-    def append_filestream_connectors_to_classpath(self):
+    def maybe_append_filestream_connectors_to_classpath(self):
         if self.include_filestream_connectors:
-            cwd = os.getcwd()
-            self.logger.info("Including filestream connectors when starting 
Connect. "
-                             "Looking for jar locally in: %s" % cwd)
-            relative_path = "/connect/file/build/libs/"
-            local_dir = cwd + relative_path
-            lib_dir = self.path.home() + relative_path
-            for pwd, dirs, files in os.walk(local_dir):
-                for file in files:
-                    if file.startswith("connect-file") and 
file.endswith(".jar"):
-                        # Use the expected directory on the node instead of 
the path in the driver node
-                        file_path = lib_dir + file
-                        self.logger.debug("Appending %s to Connect worker's 
CLASSPATH" % file_path)
-                        return "export CLASSPATH=${CLASSPATH}:%s; " % file_path
-            self.logger.info("Jar with filestream connectors was not found 
under %s" % lib_dir)
+            return self.append_module_to_classpath("file")
         else:
             self.logger.info("Starting Connect without filestream connectors 
in the CLASSPATH")
+        return ""
 
-        return None
+    def append_test_plugins_to_classpath(self):
+        return self.append_module_to_classpath("test-plugins")
+
+    def append_module_to_classpath(self, module):
+        cwd = os.getcwd()
+        relative_path = "/connect/" + module + "/build/libs/"
+        local_dir = cwd + relative_path
+        lib_dir = self.path.home() + relative_path
+        for pwd, dirs, files in os.walk(local_dir):
+            for file in files:
+                if file.endswith(".jar"):
+                    # Use the expected directory on the node instead of the 
path in the driver node
+                    file_path = lib_dir + file
+                    self.logger.info("Appending %s to Connect worker's 
CLASSPATH" % file_path)
+                    return "export CLASSPATH=${CLASSPATH}:%s; " % file_path
+
+        self.logger.info("Jar not found within %s" % local_dir)
+        return ""
 
 
 class ConnectStandaloneService(ConnectServiceBase):
@@ -327,8 +332,8 @@ class ConnectStandaloneService(ConnectServiceBase):
 
         cmd += fix_opts_for_new_jvm(node)
         cmd += "export KAFKA_OPTS=\"%s %s\"; " % (heap_kafka_opts, 
other_kafka_opts)
-        classpath = self.append_filestream_connectors_to_classpath()
-        cmd += classpath if classpath else ""
+        cmd += self.append_test_plugins_to_classpath()
+        cmd += self.maybe_append_filestream_connectors_to_classpath()
 
         for envvar in self.environment:
             cmd += "export %s=%s; " % (envvar, str(self.environment[envvar]))
@@ -388,8 +393,8 @@ class ConnectDistributedService(ConnectServiceBase):
         for envvar in self.environment:
             cmd += "export %s=%s; " % (envvar, str(self.environment[envvar]))
 
-        classpath = self.append_filestream_connectors_to_classpath()
-        cmd += classpath if classpath else ""
+        cmd += self.maybe_append_filestream_connectors_to_classpath()
+        cmd += self.append_test_plugins_to_classpath()
         cmd += "%s %s " % (self.path.script("connect-distributed.sh", node), 
self.CONFIG_FILE)
         cmd += " & echo $! >&3 ) 1>> %s 2>> %s 3> %s" % (self.STDOUT_FILE, 
self.STDERR_FILE, self.PID_FILE)
         return cmd

Reply via email to