This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3f465fc1b62 KAFKA-19202: Enable KIP-1071 in 
streams_standby_replica_test.py (#19625)
3f465fc1b62 is described below

commit 3f465fc1b6250ea337237487eca90ac23b3485da
Author: Lucas Brutschy <[email protected]>
AuthorDate: Wed May 7 09:43:11 2025 +0200

    KAFKA-19202: Enable KIP-1071 in streams_standby_replica_test.py (#19625)
    
    New system test for KIP-1071.
    
    Standby replicas need to be enabled via `kafka-configs.sh`.
    
    Reviewers: Bill Bejeck <[email protected]>, Matthias J. Sax
     <[email protected]>
---
 .../kafka/streams/tests/StreamsStandByReplicaTest.java    |  1 -
 tests/kafkatest/tests/streams/base_streams_test.py        | 15 ++++++++++++++-
 .../tests/streams/streams_standby_replica_test.py         | 12 ++++++++----
 3 files changed, 22 insertions(+), 6 deletions(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java
index d23915790b2..27771b5be16 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java
@@ -64,7 +64,6 @@ public class StreamsStandByReplicaTest {
             Exit.exit(1);
         }
         
-        streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, 
"kafka-streams-standby-tasks");
         streamsProperties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
         streamsProperties.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
         streamsProperties.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 
0);
diff --git a/tests/kafkatest/tests/streams/base_streams_test.py 
b/tests/kafkatest/tests/streams/base_streams_test.py
index 96ac192e606..81cad7a4d1b 100644
--- a/tests/kafkatest/tests/streams/base_streams_test.py
+++ b/tests/kafkatest/tests/streams/base_streams_test.py
@@ -33,6 +33,7 @@ class BaseStreamsTest(Test):
         self.num_controllers = num_controllers
         self.num_brokers = num_brokers
         self.topics = topics
+        self.use_streams_groups = True
 
         self.kafka = KafkaService(
             test_context, self.num_brokers,
@@ -47,7 +48,8 @@ class BaseStreamsTest(Test):
 
     def setUp(self):
         self.kafka.start()
-        self.kafka.run_features_command("upgrade", "streams.version", 1)
+        if self.use_streams_groups:
+            self.kafka.run_features_command("upgrade", "streams.version", 1)
 
     def get_consumer(self, client_id, topic, num_messages):
         return VerifiableConsumer(self.test_context,
@@ -96,6 +98,17 @@ class BaseStreamsTest(Test):
                    timeout_sec=timeout_sec,
                    err_msg="At %s streams did not process messages in %s 
seconds " % (test_state, timeout_sec))
 
+    def configure_standby_replicas(self, group_id, num_standby_replicas):
+        force_use_zk_connection = not 
self.kafka.all_nodes_configs_command_uses_bootstrap_server()
+        node = self.kafka.nodes[0]
+        cmd = "%s --alter --add-config streams.num.standby.replicas=%d 
--entity-type groups --entity-name %s" % \
+              (
+                    
self.kafka.kafka_configs_cmd_with_optional_security_settings(node, 
force_use_zk_connection),
+                    num_standby_replicas,
+                    group_id
+              )
+        node.account.ssh(cmd)
+
     @staticmethod
     def get_configs(group_protocol="classic", extra_configs=""):
         # Consumer max.poll.interval > min(max.block.ms, ((retries + 1) * 
request.timeout)
diff --git a/tests/kafkatest/tests/streams/streams_standby_replica_test.py 
b/tests/kafkatest/tests/streams/streams_standby_replica_test.py
index 8270652645a..77e3c65aab9 100644
--- a/tests/kafkatest/tests/streams/streams_standby_replica_test.py
+++ b/tests/kafkatest/tests/streams/streams_standby_replica_test.py
@@ -47,19 +47,23 @@ class StreamsStandbyTask(BaseStreamsTest):
                                                  })
 
     @cluster(num_nodes=10)
-    @matrix(metadata_quorum=[quorum.isolated_kraft])
-    def test_standby_tasks_rebalance(self, metadata_quorum):
+    @matrix(metadata_quorum=[quorum.combined_kraft],
+            group_protocol=["classic", "streams"])
+    def test_standby_tasks_rebalance(self, metadata_quorum, group_protocol):
         # TODO KIP-441: consider rewriting the test for 
HighAvailabilityTaskAssignor
         configs = self.get_configs(
-            group_protocol="classic",
+            group_protocol=group_protocol,
             extra_configs=
-            
",sourceTopic=%s,sinkTopic1=%s,sinkTopic2=%s,internal.task.assignor.class=org.apache.kafka.streams.processor.internals.assignment.LegacyStickyTaskAssignor"
 % (
+            
",application.id=test_standby_tasks_rebalance,sourceTopic=%s,sinkTopic1=%s,sinkTopic2=%s,internal.task.assignor.class=org.apache.kafka.streams.processor.internals.assignment.LegacyStickyTaskAssignor"
 % (
             self.streams_source_topic,
             self.streams_sink_topic_1,
             self.streams_sink_topic_2
             )
         )
 
+        if group_protocol == "streams":
+            self.configure_standby_replicas("test_standby_tasks_rebalance", 1)
+
         producer = self.get_producer(self.streams_source_topic, 
self.num_messages, throughput=15000, repeating_keys=6)
         producer.start()
 

Reply via email to