This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3d2edf8de0d KAFKA-17609:[4/4]Convert system tests to kraft part 4 
(#17328)
3d2edf8de0d is described below

commit 3d2edf8de0d645376eaccbebee3b15bb467c7f2e
Author: Bill Bejeck <[email protected]>
AuthorDate: Wed Oct 30 12:07:16 2024 -0400

    KAFKA-17609:[4/4]Convert system tests to kraft part 4 (#17328)
    
    Part 4 of 4 converting streams system tests to KRaft
    
    Reviewers: Matthias Sax <[email protected]>
---
 .../kafkatest/tests/streams/streams_smoke_test.py  |   4 +-
 .../streams/streams_static_membership_test.py      |  12 +-
 .../tests/streams/streams_upgrade_test.py          | 124 ++-------------------
 3 files changed, 15 insertions(+), 125 deletions(-)

diff --git a/tests/kafkatest/tests/streams/streams_smoke_test.py 
b/tests/kafkatest/tests/streams/streams_smoke_test.py
index 88846de8949..51f5420f62e 100644
--- a/tests/kafkatest/tests/streams/streams_smoke_test.py
+++ b/tests/kafkatest/tests/streams/streams_smoke_test.py
@@ -49,8 +49,8 @@ class StreamsSmokeTest(KafkaTest):
     @cluster(num_nodes=8)
     @matrix(processing_guarantee=['exactly_once_v2', 'at_least_once'],
             crash=[True, False],
-            metadata_quorum=quorum.all_non_upgrade)
-    def test_streams(self, processing_guarantee, crash, 
metadata_quorum=quorum.zk):
+            metadata_quorum=[quorum.combined_kraft])
+    def test_streams(self, processing_guarantee, crash, metadata_quorum):
         processor1 = StreamsSmokeTestJobRunnerService(self.test_context, 
self.kafka, processing_guarantee)
         processor2 = StreamsSmokeTestJobRunnerService(self.test_context, 
self.kafka, processing_guarantee)
         processor3 = StreamsSmokeTestJobRunnerService(self.test_context, 
self.kafka, processing_guarantee)
diff --git a/tests/kafkatest/tests/streams/streams_static_membership_test.py 
b/tests/kafkatest/tests/streams/streams_static_membership_test.py
index 2249f6cc345..3a2f41d3c34 100644
--- a/tests/kafkatest/tests/streams/streams_static_membership_test.py
+++ b/tests/kafkatest/tests/streams/streams_static_membership_test.py
@@ -19,7 +19,6 @@ from ducktape.tests.test import Test
 from kafkatest.services.kafka import KafkaService, quorum
 from kafkatest.services.streams import StaticMemberTestService
 from kafkatest.services.verifiable_producer import VerifiableProducer
-from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.tests.streams.utils import verify_stopped, stop_processors, 
verify_running, extract_generation_from_logs, extract_generation_id
 
 class StreamsStaticMembershipTest(Test):
@@ -39,13 +38,8 @@ class StreamsStaticMembershipTest(Test):
             self.input_topic: {'partitions': 18},
         }
 
