hachikuji commented on a change in pull request #8238:
URL: https://github.com/apache/kafka/pull/8238#discussion_r431596763



##########
File path: clients/src/main/resources/common/message/ListGroupsResponse.json
##########
@@ -34,7 +36,9 @@
       { "name": "GroupId", "type": "string", "versions": "0+", "entityType": 
"groupId",
         "about": "The group ID." },
       { "name": "ProtocolType", "type": "string", "versions": "0+",
-        "about": "The group protocol type." }
+        "about": "The group protocol type." },
+      { "name": "GroupState", "type": "string", "versions": "4+", 
"nullableVersions": "0+", "ignorable": true, "default": "null",

Review comment:
       Is it intentional to use nullable versions 0+? I'm surprised the 
generator doesn't fail.

##########
File path: clients/src/test/java/org/apache/kafka/clients/MockClient.java
##########
@@ -215,15 +215,22 @@ public void send(ClientRequest request, long now) {
             AbstractRequest.Builder<?> builder = request.requestBuilder();
             short version = 
nodeApiVersions.latestUsableVersion(request.apiKey(), 
builder.oldestAllowedVersion(),
                     builder.latestAllowedVersion());
-            AbstractRequest abstractRequest = 
request.requestBuilder().build(version);
-            if (!futureResp.requestMatcher.matches(abstractRequest))
-                throw new IllegalStateException("Request matcher did not match 
next-in-line request " + abstractRequest + " with prepared response " + 
futureResp.responseBody);
 
             UnsupportedVersionException unsupportedVersionException = null;
             if (futureResp.isUnsupportedRequest)
-                unsupportedVersionException = new 
UnsupportedVersionException("Api " +
-                        request.apiKey() + " with version " + version);
-
+                unsupportedVersionException = new UnsupportedVersionException(
+                        "Api " + request.apiKey() + " with version " + 
version);
+            try {
+                AbstractRequest abstractRequest = 
request.requestBuilder().build(version);

Review comment:
       It's a little odd that we go on and match the request if it is expected 
to raise an unsupported version error. Do we have test cases that depend on 
this? In `NetworkClient`, if we hit an unsupported version error, the response 
body would be null.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListing.java
##########
@@ -17,22 +17,30 @@
 
 package org.apache.kafka.clients.admin;
 
+import java.util.Objects;
+import java.util.Optional;
+
+import org.apache.kafka.common.ConsumerGroupState;
+
 /**
  * A listing of a consumer group in the cluster.
  */
 public class ConsumerGroupListing {
     private final String groupId;
     private final boolean isSimpleConsumerGroup;
+    private final Optional<ConsumerGroupState> state;
 
     /**
      * Create an instance with the specified parameters.
      *
      * @param groupId Group Id
      * @param isSimpleConsumerGroup If consumer group is simple or not.
+     * @param state The state of the consumer group
      */
-    public ConsumerGroupListing(String groupId, boolean isSimpleConsumerGroup) 
{
+    public ConsumerGroupListing(String groupId, boolean isSimpleConsumerGroup, 
Optional<ConsumerGroupState> state) {

Review comment:
       Hmm, seems this is a public class. Would it be safer to add a 
constructor overload?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java
##########
@@ -26,4 +31,30 @@
  */
 @InterfaceStability.Evolving
 public class ListConsumerGroupsOptions extends 
AbstractOptions<ListConsumerGroupsOptions> {
+
+    private Set<ConsumerGroupState> states = Collections.emptySet();
+
+    /**
+     * Only groups in these states will be returned by listConsumerGroups()
+     * If not set, all groups are returned with their states
+     */
+    public ListConsumerGroupsOptions inStates(Set<ConsumerGroupState> states) {
+        this.states = (states == null) ? Collections.emptySet() : new 
HashSet<>(states);
+        return this;
+    }
+
+    /**
+     * All groups with their states will be returned by listConsumerGroups()
+     */
+    public ListConsumerGroupsOptions inAnyState() {

Review comment:
       Do we need this API? Seems this is the default behavior.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -3052,14 +3052,23 @@ void handleResponse(AbstractResponse abstractResponse) {
                     runnable.call(new Call("listConsumerGroups", deadline, new 
ConstantNodeIdProvider(node.id())) {
                         @Override
                         ListGroupsRequest.Builder createRequest(int timeoutMs) 
{
-                            return new ListGroupsRequest.Builder(new 
ListGroupsRequestData());
+                            List<String> states = (options.states().isEmpty())

Review comment:
       nit: I think the parenthesis are unnecessary?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListing.java
##########
@@ -17,22 +17,30 @@
 
 package org.apache.kafka.clients.admin;
 
+import java.util.Objects;
+import java.util.Optional;
+
+import org.apache.kafka.common.ConsumerGroupState;
+
 /**
  * A listing of a consumer group in the cluster.
  */
 public class ConsumerGroupListing {
     private final String groupId;
     private final boolean isSimpleConsumerGroup;
+    private final Optional<ConsumerGroupState> state;
 
     /**
      * Create an instance with the specified parameters.
      *
      * @param groupId Group Id
      * @param isSimpleConsumerGroup If consumer group is simple or not.
+     * @param state The state of the consumer group
      */
-    public ConsumerGroupListing(String groupId, boolean isSimpleConsumerGroup) 
{
+    public ConsumerGroupListing(String groupId, boolean isSimpleConsumerGroup, 
Optional<ConsumerGroupState> state) {
         this.groupId = groupId;
         this.isSimpleConsumerGroup = isSimpleConsumerGroup;
+        this.state = state;

Review comment:
       nit: maybe use `requireNonNull`?

##########
File path: core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
##########
@@ -1004,9 +1061,9 @@ object ConsumerGroupCommand extends Logging {
     val offsetsOpt = parser.accepts("offsets", OffsetsDoc)
                            .availableIf(describeOpt)
     val stateOpt = parser.accepts("state", StateDoc)
-                         .availableIf(describeOpt)
-
-    parser.mutuallyExclusive(membersOpt, offsetsOpt, stateOpt)
+                         .availableIf(describeOpt, listOpt)
+                         .withOptionalArg()

Review comment:
       Since we have an optional argument, do we need to validate that it is 
not provided when `--describe` is used?

##########
File path: core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
##########
@@ -178,12 +200,44 @@ object ConsumerGroupCommand extends Logging {
       } else None
     }
 
-    def listGroups(): List[String] = {
+    def listGroups(): Unit = {
+      if (opts.options.has(opts.stateOpt)) {
+           val stateValue = opts.options.valueOf(opts.stateOpt)
+           val states = if (stateValue == null || stateValue.isEmpty)
+             allStates

Review comment:
       Seems we need to update this now that we use empty list to request all 
states.

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -1312,6 +1324,79 @@ public void testListConsumerGroupsMetadataFailure() 
throws Exception {
         }
     }
 
+    @Test
+    public void testListConsumerGroupsWithStates() throws Exception {
+        try (AdminClientUnitTestEnv env = new 
AdminClientUnitTestEnv(mockCluster(1, 0))) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+            
env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), 
Errors.NONE));
+
+            env.kafkaClient().prepareResponseFrom(
+                new ListGroupsResponse(new ListGroupsResponseData()
+                    .setErrorCode(Errors.NONE.code())
+                    .setGroups(Arrays.asList(
+                            new ListGroupsResponseData.ListedGroup()
+                                .setGroupId("group-1")
+                                
.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+                                .setGroupState("Stable"),
+                            new ListGroupsResponseData.ListedGroup()
+                                .setGroupId("group-2")
+                                .setGroupState("Empty")))),
+                env.cluster().nodeById(0));
+
+            final ListConsumerGroupsOptions options = new 
ListConsumerGroupsOptions().inAnyState();
+            final ListConsumerGroupsResult result = 
env.adminClient().listConsumerGroups(options);
+            Collection<ConsumerGroupListing> listings = result.valid().get();
+
+            assertEquals(2, listings.size());
+            List<ConsumerGroupListing> expected = new ArrayList<>();
+            expected.add(new ConsumerGroupListing("group-2", true, 
Optional.of(ConsumerGroupState.EMPTY)));
+            expected.add(new ConsumerGroupListing("group-1", false, 
Optional.of(ConsumerGroupState.STABLE)));
+            assertEquals(expected, listings);
+            assertEquals(0, result.errors().get().size());
+        }
+    }
+
+    @Test
+    public void testListConsumerGroupsWithStatesOlderBrokerVersion() throws 
Exception {
+        ApiVersion listGroupV3 = new ApiVersion(ApiKeys.LIST_GROUPS.id, 
(short) 0, (short) 3);
+        try (AdminClientUnitTestEnv env = new 
AdminClientUnitTestEnv(mockCluster(1, 0))) {
+            
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(Collections.singletonList(listGroupV3)));
+
+            
env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), 
Errors.NONE));
+
+            // Check we can list groups with older broker if we don't specify 
states
+            env.kafkaClient().prepareResponseFrom(
+                    new ListGroupsResponse(new ListGroupsResponseData()
+                        .setErrorCode(Errors.NONE.code())
+                        .setGroups(Collections.singletonList(
+                                new ListGroupsResponseData.ListedGroup()
+                                    .setGroupId("group-1")
+                                    
.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)))),
+                    env.cluster().nodeById(0));
+            ListConsumerGroupsOptions options = new 
ListConsumerGroupsOptions().inAnyState();
+            ListConsumerGroupsResult result = 
env.adminClient().listConsumerGroups(options);
+            Collection<ConsumerGroupListing> listing = result.all().get();
+            assertEquals(1, listing.size());
+            List<ConsumerGroupListing> expected = 
Collections.singletonList(new ConsumerGroupListing("group-1", false, 
Optional.empty()));
+            assertEquals(expected, listing);
+
+            // But we cannot set a state filter with older broker
+            
env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), 
Errors.NONE));
+            env.kafkaClient().prepareUnsupportedVersionResponse(
+                body -> body instanceof ListGroupsRequest);
+
+            options = new 
ListConsumerGroupsOptions().inStates(Collections.singleton(ConsumerGroupState.STABLE));
+            result = env.adminClient().listConsumerGroups(options);
+            try {
+                result.all().get();
+                fail("Should have thrown");

Review comment:
       nit: you can use `TestUtils.assertFutureThrows`

##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -797,12 +797,17 @@ class GroupCoordinator(val brokerId: Int,
     }
   }
 
-  def handleListGroups(): (Errors, List[GroupOverview]) = {
+  def handleListGroups(states: List[String]): (Errors, List[GroupOverview]) = {

Review comment:
       Could this be a `Set`?

##########
File path: clients/src/main/resources/common/message/ListGroupsRequest.json
##########
@@ -20,8 +20,14 @@
   // Version 1 and 2 are the same as version 0.
   //
   // Version 3 is the first flexible version.
-  "validVersions": "0-3",
+  //
+  // Version 4 adds the StatesFilter field (KIP-518).
+  "validVersions": "0-4",
   "flexibleVersions": "3+",
   "fields": [
+    { "name": "StatesFilter", "type": "[]string", "versions": "4+", 
"nullableVersions": "4+", "default": "null",
+      "about": "The states of the groups we want to list. If empty or null, 
all groups are returned with their state.", "fields": [
+      { "name": "Name", "type": "string", "versions": "4+", "about": "The name 
of the group state" }

Review comment:
       Seems this is ignored because the type of `StatesFilter` is `[]string`. 
Sort of feels like a bug in the schema validation that this gets ignored. In 
any case, should we remove it?

##########
File path: clients/src/main/resources/common/message/ListGroupsRequest.json
##########
@@ -20,8 +20,14 @@
   // Version 1 and 2 are the same as version 0.
   //
   // Version 3 is the first flexible version.
-  "validVersions": "0-3",
+  //
+  // Version 4 adds the StatesFilter field (KIP-518).
+  "validVersions": "0-4",
   "flexibleVersions": "3+",
   "fields": [
+    { "name": "StatesFilter", "type": "[]string", "versions": "4+", 
"nullableVersions": "4+", "default": "null",

Review comment:
       The behavior of null is identical to an empty list in terms of 
semantics. However, if an empty list is provided, the request cannot be 
serialized with older versions because the field is marked as ignorable. Would 
it be simpler if we made this field non-nullable and used the default of an 
empty array so that we don't have this inconsistency?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to