AndrewJSchofield commented on code in PR #17712:
URL: https://github.com/apache/kafka/pull/17712#discussion_r1837630024
##########
tools/src/test/java/org/apache/kafka/tools/GroupsCommandTest.java:
##########
@@ -357,6 +382,122 @@ public void testListGroupsFailsWithException() {
)));
}
+ @SuppressWarnings("NPathComplexity")
+ @ClusterTest(
+ serverProperties = {
+ @ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value =
"classic,consumer,share"),
+ @ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+ @ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
+ }
+ )
+ public void testGroupCommand(ClusterInstance clusterInstance) throws
Exception {
+ String topic = "topic";
+ String classicGroupId = "classic_group";
+ String consumerGroupId = "consumer_group";
+ String shareGroupId = "share_group";
+ String simpleGroupId = "simple_group";
+ clusterInstance.createTopic("topic", 1, (short) 1);
+ TopicPartition topicPartition = new TopicPartition(topic, 0);
+
+ Properties props = new Properties();
+ props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
clusterInstance.bootstrapServers());
+
+ try (KafkaConsumer<String, String> classicGroup =
createKafkaConsumer(clusterInstance, classicGroupId, GroupProtocol.CLASSIC);
+ KafkaConsumer<String, String> consumerGroup =
createKafkaConsumer(clusterInstance, consumerGroupId, GroupProtocol.CONSUMER);
+ KafkaShareConsumer<String, String> shareGroup =
createKafkaShareConsumer(clusterInstance, shareGroupId);
+ Admin admin = clusterInstance.createAdminClient();
+ GroupsCommand.GroupsService groupsCommand = new
GroupsCommand.GroupsService(props)
+ ) {
+ classicGroup.subscribe(List.of(topic));
+ classicGroup.poll(Duration.ofMillis(1000));
+ consumerGroup.subscribe(List.of(topic));
+ consumerGroup.poll(Duration.ofMillis(1000));
+ shareGroup.subscribe(List.of(topic));
+ shareGroup.poll(Duration.ofMillis(1000));
+
+ AlterConsumerGroupOffsetsResult result =
admin.alterConsumerGroupOffsets(simpleGroupId, Map.of(topicPartition, new
OffsetAndMetadata(0L)));
+ assertNull(result.all().get());
+
+ TestUtils.waitForCondition(() -> {
+ Map.Entry<String, String> res =
ToolsTestUtils.grabConsoleOutputAndError(() ->
+ assertDoesNotThrow(() -> groupsCommand.listGroups(new
GroupsCommand.GroupsCommandOptions(
+ List.of("--bootstrap-server",
clusterInstance.bootstrapServers(), "--list").toArray(new String[0])))));
+ if (res.getKey().split("\n").length == 5 &&
res.getValue().isEmpty()) {
+ assertCapturedListOutput(res.getKey(),
+ new String[]{classicGroupId, "Classic", "consumer"},
+ new String[]{consumerGroupId, "Consumer", "consumer"},
+ new String[]{simpleGroupId, "Classic"},
+ new String[]{shareGroupId, "Share", "share"});
+ return true;
+ }
+ return false;
+ }, "Waiting for listing groups to return all groups");
+
+ TestUtils.waitForCondition(() -> {
+ Map.Entry<String, String> res =
ToolsTestUtils.grabConsoleOutputAndError(() ->
+ assertDoesNotThrow(() -> groupsCommand.listGroups(new
GroupsCommand.GroupsCommandOptions(
+ List.of("--bootstrap-server",
clusterInstance.bootstrapServers(), "--list", "--consumer").toArray(new
String[0])))));
+ if (res.getKey().split("\n").length == 4 &&
res.getValue().isEmpty()) {
+ assertCapturedListOutput(res.getKey(),
+ new String[]{classicGroupId, "Classic", "consumer"},
+ new String[]{consumerGroupId, "Consumer", "consumer"},
+ new String[]{simpleGroupId, "Classic"});
+ return true;
+ }
+ return false;
+ }, "Waiting for listing groups to return consumer protocol
groups");
+
+ TestUtils.waitForCondition(() -> {
+ Map.Entry<String, String> res =
ToolsTestUtils.grabConsoleOutputAndError(() ->
+ assertDoesNotThrow(() -> groupsCommand.listGroups(new
GroupsCommand.GroupsCommandOptions(
+ List.of("--bootstrap-server",
clusterInstance.bootstrapServers(), "--list", "--group-type",
"classic").toArray(new String[0])))));
+ if (res.getKey().split("\n").length == 3 &&
res.getValue().isEmpty()) {
+ assertCapturedListOutput(res.getKey(),
+ new String[]{classicGroupId, "Classic", "consumer"},
+ new String[]{simpleGroupId, "Classic"});
+ return true;
+ }
+ return false;
+ }, "Waiting for listing groups to return classic type groups");
+
+ TestUtils.waitForCondition(() -> {
+ Map.Entry<String, String> res =
ToolsTestUtils.grabConsoleOutputAndError(() ->
+ assertDoesNotThrow(() -> groupsCommand.listGroups(new
GroupsCommand.GroupsCommandOptions(
+ List.of("--bootstrap-server",
clusterInstance.bootstrapServers(), "--list", "--group-type",
"consumer").toArray(new String[0])))));
+ if (res.getKey().split("\n").length == 2 &&
res.getValue().isEmpty()) {
+ assertCapturedListOutput(res.getKey(),
+ new String[]{consumerGroupId, "Consumer", "consumer"});
+ return true;
+ }
+ return false;
+ }, "Waiting for listing groups to return consumer type groups");
+
+ TestUtils.waitForCondition(() -> {
+ Map.Entry<String, String> res =
ToolsTestUtils.grabConsoleOutputAndError(() ->
+ assertDoesNotThrow(() -> groupsCommand.listGroups(new
GroupsCommand.GroupsCommandOptions(
+ List.of("--bootstrap-server",
clusterInstance.bootstrapServers(), "--list", "--group-type",
"share").toArray(new String[0])))));
+ if (res.getKey().split("\n").length == 2 &&
res.getValue().isEmpty()) {
+ assertCapturedListOutput(res.getKey(),
+ new String[]{shareGroupId, "Share", "share"});
+ return true;
+ }
+ return false;
+ }, "Waiting for listing groups to return share type groups");
+
+ TestUtils.waitForCondition(() -> {
+ Map.Entry<String, String> res =
ToolsTestUtils.grabConsoleOutputAndError(() ->
+ assertDoesNotThrow(() -> groupsCommand.listGroups(new
GroupsCommand.GroupsCommandOptions(
+ List.of("--bootstrap-server",
clusterInstance.bootstrapServers(), "--list", "--share").toArray(new
String[0])))));
+ if (res.getKey().split("\n").length == 2 &&
res.getValue().isEmpty()) {
+ assertCapturedListOutput(res.getKey(),
+ new String[]{shareGroupId, "Share", "share"});
+ return true;
+ }
+ return false;
+ }, "Waiting for listing groups to return share groups");
Review Comment:
nit: To match the other cases, `"Waiting for listing groups to return share
type groups"`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]