-        self.zookeeper = (
-            ZookeeperService(self.test_context, 1)
-            if quorum.for_test(self.test_context) == quorum.zk
-            else None
-        )
         self.kafka = KafkaService(self.test_context, num_nodes=3,
-                                  zk=self.zookeeper, topics=self.topics, 
controller_num_nodes_override=1)
+                                  zk=None, topics=self.topics, 
controller_num_nodes_override=1)
 
         self.producer = VerifiableProducer(self.test_context,
                                            1,
@@ -57,8 +51,6 @@ class StreamsStaticMembershipTest(Test):
     @cluster(num_nodes=8)
     @matrix(metadata_quorum=[quorum.isolated_kraft], 
use_new_coordinator=[True, False])
     def 
test_rolling_bounces_will_not_trigger_rebalance_under_static_membership(self, 
metadata_quorum, use_new_coordinator=False):
-        if self.zookeeper:
-            self.zookeeper.start()
         self.kafka.start()
 
         numThreads = 3
@@ -104,8 +96,6 @@ class StreamsStaticMembershipTest(Test):
 
         self.producer.stop()
         self.kafka.stop(timeout_sec=120)
-        if self.zookeeper:
-            self.zookeeper.stop()
 
     def verify_processing(self, processors):
         for processor in processors:
diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py 
b/tests/kafkatest/tests/streams/streams_upgrade_test.py
index 7b0ff2f3413..2b37e0c2a4f 100644
--- a/tests/kafkatest/tests/streams/streams_upgrade_test.py
+++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py
@@ -18,28 +18,22 @@ import time
 from ducktape.mark import matrix, ignore
 from ducktape.mark.resource import cluster
 from ducktape.tests.test import Test
-from ducktape.utils.util import wait_until
-from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka import KafkaService, quorum
 from kafkatest.services.streams import StreamsSmokeTestDriverService, 
StreamsSmokeTestJobRunnerService, \
     StreamsUpgradeTestJobRunnerService
-from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.tests.streams.utils import extract_generation_from_logs, 
extract_generation_id
-from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, 
LATEST_0_11_0, LATEST_1_0, LATEST_1_1, \
+from kafkatest.version import LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, 
LATEST_1_1, \
     LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, 
LATEST_2_6, LATEST_2_7, LATEST_2_8, \
     LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5, 
LATEST_3_6, LATEST_3_7, LATEST_3_8, DEV_BRANCH, DEV_VERSION, \
     KafkaVersion
 
 # broker 0.10.0 is not compatible with newer Kafka Streams versions
 # broker 0.10.1 and 0.10.2 do not support headers, as required by suppress() 
(since v2.2.1)
-broker_upgrade_versions = [str(LATEST_0_11_0), str(LATEST_1_0), 
str(LATEST_1_1),
-                           str(LATEST_2_0), str(LATEST_2_1), str(LATEST_2_2), 
str(LATEST_2_3),
-                           str(LATEST_2_4), str(LATEST_2_5), str(LATEST_2_6), 
str(LATEST_2_7),
-                           str(LATEST_2_8), str(LATEST_3_0), str(LATEST_3_1), 
str(LATEST_3_2),
+broker_upgrade_versions = [str(LATEST_2_8), str(LATEST_3_0), str(LATEST_3_1), 
str(LATEST_3_2),
                            str(LATEST_3_3), str(LATEST_3_4), str(LATEST_3_5), 
str(LATEST_3_6),
                            str(LATEST_3_7), str(LATEST_3_8), str(DEV_BRANCH)]
 
-metadata_1_versions = [str(LATEST_0_10_0)]
-metadata_2_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), 
str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1),
+metadata_2_versions = [str(LATEST_0_10_2), str(LATEST_0_11_0), 
str(LATEST_1_0), str(LATEST_1_1),
                        str(LATEST_2_4), str(LATEST_2_5), str(LATEST_2_6), 
str(LATEST_2_7), str(LATEST_2_8),
                        str(LATEST_3_0), str(LATEST_3_1), str(LATEST_3_2), 
str(LATEST_3_3)]
 # upgrading from version (2.4...3.3) is broken and only fixed later in 3.3.3 
(unreleased) and 3.4.0
@@ -111,102 +105,10 @@ class StreamsUpgradeTest(Test):
             node.version = KafkaVersion(to_version)
             self.kafka.start_node(node)
 
-    @ignore
     @cluster(num_nodes=6)
