This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 440e0b8801b KAFKA-17923 Remove old kafka version from e2e (#17673)
440e0b8801b is described below
commit 440e0b8801bca5a648a2cc2a7e98f25c44b0810d
Author: PoAn Yang <[email protected]>
AuthorDate: Mon Nov 11 06:22:06 2024 +0800
KAFKA-17923 Remove old kafka version from e2e (#17673)
Reviewers: Chia-Ping Tsai <[email protected]>
---
tests/docker/Dockerfile | 16 ------
tests/kafkatest/directory_layout/kafka_path.py | 7 +--
.../sanity_checks/test_console_consumer.py | 23 --------
.../sanity_checks/test_performance_services.py | 15 ++---
.../sanity_checks/test_verifiable_producer.py | 21 +------
tests/kafkatest/services/console_consumer.py | 39 +++----------
.../services/kafka/templates/kafka.properties | 5 --
tests/kafkatest/services/kafka/util.py | 11 +---
tests/kafkatest/services/monitor/jmx.py | 7 +--
.../services/performance/consumer_performance.py | 56 +++++-------------
.../services/performance/end_to_end_latency.py | 24 +-------
.../services/performance/producer_performance.py | 4 --
tests/kafkatest/services/verifiable_client.py | 14 +----
tests/kafkatest/services/verifiable_consumer.py | 6 +-
.../client/client_compatibility_features_test.py | 7 +--
.../client_compatibility_produce_consume_test.py | 7 +--
tests/kafkatest/tests/client/quota_test.py | 16 +-----
.../tests/connect/connect_distributed_test.py | 2 +-
.../core/compatibility_test_new_broker_test.py | 17 +-----
.../streams_cooperative_rebalance_upgrade_test.py | 7 +--
.../tests/streams/streams_upgrade_test.py | 6 +-
tests/kafkatest/version.py | 67 ----------------------
tests/unit/directory_layout/check_project_paths.py | 12 ++--
tests/unit/version/check_version.py | 6 +-
vagrant/base.sh | 18 ------
25 files changed, 60 insertions(+), 353 deletions(-)
diff --git a/tests/docker/Dockerfile b/tests/docker/Dockerfile
index aefb500ef0e..198ffde1b7c 100644
--- a/tests/docker/Dockerfile
+++ b/tests/docker/Dockerfile
@@ -77,15 +77,6 @@ RUN echo 'PermitUserEnvironment yes' >> /etc/ssh/sshd_config
# Install binary test dependencies.
# we use the same versions as in vagrant/base.sh
ARG KAFKA_MIRROR="https://s3-us-west-2.amazonaws.com/kafka-packages"
-RUN mkdir -p "/opt/kafka-0.8.2.2" && chmod a+rw /opt/kafka-0.8.2.2 && curl -s
"$KAFKA_MIRROR/kafka_2.11-0.8.2.2.tgz" | tar xz --strip-components=1 -C
"/opt/kafka-0.8.2.2"
-RUN mkdir -p "/opt/kafka-0.9.0.1" && chmod a+rw /opt/kafka-0.9.0.1 && curl -s
"$KAFKA_MIRROR/kafka_2.11-0.9.0.1.tgz" | tar xz --strip-components=1 -C
"/opt/kafka-0.9.0.1"
-RUN mkdir -p "/opt/kafka-0.10.0.1" && chmod a+rw /opt/kafka-0.10.0.1 && curl
-s "$KAFKA_MIRROR/kafka_2.11-0.10.0.1.tgz" | tar xz --strip-components=1 -C
"/opt/kafka-0.10.0.1"
-RUN mkdir -p "/opt/kafka-0.10.1.1" && chmod a+rw /opt/kafka-0.10.1.1 && curl
-s "$KAFKA_MIRROR/kafka_2.11-0.10.1.1.tgz" | tar xz --strip-components=1 -C
"/opt/kafka-0.10.1.1"
-RUN mkdir -p "/opt/kafka-0.10.2.2" && chmod a+rw /opt/kafka-0.10.2.2 && curl
-s "$KAFKA_MIRROR/kafka_2.11-0.10.2.2.tgz" | tar xz --strip-components=1 -C
"/opt/kafka-0.10.2.2"
-RUN mkdir -p "/opt/kafka-0.11.0.3" && chmod a+rw /opt/kafka-0.11.0.3 && curl
-s "$KAFKA_MIRROR/kafka_2.11-0.11.0.3.tgz" | tar xz --strip-components=1 -C
"/opt/kafka-0.11.0.3"
-RUN mkdir -p "/opt/kafka-1.0.2" && chmod a+rw /opt/kafka-1.0.2 && curl -s
"$KAFKA_MIRROR/kafka_2.11-1.0.2.tgz" | tar xz --strip-components=1 -C
"/opt/kafka-1.0.2"
-RUN mkdir -p "/opt/kafka-1.1.1" && chmod a+rw /opt/kafka-1.1.1 && curl -s
"$KAFKA_MIRROR/kafka_2.11-1.1.1.tgz" | tar xz --strip-components=1 -C
"/opt/kafka-1.1.1"
-RUN mkdir -p "/opt/kafka-2.0.1" && chmod a+rw /opt/kafka-2.0.1 && curl -s
"$KAFKA_MIRROR/kafka_2.12-2.0.1.tgz" | tar xz --strip-components=1 -C
"/opt/kafka-2.0.1"
RUN mkdir -p "/opt/kafka-2.1.1" && chmod a+rw /opt/kafka-2.1.1 && curl -s
"$KAFKA_MIRROR/kafka_2.12-2.1.1.tgz" | tar xz --strip-components=1 -C
"/opt/kafka-2.1.1"
RUN mkdir -p "/opt/kafka-2.2.2" && chmod a+rw /opt/kafka-2.2.2 && curl -s
"$KAFKA_MIRROR/kafka_2.12-2.2.2.tgz" | tar xz --strip-components=1 -C
"/opt/kafka-2.2.2"
RUN mkdir -p "/opt/kafka-2.3.1" && chmod a+rw /opt/kafka-2.3.1 && curl -s
"$KAFKA_MIRROR/kafka_2.12-2.3.1.tgz" | tar xz --strip-components=1 -C
"/opt/kafka-2.3.1"
@@ -106,13 +97,6 @@ RUN mkdir -p "/opt/kafka-3.8.1" && chmod a+rw
/opt/kafka-3.8.1 && curl -s "$KAFK
# Streams test dependencies
-RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.10.0.1-test.jar" -o
/opt/kafka-0.10.0.1/libs/kafka-streams-0.10.0.1-test.jar
-RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.10.1.1-test.jar" -o
/opt/kafka-0.10.1.1/libs/kafka-streams-0.10.1.1-test.jar
-RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.10.2.2-test.jar" -o
/opt/kafka-0.10.2.2/libs/kafka-streams-0.10.2.2-test.jar
-RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.11.0.3-test.jar" -o
/opt/kafka-0.11.0.3/libs/kafka-streams-0.11.0.3-test.jar
-RUN curl -s "$KAFKA_MIRROR/kafka-streams-1.0.2-test.jar" -o
/opt/kafka-1.0.2/libs/kafka-streams-1.0.2-test.jar
-RUN curl -s "$KAFKA_MIRROR/kafka-streams-1.1.1-test.jar" -o
/opt/kafka-1.1.1/libs/kafka-streams-1.1.1-test.jar
-RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.0.1-test.jar" -o
/opt/kafka-2.0.1/libs/kafka-streams-2.0.1-test.jar
RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.1.1-test.jar" -o
/opt/kafka-2.1.1/libs/kafka-streams-2.1.1-test.jar
RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.2.2-test.jar" -o
/opt/kafka-2.2.2/libs/kafka-streams-2.2.2-test.jar
RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.3.1-test.jar" -o
/opt/kafka-2.3.1/libs/kafka-streams-2.3.1-test.jar
diff --git a/tests/kafkatest/directory_layout/kafka_path.py
b/tests/kafkatest/directory_layout/kafka_path.py
index 1e3d0034e4c..90a84d44af5 100644
--- a/tests/kafkatest/directory_layout/kafka_path.py
+++ b/tests/kafkatest/directory_layout/kafka_path.py
@@ -16,7 +16,7 @@
import importlib
import os
-from kafkatest.version import get_version, KafkaVersion, DEV_BRANCH,
LATEST_0_9, LATEST_3_5
+from kafkatest.version import get_version, KafkaVersion, DEV_BRANCH, LATEST_3_5
"""This module serves a few purposes:
@@ -55,11 +55,6 @@ JARS = {
# This version of the file connectors does not contain ServiceLoader
manifests
LATEST_3_5.__str__(): {
CONNECT_FILE_JAR: "libs/connect-file*.jar"
- },
- # TODO: This is only used in 0.8.2.x system tests, remove with KAFKA-14762
- LATEST_0_9.__str__(): {
- TOOLS_JAR_NAME: "libs/kafka-tools*.jar",
- TOOLS_DEPENDANT_TEST_LIBS_JAR_NAME: "libs/{argparse4j,jackson}*.jar"
}
}
diff --git a/tests/kafkatest/sanity_checks/test_console_consumer.py
b/tests/kafkatest/sanity_checks/test_console_consumer.py
index 675920ade8f..9a271232541 100644
--- a/tests/kafkatest/sanity_checks/test_console_consumer.py
+++ b/tests/kafkatest/sanity_checks/test_console_consumer.py
@@ -22,10 +22,8 @@ from ducktape.utils.util import wait_until
from kafkatest.services.console_consumer import ConsoleConsumer
from kafkatest.services.kafka import KafkaService, quorum
-from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.utils.remote_account import line_count, file_exists
-from kafkatest.version import LATEST_0_8_2
class ConsoleConsumerTest(Test):
@@ -77,24 +75,3 @@ class ConsoleConsumerTest(Test):
assert line_count(node, ConsoleConsumer.STDOUT_CAPTURE) == 0
self.consumer.stop_node(node)
-
- @cluster(num_nodes=4)
- def test_version(self):
- """Check that console consumer v0.8.2.X successfully starts and
consumes messages."""
- self.kafka.start()
-
- num_messages = 1000
- self.producer = VerifiableProducer(self.test_context, num_nodes=1,
kafka=self.kafka, topic=self.topic,
- max_messages=num_messages,
throughput=1000)
- self.producer.start()
- self.producer.wait()
-
- self.consumer.nodes[0].version = LATEST_0_8_2
- self.consumer.new_consumer = False
- self.consumer.consumer_timeout_ms = 1000
- self.consumer.start()
- self.consumer.wait()
-
- num_consumed = len(self.consumer.messages_consumed[1])
- num_produced = self.producer.num_acked
- assert num_produced == num_consumed, "num_produced: %d, num_consumed:
%d" % (num_produced, num_consumed)
diff --git a/tests/kafkatest/sanity_checks/test_performance_services.py
b/tests/kafkatest/sanity_checks/test_performance_services.py
index f00ec492d4e..27e732edfb5 100644
--- a/tests/kafkatest/sanity_checks/test_performance_services.py
+++ b/tests/kafkatest/sanity_checks/test_performance_services.py
@@ -21,7 +21,7 @@ from kafkatest.services.kafka import KafkaService, quorum
from kafkatest.services.performance import ProducerPerformanceService,
ConsumerPerformanceService, EndToEndLatencyService
from kafkatest.services.performance import latency,
compute_aggregate_throughput
from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.version import DEV_BRANCH, LATEST_0_8_2, LATEST_0_9,
LATEST_1_1, KafkaVersion
+from kafkatest.version import DEV_BRANCH, LATEST_2_1, KafkaVersion
class PerformanceServiceTest(Test):
@@ -38,15 +38,8 @@ class PerformanceServiceTest(Test):
self.zk.start()
@cluster(num_nodes=5)
- # We are keeping 0.8.2 here so that we don't inadvertently break support
for it. Since this is just a sanity check,
- # the overhead should be manageable.
- @parametrize(version=str(LATEST_0_8_2), new_consumer=False)
- @parametrize(version=str(LATEST_0_9), new_consumer=False)
- @parametrize(version=str(LATEST_0_9))
- @parametrize(version=str(LATEST_1_1), new_consumer=False)
- @cluster(num_nodes=5)
- @matrix(version=[str(DEV_BRANCH)], metadata_quorum=quorum.all)
- def test_version(self, version=str(LATEST_0_9), new_consumer=True,
metadata_quorum=quorum.zk):
+ @matrix(version=[str(LATEST_2_1), str(DEV_BRANCH)],
metadata_quorum=quorum.all)
+ def test_version(self, version=str(LATEST_2_1), metadata_quorum=quorum.zk):
"""
Sanity check out producer performance service - verify that we can run
the service with a small
number of messages. The actual stats here are pretty meaningless since
the number of messages is quite small.
@@ -80,7 +73,7 @@ class PerformanceServiceTest(Test):
# check basic run of consumer performance service
self.consumer_perf = ConsumerPerformanceService(
- self.test_context, 1, self.kafka, new_consumer=new_consumer,
+ self.test_context, 1, self.kafka,
topic=self.topic, version=version, messages=self.num_records)
self.consumer_perf.group = "test-consumer-group"
self.consumer_perf.run()
diff --git a/tests/kafkatest/sanity_checks/test_verifiable_producer.py
b/tests/kafkatest/sanity_checks/test_verifiable_producer.py
index 152e9697293..9bd365bee64 100644
--- a/tests/kafkatest/sanity_checks/test_verifiable_producer.py
+++ b/tests/kafkatest/sanity_checks/test_verifiable_producer.py
@@ -23,7 +23,7 @@ from kafkatest.services.kafka import KafkaService, quorum
from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.utils import is_version
-from kafkatest.version import LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0,
LATEST_0_10_1, DEV_BRANCH, KafkaVersion
+from kafkatest.version import DEV_BRANCH, KafkaVersion
class TestVerifiableProducer(Test):
@@ -45,10 +45,6 @@ class TestVerifiableProducer(Test):
self.zk.start()
@cluster(num_nodes=3)
- @parametrize(producer_version=str(LATEST_0_8_2))
- @parametrize(producer_version=str(LATEST_0_9))
- @parametrize(producer_version=str(LATEST_0_10_0))
- @parametrize(producer_version=str(LATEST_0_10_1))
@matrix(producer_version=[str(DEV_BRANCH)], acks=["0", "1", "-1"],
enable_idempotence=[False])
@matrix(producer_version=[str(DEV_BRANCH)], acks=["-1"],
enable_idempotence=[True])
@matrix(producer_version=[str(DEV_BRANCH)],
security_protocol=['PLAINTEXT', 'SSL'], metadata_quorum=quorum.all)
@@ -81,20 +77,7 @@ class TestVerifiableProducer(Test):
wait_until(lambda: self.producer.num_acked > 5, timeout_sec=15,
err_msg="Producer failed to start in a reasonable amount of
time.")
- # using version.vstring (distutils.version.LooseVersion) is a tricky
way of ensuring
- # that this check works with DEV_BRANCH
- # When running VerifiableProducer 0.8.X, both the current branch
version and 0.8.X should show up because of the
- # way verifiable producer pulls in some development directories into
its classpath
- #
- # If the test fails here because 'ps .. | grep' couldn't find the
process it means
- # the login and grep that is_version() performs is slower than
- # the time it takes the producer to produce its messages.
- # Easy fix is to decrease throughput= above, the good fix is to make
the producer
- # not terminate until explicitly killed in this case.
- if node.version <= LATEST_0_8_2:
- assert is_version(node, [node.version.vstring,
LATEST_0_9.vstring], logger=self.logger)
- else:
- assert is_version(node, [node.version.vstring], logger=self.logger)
+ assert is_version(node, [node.version.vstring], logger=self.logger)
self.producer.wait()
num_produced = self.producer.num_acked
diff --git a/tests/kafkatest/services/console_consumer.py
b/tests/kafkatest/services/console_consumer.py
index fb87f20df19..cc8a0d31997 100644
--- a/tests/kafkatest/services/console_consumer.py
+++ b/tests/kafkatest/services/console_consumer.py
@@ -21,7 +21,7 @@ from ducktape.utils.util import wait_until
from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
from kafkatest.services.monitor.jmx import JmxMixin, JmxTool
-from kafkatest.version import DEV_BRANCH, LATEST_0_8_2, LATEST_0_9,
LATEST_0_10_0, V_0_10_0_0, V_0_11_0_0, V_2_0_0, LATEST_3_7
+from kafkatest.version import DEV_BRANCH, LATEST_3_7
from kafkatest.services.kafka.util import fix_opts_for_new_jvm
"""
@@ -118,9 +118,6 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin,
BackgroundThreadService)
self.isolation_level = isolation_level
self.enable_systest_events = enable_systest_events
- if self.enable_systest_events:
- # Only available in 0.10.0 and up
- assert version >= V_0_10_0_0
self.print_timestamp = print_timestamp
self.jaas_override_variables = jaas_override_variables or {}
@@ -134,10 +131,6 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin,
BackgroundThreadService)
"""Return a string which can be used to create a configuration file
appropriate for the given node."""
# Process client configuration
prop_file = self.render('console_consumer.properties')
- if hasattr(node, "version") and node.version <= LATEST_0_8_2:
- # in 0.8.2.X and earlier, console consumer does not have
--timeout-ms option
- # instead, we have to pass it through the config file
- prop_file += "\nconsumer.timeout.ms=%s\n" %
str(self.consumer_timeout_ms)
# Add security properties to the config. If security protocol is not
specified,
# use the default in the template properties.
@@ -176,19 +169,8 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin,
BackgroundThreadService)
"%(console_consumer)s " \
"--topic %(topic)s " \
"--consumer.config %(config_file)s " % args
-
- if self.new_consumer:
- assert node.version.consumer_supports_bootstrap_server(), \
- "new_consumer is only supported if version >= 0.9.0.0, version
%s" % str(node.version)
- if node.version <= LATEST_0_10_0:
- cmd += " --new-consumer"
- cmd += " --bootstrap-server %(broker_list)s" % args
- if node.version >= V_0_11_0_0:
- cmd += " --isolation-level %s" % self.isolation_level
- else:
- assert node.version < V_2_0_0, \
- "new_consumer==false is only supported if version < 2.0.0,
version %s" % str(node.version)
- cmd += " --zookeeper %(zk_connect)s" % args
+ cmd += " --bootstrap-server %(broker_list)s" % args
+ cmd += " --isolation-level %s" % self.isolation_level
if self.from_beginning:
cmd += " --from-beginning"
@@ -196,8 +178,7 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin,
BackgroundThreadService)
if self.consumer_timeout_ms is not None:
# version 0.8.X and below do not support --timeout-ms option
# This will be added in the properties file instead
- if node.version > LATEST_0_8_2:
- cmd += " --timeout-ms %s" % self.consumer_timeout_ms
+ cmd += " --timeout-ms %s" % self.consumer_timeout_ms
if self.print_timestamp:
cmd += " --property print.timestamp=true"
@@ -209,16 +190,12 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin,
BackgroundThreadService)
cmd += " --property print.partition=true"
# LoggingMessageFormatter was introduced after 0.9
- if node.version > LATEST_0_9:
- if node.version > LATEST_3_7:
- cmd += " --formatter
org.apache.kafka.tools.consumer.LoggingMessageFormatter"
- else:
- cmd += " --formatter kafka.tools.LoggingMessageFormatter"
+ if node.version > LATEST_3_7:
+ cmd += " --formatter
org.apache.kafka.tools.consumer.LoggingMessageFormatter"
+ else:
+ cmd += " --formatter kafka.tools.LoggingMessageFormatter"
if self.enable_systest_events:
- # enable systest events is only available in 0.10.0 and later
- # check the assertion here as well, in case node.version has been
modified
- assert node.version >= V_0_10_0_0
cmd += " --enable-systest-events"
if self.consumer_properties is not None:
diff --git a/tests/kafkatest/services/kafka/templates/kafka.properties
b/tests/kafkatest/services/kafka/templates/kafka.properties
index 21b60afeb83..1fd305a17df 100644
--- a/tests/kafkatest/services/kafka/templates/kafka.properties
+++ b/tests/kafkatest/services/kafka/templates/kafka.properties
@@ -41,12 +41,7 @@ listener.security.protocol.map={{
listener_security_protocol_map }}
{% if quorum_info.using_zk or quorum_info.has_brokers %}
advertised.host.name={{ node.account.hostname }}
advertised.listeners={{ advertised_listeners }}
-
-{% if node.version.supports_named_listeners() %}
inter.broker.listener.name={{ interbroker_listener.name }}
-{% else %}
-security.inter.broker.protocol={{ interbroker_listener.security_protocol }}
-{% endif %}
{% endif %}
{% for k, v in listener_security_config.client_listener_overrides.items() %}
diff --git a/tests/kafkatest/services/kafka/util.py
b/tests/kafkatest/services/kafka/util.py
index de6b85ff3c1..0965fd9d4e4 100644
--- a/tests/kafkatest/services/kafka/util.py
+++ b/tests/kafkatest/services/kafka/util.py
@@ -16,12 +16,9 @@
from collections import namedtuple
from kafkatest.utils.remote_account import java_version
-from kafkatest.version import LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0,
LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0
TopicPartition = namedtuple('TopicPartition', ['topic', 'partition'])
-new_jdk_not_supported = frozenset([str(LATEST_0_8_2), str(LATEST_0_9),
str(LATEST_0_10_0), str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0),
str(LATEST_1_0)])
-
def fix_opts_for_new_jvm(node):
# Startup scripts for early versions of Kafka contains options
# that not supported on latest versions of JVM like -XX:+PrintGCDateStamps
or -XX:UseParNewGC.
@@ -31,12 +28,6 @@ def fix_opts_for_new_jvm(node):
if java_ver <= 9:
return ""
- cmd = ""
- # check kafka version for kafka node types
- if hasattr(node, 'version'):
- if node.version == LATEST_0_8_2 or node.version == LATEST_0_9 or
node.version == LATEST_0_10_0 or node.version == LATEST_0_10_1 or node.version
== LATEST_0_10_2 or node.version == LATEST_0_11_0 or node.version == LATEST_1_0:
- cmd += "export
KAFKA_GC_LOG_OPTS=\"-Xlog:gc*:file=kafka-gc.log:time,tags:filecount=10,filesize=102400\";
"
- cmd += "export KAFKA_JVM_PERFORMANCE_OPTS=\"-server -XX:+UseG1GC
-XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35
-XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15
-Djava.awt.headless=true\"; "
- return cmd
+ return ""
diff --git a/tests/kafkatest/services/monitor/jmx.py
b/tests/kafkatest/services/monitor/jmx.py
index b326c20aa3b..99a604d888a 100644
--- a/tests/kafkatest/services/monitor/jmx.py
+++ b/tests/kafkatest/services/monitor/jmx.py
@@ -19,7 +19,7 @@ from ducktape.cluster.remoteaccount import RemoteCommandError
from ducktape.utils.util import wait_until
from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
-from kafkatest.version import get_version, V_0_11_0_0, V_3_4_0, DEV_BRANCH
+from kafkatest.version import get_version, V_3_4_0, DEV_BRANCH
class JmxMixin(object):
"""This mixin helps existing service subclasses start JmxTool on their
worker nodes and collect jmx stats.
@@ -139,10 +139,7 @@ class JmxMixin(object):
# To correctly wait for requested JMX metrics to be added we need the
--wait option for JmxTool. This option was
# not added until 0.11.0.1, so any earlier versions need to use
JmxTool from a newer version.
version = get_version(node)
- if version <= V_0_11_0_0:
- return DEV_BRANCH
- else:
- return version
+ return version
def jmx_class_name(self, version):
if version <= V_3_4_0:
diff --git a/tests/kafkatest/services/performance/consumer_performance.py
b/tests/kafkatest/services/performance/consumer_performance.py
index 34e2e0d05cd..eea91cbfd90 100644
--- a/tests/kafkatest/services/performance/consumer_performance.py
+++ b/tests/kafkatest/services/performance/consumer_performance.py
@@ -18,8 +18,7 @@ import os
from kafkatest.services.kafka.util import fix_opts_for_new_jvm
from kafkatest.services.performance import PerformanceService
-from kafkatest.services.security.security_config import SecurityConfig
-from kafkatest.version import DEV_BRANCH, V_2_0_0, LATEST_0_10_0
+from kafkatest.version import V_2_5_0, DEV_BRANCH
class ConsumerPerformanceService(PerformanceService):
@@ -65,25 +64,14 @@ class ConsumerPerformanceService(PerformanceService):
"collect_default": True}
}
- def __init__(self, context, num_nodes, kafka, topic, messages,
version=DEV_BRANCH, new_consumer=True, settings={}):
+ def __init__(self, context, num_nodes, kafka, topic, messages,
version=DEV_BRANCH, settings={}):
super(ConsumerPerformanceService, self).__init__(context, num_nodes)
self.kafka = kafka
self.security_config = kafka.security_config.client_config()
self.topic = topic
self.messages = messages
- self.new_consumer = new_consumer
self.settings = settings
- assert version.consumer_supports_bootstrap_server() or (not
new_consumer), \
- "new_consumer is only supported if version >= 0.9.0.0, version %s"
% str(version)
-
- assert version < V_2_0_0 or new_consumer, \
- "new_consumer==false is only supported if version < 2.0.0, version
%s" % str(version)
-
- security_protocol = self.security_config.security_protocol
- assert version.consumer_supports_bootstrap_server() or
security_protocol == SecurityConfig.PLAINTEXT, \
- "Security protocol %s is only supported if version >= 0.9.0.0,
version %s" % (self.security_config, str(version))
-
# These less-frequently used settings can be updated manually after
instantiation
self.fetch_size = None
self.socket_buffer_size = None
@@ -97,17 +85,13 @@ class ConsumerPerformanceService(PerformanceService):
"""Dictionary of arguments used to start the Consumer Performance
script."""
args = {
'topic': self.topic,
- 'messages': self.messages,
+ 'messages': self.messages
}
- if self.new_consumer:
- if version <= LATEST_0_10_0:
- args['new-consumer'] = ""
- args['broker-list'] =
self.kafka.bootstrap_servers(self.security_config.security_protocol)
- else:
- args['bootstrap-server'] =
self.kafka.bootstrap_servers(self.security_config.security_protocol)
+ if version < V_2_5_0:
+ args['broker-list'] =
self.kafka.bootstrap_servers(self.security_config.security_protocol)
else:
- args['zookeeper'] = self.kafka.zk_connect_setting()
+ args['bootstrap-server'] =
self.kafka.bootstrap_servers(self.security_config.security_protocol)
if self.fetch_size is not None:
args['fetch-size'] = self.fetch_size
@@ -132,9 +116,7 @@ class ConsumerPerformanceService(PerformanceService):
for key, value in self.args(node.version).items():
cmd += " --%s %s" % (key, value)
- if node.version.consumer_supports_bootstrap_server():
- # This is only used for security settings
- cmd += " --consumer.config %s" %
ConsumerPerformanceService.CONFIG_FILE
+ cmd += " --consumer.config %s" % ConsumerPerformanceService.CONFIG_FILE
for key, value in self.settings.items():
cmd += " %s=%s" % (str(key), str(value))
@@ -143,22 +125,6 @@ class ConsumerPerformanceService(PerformanceService):
'stderr':
ConsumerPerformanceService.STDERR_CAPTURE}
return cmd
- def parse_results(self, line, version):
- parts = line.split(',')
- if version.consumer_supports_bootstrap_server():
- result = {
- 'total_mb': float(parts[2]),
- 'mbps': float(parts[3]),
- 'records_per_sec': float(parts[5]),
- }
- else:
- result = {
- 'total_mb': float(parts[3]),
- 'mbps': float(parts[4]),
- 'records_per_sec': float(parts[6]),
- }
- return result
-
def _worker(self, idx, node):
node.account.ssh("mkdir -p %s" %
ConsumerPerformanceService.PERSISTENT_ROOT, allow_fail=False)
@@ -174,4 +140,10 @@ class ConsumerPerformanceService(PerformanceService):
last = line
# Parse and save the last line's information
- self.results[idx-1] = self.parse_results(last, node.version)
+ if last is not None:
+ parts = last.split(',')
+ self.results[idx-1] = {
+ 'total_mb': float(parts[2]),
+ 'mbps': float(parts[3]),
+ 'records_per_sec': float(parts[5]),
+ }
diff --git a/tests/kafkatest/services/performance/end_to_end_latency.py
b/tests/kafkatest/services/performance/end_to_end_latency.py
index 5e66b06104c..e7e0100e511 100644
--- a/tests/kafkatest/services/performance/end_to_end_latency.py
+++ b/tests/kafkatest/services/performance/end_to_end_latency.py
@@ -53,14 +53,6 @@ class EndToEndLatencyService(PerformanceService):
self.security_config = kafka.security_config.client_config()
self.version = ''
- security_protocol = self.security_config.security_protocol
-
- if not version.consumer_supports_bootstrap_server():
- assert security_protocol == SecurityConfig.PLAINTEXT, \
- "Security protocol %s is only supported if version >= 0.9.0.0,
version %s" % (self.security_config, str(version))
- assert compression_type == "none", \
- "Compression type %s is only supported if version >= 0.9.0.0,
version %s" % (compression_type, str(version))
-
self.args = {
'topic': topic,
'num_records': num_records,
@@ -82,20 +74,11 @@ class EndToEndLatencyService(PerformanceService):
'kafka_run_class': self.path.script("kafka-run-class.sh", node),
'java_class_name': self.java_class_name()
})
- if not node.version.consumer_supports_bootstrap_server():
- args.update({
- 'zk_connect': self.kafka.zk_connect_setting(),
- })
cmd = fix_opts_for_new_jvm(node)
cmd += "export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " %
EndToEndLatencyService.LOG4J_CONFIG
- if node.version.consumer_supports_bootstrap_server():
- cmd += "KAFKA_OPTS=%(kafka_opts)s %(kafka_run_class)s
%(java_class_name)s " % args
- cmd += "%(bootstrap_servers)s %(topic)s %(num_records)d %(acks)d
%(message_bytes)d %(config_file)s" % args
- else:
- # Set fetch max wait to 0 to match behavior in later versions
- cmd += "KAFKA_OPTS=%(kafka_opts)s %(kafka_run_class)s
kafka.tools.TestEndToEndLatency " % args
- cmd += "%(bootstrap_servers)s %(zk_connect)s %(topic)s
%(num_records)d 0 %(acks)d" % args
+ cmd += "KAFKA_OPTS=%(kafka_opts)s %(kafka_run_class)s
%(java_class_name)s " % args
+ cmd += "%(bootstrap_servers)s %(topic)s %(num_records)d %(acks)d
%(message_bytes)d %(config_file)s" % args
cmd += " 2>> %(stderr)s | tee -a %(stdout)s" % {'stdout':
EndToEndLatencyService.STDOUT_CAPTURE,
'stderr':
EndToEndLatencyService.STDERR_CAPTURE}
@@ -109,8 +92,7 @@ class EndToEndLatencyService(PerformanceService):
node.account.create_file(EndToEndLatencyService.LOG4J_CONFIG,
log_config)
client_config = str(self.security_config)
- if node.version.consumer_supports_bootstrap_server():
- client_config += "compression_type=%(compression_type)s" %
self.args
+ client_config += "compression_type=%(compression_type)s" % self.args
node.account.create_file(EndToEndLatencyService.CONFIG_FILE,
client_config)
self.security_config.setup_node(node)
diff --git a/tests/kafkatest/services/performance/producer_performance.py
b/tests/kafkatest/services/performance/producer_performance.py
index e0f00061cb9..acb0aec8650 100644
--- a/tests/kafkatest/services/performance/producer_performance.py
+++ b/tests/kafkatest/services/performance/producer_performance.py
@@ -55,10 +55,6 @@ class ProducerPerformanceService(HttpMetricsCollector,
PerformanceService):
self.kafka = kafka
self.security_config = kafka.security_config.client_config()
- security_protocol = self.security_config.security_protocol
- assert version.consumer_supports_bootstrap_server() or
security_protocol == SecurityConfig.PLAINTEXT, \
- "Security protocol %s is only supported if version >= 0.9.0.0,
version %s" % (self.security_config, str(version))
-
self.args = {
'topic': topic,
'kafka_opts': self.security_config.kafka_opts,
diff --git a/tests/kafkatest/services/verifiable_client.py
b/tests/kafkatest/services/verifiable_client.py
index 649a428c37d..4971136a64e 100644
--- a/tests/kafkatest/services/verifiable_client.py
+++ b/tests/kafkatest/services/verifiable_client.py
@@ -13,8 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from kafkatest.directory_layout.kafka_path import TOOLS_JAR_NAME,
TOOLS_DEPENDANT_TEST_LIBS_JAR_NAME
-from kafkatest.version import DEV_BRANCH, LATEST_0_8_2, LATEST_0_9
from ducktape.cluster.remoteaccount import RemoteCommandError
import importlib
@@ -249,17 +247,7 @@ class VerifiableClientJava (VerifiableClient):
def exec_cmd (self, node):
""" :return: command to execute to start instance
Translates Verifiable* to the corresponding Java client class name """
- cmd = ""
- if self.java_class_name == 'VerifiableProducer' and node.version <=
LATEST_0_8_2:
- # 0.8.2.X releases do not have VerifiableProducer.java, so cheat
and add
- # the tools jar from 0.9.x to the classpath
- # TODO remove with KAFKA-14762
- tools_jar = self.parent.path.jar(TOOLS_JAR_NAME, LATEST_0_9)
- tools_dependant_libs_jar =
self.parent.path.jar(TOOLS_DEPENDANT_TEST_LIBS_JAR_NAME, LATEST_0_9)
- cmd += "for file in %s; do CLASSPATH=$CLASSPATH:$file; done; " %
tools_jar
- cmd += "for file in %s; do CLASSPATH=$CLASSPATH:$file; done; " %
tools_dependant_libs_jar
- cmd += "export CLASSPATH; "
- cmd += fix_opts_for_new_jvm(node)
+ cmd = fix_opts_for_new_jvm(node)
cmd += self.parent.path.script("kafka-run-class.sh", node) + "
org.apache.kafka.tools." + self.java_class_name
return cmd
diff --git a/tests/kafkatest/services/verifiable_consumer.py
b/tests/kafkatest/services/verifiable_consumer.py
index a62dc047842..7e81ca1f7ce 100644
--- a/tests/kafkatest/services/verifiable_consumer.py
+++ b/tests/kafkatest/services/verifiable_consumer.py
@@ -21,7 +21,7 @@ from ducktape.services.background_thread import
BackgroundThreadService
from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
from kafkatest.services.kafka import TopicPartition, consumer_group
from kafkatest.services.verifiable_client import VerifiableClientMixin
-from kafkatest.version import DEV_BRANCH, V_2_3_0, V_2_3_1, V_3_7_0,
V_0_10_0_0, V_4_0_0
+from kafkatest.version import DEV_BRANCH, V_2_3_0, V_2_3_1, V_3_7_0, V_4_0_0
class ConsumerState:
@@ -317,10 +317,6 @@ class VerifiableConsumer(KafkaPathResolverMixin,
VerifiableClientMixin, Backgrou
"Version %s does not support static membership (must be 2.3 or
higher)" % str(node.version)
node.group_instance_id = self.group_id + "-instance-" + str(idx)
- if self.assignment_strategy:
- assert node.version >= V_0_10_0_0, \
- "Version %s does not setting an assignment strategy (must be
0.10.0 or higher)" % str(node.version)
-
cmd = self.start_cmd(node)
self.logger.debug("VerifiableConsumer %d command: %s" % (idx, cmd))
diff --git a/tests/kafkatest/tests/client/client_compatibility_features_test.py
b/tests/kafkatest/tests/client/client_compatibility_features_test.py
index 7fe31d7336f..d0bcd80a791 100644
--- a/tests/kafkatest/tests/client/client_compatibility_features_test.py
+++ b/tests/kafkatest/tests/client/client_compatibility_features_test.py
@@ -26,8 +26,8 @@ from ducktape.tests.test import TestContext
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService, quorum
from ducktape.tests.test import Test
-from kafkatest.version import DEV_BRANCH, LATEST_1_0, LATEST_1_1, \
- LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5,
LATEST_2_6, LATEST_2_7, LATEST_2_8, \
+from kafkatest.version import DEV_BRANCH, \
+ LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6,
LATEST_2_7, LATEST_2_8, \
LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5,
LATEST_3_6, LATEST_3_7, LATEST_3_8, KafkaVersion
def get_broker_features(broker_version):
@@ -107,9 +107,6 @@ class ClientCompatibilityFeaturesTest(Test):
@cluster(num_nodes=7)
@matrix(broker_version=[str(DEV_BRANCH)],
metadata_quorum=quorum.all_non_upgrade)
- @parametrize(broker_version=str(LATEST_1_0))
- @parametrize(broker_version=str(LATEST_1_1))
- @parametrize(broker_version=str(LATEST_2_0))
@parametrize(broker_version=str(LATEST_2_1))
@parametrize(broker_version=str(LATEST_2_2))
@parametrize(broker_version=str(LATEST_2_3))
diff --git
a/tests/kafkatest/tests/client/client_compatibility_produce_consume_test.py
b/tests/kafkatest/tests/client/client_compatibility_produce_consume_test.py
index 3ff3c2ba753..74bd5563200 100644
--- a/tests/kafkatest/tests/client/client_compatibility_produce_consume_test.py
+++ b/tests/kafkatest/tests/client/client_compatibility_produce_consume_test.py
@@ -23,8 +23,8 @@ from kafkatest.services.verifiable_producer import
VerifiableProducer
from kafkatest.services.console_consumer import ConsoleConsumer
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
from kafkatest.utils import is_int_with_prefix
-from kafkatest.version import DEV_BRANCH, LATEST_1_0, LATEST_1_1, \
- LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5,
LATEST_2_6, LATEST_2_7, LATEST_2_8, \
+from kafkatest.version import DEV_BRANCH, \
+ LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6,
LATEST_2_7, LATEST_2_8, \
LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5,
LATEST_3_6, LATEST_3_7, LATEST_3_8, KafkaVersion
class ClientCompatibilityProduceConsumeTest(ProduceConsumeValidateTest):
@@ -58,9 +58,6 @@ class
ClientCompatibilityProduceConsumeTest(ProduceConsumeValidateTest):
@cluster(num_nodes=9)
@matrix(broker_version=[str(DEV_BRANCH)],
metadata_quorum=quorum.all_non_upgrade)
- @parametrize(broker_version=str(LATEST_1_0))
- @parametrize(broker_version=str(LATEST_1_1))
- @parametrize(broker_version=str(LATEST_2_0))
@parametrize(broker_version=str(LATEST_2_1))
@parametrize(broker_version=str(LATEST_2_2))
@parametrize(broker_version=str(LATEST_2_3))
diff --git a/tests/kafkatest/tests/client/quota_test.py
b/tests/kafkatest/tests/client/quota_test.py
index 08a23ecc4f6..d52f9b6a944 100644
--- a/tests/kafkatest/tests/client/quota_test.py
+++ b/tests/kafkatest/tests/client/quota_test.py
@@ -21,7 +21,7 @@ from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService
from kafkatest.services.performance import ProducerPerformanceService
from kafkatest.services.console_consumer import ConsoleConsumer
-from kafkatest.version import DEV_BRANCH, LATEST_1_1
+from kafkatest.version import DEV_BRANCH
class QuotaConfig(object):
CLIENT_ID = 'client-id'
@@ -129,24 +129,14 @@ class QuotaTest(Test):
@cluster(num_nodes=5)
@matrix(quota_type=[QuotaConfig.CLIENT_ID, QuotaConfig.USER,
QuotaConfig.USER_CLIENT], override_quota=[True, False])
@parametrize(quota_type=QuotaConfig.CLIENT_ID, consumer_num=2)
- @parametrize(quota_type=QuotaConfig.CLIENT_ID,
old_broker_throttling_behavior=True)
- @parametrize(quota_type=QuotaConfig.CLIENT_ID,
old_client_throttling_behavior=True)
- def test_quota(self, quota_type, override_quota=True, producer_num=1,
consumer_num=1,
- old_broker_throttling_behavior=False,
old_client_throttling_behavior=False):
- # Old (pre-2.0) throttling behavior for broker throttles before
sending a response to the client.
- if old_broker_throttling_behavior:
- self.kafka.set_version(LATEST_1_1)
+ def test_quota(self, quota_type, override_quota=True, producer_num=1,
consumer_num=1):
self.kafka.start()
self.quota_config = QuotaConfig(quota_type, override_quota, self.kafka)
producer_client_id = self.quota_config.client_id
consumer_client_id = self.quota_config.client_id
- # Old (pre-2.0) throttling behavior for client does not throttle upon
receiving a response with a non-zero throttle time.
- if old_client_throttling_behavior:
- client_version = LATEST_1_1
- else:
- client_version = DEV_BRANCH
+ client_version = DEV_BRANCH
# Produce all messages
producer = ProducerPerformanceService(
diff --git a/tests/kafkatest/tests/connect/connect_distributed_test.py
b/tests/kafkatest/tests/connect/connect_distributed_test.py
index 64a80d2483e..e54118c3881 100644
--- a/tests/kafkatest/tests/connect/connect_distributed_test.py
+++ b/tests/kafkatest/tests/connect/connect_distributed_test.py
@@ -24,7 +24,7 @@ from kafkatest.services.kafka import KafkaService,
config_property, quorum, cons
from kafkatest.services.connect import ConnectDistributedService,
ConnectServiceBase, VerifiableSource, VerifiableSink, ConnectRestError,
MockSink, MockSource
from kafkatest.services.console_consumer import ConsoleConsumer
from kafkatest.services.security.security_config import SecurityConfig
-from kafkatest.version import DEV_BRANCH, LATEST_2_3, LATEST_2_2, LATEST_2_1,
LATEST_2_0, LATEST_1_1, LATEST_1_0, KafkaVersion
+from kafkatest.version import DEV_BRANCH, LATEST_2_3, LATEST_2_2, LATEST_2_1,
KafkaVersion
from functools import reduce
from collections import Counter, namedtuple
diff --git a/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py
b/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py
index 35edaffc64d..c7f600a0f3e 100644
--- a/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py
+++ b/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py
@@ -21,8 +21,7 @@ from kafkatest.services.verifiable_producer import
VerifiableProducer
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
from kafkatest.utils import is_int
-from kafkatest.version import LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0,
LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, \
- LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3,
LATEST_2_4, LATEST_2_5, LATEST_2_6, \
+from kafkatest.version import LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4,
LATEST_2_5, LATEST_2_6, \
LATEST_2_7, LATEST_2_8, LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3,
LATEST_3_4, LATEST_3_5, LATEST_3_6, \
LATEST_3_7, LATEST_3_8, DEV_BRANCH, KafkaVersion
@@ -48,8 +47,7 @@ class
ClientCompatibilityTestNewBroker(ProduceConsumeValidateTest):
@cluster(num_nodes=6)
@matrix(producer_version=[str(DEV_BRANCH)],
consumer_version=[str(DEV_BRANCH)], compression_types=[["snappy"]],
timestamp_type=[str("LogAppendTime")], metadata_quorum=quorum.all_non_upgrade)
@matrix(producer_version=[str(DEV_BRANCH)],
consumer_version=[str(DEV_BRANCH)], compression_types=[["none"]],
timestamp_type=[str("LogAppendTime")], metadata_quorum=quorum.all_non_upgrade)
- @parametrize(producer_version=str(DEV_BRANCH),
consumer_version=str(LATEST_0_9), compression_types=["none"],
new_consumer=False, timestamp_type=None)
- @matrix(producer_version=[str(DEV_BRANCH)],
consumer_version=[str(LATEST_0_9)], compression_types=[["snappy"]],
timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
+ @matrix(producer_version=[str(DEV_BRANCH)],
consumer_version=[str(LATEST_2_1)], compression_types=[["snappy"]],
timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
@matrix(producer_version=[str(LATEST_2_2)],
consumer_version=[str(LATEST_2_2)], compression_types=[["none"]],
timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
@matrix(producer_version=[str(LATEST_2_3)],
consumer_version=[str(LATEST_2_3)], compression_types=[["none"]],
timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
@matrix(producer_version=[str(LATEST_2_4)],
consumer_version=[str(LATEST_2_4)], compression_types=[["none"]],
timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
@@ -67,17 +65,6 @@ class
ClientCompatibilityTestNewBroker(ProduceConsumeValidateTest):
@matrix(producer_version=[str(LATEST_3_7)],
consumer_version=[str(LATEST_3_7)], compression_types=[["none"]],
timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
@matrix(producer_version=[str(LATEST_3_8)],
consumer_version=[str(LATEST_3_8)], compression_types=[["none"]],
timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
@matrix(producer_version=[str(LATEST_2_1)],
consumer_version=[str(LATEST_2_1)], compression_types=[["zstd"]],
timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
- @matrix(producer_version=[str(LATEST_2_0)],
consumer_version=[str(LATEST_2_0)], compression_types=[["snappy"]],
timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
- @matrix(producer_version=[str(LATEST_1_1)],
consumer_version=[str(LATEST_1_1)], compression_types=[["lz4"]],
timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
- @matrix(producer_version=[str(LATEST_1_0)],
consumer_version=[str(LATEST_1_0)], compression_types=[["none"]],
timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
- @matrix(producer_version=[str(LATEST_0_11_0)],
consumer_version=[str(LATEST_0_11_0)], compression_types=[["gzip"]],
timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
- @matrix(producer_version=[str(LATEST_0_10_2)],
consumer_version=[str(LATEST_0_10_2)], compression_types=[["lz4"]],
timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
- @matrix(producer_version=[str(LATEST_0_10_1)],
consumer_version=[str(LATEST_0_10_1)], compression_types=[["snappy"]],
timestamp_type=[str("LogAppendTime")], metadata_quorum=quorum.all_non_upgrade)
- @matrix(producer_version=[str(LATEST_0_10_0)],
consumer_version=[str(LATEST_0_10_0)], compression_types=[["snappy"]],
timestamp_type=[str("LogAppendTime")], metadata_quorum=quorum.all_non_upgrade)
- @matrix(producer_version=[str(LATEST_0_9)],
consumer_version=[str(DEV_BRANCH)], compression_types=[["none"]],
timestamp_type=[None], metadata_quorum=quorum.all_non_upgrade)
- @matrix(producer_version=[str(LATEST_0_9)],
consumer_version=[str(DEV_BRANCH)], compression_types=[["snappy"]],
timestamp_type=[None], metadata_quorum=quorum.all_non_upgrade)
- @matrix(producer_version=[str(LATEST_0_9)],
consumer_version=[str(LATEST_0_9)], compression_types=[["snappy"]],
timestamp_type=[str("LogAppendTime")], metadata_quorum=quorum.all_non_upgrade)
- @parametrize(producer_version=str(LATEST_0_8_2),
consumer_version=str(LATEST_0_8_2), compression_types=["none"],
new_consumer=False, timestamp_type=None)
def test_compatibility(self, producer_version, consumer_version,
compression_types, new_consumer=True, timestamp_type=None,
metadata_quorum=quorum.zk):
if not new_consumer and metadata_quorum != quorum.zk:
raise Exception("ZooKeeper-based consumers are not supported when
using a KRaft metadata quorum")
diff --git
a/tests/kafkatest/tests/streams/streams_cooperative_rebalance_upgrade_test.py
b/tests/kafkatest/tests/streams/streams_cooperative_rebalance_upgrade_test.py
index 992ad587923..a478f11f340 100644
---
a/tests/kafkatest/tests/streams/streams_cooperative_rebalance_upgrade_test.py
+++
b/tests/kafkatest/tests/streams/streams_cooperative_rebalance_upgrade_test.py
@@ -19,8 +19,7 @@ from ducktape.mark.resource import cluster
from ducktape.tests.test import Test
from kafkatest.services.kafka import KafkaService, quorum
from kafkatest.services.verifiable_producer import VerifiableProducer
-from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2,
LATEST_0_11_0, LATEST_1_0, LATEST_1_1, \
- LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3
+from kafkatest.version import LATEST_2_1, LATEST_2_2, LATEST_2_3
from kafkatest.services.streams import CooperativeRebalanceUpgradeService
from kafkatest.tests.streams.utils import verify_stopped, stop_processors,
verify_running
@@ -44,9 +43,7 @@ class StreamsCooperativeRebalanceUpgradeTest(Test):
second_bounce_phase = "second_bounce_phase-"
# !!CAUTION!!: THIS LIST OF VERSIONS IS FIXED, NO VERSIONS MUST BE ADDED
- streams_eager_rebalance_upgrade_versions = [str(LATEST_0_10_2),
str(LATEST_0_11_0),
- str(LATEST_1_0),
str(LATEST_1_1), str(LATEST_2_0), str(LATEST_2_1), str(LATEST_2_2),
- str(LATEST_2_3)]
+ streams_eager_rebalance_upgrade_versions = [str(LATEST_2_1),
str(LATEST_2_2), str(LATEST_2_3)]
def __init__(self, test_context):
super(StreamsCooperativeRebalanceUpgradeTest,
self).__init__(test_context)
diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py
b/tests/kafkatest/tests/streams/streams_upgrade_test.py
index 2b37e0c2a4f..759e4156920 100644
--- a/tests/kafkatest/tests/streams/streams_upgrade_test.py
+++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py
@@ -22,8 +22,7 @@ from kafkatest.services.kafka import KafkaService, quorum
from kafkatest.services.streams import StreamsSmokeTestDriverService,
StreamsSmokeTestJobRunnerService, \
StreamsUpgradeTestJobRunnerService
from kafkatest.tests.streams.utils import extract_generation_from_logs,
extract_generation_id
-from kafkatest.version import LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0,
LATEST_1_1, \
- LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5,
LATEST_2_6, LATEST_2_7, LATEST_2_8, \
+from kafkatest.version import LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4,
LATEST_2_5, LATEST_2_6, LATEST_2_7, LATEST_2_8, \
LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5,
LATEST_3_6, LATEST_3_7, LATEST_3_8, DEV_BRANCH, DEV_VERSION, \
KafkaVersion
@@ -33,8 +32,7 @@ broker_upgrade_versions = [str(LATEST_2_8), str(LATEST_3_0),
str(LATEST_3_1), st
str(LATEST_3_3), str(LATEST_3_4), str(LATEST_3_5),
str(LATEST_3_6),
str(LATEST_3_7), str(LATEST_3_8), str(DEV_BRANCH)]
-metadata_2_versions = [str(LATEST_0_10_2), str(LATEST_0_11_0),
str(LATEST_1_0), str(LATEST_1_1),
- str(LATEST_2_4), str(LATEST_2_5), str(LATEST_2_6),
str(LATEST_2_7), str(LATEST_2_8),
+metadata_2_versions = [str(LATEST_2_4), str(LATEST_2_5), str(LATEST_2_6),
str(LATEST_2_7), str(LATEST_2_8),
str(LATEST_3_0), str(LATEST_3_1), str(LATEST_3_2),
str(LATEST_3_3)]
# upgrading from version (2.4...3.3) is broken and only fixed later in 3.3.3
(unreleased) and 3.4.0
# -> https://issues.apache.org/jira/browse/KAFKA-14646
diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py
index 56db6431030..00bcf536f84 100644
--- a/tests/kafkatest/version.py
+++ b/tests/kafkatest/version.py
@@ -62,21 +62,6 @@ class KafkaVersion(LooseVersion):
return LooseVersion._cmp(self, other)
- def consumer_supports_bootstrap_server(self):
- """
- Kafka supported a new consumer beginning with v0.9.0 where
- we can specify --bootstrap-server instead of --zookeeper.
-
- This version also allowed a --consumer-config file where we could
specify
- a security protocol other than PLAINTEXT.
-
- :return: true if the version of Kafka supports a new consumer with
--bootstrap-server
- """
- return self >= V_0_9_0_0
-
- def supports_named_listeners(self):
- return self >= V_0_10_2_0
-
def acl_command_supports_bootstrap_server(self):
return self >= V_2_1_0
@@ -127,58 +112,6 @@ DEV_VERSION = KafkaVersion("4.0.0-SNAPSHOT")
# This should match the LATEST_PRODUCTION version defined in
MetadataVersion.java
LATEST_STABLE_METADATA_VERSION = "4.0-IV0"
-# 0.8.2.x versions
-V_0_8_2_1 = KafkaVersion("0.8.2.1")
-V_0_8_2_2 = KafkaVersion("0.8.2.2")
-LATEST_0_8_2 = V_0_8_2_2
-
-# 0.9.0.x versions
-V_0_9_0_0 = KafkaVersion("0.9.0.0")
-V_0_9_0_1 = KafkaVersion("0.9.0.1")
-LATEST_0_9 = V_0_9_0_1
-
-# 0.10.0.x versions
-V_0_10_0_0 = KafkaVersion("0.10.0.0")
-V_0_10_0_1 = KafkaVersion("0.10.0.1")
-LATEST_0_10_0 = V_0_10_0_1
-
-# 0.10.1.x versions
-V_0_10_1_0 = KafkaVersion("0.10.1.0")
-V_0_10_1_1 = KafkaVersion("0.10.1.1")
-LATEST_0_10_1 = V_0_10_1_1
-
-# 0.10.2.x versions
-V_0_10_2_0 = KafkaVersion("0.10.2.0")
-V_0_10_2_1 = KafkaVersion("0.10.2.1")
-V_0_10_2_2 = KafkaVersion("0.10.2.2")
-LATEST_0_10_2 = V_0_10_2_2
-
-LATEST_0_10 = LATEST_0_10_2
-
-# 0.11.0.x versions
-V_0_11_0_0 = KafkaVersion("0.11.0.0")
-V_0_11_0_1 = KafkaVersion("0.11.0.1")
-V_0_11_0_2 = KafkaVersion("0.11.0.2")
-V_0_11_0_3 = KafkaVersion("0.11.0.3")
-LATEST_0_11_0 = V_0_11_0_3
-LATEST_0_11 = LATEST_0_11_0
-
-# 1.0.x versions
-V_1_0_0 = KafkaVersion("1.0.0")
-V_1_0_1 = KafkaVersion("1.0.1")
-V_1_0_2 = KafkaVersion("1.0.2")
-LATEST_1_0 = V_1_0_2
-
-# 1.1.x versions
-V_1_1_0 = KafkaVersion("1.1.0")
-V_1_1_1 = KafkaVersion("1.1.1")
-LATEST_1_1 = V_1_1_1
-
-# 2.0.x versions
-V_2_0_0 = KafkaVersion("2.0.0")
-V_2_0_1 = KafkaVersion("2.0.1")
-LATEST_2_0 = V_2_0_1
-
# 2.1.x versions
V_2_1_0 = KafkaVersion("2.1.0")
V_2_1_1 = KafkaVersion("2.1.1")
diff --git a/tests/unit/directory_layout/check_project_paths.py
b/tests/unit/directory_layout/check_project_paths.py
index b9b76f13276..06cf6a4d7dd 100644
--- a/tests/unit/directory_layout/check_project_paths.py
+++ b/tests/unit/directory_layout/check_project_paths.py
@@ -16,7 +16,7 @@
from kafkatest.directory_layout.kafka_path import create_path_resolver,
KafkaSystemTestPathResolver, \
KAFKA_PATH_RESOLVER_KEY
-from kafkatest.version import V_0_9_0_1, DEV_BRANCH, KafkaVersion
+from kafkatest.version import V_2_1_0, DEV_BRANCH, KafkaVersion
class DummyContext(object):
@@ -64,9 +64,9 @@ class CheckCreatePathResolver(object):
"""Check expected paths when using versions."""
resolver = create_path_resolver(DummyContext())
- assert resolver.home(V_0_9_0_1) == "/opt/kafka-0.9.0.1"
- assert resolver.bin(V_0_9_0_1) == "/opt/kafka-0.9.0.1/bin"
- assert resolver.script("kafka-run-class.sh", V_0_9_0_1) ==
"/opt/kafka-0.9.0.1/bin/kafka-run-class.sh"
+ assert resolver.home(V_2_1_0) == "/opt/kafka-2.1.0"
+ assert resolver.bin(V_2_1_0) == "/opt/kafka-2.1.0/bin"
+ assert resolver.script("kafka-run-class.sh", V_2_1_0) ==
"/opt/kafka-2.1.0/bin/kafka-run-class.sh"
def check_node_or_version_helper(self):
"""KafkaSystemTestPathResolver has a helper method which can take a
node or version, and returns the version.
@@ -79,8 +79,8 @@ class CheckCreatePathResolver(object):
assert resolver._version(node) == DEV_BRANCH
# Node with version attribute should resolve to the version attribute
- node.version = V_0_9_0_1
- assert resolver._version(node) == V_0_9_0_1
+ node.version = V_2_1_0
+ assert resolver._version(node) == V_2_1_0
# A KafkaVersion object should resolve to itself
assert resolver._version(DEV_BRANCH) == DEV_BRANCH
diff --git a/tests/unit/version/check_version.py
b/tests/unit/version/check_version.py
index 8cf8e9a06e7..04148962812 100644
--- a/tests/unit/version/check_version.py
+++ b/tests/unit/version/check_version.py
@@ -15,7 +15,7 @@
from mock import Mock
-from kafkatest.version import DEV_BRANCH, V_0_8_2_2, get_version
+from kafkatest.version import DEV_BRANCH, V_2_1_0, get_version
class CheckVersion(object):
@@ -29,5 +29,5 @@ class CheckVersion(object):
assert get_version(node) == DEV_BRANCH
node = Mock()
- node.version = V_0_8_2_2
- assert get_version(node) == V_0_8_2_2
\ No newline at end of file
+ node.version = V_2_1_0
+ assert get_version(node) == V_2_1_0
\ No newline at end of file
diff --git a/vagrant/base.sh b/vagrant/base.sh
index 958b9c7d7a8..d57a2d223a4 100755
--- a/vagrant/base.sh
+++ b/vagrant/base.sh
@@ -114,24 +114,6 @@ apt-get install -y iperf traceroute
# We want to use the latest Scala version per Kafka version
# Previously we could not pull in Scala 2.12 builds, because Scala 2.12
requires Java 8 and we were running the system
# tests with Java 7. We have since switched to Java 8, so 2.0.0 and later use
Scala 2.12.
-get_kafka 0.8.2.2 2.11
-chmod a+rw /opt/kafka-0.8.2.2
-get_kafka 0.9.0.1 2.11
-chmod a+rw /opt/kafka-0.9.0.1
-get_kafka 0.10.0.1 2.11
-chmod a+rw /opt/kafka-0.10.0.1
-get_kafka 0.10.1.1 2.11
-chmod a+rw /opt/kafka-0.10.1.1
-get_kafka 0.10.2.2 2.11
-chmod a+rw /opt/kafka-0.10.2.2
-get_kafka 0.11.0.3 2.11
-chmod a+rw /opt/kafka-0.11.0.3
-get_kafka 1.0.2 2.11
-chmod a+rw /opt/kafka-1.0.2
-get_kafka 1.1.1 2.11
-chmod a+rw /opt/kafka-1.1.1
-get_kafka 2.0.1 2.12
-chmod a+rw /opt/kafka-2.0.1
get_kafka 2.1.1 2.12
chmod a+rw /opt/kafka-2.1.1
get_kafka 2.2.2 2.12