dajac commented on code in PR #10964:
URL: https://github.com/apache/kafka/pull/10964#discussion_r919825138


##########
clients/src/main/java/org/apache/kafka/clients/admin/Admin.java:
##########
@@ -915,23 +916,57 @@ default ListConsumerGroupsResult listConsumerGroups() {
 
     /**
      * List the consumer group offsets available in the cluster.
+     * <p>
+     * @deprecated Since 3.3.
+     * Use {@link #listConsumerGroupOffsets(Map, 
ListConsumerGroupOffsetsOptions)}.
      *
      * @param options The options to use when listing the consumer group 
offsets.
      * @return The ListGroupOffsetsResult
      */
-    ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId, 
ListConsumerGroupOffsetsOptions options);
+    @Deprecated
+    default ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String 
groupId, ListConsumerGroupOffsetsOptions options) {
+        ListConsumerGroupOffsetsOptions listOptions = new 
ListConsumerGroupOffsetsOptions()
+            .requireStable(options.requireStable());
+        ListConsumerGroupOffsetsSpec groupSpec = new 
ListConsumerGroupOffsetsSpec()
+                .topicPartitions(options.topicPartitions());

Review Comment:
   nit: Indentation is not consistent.



##########
clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsResult.java:
##########
@@ -17,33 +17,73 @@
 
 package org.apache.kafka.clients.admin;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import org.apache.kafka.clients.admin.internals.CoordinatorKey;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.annotation.InterfaceStability;
 
-import java.util.Map;
-
 /**
- * The result of the {@link Admin#listConsumerGroupOffsets(String)} call.
+ * The result of the {@link Admin#listConsumerGroupOffsets(Map)} and
+ * {@link Admin#listConsumerGroupOffsets(String)} call.
  * <p>
  * The API of this class is evolving, see {@link Admin} for details.
  */
 @InterfaceStability.Evolving
 public class ListConsumerGroupOffsetsResult {
 
-    final KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> future;
+    final Map<String, KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> 
futures;
 
-    ListConsumerGroupOffsetsResult(KafkaFuture<Map<TopicPartition, 
OffsetAndMetadata>> future) {
-        this.future = future;
+    ListConsumerGroupOffsetsResult(final Map<CoordinatorKey, 
KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> futures) {
+        this.futures = futures.entrySet().stream()
+                .collect(Collectors.toMap(e -> e.getKey().idValue, 
Entry::getValue));
     }
 
     /**
      * Return a future which yields a map of topic partitions to 
OffsetAndMetadata objects.
      * If the group does not have a committed offset for this partition, the 
corresponding value in the returned map will be null.
      */
     public KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> 
partitionsToOffsetAndMetadata() {
-        return future;
+        if (futures.size() != 1) {
+            throw new IllegalStateException("Offsets from multiple consumer 
groups were requested. " +
+                    "Use partitionsToOffsetAndMetadata(groupId) instead to get 
future for a specific group.");
+        }
+        return futures.values().iterator().next();
     }
 
+    /**
+     * Return a future which yields a map of topic partitions to 
OffsetAndMetadata objects for
+     * the specified group. If the group doesn't have a committed offset for a 
specific
+     * partition, the corresponding value in the returned map will be null.
+     */
+    public KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> 
partitionsToOffsetAndMetadata(String groupId) {
+        return futures.get(groupId);

Review Comment:
   I wonder if we should check if the `groupId` was actually requested. We 
could thrown an `IllegalArgumentException` instead of returning null.



##########
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##########
@@ -3263,6 +3282,95 @@ public void testListConsumerGroupOffsets() throws 
Exception {
         }
     }
 
+    @Test
+    public void testBatchedListConsumerGroupOffsets() throws Exception {
+        Cluster cluster = mockCluster(1, 0);
+        Time time = new MockTime();
+        Map<String, ListConsumerGroupOffsetsSpec> groupSpecs = 
batchedListConsumerGroupOffsetsSpec();
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, 
cluster, AdminClientConfig.RETRIES_CONFIG, "0")) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            
env.kafkaClient().prepareResponse(prepareBatchedFindCoordinatorResponse(Errors.NONE,
 env.cluster().controller(), groupSpecs.keySet()));
+
+            ListConsumerGroupOffsetsResult result = 
env.adminClient().listConsumerGroupOffsets(groupSpecs, new 
ListConsumerGroupOffsetsOptions());
+            sendOffsetFetchResponse(env.kafkaClient(), groupSpecs, true);
+
+            verifyListOffsetsForMultipleGroups(groupSpecs, result);
+        }
+    }
+
+    @Test
+    public void testBatchedListConsumerGroupOffsetsWithNoBatchingSupport() 
throws Exception {
+        Cluster cluster = mockCluster(1, 0);
+        Time time = new MockTime();
+        Map<String, ListConsumerGroupOffsetsSpec> groupSpecs = 
batchedListConsumerGroupOffsetsSpec();
+
+        ApiVersion findCoordinatorV3 = new ApiVersion()

Review Comment:
   Should we also add a test where the FindCoordinator supports v4 and 
OffsetFetch does not support v8? I think that this could happen in a partially 
upgraded cluster.



##########
clients/src/test/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandlerTest.java:
##########
@@ -24,52 +24,116 @@
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.GroupIdNotFoundException;
 import org.apache.kafka.common.errors.InvalidGroupIdException;
+import 
org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestGroup;
+import 
org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestTopics;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.OffsetFetchRequest;
 import org.apache.kafka.common.requests.OffsetFetchResponse;
 import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData;
 import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
 import org.junit.jupiter.api.Test;
 
 public class ListConsumerGroupOffsetsHandlerTest {
 
     private final LogContext logContext = new LogContext();
-    private final String groupId = "group-id";
+    private final int throttleMs = 10;
+    private final String groupZero = "group0";
+    private final String groupOne = "group1";
+    private final String groupTwo = "group2";
+    private final List<String> groups = Arrays.asList(groupZero, groupOne, 
groupTwo);
     private final TopicPartition t0p0 = new TopicPartition("t0", 0);
     private final TopicPartition t0p1 = new TopicPartition("t0", 1);
     private final TopicPartition t1p0 = new TopicPartition("t1", 0);
     private final TopicPartition t1p1 = new TopicPartition("t1", 1);
-    private final List<TopicPartition> tps = Arrays.asList(t0p0, t0p1, t1p0, 
t1p1);
+    private final TopicPartition t2p0 = new TopicPartition("t2", 0);
+    private final TopicPartition t2p1 = new TopicPartition("t2", 1);
+    private final TopicPartition t2p2 = new TopicPartition("t2", 2);
+    private final Map<String, ListConsumerGroupOffsetsSpec> singleRequestMap = 
Collections.singletonMap(groupZero,
+            new 
ListConsumerGroupOffsetsSpec().topicPartitions(Arrays.asList(t0p0, t0p1, t1p0, 
t1p1)));
+    private final Map<String, ListConsumerGroupOffsetsSpec> batchedRequestMap =
+            new HashMap<String, ListConsumerGroupOffsetsSpec>() {{
+                put(groupZero, new 
ListConsumerGroupOffsetsSpec().topicPartitions(singletonList(t0p0)));
+                put(groupOne, new 
ListConsumerGroupOffsetsSpec().topicPartitions(Arrays.asList(t0p0, t1p0, 
t1p1)));
+                put(groupTwo, new 
ListConsumerGroupOffsetsSpec().topicPartitions(Arrays.asList(t0p0, t1p0, t1p1, 
t2p0, t2p1, t2p2)));
+            }};
 
     @Test
     public void testBuildRequest() {
-        ListConsumerGroupOffsetsHandler handler = new 
ListConsumerGroupOffsetsHandler(groupId, tps, logContext);
-        OffsetFetchRequest request = handler.buildBatchedRequest(1, 
singleton(CoordinatorKey.byGroupId(groupId))).build();
-        assertEquals(groupId, request.data().groups().get(0).groupId());
+        ListConsumerGroupOffsetsHandler handler =
+            new ListConsumerGroupOffsetsHandler(singleRequestMap, false, 
logContext);
+        OffsetFetchRequest request = handler.buildBatchedRequest(1, 
coordinatorKeys(groupZero)).build();
+        assertEquals(groupZero, request.data().groups().get(0).groupId());
         assertEquals(2, request.data().groups().get(0).topics().size());
         assertEquals(2, 
request.data().groups().get(0).topics().get(0).partitionIndexes().size());
         assertEquals(2, 
request.data().groups().get(0).topics().get(1).partitionIndexes().size());
     }
 
+    @Test
+    public void testBuildRequestWithMultipleGroups() {
+        Map<String, ListConsumerGroupOffsetsSpec> requestMap = new 
HashMap<>(this.batchedRequestMap);
+        String groupThree = "group3";
+        requestMap.put(groupThree, new ListConsumerGroupOffsetsSpec()
+                .topicPartitions(Arrays.asList(new TopicPartition("t3", 0), 
new TopicPartition("t3", 1))));
+
+        ListConsumerGroupOffsetsHandler handler = new 
ListConsumerGroupOffsetsHandler(requestMap, false, logContext);
+        OffsetFetchRequest request1 = handler.buildBatchedRequest(1, 
coordinatorKeys(groupZero, groupOne, groupTwo)).build();
+        assertEquals(Utils.mkSet(groupZero, groupOne, groupTwo), 
requestGroups(request1));
+
+        OffsetFetchRequest request2 = handler.buildBatchedRequest(2, 
coordinatorKeys(groupThree)).build();
+        assertEquals(Utils.mkSet(groupThree), requestGroups(request2));
+
+        Map<String, ListConsumerGroupOffsetsSpec> builtRequests = new 
HashMap<>();
+        request1.groupIdsToPartitions().forEach((group, partitions) ->
+                builtRequests.put(group, new 
ListConsumerGroupOffsetsSpec().topicPartitions(partitions)));
+        request2.groupIdsToPartitions().forEach((group, partitions) ->
+                builtRequests.put(group, new 
ListConsumerGroupOffsetsSpec().topicPartitions(partitions)));
+
+        assertEquals(requestMap, builtRequests);
+        Map<String, List<OffsetFetchRequestTopics>> groupIdsToTopics = 
request1.groupIdsToTopics();
+
+        assertEquals(1, groupIdsToTopics.get(groupZero).size());
+        assertEquals(2, groupIdsToTopics.get(groupOne).size());
+        assertEquals(3, groupIdsToTopics.get(groupTwo).size());
+
+        assertEquals(1, 
groupIdsToTopics.get(groupZero).get(0).partitionIndexes().size());
+        assertEquals(1, 
groupIdsToTopics.get(groupOne).get(0).partitionIndexes().size());
+        assertEquals(2, 
groupIdsToTopics.get(groupOne).get(1).partitionIndexes().size());
+        assertEquals(1, 
groupIdsToTopics.get(groupTwo).get(0).partitionIndexes().size());
+        assertEquals(2, 
groupIdsToTopics.get(groupTwo).get(1).partitionIndexes().size());
+        assertEquals(3, 
groupIdsToTopics.get(groupTwo).get(2).partitionIndexes().size());
+
+        groupIdsToTopics = request2.groupIdsToTopics();
+        assertEquals(1, groupIdsToTopics.get(groupThree).size());

Review Comment:
   ditto.



##########
core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala:
##########
@@ -753,9 +753,10 @@ object ConsumerGroupCommand extends Logging {
 
     private def getCommittedOffsets(groupId: String): Map[TopicPartition, 
OffsetAndMetadata] = {
       adminClient.listConsumerGroupOffsets(
-        groupId,
-        withTimeoutMs(new ListConsumerGroupOffsetsOptions)
-      ).partitionsToOffsetAndMetadata.get.asScala
+        Collections.singletonMap(groupId, new ListConsumerGroupOffsetsSpec),
+        withTimeoutMs(new ListConsumerGroupOffsetsOptions()))
+        .partitionsToOffsetAndMetadata(groupId)
+        .get().asScala

Review Comment:
   nit: I had a hard time reading this block. I somehow prefer the previous 
format, I guess. I leave this up to you.



##########
clients/src/test/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandlerTest.java:
##########
@@ -150,21 +347,62 @@ private void assertCompleted(
         AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition, 
OffsetAndMetadata>> result,
         Map<TopicPartition, OffsetAndMetadata> expected
     ) {
-        CoordinatorKey key = CoordinatorKey.byGroupId(groupId);
+        CoordinatorKey key = CoordinatorKey.byGroupId(groupZero);
         assertEquals(emptySet(), result.failedKeys.keySet());
         assertEquals(emptyList(), result.unmappedKeys);
         assertEquals(singleton(key), result.completedKeys.keySet());
-        assertEquals(expected, 
result.completedKeys.get(CoordinatorKey.byGroupId(groupId)));
+        assertEquals(expected, result.completedKeys.get(key));
+    }
+
+    private void assertCompletedForMultipleGroups(
+        AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition, 
OffsetAndMetadata>> result,
+        Map<String, Map<TopicPartition, OffsetAndMetadata>> expected) {

Review Comment:
   nit: Could we format new methods like the other ones in this file in order 
to stay consistent. See `assertCompleted` for instance. 



##########
clients/src/test/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandlerTest.java:
##########
@@ -24,52 +24,116 @@
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.GroupIdNotFoundException;
 import org.apache.kafka.common.errors.InvalidGroupIdException;
+import 
org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestGroup;
+import 
org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestTopics;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.OffsetFetchRequest;
 import org.apache.kafka.common.requests.OffsetFetchResponse;
 import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData;
 import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
 import org.junit.jupiter.api.Test;
 
 public class ListConsumerGroupOffsetsHandlerTest {
 
     private final LogContext logContext = new LogContext();
-    private final String groupId = "group-id";
+    private final int throttleMs = 10;
+    private final String groupZero = "group0";
+    private final String groupOne = "group1";
+    private final String groupTwo = "group2";
+    private final List<String> groups = Arrays.asList(groupZero, groupOne, 
groupTwo);
     private final TopicPartition t0p0 = new TopicPartition("t0", 0);
     private final TopicPartition t0p1 = new TopicPartition("t0", 1);
     private final TopicPartition t1p0 = new TopicPartition("t1", 0);
     private final TopicPartition t1p1 = new TopicPartition("t1", 1);
-    private final List<TopicPartition> tps = Arrays.asList(t0p0, t0p1, t1p0, 
t1p1);
+    private final TopicPartition t2p0 = new TopicPartition("t2", 0);
+    private final TopicPartition t2p1 = new TopicPartition("t2", 1);
+    private final TopicPartition t2p2 = new TopicPartition("t2", 2);
+    private final Map<String, ListConsumerGroupOffsetsSpec> singleRequestMap = 
Collections.singletonMap(groupZero,
+            new 
ListConsumerGroupOffsetsSpec().topicPartitions(Arrays.asList(t0p0, t0p1, t1p0, 
t1p1)));
+    private final Map<String, ListConsumerGroupOffsetsSpec> batchedRequestMap =
+            new HashMap<String, ListConsumerGroupOffsetsSpec>() {{
+                put(groupZero, new 
ListConsumerGroupOffsetsSpec().topicPartitions(singletonList(t0p0)));
+                put(groupOne, new 
ListConsumerGroupOffsetsSpec().topicPartitions(Arrays.asList(t0p0, t1p0, 
t1p1)));
+                put(groupTwo, new 
ListConsumerGroupOffsetsSpec().topicPartitions(Arrays.asList(t0p0, t1p0, t1p1, 
t2p0, t2p1, t2p2)));
+            }};
 
     @Test
     public void testBuildRequest() {
-        ListConsumerGroupOffsetsHandler handler = new 
ListConsumerGroupOffsetsHandler(groupId, tps, logContext);
-        OffsetFetchRequest request = handler.buildBatchedRequest(1, 
singleton(CoordinatorKey.byGroupId(groupId))).build();
-        assertEquals(groupId, request.data().groups().get(0).groupId());
+        ListConsumerGroupOffsetsHandler handler =
+            new ListConsumerGroupOffsetsHandler(singleRequestMap, false, 
logContext);
+        OffsetFetchRequest request = handler.buildBatchedRequest(1, 
coordinatorKeys(groupZero)).build();
+        assertEquals(groupZero, request.data().groups().get(0).groupId());
         assertEquals(2, request.data().groups().get(0).topics().size());
         assertEquals(2, 
request.data().groups().get(0).topics().get(0).partitionIndexes().size());
         assertEquals(2, 
request.data().groups().get(0).topics().get(1).partitionIndexes().size());
     }
 
+    @Test
+    public void testBuildRequestWithMultipleGroups() {
+        Map<String, ListConsumerGroupOffsetsSpec> requestMap = new 
HashMap<>(this.batchedRequestMap);
+        String groupThree = "group3";
+        requestMap.put(groupThree, new ListConsumerGroupOffsetsSpec()
+                .topicPartitions(Arrays.asList(new TopicPartition("t3", 0), 
new TopicPartition("t3", 1))));
+
+        ListConsumerGroupOffsetsHandler handler = new 
ListConsumerGroupOffsetsHandler(requestMap, false, logContext);
+        OffsetFetchRequest request1 = handler.buildBatchedRequest(1, 
coordinatorKeys(groupZero, groupOne, groupTwo)).build();
+        assertEquals(Utils.mkSet(groupZero, groupOne, groupTwo), 
requestGroups(request1));
+
+        OffsetFetchRequest request2 = handler.buildBatchedRequest(2, 
coordinatorKeys(groupThree)).build();
+        assertEquals(Utils.mkSet(groupThree), requestGroups(request2));
+
+        Map<String, ListConsumerGroupOffsetsSpec> builtRequests = new 
HashMap<>();
+        request1.groupIdsToPartitions().forEach((group, partitions) ->
+                builtRequests.put(group, new 
ListConsumerGroupOffsetsSpec().topicPartitions(partitions)));
+        request2.groupIdsToPartitions().forEach((group, partitions) ->
+                builtRequests.put(group, new 
ListConsumerGroupOffsetsSpec().topicPartitions(partitions)));
+
+        assertEquals(requestMap, builtRequests);
+        Map<String, List<OffsetFetchRequestTopics>> groupIdsToTopics = 
request1.groupIdsToTopics();
+
+        assertEquals(1, groupIdsToTopics.get(groupZero).size());

Review Comment:
   nit: Should we assert `groupIdsToTopics.size()`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to