This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3623726c766 KAFKA-18286: Implement support for streams groups in
kafka-groups.sh (#19423)
3623726c766 is described below
commit 3623726c76673d1bc8bae22ea0bad2e8528b667a
Author: Lucas Brutschy <[email protected]>
AuthorDate: Fri Apr 11 11:17:27 2025 +0200
KAFKA-18286: Implement support for streams groups in kafka-groups.sh
(#19423)
Add support for streams groups in kafka-groups.sh.
The change adds command-line options `--streams` to list only streams
groups, and value `--group-type streams`. Those two options are mutually
exclusive with other group type and protocol filters specified on the
command line.
Includes a small integration test that spins up a kafka streams
application and lists the group.
Reviewers: Bill Bejeck <[email protected]>, Alieh Saeedii
<[email protected]>
---
.../java/org/apache/kafka/tools/GroupsCommand.java | 26 +++-
.../org/apache/kafka/tools/GroupsCommandTest.java | 154 +++++++++++++++++++--
2 files changed, 160 insertions(+), 20 deletions(-)
diff --git a/tools/src/main/java/org/apache/kafka/tools/GroupsCommand.java
b/tools/src/main/java/org/apache/kafka/tools/GroupsCommand.java
index ec99a9e4c65..fce0338beec 100644
--- a/tools/src/main/java/org/apache/kafka/tools/GroupsCommand.java
+++ b/tools/src/main/java/org/apache/kafka/tools/GroupsCommand.java
@@ -104,18 +104,19 @@ public class GroupsCommand {
public void listGroups(GroupsCommandOptions opts) throws Exception {
Collection<GroupListing> resources = adminClient.listGroups()
.all().get(30, TimeUnit.SECONDS);
- printGroupDetails(resources, opts.groupType(), opts.protocol(),
opts.hasConsumerOption(), opts.hasShareOption());
+ printGroupDetails(resources, opts.groupType(), opts.protocol(),
opts.hasConsumerOption(), opts.hasShareOption(), opts.hasStreamsOption());
}
private void printGroupDetails(Collection<GroupListing> groups,
Optional<GroupType> groupTypeFilter,
Optional<String> protocolFilter,
boolean consumerGroupFilter,
- boolean shareGroupFilter) {
+ boolean shareGroupFilter,
+ boolean streamsGroupFilter) {
List<List<String>> lineItems = new ArrayList<>();
int maxLen = 20;
for (GroupListing group : groups) {
- if (combinedFilter(group, groupTypeFilter, protocolFilter,
consumerGroupFilter, shareGroupFilter)) {
+ if (combinedFilter(group, groupTypeFilter, protocolFilter,
consumerGroupFilter, shareGroupFilter, streamsGroupFilter)) {
List<String> lineItem = new ArrayList<>();
lineItem.add(group.groupId());
lineItem.add(group.type().map(GroupType::toString).orElse(""));
@@ -145,7 +146,8 @@ public class GroupsCommand {
Optional<GroupType> groupTypeFilter,
Optional<String> protocolFilter,
boolean consumerGroupFilter,
- boolean shareGroupFilter) {
+ boolean shareGroupFilter,
+ boolean streamsGroupFilter) {
boolean pass = true;
Optional<GroupType> groupType = group.type();
String protocol = group.protocol();
@@ -159,6 +161,8 @@ public class GroupsCommand {
pass = protocol.equals("consumer") || protocol.isEmpty() ||
groupType.filter(gt -> gt == GroupType.CONSUMER).isPresent();
} else if (shareGroupFilter) {
pass = groupType.filter(gt -> gt ==
GroupType.SHARE).isPresent();
+ } else if (streamsGroupFilter) {
+ pass = groupType.filter(gt -> gt ==
GroupType.STREAMS).isPresent();
}
return pass;
}
@@ -189,6 +193,8 @@ public class GroupsCommand {
private final OptionSpecBuilder shareOpt;
+ private final OptionSpecBuilder streamsOpt;
+
public GroupsCommandOptions(String[] args) {
super(args);
bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED:
The Kafka server to connect to.")
@@ -204,7 +210,7 @@ public class GroupsCommand {
listOpt = parser.accepts("list", "List the groups.");
groupTypeOpt = parser.accepts("group-type", "Filter the groups
based on group type. "
- + "Valid types are: 'classic', 'consumer' and
'share'.")
+ + "Valid types are: 'classic', 'consumer', 'share'
and 'streams'.")
.withRequiredArg()
.describedAs("type")
.ofType(String.class);
@@ -217,6 +223,7 @@ public class GroupsCommand {
consumerOpt = parser.accepts("consumer", "Filter the groups to
show all kinds of consumer groups, including classic and simple consumer
groups. "
+ "This matches group type 'consumer', and group
type 'classic' where the protocol type is 'consumer' or empty.");
shareOpt = parser.accepts("share", "Filter the groups to show
share groups.");
+ streamsOpt = parser.accepts("streams", "Filter the groups to show
streams groups.");
try {
options = parser.parse(args);
@@ -275,6 +282,10 @@ public class GroupsCommand {
return has(shareOpt);
}
+ public boolean hasStreamsOption() {
+ return has(streamsOpt);
+ }
+
public void checkArgs() {
if (args.length == 0)
CommandLineUtils.printUsageAndExit(parser, "This tool helps to
list groups of all types.");
@@ -293,8 +304,9 @@ public class GroupsCommand {
}
// check invalid args
- CommandLineUtils.checkInvalidArgs(parser, options, consumerOpt,
groupTypeOpt, protocolOpt, shareOpt);
- CommandLineUtils.checkInvalidArgs(parser, options, shareOpt,
consumerOpt, groupTypeOpt, protocolOpt);
+ CommandLineUtils.checkInvalidArgs(parser, options, consumerOpt,
groupTypeOpt, protocolOpt, shareOpt, streamsOpt);
+ CommandLineUtils.checkInvalidArgs(parser, options, shareOpt,
consumerOpt, groupTypeOpt, protocolOpt, streamsOpt);
+ CommandLineUtils.checkInvalidArgs(parser, options, streamsOpt,
consumerOpt, groupTypeOpt, protocolOpt, shareOpt);
}
}
}
diff --git a/tools/src/test/java/org/apache/kafka/tools/GroupsCommandTest.java
b/tools/src/test/java/org/apache/kafka/tools/GroupsCommandTest.java
index 76789157977..b834c7784a8 100644
--- a/tools/src/test/java/org/apache/kafka/tools/GroupsCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/GroupsCommandTest.java
@@ -31,12 +31,20 @@ import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.GroupType;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
@@ -104,6 +112,14 @@ public class GroupsCommandTest {
assertTrue(opts.hasListOption());
assertTrue(opts.hasShareOption());
}
+
+ @Test
+ public void testOptionsListStreamsFilterSucceeds() {
+ GroupsCommand.GroupsCommandOptions opts = new
GroupsCommand.GroupsCommandOptions(
+ new String[] {"--bootstrap-server", bootstrapServer, "--list",
"--streams"});
+ assertTrue(opts.hasListOption());
+ assertTrue(opts.hasStreamsOption());
+ }
@Test
public void testOptionsListProtocolFilterSucceeds() {
@@ -123,6 +139,15 @@ public class GroupsCommandTest {
assertEquals(GroupType.SHARE, opts.groupType().get());
}
+ @Test
+ public void testOptionsListTypeStreamsFilterSucceeds() {
+ GroupsCommand.GroupsCommandOptions opts = new
GroupsCommand.GroupsCommandOptions(
+ new String[] {"--bootstrap-server", bootstrapServer, "--list",
"--group-type", "streams"});
+ assertTrue(opts.hasListOption());
+ assertTrue(opts.groupType().isPresent());
+ assertEquals(GroupType.STREAMS, opts.groupType().get());
+ }
+
@Test
public void testOptionsListInvalidTypeFilterFails() {
assertInitializeInvalidOptionsExitCode(1,
@@ -146,6 +171,18 @@ public class GroupsCommandTest {
new String[] {"--bootstrap-server", bootstrapServer, "--list",
"--consumer", "--share"});
}
+ @Test
+ public void testOptionsListShareAndStreamsFilterFails() {
+ assertInitializeInvalidOptionsExitCode(1,
+ new String[] {"--bootstrap-server", bootstrapServer, "--list",
"--share", "--streams"});
+ }
+
+ @Test
+ public void testOptionsListConsumerAndStreamsFilterFails() {
+ assertInitializeInvalidOptionsExitCode(1,
+ new String[] {"--bootstrap-server", bootstrapServer, "--list",
"--consumer", "--streams"});
+ }
+
@Test
public void testOptionsListConsumerAndProtocolFilterFails() {
assertInitializeInvalidOptionsExitCode(1,
@@ -169,7 +206,19 @@ public class GroupsCommandTest {
assertInitializeInvalidOptionsExitCode(1,
new String[] {"--bootstrap-server", bootstrapServer, "--list",
"--share", "--group-type", "classic"});
}
+
+ @Test
+ public void testOptionsListStreamsAndProtocolFilterFails() {
+ assertInitializeInvalidOptionsExitCode(1,
+ new String[] {"--bootstrap-server", bootstrapServer, "--list",
"--streams", "--protocol", "anyproto"});
+ }
+ @Test
+ public void testOptionsListStreamsAndTypeFilterFails() {
+ assertInitializeInvalidOptionsExitCode(1,
+ new String[] {"--bootstrap-server", bootstrapServer, "--list",
"--streams", "--group-type", "classic"});
+ }
+
@Test
public void testListGroupsEmpty() {
Admin adminClient = mock(Admin.class);
@@ -198,7 +247,8 @@ public class GroupsCommandTest {
ListGroupsResult result = AdminClientTestUtils.listGroupsResult(
new GroupListing("CGclassic", Optional.of(GroupType.CLASSIC),
"consumer", Optional.of(GroupState.STABLE)),
new GroupListing("CGconsumer",
Optional.of(GroupType.CONSUMER), "consumer", Optional.of(GroupState.STABLE)),
- new GroupListing("SG", Optional.of(GroupType.SHARE), "share",
Optional.of(GroupState.STABLE))
+ new GroupListing("SG", Optional.of(GroupType.SHARE), "share",
Optional.of(GroupState.STABLE)),
+ new GroupListing("StrG", Optional.of(GroupType.STREAMS),
"streams", Optional.of(GroupState.STABLE))
);
when(adminClient.listGroups()).thenReturn(result);
@@ -214,7 +264,8 @@ public class GroupsCommandTest {
assertCapturedListOutput(capturedOutput,
new String[]{"CGclassic", "Classic", "consumer"},
new String[]{"CGconsumer", "Consumer", "consumer"},
- new String[]{"SG", "Share", "share"});
+ new String[]{"SG", "Share", "share"},
+ new String[]{"StrG", "Streams", "streams"});
}
@Test
@@ -225,7 +276,8 @@ public class GroupsCommandTest {
ListGroupsResult result = AdminClientTestUtils.listGroupsResult(
new GroupListing("CGclassic", Optional.of(GroupType.CLASSIC),
"consumer", Optional.of(GroupState.STABLE)),
new GroupListing("CGconsumer",
Optional.of(GroupType.CONSUMER), "consumer", Optional.of(GroupState.STABLE)),
- new GroupListing("SG", Optional.of(GroupType.SHARE), "share",
Optional.of(GroupState.STABLE))
+ new GroupListing("SG", Optional.of(GroupType.SHARE), "share",
Optional.of(GroupState.STABLE)),
+ new GroupListing("StrG", Optional.of(GroupType.STREAMS),
"streams", Optional.of(GroupState.STABLE))
);
when(adminClient.listGroups()).thenReturn(result);
@@ -251,7 +303,8 @@ public class GroupsCommandTest {
ListGroupsResult result = AdminClientTestUtils.listGroupsResult(
new GroupListing("CGclassic", Optional.of(GroupType.CLASSIC),
"consumer", Optional.of(GroupState.STABLE)),
new GroupListing("CGconsumer", Optional.of(GroupType.CONSUMER),
"consumer", Optional.of(GroupState.STABLE)),
- new GroupListing("SG", Optional.of(GroupType.SHARE), "share",
Optional.of(GroupState.STABLE))
+ new GroupListing("SG", Optional.of(GroupType.SHARE), "share",
Optional.of(GroupState.STABLE)),
+ new GroupListing("StrG", Optional.of(GroupType.STREAMS),
"streams", Optional.of(GroupState.STABLE))
);
when(adminClient.listGroups()).thenReturn(result);
@@ -268,6 +321,32 @@ public class GroupsCommandTest {
new String[]{"SG", "Share", "share"});
}
+ @Test
+ public void testListGroupsStreamsFilter() {
+ Admin adminClient = mock(Admin.class);
+ GroupsCommand.GroupsService service = new
GroupsCommand.GroupsService(adminClient);
+
+ ListGroupsResult result = AdminClientTestUtils.listGroupsResult(
+ new GroupListing("CGclassic", Optional.of(GroupType.CLASSIC),
"consumer", Optional.of(GroupState.STABLE)),
+ new GroupListing("CGconsumer", Optional.of(GroupType.CONSUMER),
"consumer", Optional.of(GroupState.STABLE)),
+ new GroupListing("SG", Optional.of(GroupType.SHARE), "share",
Optional.of(GroupState.STABLE)),
+ new GroupListing("StrG", Optional.of(GroupType.STREAMS),
"streams", Optional.of(GroupState.STABLE))
+ );
+ when(adminClient.listGroups()).thenReturn(result);
+
+ String capturedOutput = ToolsTestUtils.captureStandardOut(() -> {
+ try {
+ service.listGroups(new GroupsCommand.GroupsCommandOptions(
+ new String[]{"--bootstrap-server", bootstrapServer,
"--list", "--streams"}
+ ));
+ } catch (Throwable t) {
+ fail(t);
+ }
+ });
+ assertCapturedListOutput(capturedOutput,
+ new String[]{"StrG", "Streams", "streams"});
+ }
+
@Test
public void testListGroupsProtocolFilter() {
Admin adminClient = mock(Admin.class);
@@ -276,7 +355,8 @@ public class GroupsCommandTest {
ListGroupsResult result = AdminClientTestUtils.listGroupsResult(
new GroupListing("CGclassic", Optional.of(GroupType.CLASSIC),
"consumer", Optional.of(GroupState.STABLE)),
new GroupListing("CGconsumer",
Optional.of(GroupType.CONSUMER), "consumer", Optional.of(GroupState.STABLE)),
- new GroupListing("SG", Optional.of(GroupType.SHARE), "share",
Optional.of(GroupState.STABLE))
+ new GroupListing("SG", Optional.of(GroupType.SHARE), "share",
Optional.of(GroupState.STABLE)),
+ new GroupListing("StrG", Optional.of(GroupType.STREAMS),
"streams", Optional.of(GroupState.STABLE))
);
when(adminClient.listGroups()).thenReturn(result);
@@ -302,7 +382,8 @@ public class GroupsCommandTest {
ListGroupsResult result = AdminClientTestUtils.listGroupsResult(
new GroupListing("CGclassic", Optional.of(GroupType.CLASSIC),
"consumer", Optional.of(GroupState.STABLE)),
new GroupListing("CGconsumer",
Optional.of(GroupType.CONSUMER), "consumer", Optional.of(GroupState.STABLE)),
- new GroupListing("SG", Optional.of(GroupType.SHARE), "share",
Optional.of(GroupState.STABLE))
+ new GroupListing("SG", Optional.of(GroupType.SHARE), "share",
Optional.of(GroupState.STABLE)),
+ new GroupListing("StrG", Optional.of(GroupType.STREAMS),
"streams", Optional.of(GroupState.STABLE))
);
when(adminClient.listGroups()).thenReturn(result);
@@ -327,7 +408,8 @@ public class GroupsCommandTest {
ListGroupsResult result = AdminClientTestUtils.listGroupsResult(
new GroupListing("CGclassic", Optional.of(GroupType.CLASSIC),
"consumer", Optional.of(GroupState.STABLE)),
new GroupListing("CGconsumer",
Optional.of(GroupType.CONSUMER), "consumer", Optional.of(GroupState.STABLE)),
- new GroupListing("SG", Optional.of(GroupType.SHARE), "share",
Optional.of(GroupState.STABLE))
+ new GroupListing("SG", Optional.of(GroupType.SHARE), "share",
Optional.of(GroupState.STABLE)),
+ new GroupListing("StrG", Optional.of(GroupType.STREAMS),
"streams", Optional.of(GroupState.STABLE))
);
when(adminClient.listGroups()).thenReturn(result);
@@ -351,7 +433,8 @@ public class GroupsCommandTest {
ListGroupsResult result = AdminClientTestUtils.listGroupsResult(
new GroupListing("CGconsumer",
Optional.of(GroupType.CONSUMER), "consumer", Optional.of(GroupState.STABLE)),
- new GroupListing("SG", Optional.of(GroupType.SHARE), "share",
Optional.of(GroupState.STABLE))
+ new GroupListing("SG", Optional.of(GroupType.SHARE), "share",
Optional.of(GroupState.STABLE)),
+ new GroupListing("StrG", Optional.of(GroupType.STREAMS),
"streams", Optional.of(GroupState.STABLE))
);
when(adminClient.listGroups()).thenReturn(result);
@@ -380,21 +463,24 @@ public class GroupsCommandTest {
)));
}
- @SuppressWarnings("NPathComplexity")
+ @SuppressWarnings({"NPathComplexity", "CyclomaticComplexity"})
@ClusterTest(
serverProperties = {
- @ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value =
"classic,consumer,share"),
+ @ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value =
"classic,consumer,share,streams"),
@ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
@ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
}
)
public void testGroupCommand(ClusterInstance clusterInstance) throws
Exception {
String topic = "topic";
+ String outputTopic = "output-topic";
String classicGroupId = "classic_group";
String consumerGroupId = "consumer_group";
String shareGroupId = "share_group";
+ String streamsGroupId = "streams_group";
String simpleGroupId = "simple_group";
- clusterInstance.createTopic("topic", 1, (short) 1);
+ clusterInstance.createTopic(topic, 1, (short) 1);
+ clusterInstance.createTopic(outputTopic, 1, (short) 1);
TopicPartition topicPartition = new TopicPartition(topic, 0);
Properties props = new Properties();
@@ -403,6 +489,7 @@ public class GroupsCommandTest {
try (KafkaConsumer<String, String> classicGroup =
createKafkaConsumer(clusterInstance, classicGroupId, GroupProtocol.CLASSIC);
KafkaConsumer<String, String> consumerGroup =
createKafkaConsumer(clusterInstance, consumerGroupId, GroupProtocol.CONSUMER);
KafkaShareConsumer<String, String> shareGroup =
createKafkaShareConsumer(clusterInstance, shareGroupId);
+ KafkaStreams streams = createKafkaStreams(clusterInstance,
streamsGroupId, topic, outputTopic);
Admin admin = clusterInstance.admin();
GroupsCommand.GroupsService groupsCommand = new
GroupsCommand.GroupsService(props)
) {
@@ -412,6 +499,7 @@ public class GroupsCommandTest {
consumerGroup.poll(Duration.ofMillis(1000));
shareGroup.subscribe(List.of(topic));
shareGroup.poll(Duration.ofMillis(1000));
+ streams.start();
AlterConsumerGroupOffsetsResult result =
admin.alterConsumerGroupOffsets(simpleGroupId, Map.of(topicPartition, new
OffsetAndMetadata(0L)));
assertNull(result.all().get());
@@ -420,12 +508,13 @@ public class GroupsCommandTest {
Map.Entry<String, String> res =
ToolsTestUtils.grabConsoleOutputAndError(() ->
assertDoesNotThrow(() -> groupsCommand.listGroups(new
GroupsCommand.GroupsCommandOptions(
List.of("--bootstrap-server",
clusterInstance.bootstrapServers(), "--list").toArray(new String[0])))));
- if (res.getKey().split("\n").length == 5 &&
res.getValue().isEmpty()) {
+ if (res.getKey().split("\n").length == 6 &&
res.getValue().isEmpty()) {
assertCapturedListOutput(res.getKey(),
new String[]{classicGroupId, "Classic", "consumer"},
new String[]{consumerGroupId, "Consumer", "consumer"},
new String[]{simpleGroupId, "Classic"},
- new String[]{shareGroupId, "Share", "share"});
+ new String[]{shareGroupId, "Share", "share"},
+ new String[]{streamsGroupId, "Streams", "streams"});
return true;
}
return false;
@@ -482,6 +571,18 @@ public class GroupsCommandTest {
return false;
}, "Waiting for listing groups to return share type groups");
+ TestUtils.waitForCondition(() -> {
+ Map.Entry<String, String> res =
ToolsTestUtils.grabConsoleOutputAndError(() ->
+ assertDoesNotThrow(() -> groupsCommand.listGroups(new
GroupsCommand.GroupsCommandOptions(
+ List.of("--bootstrap-server",
clusterInstance.bootstrapServers(), "--list", "--group-type",
"streams").toArray(new String[0])))));
+ if (res.getKey().split("\n").length == 2 &&
res.getValue().isEmpty()) {
+ assertCapturedListOutput(res.getKey(),
+ new String[]{streamsGroupId, "Streams", "streams"});
+ return true;
+ }
+ return false;
+ }, "Waiting for listing groups to return streams type groups");
+
TestUtils.waitForCondition(() -> {
Map.Entry<String, String> res =
ToolsTestUtils.grabConsoleOutputAndError(() ->
assertDoesNotThrow(() -> groupsCommand.listGroups(new
GroupsCommand.GroupsCommandOptions(
@@ -493,6 +594,19 @@ public class GroupsCommandTest {
}
return false;
}, "Waiting for listing groups to return share type groups");
+
+
+ TestUtils.waitForCondition(() -> {
+ Map.Entry<String, String> res =
ToolsTestUtils.grabConsoleOutputAndError(() ->
+ assertDoesNotThrow(() -> groupsCommand.listGroups(new
GroupsCommand.GroupsCommandOptions(
+ List.of("--bootstrap-server",
clusterInstance.bootstrapServers(), "--list", "--streams").toArray(new
String[0])))));
+ if (res.getKey().split("\n").length == 2 &&
res.getValue().isEmpty()) {
+ assertCapturedListOutput(res.getKey(),
+ new String[]{streamsGroupId, "Streams", "streams"});
+ return true;
+ }
+ return false;
+ }, "Waiting for listing groups to return streams type groups");
}
}
@@ -534,4 +648,18 @@ public class GroupsCommandTest {
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName(),
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName()));
}
+
+ private KafkaStreams createKafkaStreams(ClusterInstance clusterInstance,
String groupId, String inputTopic, String outputTopic) {
+ final StreamsBuilder builder = new StreamsBuilder();
+ final KStream<Long, String> input = builder.stream(inputTopic);
+ input.map((key, value) -> new KeyValue<>(key, key)).to(outputTopic,
Produced.with(Serdes.Long(), Serdes.Long()));
+ Topology topology = builder.build();
+ Properties properties = new Properties();
+ properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
clusterInstance.bootstrapServers());
+ properties.put(StreamsConfig.APPLICATION_ID_CONFIG, groupId);
+ properties.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, "streams");
+ properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
+ properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
+ return new KafkaStreams(topology, properties);
+ }
}