This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 14ea1cf61a1 KAFKA-19202: Enable KIP-1071 in
streams_broker_bounce_test.py (#19584)
14ea1cf61a1 is described below
commit 14ea1cf61a10a8310a8a364cec0bbfbe67dbe51f
Author: Lucas Brutschy <[email protected]>
AuthorDate: Tue Apr 29 18:11:46 2025 +0200
KAFKA-19202: Enable KIP-1071 in streams_broker_bounce_test.py (#19584)
Enable KIP-1071 in the next system test.
Reviewers: Bill Bejeck <[email protected]>
---
.../tests/streams/streams_broker_bounce_test.py | 36 ++++++++++++----------
1 file changed, 20 insertions(+), 16 deletions(-)
diff --git a/tests/kafkatest/tests/streams/streams_broker_bounce_test.py
b/tests/kafkatest/tests/streams/streams_broker_bounce_test.py
index dc61e5fa37c..e9cd1315cf5 100644
--- a/tests/kafkatest/tests/streams/streams_broker_bounce_test.py
+++ b/tests/kafkatest/tests/streams/streams_broker_bounce_test.py
@@ -149,10 +149,10 @@ class StreamsBrokerBounceTest(Test):
return True
- def setup_system(self, start_processor=True, num_threads=3):
+ def setup_system(self, start_processor=True, num_threads=3,
group_protocol='classic'):
# Setup phase
-
- self.kafka = KafkaService(self.test_context,
num_nodes=self.replication, zk=None, topics=self.topics)
+ use_streams_groups = True if group_protocol == 'streams' else False
+ self.kafka = KafkaService(self.test_context,
num_nodes=self.replication, zk=None, topics=self.topics,
use_streams_groups=use_streams_groups)
self.kafka.start()
# allow some time for topics to be created
@@ -162,7 +162,7 @@ class StreamsBrokerBounceTest(Test):
# Start test harness
self.driver = StreamsSmokeTestDriverService(self.test_context,
self.kafka)
- self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context,
self.kafka, "at_least_once", num_threads)
+ self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context,
self.kafka, "at_least_once", group_protocol=group_protocol, num_threads =
num_threads)
self.driver.start()
@@ -207,15 +207,16 @@ class StreamsBrokerBounceTest(Test):
broker_type=["leader"],
num_threads=[1, 3],
sleep_time_secs=[120],
- metadata_quorum=[quorum.combined_kraft])
- def test_broker_type_bounce(self, failure_mode, broker_type,
sleep_time_secs, num_threads, metadata_quorum):
+ metadata_quorum=[quorum.combined_kraft],
+ group_protocol=["classic", "streams"])
+ def test_broker_type_bounce(self, failure_mode, broker_type,
sleep_time_secs, num_threads, metadata_quorum, group_protocol):
"""
Start a smoke test client, then kill one particular broker and ensure
data is still received
Record if records are delivered.
We also add a single thread stream client to make sure we could get
all partitions reassigned in
next generation so to verify the partition lost is correctly triggered.
"""
- self.setup_system(num_threads=num_threads)
+ self.setup_system(num_threads=num_threads,
group_protocol=group_protocol)
# Sleep to allow test to run for a bit
time.sleep(sleep_time_secs)
@@ -230,14 +231,15 @@ class StreamsBrokerBounceTest(Test):
@matrix(failure_mode=["clean_shutdown"],
broker_type=["controller"],
sleep_time_secs=[0],
- metadata_quorum=[quorum.combined_kraft])
- def test_broker_type_bounce_at_start(self, failure_mode, broker_type,
sleep_time_secs, metadata_quorum):
+ metadata_quorum=[quorum.combined_kraft],
+ group_protocol=["classic", "streams"])
+ def test_broker_type_bounce_at_start(self, failure_mode, broker_type,
sleep_time_secs, metadata_quorum, group_protocol):
"""
Start a smoke test client, then kill one particular broker immediately
before streams stats
Streams should throw an exception since it cannot create topics with
the desired
replication factor of 3
"""
- self.setup_system(start_processor=False)
+ self.setup_system(start_processor=False, group_protocol=group_protocol)
# Sleep to allow test to run for a bit
time.sleep(sleep_time_secs)
@@ -252,13 +254,14 @@ class StreamsBrokerBounceTest(Test):
@cluster(num_nodes=10)
@matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce",
"hard_bounce"],
num_failures=[2],
- metadata_quorum=[quorum.isolated_kraft])
- def test_many_brokers_bounce(self, failure_mode, num_failures,
metadata_quorum):
+ metadata_quorum=[quorum.isolated_kraft],
+ group_protocol=["classic", "streams"])
+ def test_many_brokers_bounce(self, failure_mode, num_failures,
metadata_quorum, group_protocol):
"""
Start a smoke test client, then kill a few brokers and ensure data is
still received
Record if records are delivered
"""
- self.setup_system()
+ self.setup_system(group_protocol=group_protocol)
# Sleep to allow test to run for a bit
time.sleep(120)
@@ -271,8 +274,9 @@ class StreamsBrokerBounceTest(Test):
@cluster(num_nodes=10)
@matrix(failure_mode=["clean_bounce", "hard_bounce"],
num_failures=[3],
- metadata_quorum=[quorum.isolated_kraft])
- def test_all_brokers_bounce(self, failure_mode, num_failures,
metadata_quorum):
+ metadata_quorum=[quorum.isolated_kraft],
+ group_protocol=["classic", "streams"])
+ def test_all_brokers_bounce(self, failure_mode, num_failures,
metadata_quorum, group_protocol):
"""
Start a smoke test client, then kill a few brokers and ensure data is
still received
Record if records are delivered
@@ -284,7 +288,7 @@ class StreamsBrokerBounceTest(Test):
self.topics['__consumer_offsets'] = { 'partitions': 50,
'replication-factor': self.replication,
'configs':
{"min.insync.replicas": 1} }
- self.setup_system()
+ self.setup_system(group_protocol=group_protocol)
# Sleep to allow test to run for a bit
time.sleep(120)