This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3e0276ebb71 KAFKA-19127: Integration test for altering and describing 
streams group configs (#19436)
3e0276ebb71 is described below

commit 3e0276ebb7127fde4e7aac1ad0d4583ec250d049
Author: Alieh Saeedi <[email protected]>
AuthorDate: Fri Apr 11 12:48:02 2025 +0200

    KAFKA-19127: Integration test for altering and describing streams group 
configs (#19436)
    
    This PR introduces integration tests for verifying
    - altering configs
    - `describe` output
    
    for a streams group.  The configs have been defined in `KIP-1071`.
    
    Reviewers: Lucas Brutschy <[email protected]>
---
 .../kafka/tools/ConfigCommandIntegrationTest.java  | 40 ++++++++++++++++++++++
 1 file changed, 40 insertions(+)

diff --git 
a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java 
b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java
index 2587a6c150f..a5c91a632f7 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java
@@ -217,6 +217,46 @@ public class ConfigCommandIntegrationTest {
         verifyGroupConfigUpdate(asList("--group", defaultGroupName, 
"--alter"));
     }
 
+    @ClusterTest
+    public void testDescribeStreamsGroupConfigs() {
+        Stream<String> command = Stream.concat(quorumArgs(), Stream.of(
+            "--entity-type", "groups",
+            "--entity-name", "group",
+            "--describe", "--all"));
+        String message = captureStandardOut(run(command));
+
+        assertTrue(message.contains("streams.heartbeat.interval.ms=5000 
sensitive=false synonyms={DEFAULT_CONFIG:streams.heartbeat.interval.ms=5000}"));
+        assertTrue(message.contains("streams.num.standby.replicas=0 
sensitive=false synonyms={DEFAULT_CONFIG:streams.num.standby.replicas=0}"));
+        assertTrue(message.contains("streams.session.timeout.ms=45000 
sensitive=false synonyms={DEFAULT_CONFIG:streams.session.timeout.ms=45000}"));
+    }
+
+    @ClusterTest
+    public void testAlterStreamsGroupNumOfStandbyReplicas() {
+        // Verify the initial config
+        Stream<String> command = Stream.concat(quorumArgs(), Stream.of(
+            "--entity-type", "groups",
+            "--entity-name", "group",
+            "--describe", "--all"));
+        String message = captureStandardOut(run(command));
+        assertTrue(message.contains("streams.num.standby.replicas=0"));
+
+        // Alter number of standby replicas
+        command = Stream.concat(quorumArgs(), Stream.of(
+            "--entity-type", "groups",
+            "--entity-name", "group",
+            "--alter", "--add-config", "streams.num.standby.replicas=1"));
+        message = captureStandardOut(run(command));
+        assertEquals("Completed updating config for group group.", message);
+
+        // Verify the updated config
+        command = Stream.concat(quorumArgs(), Stream.of(
+            "--entity-type", "groups",
+            "--entity-name", "group",
+            "--describe"));
+        message = captureStandardOut(run(command));
+        assertTrue(message.contains("streams.num.standby.replicas=1"));
+    }
+
     private void verifyGroupConfigUpdate(List<String> alterOpts) throws 
Exception {
         try (Admin client = cluster.admin()) {
             // Add config

Reply via email to