-    @matrix(from_version=broker_upgrade_versions, 
to_version=broker_upgrade_versions)
-    def test_upgrade_downgrade_brokers(self, from_version, to_version):
-        """
-        Start a smoke test client then perform rolling upgrades on the broker.
-        """
-
-        if from_version == to_version:
-            return
-
-        self.replication = 3
-        self.num_kafka_nodes = 3
-        self.partitions = 1
-        self.isr = 2
-        self.topics = {
-            'echo' : { 'partitions': self.partitions, 'replication-factor': 
self.replication,
-                       'configs': {"min.insync.replicas": self.isr}},
-            'data' : { 'partitions': self.partitions, 'replication-factor': 
self.replication,
-                       'configs': {"min.insync.replicas": self.isr} },
-            'min' : { 'partitions': self.partitions, 'replication-factor': 
self.replication,
-                      'configs': {"min.insync.replicas": self.isr} },
-            'max' : { 'partitions': self.partitions, 'replication-factor': 
self.replication,
-                      'configs': {"min.insync.replicas": self.isr} },
-            'sum' : { 'partitions': self.partitions, 'replication-factor': 
self.replication,
-                      'configs': {"min.insync.replicas": self.isr} },
-            'dif' : { 'partitions': self.partitions, 'replication-factor': 
self.replication,
-                      'configs': {"min.insync.replicas": self.isr} },
-            'cnt' : { 'partitions': self.partitions, 'replication-factor': 
self.replication,
-                      'configs': {"min.insync.replicas": self.isr} },
-            'avg' : { 'partitions': self.partitions, 'replication-factor': 
self.replication,
-                      'configs': {"min.insync.replicas": self.isr} },
-            'wcnt' : { 'partitions': self.partitions, 'replication-factor': 
self.replication,
-                       'configs': {"min.insync.replicas": self.isr} },
-            'tagg' : { 'partitions': self.partitions, 'replication-factor': 
self.replication,
-                       'configs': {"min.insync.replicas": self.isr} }
-        }
-
-        # Setup phase
-        self.zk = ZookeeperService(self.test_context, num_nodes=1)
-        self.zk.start()
-
-        # number of nodes needs to be >= 3 for the smoke test
-        self.kafka = KafkaService(self.test_context, 
num_nodes=self.num_kafka_nodes,
-                                  zk=self.zk, 
version=KafkaVersion(from_version), topics=self.topics)
-        self.kafka.start()
-
-        # allow some time for topics to be created
-        wait_until(lambda: 
self.confirm_topics_on_all_brokers(set(self.topics.keys())),
-                   timeout_sec=60,
-                   err_msg="Broker did not create all topics in 60 seconds ")
-
-        self.driver = StreamsSmokeTestDriverService(self.test_context, 
self.kafka)
-
-        processor = StreamsSmokeTestJobRunnerService(self.test_context, 
self.kafka, "at_least_once")
-
-        with self.driver.node.account.monitor_log(self.driver.STDOUT_FILE) as 
driver_monitor:
-            self.driver.start()
-
-            with processor.node.account.monitor_log(processor.STDOUT_FILE) as 
monitor:
-                processor.start()
-                monitor.wait_until(self.processed_data_msg,
-                                   timeout_sec=60,
-                                   err_msg="Never saw output '%s' on " % 
self.processed_data_msg + str(processor.node))
-
-            connected_message = "Discovered group coordinator"
-            with processor.node.account.monitor_log(processor.LOG_FILE) as 
log_monitor:
-                with processor.node.account.monitor_log(processor.STDOUT_FILE) 
as stdout_monitor:
-                    self.perform_broker_upgrade(to_version)
-
-                    log_monitor.wait_until(connected_message,
-                                           timeout_sec=120,
-                                           err_msg=("Never saw output '%s' on 
" % connected_message) + str(processor.node.account))
-
-                    stdout_monitor.wait_until(self.processed_data_msg,
-                                              timeout_sec=60,
-                                              err_msg="Never saw output '%s' 
on" % self.processed_data_msg + str(processor.node.account))
-
-            # SmokeTestDriver allows up to 6 minutes to consume all
-            # records for the verification step so this timeout is set to
-            # 6 minutes (360 seconds) for consuming of verification records
-            # and a very conservative additional 2 minutes (120 seconds) to 
process
-            # the records in the verification step
-            
driver_monitor.wait_until('ALL-RECORDS-DELIVERED\|PROCESSED-MORE-THAN-GENERATED',
-                                      timeout_sec=480,
-                                      err_msg="Never saw output '%s' on" % 
'ALL-RECORDS-DELIVERED|PROCESSED-MORE-THAN-GENERATED' + 
str(self.driver.node.account))
-
-        self.driver.stop()
-        processor.stop()
-        processor.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" 
% processor.STDOUT_FILE, allow_fail=False)
-
-    @cluster(num_nodes=6)
-    @matrix(from_version=metadata_1_versions)
-    @matrix(from_version=metadata_2_versions)
-    @matrix(from_version=fk_join_versions)
-    def test_rolling_upgrade_with_2_bounces(self, from_version):
+    @matrix(from_version=metadata_2_versions, 
metadata_quorum=[quorum.combined_kraft])
+    @matrix(from_version=fk_join_versions, 
metadata_quorum=[quorum.combined_kraft])
+    def test_rolling_upgrade_with_2_bounces(self, from_version, 
metadata_quorum):
         """
         This test verifies that the cluster successfully upgrades despite 
changes in the metadata and FK
         join protocols.
@@ -245,7 +147,8 @@ class StreamsUpgradeTest(Test):
         self.stop_and_await()
 
     @cluster(num_nodes=6)
-    def test_version_probing_upgrade(self):
+    @matrix(metadata_quorum=[quorum.combined_kraft])
+    def test_version_probing_upgrade(self, metadata_quorum):
         """
         Starts 3 KafkaStreams instances, and upgrades one-by-one to "future 
version"
         """
@@ -272,8 +175,8 @@ class StreamsUpgradeTest(Test):
         self.stop_and_await()
 
     @cluster(num_nodes=6)
-    @matrix(from_version=[str(LATEST_3_2), str(DEV_VERSION)],  upgrade=[True, 
False])
-    def test_upgrade_downgrade_state_updater(self, from_version, upgrade):
+    @matrix(from_version=[str(LATEST_3_2), str(DEV_VERSION)],  upgrade=[True, 
False], metadata_quorum=[quorum.combined_kraft])
+    def test_upgrade_downgrade_state_updater(self, from_version, upgrade, 
metadata_quorum):
         """
         Starts 3 KafkaStreams instances, and enables / disables state 
restoration
         for the instances in a rolling bounce.
@@ -312,10 +215,7 @@ class StreamsUpgradeTest(Test):
         self.stop_and_await()
 
     def set_up_services(self):
-        self.zk = ZookeeperService(self.test_context, num_nodes=1)
-        self.zk.start()
-
-        self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk, 
topics=self.topics)
+        self.kafka = KafkaService(self.test_context, num_nodes=1, zk=None, 
topics=self.topics)
         self.kafka.start()
 
         self.driver = StreamsSmokeTestDriverService(self.test_context, 
self.kafka)

Reply via email to