This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3e0276ebb71 KAFKA-19127: Integration test for altering and describing
streams group configs (#19436)
3e0276ebb71 is described below
commit 3e0276ebb7127fde4e7aac1ad0d4583ec250d049
Author: Alieh Saeedi <[email protected]>
AuthorDate: Fri Apr 11 12:48:02 2025 +0200
KAFKA-19127: Integration test for altering and describing streams group
configs (#19436)
This PR introduces integration tests for verifying
- altering configs
- `describe` output
for a streams group. The configs have been defined in `KIP-1071`.
Reviewers: Lucas Brutschy <[email protected]>
---
.../kafka/tools/ConfigCommandIntegrationTest.java | 40 ++++++++++++++++++++++
1 file changed, 40 insertions(+)
diff --git
a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java
b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java
index 2587a6c150f..a5c91a632f7 100644
---
a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java
@@ -217,6 +217,46 @@ public class ConfigCommandIntegrationTest {
verifyGroupConfigUpdate(asList("--group", defaultGroupName,
"--alter"));
}
+ @ClusterTest
+ public void testDescribeStreamsGroupConfigs() {
+ Stream<String> command = Stream.concat(quorumArgs(), Stream.of(
+ "--entity-type", "groups",
+ "--entity-name", "group",
+ "--describe", "--all"));
+ String message = captureStandardOut(run(command));
+
+ assertTrue(message.contains("streams.heartbeat.interval.ms=5000
sensitive=false synonyms={DEFAULT_CONFIG:streams.heartbeat.interval.ms=5000}"));
+ assertTrue(message.contains("streams.num.standby.replicas=0
sensitive=false synonyms={DEFAULT_CONFIG:streams.num.standby.replicas=0}"));
+ assertTrue(message.contains("streams.session.timeout.ms=45000
sensitive=false synonyms={DEFAULT_CONFIG:streams.session.timeout.ms=45000}"));
+ }
+
+ @ClusterTest
+ public void testAlterStreamsGroupNumOfStandbyReplicas() {
+ // Verify the initial config
+ Stream<String> command = Stream.concat(quorumArgs(), Stream.of(
+ "--entity-type", "groups",
+ "--entity-name", "group",
+ "--describe", "--all"));
+ String message = captureStandardOut(run(command));
+ assertTrue(message.contains("streams.num.standby.replicas=0"));
+
+ // Alter number of standby replicas
+ command = Stream.concat(quorumArgs(), Stream.of(
+ "--entity-type", "groups",
+ "--entity-name", "group",
+ "--alter", "--add-config", "streams.num.standby.replicas=1"));
+ message = captureStandardOut(run(command));
+ assertEquals("Completed updating config for group group.", message);
+
+ // Verify the updated config
+ command = Stream.concat(quorumArgs(), Stream.of(
+ "--entity-type", "groups",
+ "--entity-name", "group",
+ "--describe"));
+ message = captureStandardOut(run(command));
+ assertTrue(message.contains("streams.num.standby.replicas=1"));
+ }
+
private void verifyGroupConfigUpdate(List<String> alterOpts) throws
Exception {
try (Admin client = cluster.admin()) {
// Add config