This is an automated email from the ASF dual-hosted git repository.
kkarantasis pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.1 by this push:
new 05601f4 KAFKA-13748: Do not include file stream connectors in
Connect's CLASSPATH and plugin.path by default (#11908)
05601f4 is described below
commit 05601f42cdbb67c6608e37c6f965608fd926d07f
Author: Konstantine Karantasis <[email protected]>
AuthorDate: Wed Mar 30 13:15:42 2022 -0700
KAFKA-13748: Do not include file stream connectors in Connect's CLASSPATH
and plugin.path by default (#11908)
With this change we stop including the non-production grade connectors that
are meant to be used for demos and quick starts by default in the CLASSPATH and
plugin.path of Connect deployments. The package of these connector will still
be shipped with the Apache Kafka distribution and will be available for
explicit inclusion.
The changes have been tested through the system tests and the existing unit
and integration tests.
Reviewers: Mickael Maison <[email protected]>, Randall Hauch
<[email protected]>
---
bin/kafka-run-class.sh | 4 +--
docs/connect.html | 1 +
docs/quickstart.html | 4 +--
tests/kafkatest/services/connect.py | 42 +++++++++++++++++++---
.../tests/connect/connect_distributed_test.py | 12 ++++---
tests/kafkatest/tests/connect/connect_rest_test.py | 3 +-
tests/kafkatest/tests/connect/connect_test.py | 12 ++++---
7 files changed, 59 insertions(+), 19 deletions(-)
diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh
index 6167583..490f930 100755
--- a/bin/kafka-run-class.sh
+++ b/bin/kafka-run-class.sh
@@ -32,7 +32,7 @@ if [ -z "$INCLUDE_TEST_JARS" ]; then
fi
# Exclude jars not necessary for running commands.
-regex="(-(test|test-sources|src|scaladoc|javadoc)\.jar|jar.asc)$"
+regex="(-(test|test-sources|src|scaladoc|javadoc)\.jar|jar.asc|connect-file.*\.jar)$"
should_include_file() {
if [ "$INCLUDE_TEST_JARS" = true ]; then
return 0
@@ -171,7 +171,7 @@ do
CLASSPATH="$CLASSPATH:$dir/*"
done
-for cc_pkg in "api" "transforms" "runtime" "file" "mirror" "mirror-client"
"json" "tools" "basic-auth-extension"
+for cc_pkg in "api" "transforms" "runtime" "mirror" "mirror-client" "json"
"tools" "basic-auth-extension"
do
for file in "$base_dir"/connect/${cc_pkg}/build/libs/connect-${cc_pkg}*.jar;
do
diff --git a/docs/connect.html b/docs/connect.html
index 07f8778..f1b4da1 100644
--- a/docs/connect.html
+++ b/docs/connect.html
@@ -49,6 +49,7 @@
<li><code>bootstrap.servers</code> - List of Kafka servers used to
bootstrap connections to Kafka</li>
<li><code>key.converter</code> - Converter class used to convert
between Kafka Connect format and the serialized form that is written to Kafka.
This controls the format of the keys in messages written to or read from Kafka,
and since this is independent of connectors it allows any connector to work
with any serialization format. Examples of common formats include JSON and
Avro.</li>
<li><code>value.converter</code> - Converter class used to convert
between Kafka Connect format and the serialized form that is written to Kafka.
This controls the format of the values in messages written to or read from
Kafka, and since this is independent of connectors it allows any connector to
work with any serialization format. Examples of common formats include JSON and
Avro.</li>
+ <li><code>plugin.path</code> (default <code>empty</code>) - a list of
paths that contain Connect plugins (connectors, converters, transformations).
Before running quick starts, users must add the absolute path that contains the
example FileStreamSourceConnector and FileStreamSinkConnector packaged in
<code>connect-file-"version".jar</code>, because these connectors are not
included by default to the <code>CLASSPATH</code> or the
<code>plugin.path</code> of the Connect worker (see [...]
</ul>
<p>The important configuration options specific to standalone mode are:</p>
diff --git a/docs/quickstart.html b/docs/quickstart.html
index 2bb6fb6..c5669c4 100644
--- a/docs/quickstart.html
+++ b/docs/quickstart.html
@@ -32,8 +32,8 @@
the latest Kafka release and extract it:
</p>
- <pre class="line-numbers"><code class="language-bash">$ tar -xzf
kafka_2.13-3.1.1.tgz
-$ cd kafka_2.13-3.1.1</code></pre>
+ <pre class="line-numbers"><code class="language-bash">$ tar -xzf
kafka_{{scalaVersion}}-{{fullDotVersion}}.tgz
+$ cd kafka_{{scalaVersion}}-{{fullDotVersion}}</code></pre>
</div>
<div class="quickstart-step">
diff --git a/tests/kafkatest/services/connect.py
b/tests/kafkatest/services/connect.py
index 26c0d92..41c33cc 100644
--- a/tests/kafkatest/services/connect.py
+++ b/tests/kafkatest/services/connect.py
@@ -69,7 +69,8 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
"collect_default": True}
}
- def __init__(self, context, num_nodes, kafka, files, startup_timeout_sec =
60):
+ def __init__(self, context, num_nodes, kafka, files,
startup_timeout_sec=60,
+ include_filestream_connectors=False):
super(ConnectServiceBase, self).__init__(context, num_nodes)
self.kafka = kafka
self.security_config = kafka.security_config.client_config()
@@ -78,6 +79,8 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
self.startup_timeout_sec = startup_timeout_sec
self.environment = {}
self.external_config_template_func = None
+ self.include_filestream_connectors = include_filestream_connectors
+ self.logger.debug("include_filestream_connectors % s",
include_filestream_connectors)
def pids(self, node):
"""Return process ids for Kafka Connect processes."""
@@ -279,12 +282,34 @@ class ConnectServiceBase(KafkaPathResolverMixin, Service):
env_opts = "\"%s %s\"" % (env_opts.strip('\"'), value)
self.environment[envvar] = env_opts
+ def append_filestream_connectors_to_classpath(self):
+ if self.include_filestream_connectors:
+ cwd = os.getcwd()
+ self.logger.info("Including filestream connectors when starting
Connect. "
+ "Looking for jar locally in: %s" % cwd)
+ relative_path = "/connect/file/build/libs/"
+ local_dir = cwd + relative_path
+ lib_dir = self.path.home() + relative_path
+ for pwd, dirs, files in os.walk(local_dir):
+ for file in files:
+ if file.startswith("connect-file") and
file.endswith(".jar"):
+ # Use the expected directory on the node instead of
the path in the driver node
+ file_path = lib_dir + file
+ self.logger.debug("Appending %s to Connect worker's
CLASSPATH" % file_path)
+ return "export CLASSPATH=${CLASSPATH}:%s; " % file_path
+ self.logger.info("Jar with filestream connectors was not found
under %s" % lib_dir)
+ else:
+ self.logger.info("Starting Connect without filestream connectors
in the CLASSPATH")
+
+ return None
+
class ConnectStandaloneService(ConnectServiceBase):
"""Runs Kafka Connect in standalone mode."""
- def __init__(self, context, kafka, files, startup_timeout_sec = 60):
- super(ConnectStandaloneService, self).__init__(context, 1, kafka,
files, startup_timeout_sec)
+ def __init__(self, context, kafka, files, startup_timeout_sec=60,
include_filestream_connectors=False):
+ super(ConnectStandaloneService, self).__init__(context, 1, kafka,
files, startup_timeout_sec,
+
include_filestream_connectors)
# For convenience since this service only makes sense with a single node
@property
@@ -299,6 +324,9 @@ class ConnectStandaloneService(ConnectServiceBase):
cmd += fix_opts_for_new_jvm(node)
cmd += "export KAFKA_OPTS=\"%s %s\"; " % (heap_kafka_opts,
other_kafka_opts)
+ classpath = self.append_filestream_connectors_to_classpath()
+ cmd += classpath if classpath else ""
+
for envvar in self.environment:
cmd += "export %s=%s; " % (envvar, str(self.environment[envvar]))
cmd += "%s %s " % (self.path.script("connect-standalone.sh", node),
self.CONFIG_FILE)
@@ -339,8 +367,9 @@ class ConnectDistributedService(ConnectServiceBase):
"""Runs Kafka Connect in distributed mode."""
def __init__(self, context, num_nodes, kafka, files,
offsets_topic="connect-offsets",
- configs_topic="connect-configs",
status_topic="connect-status", startup_timeout_sec = 60):
- super(ConnectDistributedService, self).__init__(context, num_nodes,
kafka, files, startup_timeout_sec)
+ configs_topic="connect-configs",
status_topic="connect-status", startup_timeout_sec=60,
+ include_filestream_connectors=False):
+ super(ConnectDistributedService, self).__init__(context, num_nodes,
kafka, files, startup_timeout_sec, include_filestream_connectors)
self.startup_mode = self.STARTUP_MODE_JOIN
self.offsets_topic = offsets_topic
self.configs_topic = configs_topic
@@ -355,6 +384,9 @@ class ConnectDistributedService(ConnectServiceBase):
cmd += "export KAFKA_OPTS=\"%s %s\"; " % (heap_kafka_opts,
other_kafka_opts)
for envvar in self.environment:
cmd += "export %s=%s; " % (envvar, str(self.environment[envvar]))
+
+ classpath = self.append_filestream_connectors_to_classpath()
+ cmd += classpath if classpath else ""
cmd += "%s %s " % (self.path.script("connect-distributed.sh", node),
self.CONFIG_FILE)
cmd += " & echo $! >&3 ) 1>> %s 2>> %s 3> %s" % (self.STDOUT_FILE,
self.STDERR_FILE, self.PID_FILE)
return cmd
diff --git a/tests/kafkatest/tests/connect/connect_distributed_test.py
b/tests/kafkatest/tests/connect/connect_distributed_test.py
index 6bc52b0..970779f 100644
--- a/tests/kafkatest/tests/connect/connect_distributed_test.py
+++ b/tests/kafkatest/tests/connect/connect_distributed_test.py
@@ -80,7 +80,7 @@ class ConnectDistributedTest(Test):
self.value_converter = "org.apache.kafka.connect.json.JsonConverter"
self.schemas = True
- def setup_services(self, security_protocol=SecurityConfig.PLAINTEXT,
timestamp_type=None, broker_version=DEV_BRANCH, auto_create_topics=False):
+ def setup_services(self, security_protocol=SecurityConfig.PLAINTEXT,
timestamp_type=None, broker_version=DEV_BRANCH, auto_create_topics=False,
include_filestream_connectors=False):
self.kafka = KafkaService(self.test_context, self.num_brokers, self.zk,
security_protocol=security_protocol,
interbroker_security_protocol=security_protocol,
topics=self.topics, version=broker_version,
@@ -89,7 +89,8 @@ class ConnectDistributedTest(Test):
for node in self.kafka.nodes:
node.config[config_property.MESSAGE_TIMESTAMP_TYPE] =
timestamp_type
- self.cc = ConnectDistributedService(self.test_context, 3, self.kafka,
[self.INPUT_FILE, self.OUTPUT_FILE])
+ self.cc = ConnectDistributedService(self.test_context, 3, self.kafka,
[self.INPUT_FILE, self.OUTPUT_FILE],
+
include_filestream_connectors=include_filestream_connectors)
self.cc.log_level = "DEBUG"
self.zk.start()
@@ -370,7 +371,7 @@ class ConnectDistributedTest(Test):
"""
self.CONNECT_PROTOCOL = connect_protocol
- self.setup_services(security_protocol=security_protocol)
+ self.setup_services(security_protocol=security_protocol,
include_filestream_connectors=True)
self.cc.set_configs(lambda node:
self.render("connect-distributed.properties", node=node))
self.cc.start()
@@ -522,7 +523,7 @@ class ConnectDistributedTest(Test):
@matrix(connect_protocol=['sessioned', 'compatible', 'eager'])
def test_transformations(self, connect_protocol):
self.CONNECT_PROTOCOL = connect_protocol
- self.setup_services(timestamp_type='CreateTime')
+ self.setup_services(timestamp_type='CreateTime',
include_filestream_connectors=True)
self.cc.set_configs(lambda node:
self.render("connect-distributed.properties", node=node))
self.cc.start()
@@ -610,7 +611,8 @@ class ConnectDistributedTest(Test):
or relies upon the broker to auto-create the topics (v0.10.0.x and
before).
"""
self.CONNECT_PROTOCOL = connect_protocol
- self.setup_services(broker_version=KafkaVersion(broker_version),
auto_create_topics=auto_create_topics, security_protocol=security_protocol)
+ self.setup_services(broker_version=KafkaVersion(broker_version),
auto_create_topics=auto_create_topics,
+ security_protocol=security_protocol,
include_filestream_connectors=True)
self.cc.set_configs(lambda node:
self.render("connect-distributed.properties", node=node))
self.cc.start()
diff --git a/tests/kafkatest/tests/connect/connect_rest_test.py
b/tests/kafkatest/tests/connect/connect_rest_test.py
index 4d978a2..ff44d94 100644
--- a/tests/kafkatest/tests/connect/connect_rest_test.py
+++ b/tests/kafkatest/tests/connect/connect_rest_test.py
@@ -73,7 +73,8 @@ class ConnectRestApiTest(KafkaTest):
'test': {'partitions': 1, 'replication-factor': 1}
})
- self.cc = ConnectDistributedService(test_context, 2, self.kafka,
[self.INPUT_FILE, self.INPUT_FILE2, self.OUTPUT_FILE])
+ self.cc = ConnectDistributedService(test_context, 2, self.kafka,
[self.INPUT_FILE, self.INPUT_FILE2, self.OUTPUT_FILE],
+ include_filestream_connectors=True)
@cluster(num_nodes=4)
@matrix(connect_protocol=['compatible', 'eager'])
diff --git a/tests/kafkatest/tests/connect/connect_test.py
b/tests/kafkatest/tests/connect/connect_test.py
index 1a7f6ab..4c2a91a 100644
--- a/tests/kafkatest/tests/connect/connect_test.py
+++ b/tests/kafkatest/tests/connect/connect_test.py
@@ -91,8 +91,10 @@ class ConnectStandaloneFileTest(Test):
security_protocol=security_protocol,
interbroker_security_protocol=security_protocol,
topics=self.topics,
controller_num_nodes_override=self.num_zk)
- self.source = ConnectStandaloneService(self.test_context, self.kafka,
[self.INPUT_FILE, self.OFFSETS_FILE])
- self.sink = ConnectStandaloneService(self.test_context, self.kafka,
[self.OUTPUT_FILE, self.OFFSETS_FILE])
+ self.source = ConnectStandaloneService(self.test_context, self.kafka,
[self.INPUT_FILE, self.OFFSETS_FILE],
+
include_filestream_connectors=True)
+ self.sink = ConnectStandaloneService(self.test_context, self.kafka,
[self.OUTPUT_FILE, self.OFFSETS_FILE],
+
include_filestream_connectors=True)
self.consumer_validator = ConsoleConsumer(self.test_context, 1,
self.kafka, self.TOPIC_TEST,
consumer_timeout_ms=10000)
@@ -164,8 +166,10 @@ class ConnectStandaloneFileTest(Test):
else:
faulty_records = faulty_records[0]
- self.source = ConnectStandaloneService(self.test_context, self.kafka,
[self.INPUT_FILE, self.OFFSETS_FILE])
- self.sink = ConnectStandaloneService(self.test_context, self.kafka,
[self.OUTPUT_FILE, self.OFFSETS_FILE])
+ self.source = ConnectStandaloneService(self.test_context, self.kafka,
[self.INPUT_FILE, self.OFFSETS_FILE],
+
include_filestream_connectors=True)
+ self.sink = ConnectStandaloneService(self.test_context, self.kafka,
[self.OUTPUT_FILE, self.OFFSETS_FILE],
+
include_filestream_connectors=True)
self.zk.start()
self.kafka.start()