This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3f465fc1b62 KAFKA-19202: Enable KIP-1071 in
streams_standby_replica_test.py (#19625)
3f465fc1b62 is described below
commit 3f465fc1b6250ea337237487eca90ac23b3485da
Author: Lucas Brutschy <[email protected]>
AuthorDate: Wed May 7 09:43:11 2025 +0200
KAFKA-19202: Enable KIP-1071 in streams_standby_replica_test.py (#19625)
New system test for KIP-1071.
Standby replicas need to be enabled via `kafka-configs.sh`.
Reviewers: Bill Bejeck <[email protected]>, Matthias J. Sax
<[email protected]>
---
.../kafka/streams/tests/StreamsStandByReplicaTest.java | 1 -
tests/kafkatest/tests/streams/base_streams_test.py | 15 ++++++++++++++-
.../tests/streams/streams_standby_replica_test.py | 12 ++++++++----
3 files changed, 22 insertions(+), 6 deletions(-)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java
b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java
index d23915790b2..27771b5be16 100644
---
a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java
@@ -64,7 +64,6 @@ public class StreamsStandByReplicaTest {
Exit.exit(1);
}
- streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG,
"kafka-streams-standby-tasks");
streamsProperties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
streamsProperties.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
streamsProperties.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG,
0);
diff --git a/tests/kafkatest/tests/streams/base_streams_test.py
b/tests/kafkatest/tests/streams/base_streams_test.py
index 96ac192e606..81cad7a4d1b 100644
--- a/tests/kafkatest/tests/streams/base_streams_test.py
+++ b/tests/kafkatest/tests/streams/base_streams_test.py
@@ -33,6 +33,7 @@ class BaseStreamsTest(Test):
self.num_controllers = num_controllers
self.num_brokers = num_brokers
self.topics = topics
+ self.use_streams_groups = True
self.kafka = KafkaService(
test_context, self.num_brokers,
@@ -47,7 +48,8 @@ class BaseStreamsTest(Test):
def setUp(self):
self.kafka.start()
- self.kafka.run_features_command("upgrade", "streams.version", 1)
+ if self.use_streams_groups:
+ self.kafka.run_features_command("upgrade", "streams.version", 1)
def get_consumer(self, client_id, topic, num_messages):
return VerifiableConsumer(self.test_context,
@@ -96,6 +98,17 @@ class BaseStreamsTest(Test):
timeout_sec=timeout_sec,
err_msg="At %s streams did not process messages in %s
seconds " % (test_state, timeout_sec))
+ def configure_standby_replicas(self, group_id, num_standby_replicas):
+ force_use_zk_connection = not
self.kafka.all_nodes_configs_command_uses_bootstrap_server()
+ node = self.kafka.nodes[0]
+ cmd = "%s --alter --add-config streams.num.standby.replicas=%d
--entity-type groups --entity-name %s" % \
+ (
+
self.kafka.kafka_configs_cmd_with_optional_security_settings(node,
force_use_zk_connection),
+ num_standby_replicas,
+ group_id
+ )
+ node.account.ssh(cmd)
+
@staticmethod
def get_configs(group_protocol="classic", extra_configs=""):
# Consumer max.poll.interval > min(max.block.ms, ((retries + 1) *
request.timeout)
diff --git a/tests/kafkatest/tests/streams/streams_standby_replica_test.py
b/tests/kafkatest/tests/streams/streams_standby_replica_test.py
index 8270652645a..77e3c65aab9 100644
--- a/tests/kafkatest/tests/streams/streams_standby_replica_test.py
+++ b/tests/kafkatest/tests/streams/streams_standby_replica_test.py
@@ -47,19 +47,23 @@ class StreamsStandbyTask(BaseStreamsTest):
})
@cluster(num_nodes=10)
- @matrix(metadata_quorum=[quorum.isolated_kraft])
- def test_standby_tasks_rebalance(self, metadata_quorum):
+ @matrix(metadata_quorum=[quorum.combined_kraft],
+ group_protocol=["classic", "streams"])
+ def test_standby_tasks_rebalance(self, metadata_quorum, group_protocol):
# TODO KIP-441: consider rewriting the test for
HighAvailabilityTaskAssignor
configs = self.get_configs(
- group_protocol="classic",
+ group_protocol=group_protocol,
extra_configs=
-
",sourceTopic=%s,sinkTopic1=%s,sinkTopic2=%s,internal.task.assignor.class=org.apache.kafka.streams.processor.internals.assignment.LegacyStickyTaskAssignor"
% (
+
",application.id=test_standby_tasks_rebalance,sourceTopic=%s,sinkTopic1=%s,sinkTopic2=%s,internal.task.assignor.class=org.apache.kafka.streams.processor.internals.assignment.LegacyStickyTaskAssignor"
% (
self.streams_source_topic,
self.streams_sink_topic_1,
self.streams_sink_topic_2
)
)
+ if group_protocol == "streams":
+ self.configure_standby_replicas("test_standby_tasks_rebalance", 1)
+
producer = self.get_producer(self.streams_source_topic,
self.num_messages, throughput=15000, repeating_keys=6)
producer.start()