This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3623726c766 KAFKA-18286: Implement support for streams groups in 
kafka-groups.sh (#19423)
3623726c766 is described below

commit 3623726c76673d1bc8bae22ea0bad2e8528b667a
Author: Lucas Brutschy <[email protected]>
AuthorDate: Fri Apr 11 11:17:27 2025 +0200

    KAFKA-18286: Implement support for streams groups in kafka-groups.sh 
(#19423)
    
    Add support for streams groups in kafka-groups.sh.
    
    The change adds command-line options `--streams` to list only streams
    groups, and value `--group-type streams`. Those two options are mutually
    exclusive with other group type and protocol filters specified on the
    command line.
    
    Includes a small integration test that spins up a kafka streams
    application and lists the group.
    
    Reviewers: Bill Bejeck <[email protected]>, Alieh Saeedii
    <[email protected]>
---
 .../java/org/apache/kafka/tools/GroupsCommand.java |  26 +++-
 .../org/apache/kafka/tools/GroupsCommandTest.java  | 154 +++++++++++++++++++--
 2 files changed, 160 insertions(+), 20 deletions(-)

diff --git a/tools/src/main/java/org/apache/kafka/tools/GroupsCommand.java 
b/tools/src/main/java/org/apache/kafka/tools/GroupsCommand.java
index ec99a9e4c65..fce0338beec 100644
--- a/tools/src/main/java/org/apache/kafka/tools/GroupsCommand.java
+++ b/tools/src/main/java/org/apache/kafka/tools/GroupsCommand.java
@@ -104,18 +104,19 @@ public class GroupsCommand {
         public void listGroups(GroupsCommandOptions opts) throws Exception {
             Collection<GroupListing> resources = adminClient.listGroups()
                     .all().get(30, TimeUnit.SECONDS);
-            printGroupDetails(resources, opts.groupType(), opts.protocol(), 
opts.hasConsumerOption(), opts.hasShareOption());
+            printGroupDetails(resources, opts.groupType(), opts.protocol(), 
opts.hasConsumerOption(), opts.hasShareOption(), opts.hasStreamsOption());
         }
 
         private void printGroupDetails(Collection<GroupListing> groups,
                                        Optional<GroupType> groupTypeFilter,
                                        Optional<String> protocolFilter,
                                        boolean consumerGroupFilter,
-                                       boolean shareGroupFilter) {
+                                       boolean shareGroupFilter,
+                                       boolean streamsGroupFilter) {
             List<List<String>> lineItems = new ArrayList<>();
             int maxLen = 20;
             for (GroupListing group : groups) {
-                if (combinedFilter(group, groupTypeFilter, protocolFilter, 
consumerGroupFilter, shareGroupFilter)) {
+                if (combinedFilter(group, groupTypeFilter, protocolFilter, 
consumerGroupFilter, shareGroupFilter, streamsGroupFilter)) {
                     List<String> lineItem = new ArrayList<>();
                     lineItem.add(group.groupId());
                     
lineItem.add(group.type().map(GroupType::toString).orElse(""));
@@ -145,7 +146,8 @@ public class GroupsCommand {
                                        Optional<GroupType> groupTypeFilter,
                                        Optional<String> protocolFilter,
                                        boolean consumerGroupFilter,
-                                       boolean shareGroupFilter) {
+                                       boolean shareGroupFilter,
+                                       boolean streamsGroupFilter) {
             boolean pass = true;
             Optional<GroupType> groupType = group.type();
             String protocol = group.protocol();
@@ -159,6 +161,8 @@ public class GroupsCommand {
                 pass = protocol.equals("consumer") || protocol.isEmpty() || 
groupType.filter(gt -> gt == GroupType.CONSUMER).isPresent();
             } else if (shareGroupFilter) {
                 pass = groupType.filter(gt -> gt == 
GroupType.SHARE).isPresent();
+            } else if (streamsGroupFilter) {
+                pass = groupType.filter(gt -> gt == 
GroupType.STREAMS).isPresent();
             }
             return pass;
         }
@@ -189,6 +193,8 @@ public class GroupsCommand {
 
         private final OptionSpecBuilder shareOpt;
 
+        private final OptionSpecBuilder streamsOpt;
+
         public GroupsCommandOptions(String[] args) {
             super(args);
             bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED: 
The Kafka server to connect to.")
@@ -204,7 +210,7 @@ public class GroupsCommand {
             listOpt = parser.accepts("list", "List the groups.");
 
             groupTypeOpt = parser.accepts("group-type", "Filter the groups 
based on group type. "
-                            + "Valid types are: 'classic', 'consumer' and 
'share'.")
+                            + "Valid types are: 'classic', 'consumer', 'share' 
and 'streams'.")
                     .withRequiredArg()
                     .describedAs("type")
                     .ofType(String.class);
@@ -217,6 +223,7 @@ public class GroupsCommand {
             consumerOpt = parser.accepts("consumer", "Filter the groups to 
show all kinds of consumer groups, including classic and simple consumer 
groups. "
                             + "This matches group type 'consumer', and group 
type 'classic' where the protocol type is 'consumer' or empty.");
             shareOpt = parser.accepts("share", "Filter the groups to show 
share groups.");
+            streamsOpt = parser.accepts("streams", "Filter the groups to show 
streams groups.");
 
             try {
                 options = parser.parse(args);
@@ -275,6 +282,10 @@ public class GroupsCommand {
             return has(shareOpt);
         }
 
+        public boolean hasStreamsOption() {
+            return has(streamsOpt);
+        }
+
         public void checkArgs() {
             if (args.length == 0)
                 CommandLineUtils.printUsageAndExit(parser, "This tool helps to 
list groups of all types.");
@@ -293,8 +304,9 @@ public class GroupsCommand {
             }
 
             // check invalid args
-            CommandLineUtils.checkInvalidArgs(parser, options, consumerOpt, 
groupTypeOpt, protocolOpt, shareOpt);
-            CommandLineUtils.checkInvalidArgs(parser, options, shareOpt, 
consumerOpt, groupTypeOpt, protocolOpt);
+            CommandLineUtils.checkInvalidArgs(parser, options, consumerOpt, 
groupTypeOpt, protocolOpt, shareOpt, streamsOpt);
+            CommandLineUtils.checkInvalidArgs(parser, options, shareOpt, 
consumerOpt, groupTypeOpt, protocolOpt, streamsOpt);
+            CommandLineUtils.checkInvalidArgs(parser, options, streamsOpt, 
consumerOpt, groupTypeOpt, protocolOpt, shareOpt);
         }
     }
 }
diff --git a/tools/src/test/java/org/apache/kafka/tools/GroupsCommandTest.java 
b/tools/src/test/java/org/apache/kafka/tools/GroupsCommandTest.java
index 76789157977..b834c7784a8 100644
--- a/tools/src/test/java/org/apache/kafka/tools/GroupsCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/GroupsCommandTest.java
@@ -31,12 +31,20 @@ import org.apache.kafka.common.GroupState;
 import org.apache.kafka.common.GroupType;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.test.ClusterInstance;
 import org.apache.kafka.common.test.api.ClusterConfigProperty;
 import org.apache.kafka.common.test.api.ClusterTest;
 import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.test.TestUtils;
 
 import org.junit.jupiter.api.AfterEach;
@@ -104,6 +112,14 @@ public class GroupsCommandTest {
         assertTrue(opts.hasListOption());
         assertTrue(opts.hasShareOption());
     }
+    
+    @Test
+    public void testOptionsListStreamsFilterSucceeds() {
+        GroupsCommand.GroupsCommandOptions opts = new 
GroupsCommand.GroupsCommandOptions(
+            new String[] {"--bootstrap-server", bootstrapServer, "--list", 
"--streams"});
+        assertTrue(opts.hasListOption());
+        assertTrue(opts.hasStreamsOption());
+    }
 
     @Test
     public void testOptionsListProtocolFilterSucceeds() {
@@ -123,6 +139,15 @@ public class GroupsCommandTest {
         assertEquals(GroupType.SHARE, opts.groupType().get());
     }
 
+    @Test
+    public void testOptionsListTypeStreamsFilterSucceeds() {
+        GroupsCommand.GroupsCommandOptions opts = new 
GroupsCommand.GroupsCommandOptions(
+            new String[] {"--bootstrap-server", bootstrapServer, "--list", 
"--group-type", "streams"});
+        assertTrue(opts.hasListOption());
+        assertTrue(opts.groupType().isPresent());
+        assertEquals(GroupType.STREAMS, opts.groupType().get());
+    }
+
     @Test
     public void testOptionsListInvalidTypeFilterFails() {
         assertInitializeInvalidOptionsExitCode(1,
@@ -146,6 +171,18 @@ public class GroupsCommandTest {
             new String[] {"--bootstrap-server", bootstrapServer, "--list", 
"--consumer", "--share"});
     }
 
+    @Test
+    public void testOptionsListShareAndStreamsFilterFails() {
+        assertInitializeInvalidOptionsExitCode(1,
+            new String[] {"--bootstrap-server", bootstrapServer, "--list", 
"--share", "--streams"});
+    }
+
+    @Test
+    public void testOptionsListConsumerAndStreamsFilterFails() {
+        assertInitializeInvalidOptionsExitCode(1,
+            new String[] {"--bootstrap-server", bootstrapServer, "--list", 
"--consumer", "--streams"});
+    }
+
     @Test
     public void testOptionsListConsumerAndProtocolFilterFails() {
         assertInitializeInvalidOptionsExitCode(1,
@@ -169,7 +206,19 @@ public class GroupsCommandTest {
         assertInitializeInvalidOptionsExitCode(1,
             new String[] {"--bootstrap-server", bootstrapServer, "--list", 
"--share", "--group-type", "classic"});
     }
+    
+    @Test
+    public void testOptionsListStreamsAndProtocolFilterFails() {
+        assertInitializeInvalidOptionsExitCode(1,
+            new String[] {"--bootstrap-server", bootstrapServer, "--list", 
"--streams", "--protocol", "anyproto"});
+    }
 
+    @Test
+    public void testOptionsListStreamsAndTypeFilterFails() {
+        assertInitializeInvalidOptionsExitCode(1,
+            new String[] {"--bootstrap-server", bootstrapServer, "--list", 
"--streams", "--group-type", "classic"});
+    }
+    
     @Test
     public void testListGroupsEmpty() {
         Admin adminClient = mock(Admin.class);
@@ -198,7 +247,8 @@ public class GroupsCommandTest {
         ListGroupsResult result = AdminClientTestUtils.listGroupsResult(
                 new GroupListing("CGclassic", Optional.of(GroupType.CLASSIC), 
"consumer", Optional.of(GroupState.STABLE)),
                 new GroupListing("CGconsumer", 
Optional.of(GroupType.CONSUMER), "consumer", Optional.of(GroupState.STABLE)),
-                new GroupListing("SG", Optional.of(GroupType.SHARE), "share", 
Optional.of(GroupState.STABLE))
+                new GroupListing("SG", Optional.of(GroupType.SHARE), "share", 
Optional.of(GroupState.STABLE)),
+                new GroupListing("StrG", Optional.of(GroupType.STREAMS), 
"streams", Optional.of(GroupState.STABLE))
         );
         when(adminClient.listGroups()).thenReturn(result);
 
@@ -214,7 +264,8 @@ public class GroupsCommandTest {
         assertCapturedListOutput(capturedOutput,
                 new String[]{"CGclassic", "Classic", "consumer"},
                 new String[]{"CGconsumer", "Consumer", "consumer"},
-                new String[]{"SG", "Share", "share"});
+                new String[]{"SG", "Share", "share"},
+                new String[]{"StrG", "Streams", "streams"});
     }
 
     @Test
@@ -225,7 +276,8 @@ public class GroupsCommandTest {
         ListGroupsResult result = AdminClientTestUtils.listGroupsResult(
                 new GroupListing("CGclassic", Optional.of(GroupType.CLASSIC), 
"consumer", Optional.of(GroupState.STABLE)),
                 new GroupListing("CGconsumer", 
Optional.of(GroupType.CONSUMER), "consumer", Optional.of(GroupState.STABLE)),
-                new GroupListing("SG", Optional.of(GroupType.SHARE), "share", 
Optional.of(GroupState.STABLE))
+                new GroupListing("SG", Optional.of(GroupType.SHARE), "share", 
Optional.of(GroupState.STABLE)),
+                new GroupListing("StrG", Optional.of(GroupType.STREAMS), 
"streams", Optional.of(GroupState.STABLE))
         );
         when(adminClient.listGroups()).thenReturn(result);
 
@@ -251,7 +303,8 @@ public class GroupsCommandTest {
         ListGroupsResult result = AdminClientTestUtils.listGroupsResult(
             new GroupListing("CGclassic", Optional.of(GroupType.CLASSIC), 
"consumer", Optional.of(GroupState.STABLE)),
             new GroupListing("CGconsumer", Optional.of(GroupType.CONSUMER), 
"consumer", Optional.of(GroupState.STABLE)),
-            new GroupListing("SG", Optional.of(GroupType.SHARE), "share", 
Optional.of(GroupState.STABLE))
+            new GroupListing("SG", Optional.of(GroupType.SHARE), "share", 
Optional.of(GroupState.STABLE)),
+            new GroupListing("StrG", Optional.of(GroupType.STREAMS), 
"streams", Optional.of(GroupState.STABLE))
         );
         when(adminClient.listGroups()).thenReturn(result);
 
@@ -268,6 +321,32 @@ public class GroupsCommandTest {
             new String[]{"SG", "Share", "share"});
     }
 
+    @Test
+    public void testListGroupsStreamsFilter() {
+        Admin adminClient = mock(Admin.class);
+        GroupsCommand.GroupsService service = new 
GroupsCommand.GroupsService(adminClient);
+
+        ListGroupsResult result = AdminClientTestUtils.listGroupsResult(
+            new GroupListing("CGclassic", Optional.of(GroupType.CLASSIC), 
"consumer", Optional.of(GroupState.STABLE)),
+            new GroupListing("CGconsumer", Optional.of(GroupType.CONSUMER), 
"consumer", Optional.of(GroupState.STABLE)),
+            new GroupListing("SG", Optional.of(GroupType.SHARE), "share", 
Optional.of(GroupState.STABLE)),
+            new GroupListing("StrG", Optional.of(GroupType.STREAMS), 
"streams", Optional.of(GroupState.STABLE))
+        );
+        when(adminClient.listGroups()).thenReturn(result);
+
+        String capturedOutput = ToolsTestUtils.captureStandardOut(() -> {
+            try {
+                service.listGroups(new GroupsCommand.GroupsCommandOptions(
+                    new String[]{"--bootstrap-server", bootstrapServer, 
"--list", "--streams"}
+                ));
+            } catch (Throwable t) {
+                fail(t);
+            }
+        });
+        assertCapturedListOutput(capturedOutput,
+            new String[]{"StrG", "Streams", "streams"});
+    }
+
     @Test
     public void testListGroupsProtocolFilter() {
         Admin adminClient = mock(Admin.class);
@@ -276,7 +355,8 @@ public class GroupsCommandTest {
         ListGroupsResult result = AdminClientTestUtils.listGroupsResult(
                 new GroupListing("CGclassic", Optional.of(GroupType.CLASSIC), 
"consumer", Optional.of(GroupState.STABLE)),
                 new GroupListing("CGconsumer", 
Optional.of(GroupType.CONSUMER), "consumer", Optional.of(GroupState.STABLE)),
-                new GroupListing("SG", Optional.of(GroupType.SHARE), "share", 
Optional.of(GroupState.STABLE))
+                new GroupListing("SG", Optional.of(GroupType.SHARE), "share", 
Optional.of(GroupState.STABLE)),
+                new GroupListing("StrG", Optional.of(GroupType.STREAMS), 
"streams", Optional.of(GroupState.STABLE))
         );
         when(adminClient.listGroups()).thenReturn(result);
 
@@ -302,7 +382,8 @@ public class GroupsCommandTest {
         ListGroupsResult result = AdminClientTestUtils.listGroupsResult(
                 new GroupListing("CGclassic", Optional.of(GroupType.CLASSIC), 
"consumer", Optional.of(GroupState.STABLE)),
                 new GroupListing("CGconsumer", 
Optional.of(GroupType.CONSUMER), "consumer", Optional.of(GroupState.STABLE)),
-                new GroupListing("SG", Optional.of(GroupType.SHARE), "share", 
Optional.of(GroupState.STABLE))
+                new GroupListing("SG", Optional.of(GroupType.SHARE), "share", 
Optional.of(GroupState.STABLE)),
+                new GroupListing("StrG", Optional.of(GroupType.STREAMS), 
"streams", Optional.of(GroupState.STABLE))
         );
         when(adminClient.listGroups()).thenReturn(result);
 
@@ -327,7 +408,8 @@ public class GroupsCommandTest {
         ListGroupsResult result = AdminClientTestUtils.listGroupsResult(
                 new GroupListing("CGclassic", Optional.of(GroupType.CLASSIC), 
"consumer", Optional.of(GroupState.STABLE)),
                 new GroupListing("CGconsumer", 
Optional.of(GroupType.CONSUMER), "consumer", Optional.of(GroupState.STABLE)),
-                new GroupListing("SG", Optional.of(GroupType.SHARE), "share", 
Optional.of(GroupState.STABLE))
+                new GroupListing("SG", Optional.of(GroupType.SHARE), "share", 
Optional.of(GroupState.STABLE)),
+                new GroupListing("StrG", Optional.of(GroupType.STREAMS), 
"streams", Optional.of(GroupState.STABLE))
         );
         when(adminClient.listGroups()).thenReturn(result);
 
@@ -351,7 +433,8 @@ public class GroupsCommandTest {
 
         ListGroupsResult result = AdminClientTestUtils.listGroupsResult(
                 new GroupListing("CGconsumer", 
Optional.of(GroupType.CONSUMER), "consumer", Optional.of(GroupState.STABLE)),
-                new GroupListing("SG", Optional.of(GroupType.SHARE), "share", 
Optional.of(GroupState.STABLE))
+                new GroupListing("SG", Optional.of(GroupType.SHARE), "share", 
Optional.of(GroupState.STABLE)),
+                new GroupListing("StrG", Optional.of(GroupType.STREAMS), 
"streams", Optional.of(GroupState.STABLE))
         );
         when(adminClient.listGroups()).thenReturn(result);
 
@@ -380,21 +463,24 @@ public class GroupsCommandTest {
         )));
     }
 
-    @SuppressWarnings("NPathComplexity")
+    @SuppressWarnings({"NPathComplexity", "CyclomaticComplexity"})
     @ClusterTest(
         serverProperties = {
-            @ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = 
"classic,consumer,share"),
+            @ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = 
"classic,consumer,share,streams"),
             @ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
             @ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
         }
     )
     public void testGroupCommand(ClusterInstance clusterInstance) throws 
Exception {
         String topic = "topic";
+        String outputTopic = "output-topic";
         String classicGroupId = "classic_group";
         String consumerGroupId = "consumer_group";
         String shareGroupId = "share_group";
+        String streamsGroupId = "streams_group";
         String simpleGroupId = "simple_group";
-        clusterInstance.createTopic("topic", 1, (short) 1);
+        clusterInstance.createTopic(topic, 1, (short) 1);
+        clusterInstance.createTopic(outputTopic, 1, (short) 1);
         TopicPartition topicPartition = new TopicPartition(topic, 0);
 
         Properties props = new Properties();
@@ -403,6 +489,7 @@ public class GroupsCommandTest {
         try (KafkaConsumer<String, String> classicGroup = 
createKafkaConsumer(clusterInstance, classicGroupId, GroupProtocol.CLASSIC);
              KafkaConsumer<String, String> consumerGroup = 
createKafkaConsumer(clusterInstance, consumerGroupId, GroupProtocol.CONSUMER);
              KafkaShareConsumer<String, String> shareGroup = 
createKafkaShareConsumer(clusterInstance, shareGroupId);
+             KafkaStreams streams = createKafkaStreams(clusterInstance, 
streamsGroupId, topic, outputTopic);
              Admin admin = clusterInstance.admin();
              GroupsCommand.GroupsService groupsCommand = new 
GroupsCommand.GroupsService(props)
         ) {
@@ -412,6 +499,7 @@ public class GroupsCommandTest {
             consumerGroup.poll(Duration.ofMillis(1000));
             shareGroup.subscribe(List.of(topic));
             shareGroup.poll(Duration.ofMillis(1000));
+            streams.start();
 
             AlterConsumerGroupOffsetsResult result = 
admin.alterConsumerGroupOffsets(simpleGroupId, Map.of(topicPartition, new 
OffsetAndMetadata(0L)));
             assertNull(result.all().get());
@@ -420,12 +508,13 @@ public class GroupsCommandTest {
                 Map.Entry<String, String> res = 
ToolsTestUtils.grabConsoleOutputAndError(() ->
                     assertDoesNotThrow(() -> groupsCommand.listGroups(new 
GroupsCommand.GroupsCommandOptions(
                         List.of("--bootstrap-server", 
clusterInstance.bootstrapServers(), "--list").toArray(new String[0])))));
-                if (res.getKey().split("\n").length == 5 && 
res.getValue().isEmpty()) {
+                if (res.getKey().split("\n").length == 6 && 
res.getValue().isEmpty()) {
                     assertCapturedListOutput(res.getKey(),
                         new String[]{classicGroupId, "Classic", "consumer"},
                         new String[]{consumerGroupId, "Consumer", "consumer"},
                         new String[]{simpleGroupId, "Classic"},
-                        new String[]{shareGroupId, "Share", "share"});
+                        new String[]{shareGroupId, "Share", "share"},
+                        new String[]{streamsGroupId, "Streams", "streams"});
                     return true;
                 }
                 return false;
@@ -482,6 +571,18 @@ public class GroupsCommandTest {
                 return false;
             }, "Waiting for listing groups to return share type groups");
 
+            TestUtils.waitForCondition(() -> {
+                Map.Entry<String, String> res = 
ToolsTestUtils.grabConsoleOutputAndError(() ->
+                    assertDoesNotThrow(() -> groupsCommand.listGroups(new 
GroupsCommand.GroupsCommandOptions(
+                        List.of("--bootstrap-server", 
clusterInstance.bootstrapServers(), "--list", "--group-type", 
"streams").toArray(new String[0])))));
+                if (res.getKey().split("\n").length == 2 && 
res.getValue().isEmpty()) {
+                    assertCapturedListOutput(res.getKey(),
+                        new String[]{streamsGroupId, "Streams", "streams"});
+                    return true;
+                }
+                return false;
+            }, "Waiting for listing groups to return streams type groups");
+
             TestUtils.waitForCondition(() -> {
                 Map.Entry<String, String> res = 
ToolsTestUtils.grabConsoleOutputAndError(() ->
                     assertDoesNotThrow(() -> groupsCommand.listGroups(new 
GroupsCommand.GroupsCommandOptions(
@@ -493,6 +594,19 @@ public class GroupsCommandTest {
                 }
                 return false;
             }, "Waiting for listing groups to return share type groups");
+
+
+            TestUtils.waitForCondition(() -> {
+                Map.Entry<String, String> res = 
ToolsTestUtils.grabConsoleOutputAndError(() ->
+                    assertDoesNotThrow(() -> groupsCommand.listGroups(new 
GroupsCommand.GroupsCommandOptions(
+                        List.of("--bootstrap-server", 
clusterInstance.bootstrapServers(), "--list", "--streams").toArray(new 
String[0])))));
+                if (res.getKey().split("\n").length == 2 && 
res.getValue().isEmpty()) {
+                    assertCapturedListOutput(res.getKey(),
+                        new String[]{streamsGroupId, "Streams", "streams"});
+                    return true;
+                }
+                return false;
+            }, "Waiting for listing groups to return streams type groups");
         }
     }
 
@@ -534,4 +648,18 @@ public class GroupsCommandTest {
             ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName(),
             ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName()));
     }
+
+    private KafkaStreams createKafkaStreams(ClusterInstance clusterInstance, 
String groupId, String inputTopic, String outputTopic) {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KStream<Long, String> input = builder.stream(inputTopic);
+        input.map((key, value) -> new KeyValue<>(key, key)).to(outputTopic, 
Produced.with(Serdes.Long(), Serdes.Long()));
+        Topology topology = builder.build();
+        Properties properties = new Properties();
+        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
clusterInstance.bootstrapServers());
+        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, groupId);
+        properties.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, "streams");
+        properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
+        properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
+        return new KafkaStreams(topology, properties);
+    }
 }

Reply via email to