This is an automated email from the ASF dual-hosted git repository.
gharris pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a9efca0bf63 KAFKA-14759: Move Mock, Schema, and Verifiable connectors
to new test-plugins module (#13302)
a9efca0bf63 is described below
commit a9efca0bf63110d68f84fc9841d8a31f245e10e0
Author: Greg Harris <[email protected]>
AuthorDate: Wed Aug 16 10:30:24 2023 -0700
KAFKA-14759: Move Mock, Schema, and Verifiable connectors to new
test-plugins module (#13302)
Reviewers: Hector Geraldino <[email protected]>, Chris Egerton
<[email protected]>
---
build.gradle | 16 +++++++-
.../rest/resources/ConnectorPluginsResource.java | 34 ++++------------
.../resources/ConnectorPluginsResourceTest.java | 10 -----
.../apache/kafka/connect/tools/MockConnector.java | 0
.../kafka/connect/tools/MockSinkConnector.java | 0
.../apache/kafka/connect/tools/MockSinkTask.java | 0
.../kafka/connect/tools/MockSourceConnector.java | 0
.../apache/kafka/connect/tools/MockSourceTask.java | 0
.../kafka/connect/tools/SchemaSourceConnector.java | 0
.../kafka/connect/tools/SchemaSourceTask.java | 0
.../connect/tools/VerifiableSinkConnector.java | 0
.../kafka/connect/tools/VerifiableSinkTask.java | 0
.../connect/tools/VerifiableSourceConnector.java | 0
.../kafka/connect/tools/VerifiableSourceTask.java | 0
.../org.apache.kafka.connect.sink.SinkConnector | 0
...org.apache.kafka.connect.source.SourceConnector | 0
settings.gradle | 1 +
tests/kafkatest/services/connect.py | 45 ++++++++++++----------
18 files changed, 48 insertions(+), 58 deletions(-)
diff --git a/build.gradle b/build.gradle
index 23c8b4b9ee9..1787987133d 100644
--- a/build.gradle
+++ b/build.gradle
@@ -829,6 +829,7 @@ def connectPkgs = [
'connect:file',
'connect:json',
'connect:runtime',
+ 'connect:test-plugins',
'connect:transforms',
'connect:mirror',
'connect:mirror-client'
@@ -2768,7 +2769,6 @@ project(':connect:runtime') {
implementation libs.reflections
implementation libs.mavenArtifact
implementation libs.swaggerAnnotations
- implementation project(':server-common')
// We use this library to generate OpenAPI docs for the REST API, but we
don't want or need it at compile
// or run time. So, we add it to a separate configuration, which we use
later on during docs generation
@@ -2778,6 +2778,8 @@ project(':connect:runtime') {
testImplementation project(':core')
testImplementation project(':metadata')
testImplementation project(':core').sourceSets.test.output
+ testImplementation project(':server-common')
+ testImplementation project(':connect:test-plugins')
testImplementation libs.easymock
testImplementation libs.junitJupiterApi
@@ -3075,6 +3077,18 @@ project(':connect:mirror-client') {
}
}
+project(':connect:test-plugins') {
+ archivesBaseName = "connect-test-plugins"
+
+ dependencies {
+ api project(':connect:api')
+
+ implementation project(':server-common')
+ implementation libs.slf4jApi
+ implementation libs.jacksonDatabind
+ }
+}
+
task aggregatedJavadoc(type: Javadoc, dependsOn: compileJava) {
def projectsWithJavadoc = subprojects.findAll { it.javadoc.enabled }
source = projectsWithJavadoc.collect { it.sourceSets.main.allJava }
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
index ad8ff00cb96..947c467ae1a 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
@@ -26,13 +26,6 @@ import
org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
import org.apache.kafka.connect.runtime.rest.entities.ConfigKeyInfo;
import org.apache.kafka.connect.runtime.rest.entities.PluginInfo;
import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
-import org.apache.kafka.connect.sink.SinkConnector;
-import org.apache.kafka.connect.source.SourceConnector;
-import org.apache.kafka.connect.tools.MockSinkConnector;
-import org.apache.kafka.connect.tools.MockSourceConnector;
-import org.apache.kafka.connect.tools.SchemaSourceConnector;
-import org.apache.kafka.connect.tools.VerifiableSinkConnector;
-import org.apache.kafka.connect.tools.VerifiableSourceConnector;
import org.apache.kafka.connect.util.FutureCallback;
import javax.ws.rs.BadRequestException;
@@ -47,7 +40,6 @@ import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -66,34 +58,22 @@ public class ConnectorPluginsResource implements
ConnectResource {
private final List<PluginInfo> connectorPlugins;
private long requestTimeoutMs;
- static final List<Class<? extends SinkConnector>> SINK_CONNECTOR_EXCLUDES
= Arrays.asList(
- VerifiableSinkConnector.class,
- MockSinkConnector.class
- );
-
- static final List<Class<? extends SourceConnector>>
SOURCE_CONNECTOR_EXCLUDES = Arrays.asList(
- VerifiableSourceConnector.class,
- MockSourceConnector.class,
- SchemaSourceConnector.class
- );
-
public ConnectorPluginsResource(Herder herder) {
this.herder = herder;
this.connectorPlugins = new ArrayList<>();
this.requestTimeoutMs = DEFAULT_REST_REQUEST_TIMEOUT_MS;
// TODO: improve once plugins are allowed to be added/removed during
runtime.
- addConnectorPlugins(herder.plugins().sinkConnectors(),
SINK_CONNECTOR_EXCLUDES);
- addConnectorPlugins(herder.plugins().sourceConnectors(),
SOURCE_CONNECTOR_EXCLUDES);
- addConnectorPlugins(herder.plugins().transformations(),
Collections.emptySet());
- addConnectorPlugins(herder.plugins().predicates(),
Collections.emptySet());
- addConnectorPlugins(herder.plugins().converters(),
Collections.emptySet());
- addConnectorPlugins(herder.plugins().headerConverters(),
Collections.emptySet());
+ addConnectorPlugins(herder.plugins().sinkConnectors());
+ addConnectorPlugins(herder.plugins().sourceConnectors());
+ addConnectorPlugins(herder.plugins().transformations());
+ addConnectorPlugins(herder.plugins().predicates());
+ addConnectorPlugins(herder.plugins().converters());
+ addConnectorPlugins(herder.plugins().headerConverters());
}
- private <T> void addConnectorPlugins(Collection<PluginDesc<T>> plugins,
Collection<Class<? extends T>> excludes) {
+ private <T> void addConnectorPlugins(Collection<PluginDesc<T>> plugins) {
plugins.stream()
- .filter(p -> !excludes.contains(p.pluginClass()))
.map(PluginInfo::new)
.forEach(connectorPlugins::add);
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
index 94304b87810..c39017adc40 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
@@ -342,12 +342,8 @@ public class ConnectorPluginsResourceTest {
@Test
public void testListConnectorPlugins() {
- Set<Class<?>> excludes =
Stream.of(ConnectorPluginsResource.SINK_CONNECTOR_EXCLUDES,
ConnectorPluginsResource.SOURCE_CONNECTOR_EXCLUDES)
- .flatMap(Collection::stream)
- .collect(Collectors.toSet());
Set<PluginInfo> expectedConnectorPlugins =
Stream.of(SINK_CONNECTOR_PLUGINS, SOURCE_CONNECTOR_PLUGINS)
.flatMap(Collection::stream)
- .filter(p -> !excludes.contains(p.pluginClass()))
.map(PluginInfo::new)
.collect(Collectors.toSet());
Set<PluginInfo> actualConnectorPlugins = new
HashSet<>(connectorPluginsResource.listConnectorPlugins(true));
@@ -390,11 +386,6 @@ public class ConnectorPluginsResourceTest {
@Test
public void testListAllPlugins() {
- Set<Class<?>> excludes = Stream.of(
- ConnectorPluginsResource.SINK_CONNECTOR_EXCLUDES,
- ConnectorPluginsResource.SOURCE_CONNECTOR_EXCLUDES
- ).flatMap(Collection::stream)
- .collect(Collectors.toSet());
Set<PluginInfo> expectedConnectorPlugins = Stream.of(
SINK_CONNECTOR_PLUGINS,
SOURCE_CONNECTOR_PLUGINS,
@@ -403,7 +394,6 @@ public class ConnectorPluginsResourceTest {
TRANSFORMATION_PLUGINS,
PREDICATE_PLUGINS
).flatMap(Collection::stream)
- .filter(p -> !excludes.contains(p.pluginClass()))
.map(PluginInfo::new)
.collect(Collectors.toSet());
Set<PluginInfo> actualConnectorPlugins = new
HashSet<>(connectorPluginsResource.listConnectorPlugins(false));
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockConnector.java
b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/MockConnector.java
similarity index 100%
rename from
connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockConnector.java
rename to
connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/MockConnector.java
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockSinkConnector.java
b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/MockSinkConnector.java
similarity index 100%
rename from
connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockSinkConnector.java
rename to
connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/MockSinkConnector.java
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockSinkTask.java
b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/MockSinkTask.java
similarity index 100%
rename from
connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockSinkTask.java
rename to
connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/MockSinkTask.java
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockSourceConnector.java
b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/MockSourceConnector.java
similarity index 100%
rename from
connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockSourceConnector.java
rename to
connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/MockSourceConnector.java
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockSourceTask.java
b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/MockSourceTask.java
similarity index 100%
rename from
connect/runtime/src/main/java/org/apache/kafka/connect/tools/MockSourceTask.java
rename to
connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/MockSourceTask.java
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/SchemaSourceConnector.java
b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/SchemaSourceConnector.java
similarity index 100%
rename from
connect/runtime/src/main/java/org/apache/kafka/connect/tools/SchemaSourceConnector.java
rename to
connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/SchemaSourceConnector.java
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/SchemaSourceTask.java
b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/SchemaSourceTask.java
similarity index 100%
rename from
connect/runtime/src/main/java/org/apache/kafka/connect/tools/SchemaSourceTask.java
rename to
connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/SchemaSourceTask.java
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkConnector.java
b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkConnector.java
similarity index 100%
rename from
connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkConnector.java
rename to
connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkConnector.java
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkTask.java
b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkTask.java
similarity index 100%
rename from
connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkTask.java
rename to
connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkTask.java
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceConnector.java
b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceConnector.java
similarity index 100%
rename from
connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceConnector.java
rename to
connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceConnector.java
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java
b/connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java
similarity index 100%
rename from
connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java
rename to
connect/test-plugins/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceTask.java
diff --git
a/connect/runtime/src/main/resources/META-INF/services/org.apache.kafka.connect.sink.SinkConnector
b/connect/test-plugins/src/main/resources/META-INF/services/org.apache.kafka.connect.sink.SinkConnector
similarity index 100%
rename from
connect/runtime/src/main/resources/META-INF/services/org.apache.kafka.connect.sink.SinkConnector
rename to
connect/test-plugins/src/main/resources/META-INF/services/org.apache.kafka.connect.sink.SinkConnector
diff --git
a/connect/runtime/src/main/resources/META-INF/services/org.apache.kafka.connect.source.SourceConnector
b/connect/test-plugins/src/main/resources/META-INF/services/org.apache.kafka.connect.source.SourceConnector
similarity index 100%
rename from
connect/runtime/src/main/resources/META-INF/services/org.apache.kafka.connect.source.SourceConnector
rename to
connect/test-plugins/src/main/resources/META-INF/services/org.apache.kafka.connect.source.SourceConnector
diff --git a/settings.gradle b/settings.gradle
index 0f4f7e52d94..09cb1b44d82 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -55,6 +55,7 @@ include 'clients',
'connect:mirror',
'connect:mirror-client',
'connect:runtime',
+ 'connect:test-plugins',
'connect:transforms',
'core',
'examples',
diff --git a/tests/kafkatest/services/connect.py
b/tests/kafkatest/services/connect.py
index dce275665ba..2cc93aa5b3b 100644
--- a/tests/kafkatest/services/connect.py
+++ b/tests/kafkatest/services/connect.py
@@ -285,26 +285,31 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
env_opts = "\"%s %s\"" % (env_opts.strip('\"'), value)
self.environment[envvar] = env_opts
- def append_filestream_connectors_to_classpath(self):
+ def maybe_append_filestream_connectors_to_classpath(self):
if self.include_filestream_connectors:
- cwd = os.getcwd()
- self.logger.info("Including filestream connectors when starting
Connect. "
- "Looking for jar locally in: %s" % cwd)
- relative_path = "/connect/file/build/libs/"
- local_dir = cwd + relative_path
- lib_dir = self.path.home() + relative_path
- for pwd, dirs, files in os.walk(local_dir):
- for file in files:
- if file.startswith("connect-file") and
file.endswith(".jar"):
- # Use the expected directory on the node instead of
the path in the driver node
- file_path = lib_dir + file
- self.logger.debug("Appending %s to Connect worker's
CLASSPATH" % file_path)
- return "export CLASSPATH=${CLASSPATH}:%s; " % file_path
- self.logger.info("Jar with filestream connectors was not found
under %s" % lib_dir)
+ return self.append_module_to_classpath("file")
else:
self.logger.info("Starting Connect without filestream connectors
in the CLASSPATH")
+ return ""
- return None
+ def append_test_plugins_to_classpath(self):
+ return self.append_module_to_classpath("test-plugins")
+
+ def append_module_to_classpath(self, module):
+ cwd = os.getcwd()
+ relative_path = "/connect/" + module + "/build/libs/"
+ local_dir = cwd + relative_path
+ lib_dir = self.path.home() + relative_path
+ for pwd, dirs, files in os.walk(local_dir):
+ for file in files:
+ if file.endswith(".jar"):
+ # Use the expected directory on the node instead of the
path in the driver node
+ file_path = lib_dir + file
+ self.logger.info("Appending %s to Connect worker's
CLASSPATH" % file_path)
+ return "export CLASSPATH=${CLASSPATH}:%s; " % file_path
+
+ self.logger.info("Jar not found within %s" % local_dir)
+ return ""
class ConnectStandaloneService(ConnectServiceBase):
@@ -327,8 +332,8 @@ class ConnectStandaloneService(ConnectServiceBase):
cmd += fix_opts_for_new_jvm(node)
cmd += "export KAFKA_OPTS=\"%s %s\"; " % (heap_kafka_opts,
other_kafka_opts)
- classpath = self.append_filestream_connectors_to_classpath()
- cmd += classpath if classpath else ""
+ cmd += self.append_test_plugins_to_classpath()
+ cmd += self.maybe_append_filestream_connectors_to_classpath()
for envvar in self.environment:
cmd += "export %s=%s; " % (envvar, str(self.environment[envvar]))
@@ -388,8 +393,8 @@ class ConnectDistributedService(ConnectServiceBase):
for envvar in self.environment:
cmd += "export %s=%s; " % (envvar, str(self.environment[envvar]))
- classpath = self.append_filestream_connectors_to_classpath()
- cmd += classpath if classpath else ""
+ cmd += self.maybe_append_filestream_connectors_to_classpath()
+ cmd += self.append_test_plugins_to_classpath()
cmd += "%s %s " % (self.path.script("connect-distributed.sh", node),
self.CONFIG_FILE)
cmd += " & echo $! >&3 ) 1>> %s 2>> %s 3> %s" % (self.STDOUT_FILE,
self.STDERR_FILE, self.PID_FILE)
return cmd