dajac commented on code in PR #15150: URL: https://github.com/apache/kafka/pull/15150#discussion_r1483062130
########## clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java: ########## @@ -2811,6 +2811,71 @@ public void testListConsumerGroupsWithStates() throws Exception { } } + @Test + public void testListConsumerGroupsWithTypes() throws Exception { + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + + // Test with no specific list consumer group options. + env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE)); + + env.kafkaClient().prepareResponseFrom( + new ListGroupsResponse(new ListGroupsResponseData() + .setErrorCode(Errors.NONE.code()) + .setGroups(Arrays.asList( + new ListGroupsResponseData.ListedGroup() + .setGroupId("group-1") + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setGroupState("Stable") + .setGroupType(GroupType.CLASSIC.toString()), + new ListGroupsResponseData.ListedGroup() + .setGroupId("group-2") + .setGroupState("Empty") + .setGroupType(GroupType.CLASSIC.toString())))), + env.cluster().nodeById(0)); + + final ListConsumerGroupsOptions options = new ListConsumerGroupsOptions(); + final ListConsumerGroupsResult result = env.adminClient().listConsumerGroups(options); + Collection<ConsumerGroupListing> listings = result.valid().get(); + + assertEquals(2, listings.size()); + List<ConsumerGroupListing> expected = new ArrayList<>(); + expected.add(new ConsumerGroupListing("group-2", true, Optional.of(ConsumerGroupState.EMPTY), Optional.of(GroupType.CLASSIC))); + expected.add(new ConsumerGroupListing("group-1", false, Optional.of(ConsumerGroupState.STABLE), Optional.of(GroupType.CLASSIC))); + assertEquals(expected, listings); + assertEquals(0, result.errors().get().size()); + + // Test with list consumer group options. + env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE)); + + env.kafkaClient().prepareResponseFrom( Review Comment: It may be worth validating that the request also contains the expected filters. For this, you can pass a request matcher as the first argument here. There are examples in this file. We could also add it to the previous case. ########## core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala: ########## @@ -189,16 +197,65 @@ object ConsumerGroupCommand extends Logging { } def listGroups(): Unit = { - if (opts.options.has(opts.stateOpt)) { - val stateValue = opts.options.valueOf(opts.stateOpt) - val states = if (stateValue == null || stateValue.isEmpty) - Set[ConsumerGroupState]() - else - consumerGroupStatesFromString(stateValue) - val listings = listConsumerGroupsWithState(states) - printGroupStates(listings.map(e => (e.groupId, e.state.get.toString))) - } else + val includeType = opts.options.has(opts.typeOpt) + val includeState = opts.options.has(opts.stateOpt) + + if (includeType || includeState) { + val types = typeValues() + val states = stateValues() + val listings = listConsumerGroupsWithFilters(types, states) + + printGroupInfo(listings, includeType, includeState) + + } else { listConsumerGroups().foreach(println(_)) + } + } + + private def stateValues(): Set[ConsumerGroupState] = { + val stateValue = opts.options.valueOf(opts.stateOpt) + if (stateValue == null || stateValue.isEmpty) + Set[ConsumerGroupState]() + else + consumerGroupStatesFromString(stateValue) + } + + private def typeValues(): Set[GroupType] = { + val typeValue = opts.options.valueOf(opts.typeOpt) + if (typeValue == null || typeValue.isEmpty) + Set[GroupType]() + else + consumerGroupTypesFromString(typeValue) + } + + private def printGroupInfo(groups: List[ConsumerGroupListing], includeType: Boolean, includeState: Boolean): Unit = { + def groupId(groupListing: ConsumerGroupListing): String = groupListing.groupId + def groupType(groupListing: ConsumerGroupListing): String = groupListing.`type`().orElse(GroupType.UNKNOWN).toString + def groupState(groupListing: ConsumerGroupListing): String = groupListing.state.orElse(ConsumerGroupState.UNKNOWN).toString + + val maxGroupLen = groups.foldLeft(15)((maxLen, groupListing) => Math.max(maxLen, groupId(groupListing).length)) + 10 + var format = s"%-${maxGroupLen}s" + var header = List("Group") Review Comment: nit: `GROUP` as it was before. ########## clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java: ########## @@ -2849,6 +2914,44 @@ public void testListConsumerGroupsWithStatesOlderBrokerVersion() throws Exceptio } } + @Test + public void testListConsumerGroupsWithTypesOlderBrokerVersion() throws Exception { + ApiVersion listGroupV4 = new ApiVersion() + .setApiKey(ApiKeys.LIST_GROUPS.id) + .setMinVersion((short) 0) + .setMaxVersion((short) 4); + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(Collections.singletonList(listGroupV4))); + + env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE)); + + // Check if we can list groups with older broker if we don't specify types. + env.kafkaClient().prepareResponseFrom( + new ListGroupsResponse(new ListGroupsResponseData() + .setErrorCode(Errors.NONE.code()) + .setGroups(Collections.singletonList( + new ListGroupsResponseData.ListedGroup() + .setGroupId("group-1") + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)))), + env.cluster().nodeById(0)); + ListConsumerGroupsOptions options = new ListConsumerGroupsOptions(); + ListConsumerGroupsResult result = env.adminClient().listConsumerGroups(options); + Collection<ConsumerGroupListing> listing = result.all().get(); + assertEquals(1, listing.size()); + List<ConsumerGroupListing> expected = Collections.singletonList(new ConsumerGroupListing("group-1", false)); + assertEquals(expected, listing); Review Comment: I wonder if this is really necessary. We already test this in the other test. ########## core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala: ########## @@ -111,37 +263,124 @@ class ListConsumerGroupTest extends ConsumerGroupCommandTest { assertThrows(classOf[IllegalArgumentException], () => ConsumerGroupCommand.consumerGroupStatesFromString(" , ,")) } - @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) - def testListGroupCommand(quorum: String): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testConsumerGroupTypesFromString(quorum: String, groupProtocol: String): Unit = { + var result = ConsumerGroupCommand.consumerGroupTypesFromString("consumer") + assertEquals(Set(GroupType.CONSUMER), result) + + result = ConsumerGroupCommand.consumerGroupTypesFromString("consumer, classic") + assertEquals(Set(GroupType.CONSUMER, GroupType.CLASSIC), result) + + result = ConsumerGroupCommand.consumerGroupTypesFromString("Consumer, Classic") + assertEquals(Set(GroupType.CONSUMER, GroupType.CLASSIC), result) + + assertThrows(classOf[IllegalArgumentException], () => ConsumerGroupCommand.consumerGroupTypesFromString("bad, wrong")) + + assertThrows(classOf[IllegalArgumentException], () => ConsumerGroupCommand.consumerGroupTypesFromString(" bad, generic")) + + assertThrows(classOf[IllegalArgumentException], () => ConsumerGroupCommand.consumerGroupTypesFromString(" , ,")) + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly")) + def testListGroupCommandClassicProtocol(quorum: String, groupProtocol: String): Unit = { val simpleGroup = "simple-group" + val protocolGroup = "protocol-group" + addSimpleGroupExecutor(group = simpleGroup) addConsumerGroupExecutor(numConsumers = 1) + addConsumerGroupExecutor(numConsumers = 1, group = protocolGroup, groupProtocol = groupProtocol) var out = "" var cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list") TestUtils.waitUntilTrue(() => { out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs)) - !out.contains("STATE") && out.contains(simpleGroup) && out.contains(group) - }, s"Expected to find $simpleGroup, $group and no header, but found $out") + !out.contains("STATE") && out.contains(simpleGroup) && out.contains(group) && out.contains(protocolGroup) + }, s"Expected to find $simpleGroup, $group, $protocolGroup and no header, but found $out") cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", "--state") TestUtils.waitUntilTrue(() => { out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs)) - out.contains("STATE") && out.contains(simpleGroup) && out.contains(group) - }, s"Expected to find $simpleGroup, $group and the header, but found $out") + out.contains("STATE") && !out.contains("TYPE") && out.contains(simpleGroup) && out.contains(group) && out.contains(protocolGroup) + }, s"Expected to find $simpleGroup, $group, $protocolGroup and the header, but found $out") + + cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", "--type") + TestUtils.waitUntilTrue(() => { + out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs)) + out.contains("TYPE") && !out.contains("STATE") && out.contains(simpleGroup) && out.contains(group) && out.contains(protocolGroup) + }, s"Expected to find $simpleGroup, $group, $protocolGroup and the header, but found $out") + + cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", "--state", "--type") + TestUtils.waitUntilTrue(() => { + out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs)) + out.contains("TYPE") && out.contains("STATE") && out.contains(simpleGroup) && out.contains(group) && out.contains(protocolGroup) + }, s"Expected to find $simpleGroup, $group, $protocolGroup and the header, but found $out") cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", "--state", "Stable") TestUtils.waitUntilTrue(() => { out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs)) - out.contains("STATE") && out.contains(group) && out.contains("Stable") - }, s"Expected to find $group in state Stable and the header, but found $out") + out.contains("STATE") && out.contains(group) && out.contains("Stable") && out.contains(protocolGroup) + }, s"Expected to find $group, $protocolGroup in state Stable and the header, but found $out") cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", "--state", "stable") TestUtils.waitUntilTrue(() => { out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs)) - out.contains("STATE") && out.contains(group) && out.contains("Stable") - }, s"Expected to find $group in state Stable and the header, but found $out") + out.contains("STATE") && out.contains(group) && out.contains("Stable") && out.contains(protocolGroup) + }, s"Expected to find $group, $protocolGroup in state Stable and the header, but found $out") + + cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", "--type", "Classic") + TestUtils.waitUntilTrue(() => { + out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs)) + out.contains("TYPE") && out.contains("Classic") && !out.contains("STATE") && out.contains(simpleGroup) && out.contains(group) && out.contains(protocolGroup) + }, s"Expected to find $simpleGroup, $group, $protocolGroup and the header, but found $out") + + cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", "--type", "classic") + TestUtils.waitUntilTrue(() => { + out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs)) + out.contains("TYPE") && out.contains("Classic") && !out.contains("STATE") && out.contains(simpleGroup) && out.contains(group) && out.contains(protocolGroup) + }, s"Expected to find $simpleGroup, $group, $protocolGroup and the header, but found $out") } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @CsvSource(Array("kraft+kip848,consumer")) + def testListGroupCommandConsumerProtocol(quorum: String, groupProtocol: String): Unit = { + val simpleGroup = "simple-group" Review Comment: btw, I was wondering why tests fail in this suite but not in DescribeConsumerGroupTest. I have noticed that there we call `createOffsetsTopic()` in each tests. We could also try this here. ########## core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala: ########## @@ -64,28 +67,177 @@ class ListConsumerGroupTest extends ConsumerGroupCommandTest { val service = getConsumerGroupService(cgcArgs) val expectedListing = Set( - new ConsumerGroupListing(simpleGroup, true, Optional.of(ConsumerGroupState.EMPTY)), - new ConsumerGroupListing(group, false, Optional.of(ConsumerGroupState.STABLE))) + new ConsumerGroupListing( + simpleGroup, + true, + Optional.of(ConsumerGroupState.EMPTY), + Optional.of(GroupType.CLASSIC) + ), + new ConsumerGroupListing( + group, + false, + Optional.of(ConsumerGroupState.STABLE), + Optional.of(GroupType.CLASSIC) + ) + ) var foundListing = Set.empty[ConsumerGroupListing] TestUtils.waitUntilTrue(() => { - foundListing = service.listConsumerGroupsWithState(ConsumerGroupState.values.toSet).toSet + foundListing = service.listConsumerGroupsWithFilters(Set.empty, ConsumerGroupState.values.toSet).toSet expectedListing == foundListing }, s"Expected to show groups $expectedListing, but found $foundListing") - val expectedListingStable = Set( - new ConsumerGroupListing(group, false, Optional.of(ConsumerGroupState.STABLE))) + val expectedListingStable = Set.empty[ConsumerGroupListing] foundListing = Set.empty[ConsumerGroupListing] TestUtils.waitUntilTrue(() => { - foundListing = service.listConsumerGroupsWithState(Set(ConsumerGroupState.STABLE)).toSet + foundListing = service.listConsumerGroupsWithFilters(Set.empty, Set(ConsumerGroupState.PREPARING_REBALANCE)).toSet expectedListingStable == foundListing }, s"Expected to show groups $expectedListingStable, but found $foundListing") } - @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) - def testConsumerGroupStatesFromString(quorum: String): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly")) + def testListConsumerGroupsWithTypesClassicProtocol(quorum: String, groupProtocol: String): Unit = { + val simpleGroup = "simple-group" + val protocolGroup = "protocol-group" + + addSimpleGroupExecutor(group = simpleGroup) + addConsumerGroupExecutor(numConsumers = 1) + addConsumerGroupExecutor(numConsumers = 1, group = protocolGroup, groupProtocol = groupProtocol) + + val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", "--type") + val service = getConsumerGroupService(cgcArgs) + + val expectedListingStable = Set.empty[ConsumerGroupListing] + + val expectedListing = Set( + new ConsumerGroupListing( + simpleGroup, + true, + Optional.of(ConsumerGroupState.EMPTY), + Optional.of(GroupType.CLASSIC) + ), + new ConsumerGroupListing( + group, + false, + Optional.of(ConsumerGroupState.STABLE), + Optional.of(GroupType.CLASSIC) + ), + new ConsumerGroupListing( + protocolGroup, + false, + Optional.of(ConsumerGroupState.STABLE), + Optional.of(GroupType.CLASSIC) + ) + ) + + // No filters explicitly mentioned. Expectation is that all groups are returned. + var foundListing = Set.empty[ConsumerGroupListing] + TestUtils.waitUntilTrue(() => { + foundListing = service.listConsumerGroupsWithFilters(Set.empty, Set.empty).toSet + expectedListing == foundListing + }, s"Expected to show groups $expectedListing, but found $foundListing") Review Comment: nit: This code is repeated many times. I wonder if it may be worth extracting it into an helper method. It would make the test cases smaller. ########## core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala: ########## @@ -189,16 +197,65 @@ object ConsumerGroupCommand extends Logging { } def listGroups(): Unit = { - if (opts.options.has(opts.stateOpt)) { - val stateValue = opts.options.valueOf(opts.stateOpt) - val states = if (stateValue == null || stateValue.isEmpty) - Set[ConsumerGroupState]() - else - consumerGroupStatesFromString(stateValue) - val listings = listConsumerGroupsWithState(states) - printGroupStates(listings.map(e => (e.groupId, e.state.get.toString))) - } else + val includeType = opts.options.has(opts.typeOpt) + val includeState = opts.options.has(opts.stateOpt) + + if (includeType || includeState) { + val types = typeValues() + val states = stateValues() + val listings = listConsumerGroupsWithFilters(types, states) + + printGroupInfo(listings, includeType, includeState) + + } else { listConsumerGroups().foreach(println(_)) + } + } + + private def stateValues(): Set[ConsumerGroupState] = { + val stateValue = opts.options.valueOf(opts.stateOpt) + if (stateValue == null || stateValue.isEmpty) + Set[ConsumerGroupState]() + else + consumerGroupStatesFromString(stateValue) + } + + private def typeValues(): Set[GroupType] = { + val typeValue = opts.options.valueOf(opts.typeOpt) + if (typeValue == null || typeValue.isEmpty) + Set[GroupType]() + else + consumerGroupTypesFromString(typeValue) + } + + private def printGroupInfo(groups: List[ConsumerGroupListing], includeType: Boolean, includeState: Boolean): Unit = { + def groupId(groupListing: ConsumerGroupListing): String = groupListing.groupId + def groupType(groupListing: ConsumerGroupListing): String = groupListing.`type`().orElse(GroupType.UNKNOWN).toString + def groupState(groupListing: ConsumerGroupListing): String = groupListing.state.orElse(ConsumerGroupState.UNKNOWN).toString + + val maxGroupLen = groups.foldLeft(15)((maxLen, groupListing) => Math.max(maxLen, groupId(groupListing).length)) + 10 Review Comment: nit: It looks like `+ 10` was not there before. Why do we need it? ########## core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala: ########## @@ -64,28 +67,177 @@ class ListConsumerGroupTest extends ConsumerGroupCommandTest { val service = getConsumerGroupService(cgcArgs) val expectedListing = Set( - new ConsumerGroupListing(simpleGroup, true, Optional.of(ConsumerGroupState.EMPTY)), - new ConsumerGroupListing(group, false, Optional.of(ConsumerGroupState.STABLE))) + new ConsumerGroupListing( + simpleGroup, + true, + Optional.of(ConsumerGroupState.EMPTY), + Optional.of(GroupType.CLASSIC) + ), + new ConsumerGroupListing( + group, + false, + Optional.of(ConsumerGroupState.STABLE), + Optional.of(GroupType.CLASSIC) + ) + ) var foundListing = Set.empty[ConsumerGroupListing] TestUtils.waitUntilTrue(() => { - foundListing = service.listConsumerGroupsWithState(ConsumerGroupState.values.toSet).toSet + foundListing = service.listConsumerGroupsWithFilters(Set.empty, ConsumerGroupState.values.toSet).toSet expectedListing == foundListing }, s"Expected to show groups $expectedListing, but found $foundListing") - val expectedListingStable = Set( - new ConsumerGroupListing(group, false, Optional.of(ConsumerGroupState.STABLE))) + val expectedListingStable = Set.empty[ConsumerGroupListing] foundListing = Set.empty[ConsumerGroupListing] TestUtils.waitUntilTrue(() => { - foundListing = service.listConsumerGroupsWithState(Set(ConsumerGroupState.STABLE)).toSet + foundListing = service.listConsumerGroupsWithFilters(Set.empty, Set(ConsumerGroupState.PREPARING_REBALANCE)).toSet expectedListingStable == foundListing }, s"Expected to show groups $expectedListingStable, but found $foundListing") } - @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) - def testConsumerGroupStatesFromString(quorum: String): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly")) + def testListConsumerGroupsWithTypesClassicProtocol(quorum: String, groupProtocol: String): Unit = { + val simpleGroup = "simple-group" + val protocolGroup = "protocol-group" + + addSimpleGroupExecutor(group = simpleGroup) + addConsumerGroupExecutor(numConsumers = 1) + addConsumerGroupExecutor(numConsumers = 1, group = protocolGroup, groupProtocol = groupProtocol) Review Comment: nit: We probably don't need this one as we only test with the classic protocol in this test. ########## clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListing.java: ########## @@ -51,6 +53,27 @@ public ConsumerGroupListing(String groupId, boolean isSimpleConsumerGroup, Optio this.groupId = groupId; this.isSimpleConsumerGroup = isSimpleConsumerGroup; this.state = Objects.requireNonNull(state); + this.type = Optional.empty(); Review Comment: nit: Let's call the other constructor here. ########## core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala: ########## @@ -111,37 +263,124 @@ class ListConsumerGroupTest extends ConsumerGroupCommandTest { assertThrows(classOf[IllegalArgumentException], () => ConsumerGroupCommand.consumerGroupStatesFromString(" , ,")) } - @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) - def testListGroupCommand(quorum: String): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) Review Comment: nit: `@Test`? ########## core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala: ########## @@ -111,37 +263,124 @@ class ListConsumerGroupTest extends ConsumerGroupCommandTest { assertThrows(classOf[IllegalArgumentException], () => ConsumerGroupCommand.consumerGroupStatesFromString(" , ,")) } - @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) - def testListGroupCommand(quorum: String): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testConsumerGroupTypesFromString(quorum: String, groupProtocol: String): Unit = { + var result = ConsumerGroupCommand.consumerGroupTypesFromString("consumer") + assertEquals(Set(GroupType.CONSUMER), result) + + result = ConsumerGroupCommand.consumerGroupTypesFromString("consumer, classic") + assertEquals(Set(GroupType.CONSUMER, GroupType.CLASSIC), result) + + result = ConsumerGroupCommand.consumerGroupTypesFromString("Consumer, Classic") + assertEquals(Set(GroupType.CONSUMER, GroupType.CLASSIC), result) + + assertThrows(classOf[IllegalArgumentException], () => ConsumerGroupCommand.consumerGroupTypesFromString("bad, wrong")) + + assertThrows(classOf[IllegalArgumentException], () => ConsumerGroupCommand.consumerGroupTypesFromString(" bad, generic")) + + assertThrows(classOf[IllegalArgumentException], () => ConsumerGroupCommand.consumerGroupTypesFromString(" , ,")) + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly")) + def testListGroupCommandClassicProtocol(quorum: String, groupProtocol: String): Unit = { val simpleGroup = "simple-group" + val protocolGroup = "protocol-group" + addSimpleGroupExecutor(group = simpleGroup) addConsumerGroupExecutor(numConsumers = 1) + addConsumerGroupExecutor(numConsumers = 1, group = protocolGroup, groupProtocol = groupProtocol) var out = "" var cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list") TestUtils.waitUntilTrue(() => { out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs)) - !out.contains("STATE") && out.contains(simpleGroup) && out.contains(group) - }, s"Expected to find $simpleGroup, $group and no header, but found $out") + !out.contains("STATE") && out.contains(simpleGroup) && out.contains(group) && out.contains(protocolGroup) + }, s"Expected to find $simpleGroup, $group, $protocolGroup and no header, but found $out") cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", "--state") TestUtils.waitUntilTrue(() => { out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs)) - out.contains("STATE") && out.contains(simpleGroup) && out.contains(group) - }, s"Expected to find $simpleGroup, $group and the header, but found $out") + out.contains("STATE") && !out.contains("TYPE") && out.contains(simpleGroup) && out.contains(group) && out.contains(protocolGroup) + }, s"Expected to find $simpleGroup, $group, $protocolGroup and the header, but found $out") + + cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", "--type") + TestUtils.waitUntilTrue(() => { + out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs)) + out.contains("TYPE") && !out.contains("STATE") && out.contains(simpleGroup) && out.contains(group) && out.contains(protocolGroup) + }, s"Expected to find $simpleGroup, $group, $protocolGroup and the header, but found $out") + + cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", "--state", "--type") + TestUtils.waitUntilTrue(() => { + out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs)) + out.contains("TYPE") && out.contains("STATE") && out.contains(simpleGroup) && out.contains(group) && out.contains(protocolGroup) + }, s"Expected to find $simpleGroup, $group, $protocolGroup and the header, but found $out") cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", "--state", "Stable") TestUtils.waitUntilTrue(() => { out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs)) - out.contains("STATE") && out.contains(group) && out.contains("Stable") - }, s"Expected to find $group in state Stable and the header, but found $out") + out.contains("STATE") && out.contains(group) && out.contains("Stable") && out.contains(protocolGroup) + }, s"Expected to find $group, $protocolGroup in state Stable and the header, but found $out") cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", "--state", "stable") TestUtils.waitUntilTrue(() => { out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs)) - out.contains("STATE") && out.contains(group) && out.contains("Stable") - }, s"Expected to find $group in state Stable and the header, but found $out") + out.contains("STATE") && out.contains(group) && out.contains("Stable") && out.contains(protocolGroup) + }, s"Expected to find $group, $protocolGroup in state Stable and the header, but found $out") + + cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", "--type", "Classic") + TestUtils.waitUntilTrue(() => { + out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs)) + out.contains("TYPE") && out.contains("Classic") && !out.contains("STATE") && out.contains(simpleGroup) && out.contains(group) && out.contains(protocolGroup) + }, s"Expected to find $simpleGroup, $group, $protocolGroup and the header, but found $out") + + cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", "--type", "classic") + TestUtils.waitUntilTrue(() => { + out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs)) + out.contains("TYPE") && out.contains("Classic") && !out.contains("STATE") && out.contains(simpleGroup) && out.contains(group) && out.contains(protocolGroup) + }, s"Expected to find $simpleGroup, $group, $protocolGroup and the header, but found $out") } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @CsvSource(Array("kraft+kip848,consumer")) Review Comment: It would be better to also have a method as the source (like the others) for this one. We can add one if does not exist yet. ########## core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala: ########## @@ -64,28 +67,177 @@ class ListConsumerGroupTest extends ConsumerGroupCommandTest { val service = getConsumerGroupService(cgcArgs) val expectedListing = Set( - new ConsumerGroupListing(simpleGroup, true, Optional.of(ConsumerGroupState.EMPTY)), - new ConsumerGroupListing(group, false, Optional.of(ConsumerGroupState.STABLE))) + new ConsumerGroupListing( + simpleGroup, + true, + Optional.of(ConsumerGroupState.EMPTY), + Optional.of(GroupType.CLASSIC) + ), + new ConsumerGroupListing( + group, + false, + Optional.of(ConsumerGroupState.STABLE), + Optional.of(GroupType.CLASSIC) + ) + ) var foundListing = Set.empty[ConsumerGroupListing] TestUtils.waitUntilTrue(() => { - foundListing = service.listConsumerGroupsWithState(ConsumerGroupState.values.toSet).toSet + foundListing = service.listConsumerGroupsWithFilters(Set.empty, ConsumerGroupState.values.toSet).toSet expectedListing == foundListing }, s"Expected to show groups $expectedListing, but found $foundListing") - val expectedListingStable = Set( - new ConsumerGroupListing(group, false, Optional.of(ConsumerGroupState.STABLE))) + val expectedListingStable = Set.empty[ConsumerGroupListing] foundListing = Set.empty[ConsumerGroupListing] TestUtils.waitUntilTrue(() => { - foundListing = service.listConsumerGroupsWithState(Set(ConsumerGroupState.STABLE)).toSet + foundListing = service.listConsumerGroupsWithFilters(Set.empty, Set(ConsumerGroupState.PREPARING_REBALANCE)).toSet expectedListingStable == foundListing }, s"Expected to show groups $expectedListingStable, but found $foundListing") } - @ParameterizedTest - @ValueSource(strings = Array("zk", "kraft")) - def testConsumerGroupStatesFromString(quorum: String): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly")) + def testListConsumerGroupsWithTypesClassicProtocol(quorum: String, groupProtocol: String): Unit = { + val simpleGroup = "simple-group" + val protocolGroup = "protocol-group" + + addSimpleGroupExecutor(group = simpleGroup) + addConsumerGroupExecutor(numConsumers = 1) + addConsumerGroupExecutor(numConsumers = 1, group = protocolGroup, groupProtocol = groupProtocol) + + val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", "--type") + val service = getConsumerGroupService(cgcArgs) + + val expectedListingStable = Set.empty[ConsumerGroupListing] + + val expectedListing = Set( + new ConsumerGroupListing( + simpleGroup, + true, + Optional.of(ConsumerGroupState.EMPTY), + Optional.of(GroupType.CLASSIC) + ), + new ConsumerGroupListing( + group, + false, + Optional.of(ConsumerGroupState.STABLE), + Optional.of(GroupType.CLASSIC) + ), + new ConsumerGroupListing( + protocolGroup, + false, + Optional.of(ConsumerGroupState.STABLE), + Optional.of(GroupType.CLASSIC) + ) + ) + + // No filters explicitly mentioned. Expectation is that all groups are returned. + var foundListing = Set.empty[ConsumerGroupListing] + TestUtils.waitUntilTrue(() => { + foundListing = service.listConsumerGroupsWithFilters(Set.empty, Set.empty).toSet + expectedListing == foundListing + }, s"Expected to show groups $expectedListing, but found $foundListing") + + // When group type is mentioned: + // Old Group Coordinator returns empty listings if the type is not Classic. + // New Group Coordinator returns groups according to the filter. + foundListing = Set.empty[ConsumerGroupListing] + TestUtils.waitUntilTrue(() => { + foundListing = service.listConsumerGroupsWithFilters(Set(GroupType.CONSUMER), Set.empty).toSet + expectedListingStable == foundListing Review Comment: nit: Should we remove `expectedListingStable` and directly use `Set.empty[ConsumerGroupListing]` here? ########## core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala: ########## @@ -64,28 +67,177 @@ class ListConsumerGroupTest extends ConsumerGroupCommandTest { val service = getConsumerGroupService(cgcArgs) val expectedListing = Set( - new ConsumerGroupListing(simpleGroup, true, Optional.of(ConsumerGroupState.EMPTY)), - new ConsumerGroupListing(group, false, Optional.of(ConsumerGroupState.STABLE))) + new ConsumerGroupListing( + simpleGroup, + true, + Optional.of(ConsumerGroupState.EMPTY), + Optional.of(GroupType.CLASSIC) + ), + new ConsumerGroupListing( + group, + false, + Optional.of(ConsumerGroupState.STABLE), + Optional.of(GroupType.CLASSIC) + ) + ) var foundListing = Set.empty[ConsumerGroupListing] TestUtils.waitUntilTrue(() => { - foundListing = service.listConsumerGroupsWithState(ConsumerGroupState.values.toSet).toSet + foundListing = service.listConsumerGroupsWithFilters(Set.empty, ConsumerGroupState.values.toSet).toSet expectedListing == foundListing }, s"Expected to show groups $expectedListing, but found $foundListing") - val expectedListingStable = Set( - new ConsumerGroupListing(group, false, Optional.of(ConsumerGroupState.STABLE))) + val expectedListingStable = Set.empty[ConsumerGroupListing] foundListing = Set.empty[ConsumerGroupListing] TestUtils.waitUntilTrue(() => { - foundListing = service.listConsumerGroupsWithState(Set(ConsumerGroupState.STABLE)).toSet + foundListing = service.listConsumerGroupsWithFilters(Set.empty, Set(ConsumerGroupState.PREPARING_REBALANCE)).toSet Review Comment: Why are we changing this? It you really want to test with a state that does not exist, it may be better to keep this one as it was and to add it after it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org