[ 
https://issues.apache.org/jira/browse/KAFKA-6058?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16438777#comment-16438777
 ] 

ASF GitHub Bot commented on KAFKA-6058:
---------------------------------------

guozhangwang closed pull request #4856: KAFKA-6058: Refactor consumer API 
result return types
URL: https://github.com/apache/kafka/pull/4856
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java
index b4bce264405..dd6835cf10c 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java
@@ -29,13 +29,24 @@
  */
 @InterfaceStability.Evolving
 public class DeleteConsumerGroupsResult {
-    final KafkaFuture<Map<String, KafkaFuture<Void>>> futures;
+    private final Map<String, KafkaFuture<Void>> futures;
 
-    DeleteConsumerGroupsResult(KafkaFuture<Map<String, KafkaFuture<Void>>> 
futures) {
+    DeleteConsumerGroupsResult(final Map<String, KafkaFuture<Void>> futures) {
         this.futures = futures;
     }
 
-    public KafkaFuture<Map<String, KafkaFuture<Void>>> deletedGroups() {
+    /**
+     * Return a map from group id to futures which can be used to check the 
status of
+     * individual deletions.
+     */
+    public Map<String, KafkaFuture<Void>> deletedGroups() {
         return futures;
     }
+
+    /**
+     * Return a future which succeeds only if all the consumer group deletions 
succeed.
+     */
+    public KafkaFuture<Void> all() {
+        return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0]));
+    }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsResult.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsResult.java
index adde031b678..ac2189cc6dc 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsResult.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsResult.java
@@ -32,16 +32,23 @@
 @InterfaceStability.Evolving
 public class DescribeConsumerGroupsResult {
 
-    private final KafkaFuture<Map<String, 
KafkaFuture<ConsumerGroupDescription>>> futures;
+    private final Map<String, KafkaFuture<ConsumerGroupDescription>> futures;
 
-    public DescribeConsumerGroupsResult(KafkaFuture<Map<String, 
KafkaFuture<ConsumerGroupDescription>>> futures) {
+    public DescribeConsumerGroupsResult(final Map<String, 
KafkaFuture<ConsumerGroupDescription>> futures) {
         this.futures = futures;
     }
 
     /**
-     * Return a map from group name to futures which can be used to check the 
description of a consumer group.
+     * Return a map from group id to futures which can be used to check the 
description of a consumer group.
      */
-    public KafkaFuture<Map<String, KafkaFuture<ConsumerGroupDescription>>> 
describedGroups() {
+    public Map<String, KafkaFuture<ConsumerGroupDescription>> 
describedGroups() {
         return futures;
     }
+
+    /**
+     * Return a future which succeeds only if all the consumer group 
description succeed.
+     */
+    public KafkaFuture<Void> all() {
+        return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0]));
+    }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 50bcfd38856..fa3f943555b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -46,6 +46,7 @@
 import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.BrokerNotAvailableException;
 import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.InvalidGroupIdException;
 import org.apache.kafka.common.errors.InvalidRequestException;
 import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.RetriableException;
@@ -53,6 +54,7 @@
 import org.apache.kafka.common.errors.UnknownServerException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.internals.Topic;
 import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
@@ -916,8 +918,11 @@ private void failCalls(long now, List<Call> calls, 
AuthenticationException authe
          * @param correlationIdToCall   A map of correlation IDs to calls.
          * @param callsInFlight         A map of nodes to the calls they have 
in flight.
         **/
-        private void handleResponses(long now, List<ClientResponse> responses, 
Map<String, List<Call>> callsInFlight,
-                Map<Integer, Call> correlationIdToCall) {
+        private void handleResponses(long now,
+                                     List<ClientResponse> responses,
+                                     Map<String, List<Call>> callsInFlight,
+                                     Map<Integer, Call> correlationIdToCall) {
+
             for (ClientResponse response : responses) {
                 int correlationId = response.requestHeader().correlationId();
 
@@ -1108,7 +1113,11 @@ void call(Call call, long now) {
      * those policies on the server, so that they can be changed in the future 
if needed.
      */
     private static boolean topicNameIsUnrepresentable(String topicName) {
-        return (topicName == null) || topicName.isEmpty();
+        return topicName == null || topicName.isEmpty();
+    }
+
+    private static boolean groupIdIsUnrepresentable(String groupId) {
+        return groupId == null;
     }
 
     @Override
@@ -1951,6 +1960,7 @@ void handleFailure(Throwable throwable) {
         return new DescribeReplicaLogDirsResult(new 
HashMap<TopicPartitionReplica, KafkaFuture<ReplicaLogDirInfo>>(futures));
     }
 
+    @Override
     public CreatePartitionsResult createPartitions(Map<String, NewPartitions> 
newPartitions,
                                                    final 
CreatePartitionsOptions options) {
         final Map<String, KafkaFutureImpl<Void>> futures = new 
HashMap<>(newPartitions.size());
@@ -1989,6 +1999,7 @@ void handleFailure(Throwable throwable) {
         return new CreatePartitionsResult(new HashMap<String, 
KafkaFuture<Void>>(futures));
     }
 
+    @Override
     public DeleteRecordsResult deleteRecords(final Map<TopicPartition, 
RecordsToDelete> recordsToDelete,
                                              final DeleteRecordsOptions 
options) {
 
@@ -2228,20 +2239,30 @@ void handleFailure(Throwable throwable) {
     @Override
     public DescribeConsumerGroupsResult describeConsumerGroups(final 
Collection<String> groupIds,
                                                                final 
DescribeConsumerGroupsOptions options) {
-        final KafkaFutureImpl<Map<String, 
KafkaFuture<ConsumerGroupDescription>>> resultFutures = new KafkaFutureImpl<>();
-        final Map<String, KafkaFutureImpl<ConsumerGroupDescription>> 
consumerGroupFutures = new HashMap<>(groupIds.size());
-        final ArrayList<String> groupIdList = new ArrayList<>();
-        for (String groupId : groupIds) {
-            if (!consumerGroupFutures.containsKey(groupId)) {
-                consumerGroupFutures.put(groupId, new 
KafkaFutureImpl<ConsumerGroupDescription>());
-                groupIdList.add(groupId);
+
+        final Map<String, KafkaFutureImpl<ConsumerGroupDescription>> futures = 
new HashMap<>(groupIds.size());
+        for (String groupId: groupIds) {
+            if (groupIdIsUnrepresentable(groupId)) {
+                KafkaFutureImpl<ConsumerGroupDescription> future = new 
KafkaFutureImpl<>();
+                future.completeExceptionally(new InvalidGroupIdException("The 
given group id '" +
+                        groupId + "' cannot be represented in a request."));
+                futures.put(groupId, future);
+            } else if (!futures.containsKey(groupId)) {
+                futures.put(groupId, new 
KafkaFutureImpl<ConsumerGroupDescription>());
             }
         }
 
-        for (final String groupId : groupIdList) {
+        // TODO: KAFKA-6788, we should consider grouping the request per 
coordinator and send one request with a list of
+        // all consumer groups this coordinator host
+        for (final Map.Entry<String, 
KafkaFutureImpl<ConsumerGroupDescription>> entry : futures.entrySet()) {
+            // skip sending request for those futures that already failed.
+            if (entry.getValue().isCompletedExceptionally())
+                continue;
+
+            final String groupId = entry.getKey();
 
-            final long nowFindCoordinator = time.milliseconds();
-            final long deadline = calcDeadlineMs(nowFindCoordinator, 
options.timeoutMs());
+            final long startFindCoordinatorMs = time.milliseconds();
+            final long deadline = calcDeadlineMs(startFindCoordinatorMs, 
options.timeoutMs());
 
             runnable.call(new Call("findCoordinator", deadline, new 
LeastLoadedNodeProvider()) {
                 @Override
@@ -2261,23 +2282,21 @@ void handleResponse(AbstractResponse abstractResponse) {
 
                         @Override
                         AbstractRequest.Builder createRequest(int timeoutMs) {
-                            return new 
DescribeGroupsRequest.Builder(groupIdList);
+                            return new 
DescribeGroupsRequest.Builder(Collections.singletonList(groupId));
                         }
 
                         @Override
                         void handleResponse(AbstractResponse abstractResponse) 
{
                             final DescribeGroupsResponse response = 
(DescribeGroupsResponse) abstractResponse;
-                            // Handle server responses for particular groupId.
-                            for (Map.Entry<String, 
KafkaFutureImpl<ConsumerGroupDescription>> entry : 
consumerGroupFutures.entrySet()) {
-                                final String groupId = entry.getKey();
-                                final 
KafkaFutureImpl<ConsumerGroupDescription> future = entry.getValue();
-                                final DescribeGroupsResponse.GroupMetadata 
groupMetadata = response.groups().get(groupId);
-                                final Errors groupError = 
groupMetadata.error();
-                                if (groupError != Errors.NONE) {
-                                    
future.completeExceptionally(groupError.exception());
-                                    continue;
-                                }
 
+                            KafkaFutureImpl<ConsumerGroupDescription> future = 
futures.get(groupId);
+                            final DescribeGroupsResponse.GroupMetadata 
groupMetadata = response.groups().get(groupId);
+
+                            final Errors groupError = groupMetadata.error();
+                            if (groupError != Errors.NONE) {
+                                // TODO: KAFKA-6789, we can retry based on the 
error code
+                                
future.completeExceptionally(groupError.exception());
+                            } else {
                                 final String protocolType = 
groupMetadata.protocolType();
                                 if 
(protocolType.equals(ConsumerProtocol.PROTOCOL_TYPE) || protocolType.isEmpty()) 
{
                                     final 
List<DescribeGroupsResponse.GroupMember> members = groupMetadata.members();
@@ -2306,27 +2325,28 @@ void handleResponse(AbstractResponse abstractResponse) {
 
                         @Override
                         void handleFailure(Throwable throwable) {
-                            
completeAllExceptionally(consumerGroupFutures.values(), throwable);
+                            KafkaFutureImpl<ConsumerGroupDescription> future = 
futures.get(groupId);
+                            future.completeExceptionally(throwable);
                         }
                     }, nowDescribeConsumerGroups);
-
-                    resultFutures.complete(new HashMap<String, 
KafkaFuture<ConsumerGroupDescription>>(consumerGroupFutures));
                 }
 
                 @Override
                 void handleFailure(Throwable throwable) {
-                    resultFutures.completeExceptionally(throwable);
+                    KafkaFutureImpl<ConsumerGroupDescription> future = 
futures.get(groupId);
+                    future.completeExceptionally(throwable);
                 }
-            }, nowFindCoordinator);
+            }, startFindCoordinatorMs);
         }
 
-        return new DescribeConsumerGroupsResult(resultFutures);
+        return new DescribeConsumerGroupsResult(new HashMap<String, 
KafkaFuture<ConsumerGroupDescription>>(futures));
     }
 
     @Override
     public ListConsumerGroupsResult 
listConsumerGroups(ListConsumerGroupsOptions options) {
-        //final KafkaFutureImpl<Map<Node, 
KafkaFuture<Collection<ConsumerGroupListing>>>> nodeAndConsumerGroupListing = 
new KafkaFutureImpl<>();
-        final KafkaFutureImpl<Collection<ConsumerGroupListing>> future = new 
KafkaFutureImpl<Collection<ConsumerGroupListing>>();
+        final Map<Node, KafkaFutureImpl<Collection<ConsumerGroupListing>>> 
futuresMap = new HashMap<>();
+        final KafkaFutureImpl<Collection<ConsumerGroupListing>> flattenFuture 
= new KafkaFutureImpl<>();
+        final KafkaFutureImpl<Void> listFuture = new KafkaFutureImpl<>();
 
         final long nowMetadata = time.milliseconds();
         final long deadline = calcDeadlineMs(nowMetadata, options.timeoutMs());
@@ -2334,49 +2354,74 @@ public ListConsumerGroupsResult 
listConsumerGroups(ListConsumerGroupsOptions opt
         runnable.call(new Call("listNodes", deadline, new 
LeastLoadedNodeProvider()) {
             @Override
             AbstractRequest.Builder createRequest(int timeoutMs) {
-                return new 
MetadataRequest.Builder(Collections.<String>emptyList(), true);
+                return new 
MetadataRequest.Builder(Collections.singletonList(Topic.GROUP_METADATA_TOPIC_NAME),
 true);
             }
 
             @Override
             void handleResponse(AbstractResponse abstractResponse) {
                 MetadataResponse metadataResponse = (MetadataResponse) 
abstractResponse;
 
-                final Map<Node, 
KafkaFutureImpl<Collection<ConsumerGroupListing>>> futures = new HashMap<>();
-
-                for (final Node node : metadataResponse.brokers()) {
-                    futures.put(node, new 
KafkaFutureImpl<Collection<ConsumerGroupListing>>());
+                for (final MetadataResponse.TopicMetadata metadata : 
metadataResponse.topicMetadata()) {
+                    if 
(metadata.topic().equals(Topic.GROUP_METADATA_TOPIC_NAME)) {
+                        for (final MetadataResponse.PartitionMetadata 
partitionMetadata : metadata.partitionMetadata()) {
+                            final Node leader = partitionMetadata.leader();
+                            if (partitionMetadata.error() != Errors.NONE) {
+                                // TODO: KAFKA-6789, retry based on the error 
code
+                                
KafkaFutureImpl<Collection<ConsumerGroupListing>> future = new 
KafkaFutureImpl<>();
+                                
future.completeExceptionally(partitionMetadata.error().exception());
+                                // if it is the leader not found error, then 
the leader might be NoNode; if there are more than
+                                // one such error, we will only have one entry 
in the map. For now it is okay since we are not
+                                // guaranteeing to return the full list of 
consumers still.
+                                futuresMap.put(leader, future);
+                            } else {
+                                futuresMap.put(leader, new 
KafkaFutureImpl<Collection<ConsumerGroupListing>>());
+                            }
+                        }
+                        listFuture.complete(null);
+                    } else {
+                        if (metadata.error() != Errors.NONE)
+                            
listFuture.completeExceptionally(metadata.error().exception());
+                        else
+                            listFuture.completeExceptionally(new 
IllegalStateException("Unexpected topic metadata for "
+                                    + metadata.topic() + " is returned; cannot 
find the brokers to query consumer listings."));
+                    }
                 }
 
-                future.combine(futures.values().toArray(new 
KafkaFuture[0])).thenApply(
-                        new 
KafkaFuture.BaseFunction<Collection<ConsumerGroupListing>, 
Collection<ConsumerGroupListing>>() {
+                // we have to flatten the future here instead in the result, 
because we need to wait until the map of nodes
+                // are known from the listNode request.
+                flattenFuture.copyWith(
+                        KafkaFuture.allOf(futuresMap.values().toArray(new 
KafkaFuture[0])),
+                        new KafkaFuture.BaseFunction<Void, 
Collection<ConsumerGroupListing>>() {
                             @Override
-                            public Collection<ConsumerGroupListing> 
apply(Collection<ConsumerGroupListing> v) {
+                            public Collection<ConsumerGroupListing> apply(Void 
v) {
                                 List<ConsumerGroupListing> listings = new 
ArrayList<>();
-                                for (Map.Entry<Node, 
KafkaFutureImpl<Collection<ConsumerGroupListing>>> entry : futures.entrySet()) {
+                                for (Map.Entry<Node, 
KafkaFutureImpl<Collection<ConsumerGroupListing>>> entry : 
futuresMap.entrySet()) {
                                     Collection<ConsumerGroupListing> results;
                                     try {
                                         results = entry.getValue().get();
+                                        listings.addAll(results);
                                     } catch (Throwable e) {
-                                        // This should be unreachable, since 
the future returned by KafkaFuture#allOf should
-                                        // have failed if any Future failed.
-                                        throw new 
KafkaException("ListConsumerGroupsResult#listings(): internal error", e);
+                                        // This should be unreachable, because 
allOf ensured that all the futures
+                                        // completed successfully.
+                                        throw new RuntimeException(e);
                                     }
-                                    listings.addAll(results);
                                 }
                                 return listings;
                             }
                         });
 
+                for (final Map.Entry<Node, 
KafkaFutureImpl<Collection<ConsumerGroupListing>>> entry : 
futuresMap.entrySet()) {
+                    // skip sending the request for those futures who have 
already failed
+                    if (entry.getValue().isCompletedExceptionally())
+                        continue;
 
-                for (final Map.Entry<Node, 
KafkaFutureImpl<Collection<ConsumerGroupListing>>> entry : futures.entrySet()) {
                     final long nowList = time.milliseconds();
 
                     final int brokerId = entry.getKey().id();
+                    final KafkaFutureImpl<Collection<ConsumerGroupListing>> 
future = entry.getValue();
 
                     runnable.call(new Call("listConsumerGroups", deadline, new 
ConstantNodeIdProvider(brokerId)) {
 
-                        private final 
KafkaFutureImpl<Collection<ConsumerGroupListing>> future = entry.getValue();
-
                         @Override
                         AbstractRequest.Builder createRequest(int timeoutMs) {
                             return new ListGroupsRequest.Builder();
@@ -2385,21 +2430,26 @@ void handleResponse(AbstractResponse abstractResponse) {
                         @Override
                         void handleResponse(AbstractResponse abstractResponse) 
{
                             final ListGroupsResponse response = 
(ListGroupsResponse) abstractResponse;
-                            final List<ConsumerGroupListing> groupsListing = 
new ArrayList<>();
-                            for (ListGroupsResponse.Group group : 
response.groups()) {
-                                if 
(group.protocolType().equals(ConsumerProtocol.PROTOCOL_TYPE) || 
group.protocolType().isEmpty()) {
-                                    final String groupId = group.groupId();
-                                    final String protocolType = 
group.protocolType();
-                                    final ConsumerGroupListing groupListing = 
new ConsumerGroupListing(groupId, protocolType.isEmpty());
-                                    groupsListing.add(groupListing);
+
+                            if (response.error() != Errors.NONE) {
+                                
future.completeExceptionally(response.error().exception());
+                            } else {
+                                final List<ConsumerGroupListing> groupsListing 
= new ArrayList<>();
+                                for (ListGroupsResponse.Group group : 
response.groups()) {
+                                    if 
(group.protocolType().equals(ConsumerProtocol.PROTOCOL_TYPE) || 
group.protocolType().isEmpty()) {
+                                        final String groupId = group.groupId();
+                                        final String protocolType = 
group.protocolType();
+                                        final ConsumerGroupListing 
groupListing = new ConsumerGroupListing(groupId, protocolType.isEmpty());
+                                        groupsListing.add(groupListing);
+                                    }
                                 }
+                                future.complete(groupsListing);
                             }
-                            future.complete(groupsListing);
                         }
 
                         @Override
                         void handleFailure(Throwable throwable) {
-                            completeAllExceptionally(futures.values(), 
throwable);
+                            future.completeExceptionally(throwable);
                         }
                     }, nowList);
 
@@ -2408,19 +2458,19 @@ void handleFailure(Throwable throwable) {
 
             @Override
             void handleFailure(Throwable throwable) {
-                future.completeExceptionally(throwable);
+                listFuture.completeExceptionally(throwable);
             }
         }, nowMetadata);
 
-        return new ListConsumerGroupsResult(future);
+        return new ListConsumerGroupsResult(listFuture, flattenFuture, 
futuresMap);
     }
 
     @Override
     public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(final 
String groupId, final ListConsumerGroupOffsetsOptions options) {
         final KafkaFutureImpl<Map<TopicPartition, OffsetAndMetadata>> 
groupOffsetListingFuture = new KafkaFutureImpl<>();
 
-        final long nowFindCoordinator = time.milliseconds();
-        final long deadline = calcDeadlineMs(nowFindCoordinator, 
options.timeoutMs());
+        final long startFindCoordinatorMs = time.milliseconds();
+        final long deadline = calcDeadlineMs(startFindCoordinatorMs, 
options.timeoutMs());
 
         runnable.call(new Call("findCoordinator", deadline, new 
LeastLoadedNodeProvider()) {
             @Override
@@ -2446,14 +2496,25 @@ void handleResponse(AbstractResponse abstractResponse) {
                     void handleResponse(AbstractResponse abstractResponse) {
                         final OffsetFetchResponse response = 
(OffsetFetchResponse) abstractResponse;
                         final Map<TopicPartition, OffsetAndMetadata> 
groupOffsetsListing = new HashMap<>();
-                        for (Map.Entry<TopicPartition, 
OffsetFetchResponse.PartitionData> entry :
-                                response.responseData().entrySet()) {
-                            final TopicPartition topicPartition = 
entry.getKey();
-                            final Long offset = entry.getValue().offset;
-                            final String metadata = entry.getValue().metadata;
-                            groupOffsetsListing.put(topicPartition, new 
OffsetAndMetadata(offset, metadata));
+
+                        if (response.hasError()) {
+                            
groupOffsetListingFuture.completeExceptionally(response.error().exception());
+                        } else {
+                            for (Map.Entry<TopicPartition, 
OffsetFetchResponse.PartitionData> entry :
+                                    response.responseData().entrySet()) {
+                                final TopicPartition topicPartition = 
entry.getKey();
+                                final Errors error = entry.getValue().error;
+
+                                if (error == Errors.NONE) {
+                                    final Long offset = 
entry.getValue().offset;
+                                    final String metadata = 
entry.getValue().metadata;
+                                    groupOffsetsListing.put(topicPartition, 
new OffsetAndMetadata(offset, metadata));
+                                } else {
+                                    log.warn("Skipping return offset for {} 
due to error {}.", topicPartition, error);
+                                }
+                            }
+                            
groupOffsetListingFuture.complete(groupOffsetsListing);
                         }
-                        groupOffsetListingFuture.complete(groupOffsetsListing);
                     }
 
                     @Override
@@ -2467,27 +2528,35 @@ void handleFailure(Throwable throwable) {
             void handleFailure(Throwable throwable) {
                 groupOffsetListingFuture.completeExceptionally(throwable);
             }
-        }, nowFindCoordinator);
+        }, startFindCoordinatorMs);
 
         return new ListConsumerGroupOffsetsResult(groupOffsetListingFuture);
     }
 
     @Override
     public DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String> 
groupIds, DeleteConsumerGroupsOptions options) {
-        final KafkaFutureImpl<Map<String, KafkaFuture<Void>>> 
deleteConsumerGroupsFuture = new KafkaFutureImpl<>();
-        final Map<String, KafkaFutureImpl<Void>> deleteConsumerGroupFutures = 
new HashMap<>(groupIds.size());
-        final Set<String> groupIdList = new HashSet<>();
-        for (String groupId : groupIds) {
-            if (!deleteConsumerGroupFutures.containsKey(groupId)) {
-                deleteConsumerGroupFutures.put(groupId, new 
KafkaFutureImpl<Void>());
-                groupIdList.add(groupId);
+
+        final Map<String, KafkaFutureImpl<Void>> futures = new 
HashMap<>(groupIds.size());
+        for (String groupId: groupIds) {
+            if (groupIdIsUnrepresentable(groupId)) {
+                KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
+                future.completeExceptionally(new ApiException("The given group 
id '" +
+                        groupId + "' cannot be represented in a request."));
+                futures.put(groupId, future);
+            } else if (!futures.containsKey(groupId)) {
+                futures.put(groupId, new KafkaFutureImpl<Void>());
             }
         }
 
-        for (final String groupId : groupIdList) {
+        // TODO: KAFKA-6788, we should consider grouping the request per 
coordinator and send one request with a list of
+        // all consumer groups this coordinator host
+        for (final String groupId : groupIds) {
+            // skip sending request for those futures that already failed.
+            if (futures.get(groupId).isCompletedExceptionally())
+                continue;
 
-            final long nowFindCoordinator = time.milliseconds();
-            final long deadline = calcDeadlineMs(nowFindCoordinator, 
options.timeoutMs());
+            final long startFindCoordinatorMs = time.milliseconds();
+            final long deadline = calcDeadlineMs(startFindCoordinatorMs, 
options.timeoutMs());
 
             runnable.call(new Call("findCoordinator", deadline, new 
LeastLoadedNodeProvider()) {
                 @Override
@@ -2513,36 +2582,33 @@ void handleResponse(AbstractResponse abstractResponse) {
                         @Override
                         void handleResponse(AbstractResponse abstractResponse) 
{
                             final DeleteGroupsResponse response = 
(DeleteGroupsResponse) abstractResponse;
-                            // Handle server responses for particular groupId.
-                            for (Map.Entry<String, KafkaFutureImpl<Void>> 
entry : deleteConsumerGroupFutures.entrySet()) {
-                                final String groupId = entry.getKey();
-                                final KafkaFutureImpl<Void> future = 
entry.getValue();
-                                final Errors groupError = 
response.get(groupId);
-                                if (groupError != Errors.NONE) {
-                                    
future.completeExceptionally(groupError.exception());
-                                    continue;
-                                }
 
+                            KafkaFutureImpl<Void> future = 
futures.get(groupId);
+                            final Errors groupError = response.get(groupId);
+
+                            if (groupError != Errors.NONE) {
+                                
future.completeExceptionally(groupError.exception());
+                            } else {
                                 future.complete(null);
                             }
                         }
 
                         @Override
                         void handleFailure(Throwable throwable) {
-                            
completeAllExceptionally(deleteConsumerGroupFutures.values(), throwable);
+                            KafkaFutureImpl<Void> future = 
futures.get(groupId);
+                            future.completeExceptionally(throwable);
                         }
                     }, nowDeleteConsumerGroups);
-
-                    deleteConsumerGroupsFuture.complete(new HashMap<String, 
KafkaFuture<Void>>(deleteConsumerGroupFutures));
                 }
 
                 @Override
                 void handleFailure(Throwable throwable) {
-                    
deleteConsumerGroupsFuture.completeExceptionally(throwable);
+                    KafkaFutureImpl<Void> future = futures.get(groupId);
+                    future.completeExceptionally(throwable);
                 }
-            }, nowFindCoordinator);
+            }, startFindCoordinatorMs);
         }
 
-        return new DeleteConsumerGroupsResult(deleteConsumerGroupsFuture);
+        return new DeleteConsumerGroupsResult(new HashMap<String, 
KafkaFuture<Void>>(futures));
     }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsResult.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsResult.java
index c7253710440..c3f1236eb09 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsResult.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsResult.java
@@ -18,9 +18,14 @@
 package org.apache.kafka.clients.admin;
 
 import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Node;
 import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.utils.AbstractIterator;
 
 import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
 
 /**
  * The result of the {@link AdminClient#listConsumerGroups()} call.
@@ -29,16 +34,70 @@
  */
 @InterfaceStability.Evolving
 public class ListConsumerGroupsResult {
-    private final KafkaFuture<Collection<ConsumerGroupListing>> future;
+    private final Map<Node, KafkaFutureImpl<Collection<ConsumerGroupListing>>> 
futuresMap;
+    private final KafkaFuture<Collection<ConsumerGroupListing>> flattenFuture;
+    private final KafkaFuture<Void> listFuture;
 
-    ListConsumerGroupsResult(KafkaFuture<Collection<ConsumerGroupListing>> 
future) {
-        this.future = future;
+    ListConsumerGroupsResult(final KafkaFuture<Void> listFuture,
+                             final 
KafkaFuture<Collection<ConsumerGroupListing>> flattenFuture,
+                             final Map<Node, 
KafkaFutureImpl<Collection<ConsumerGroupListing>>> futuresMap) {
+        this.flattenFuture = flattenFuture;
+        this.listFuture = listFuture;
+        this.futuresMap = futuresMap;
+    }
+
+    private class FutureConsumerGroupListingIterator extends 
AbstractIterator<KafkaFuture<ConsumerGroupListing>> {
+        private Iterator<KafkaFutureImpl<Collection<ConsumerGroupListing>>> 
futuresIter;
+        private Iterator<ConsumerGroupListing> innerIter;
+
+        @Override
+        protected KafkaFuture<ConsumerGroupListing> makeNext() {
+            if (futuresIter == null) {
+                try {
+                    listFuture.get();
+                } catch (Exception e) {
+                    // the list future has failed, there will be no listings 
to show at all
+                    return allDone();
+                }
+
+                futuresIter = futuresMap.values().iterator();
+            }
+
+            while (innerIter == null || !innerIter.hasNext()) {
+                if (futuresIter.hasNext()) {
+                    KafkaFuture<Collection<ConsumerGroupListing>> 
collectionFuture = futuresIter.next();
+                    try {
+                        Collection<ConsumerGroupListing> collection = 
collectionFuture.get();
+                        innerIter = collection.iterator();
+                    } catch (Exception e) {
+                        KafkaFutureImpl<ConsumerGroupListing> future = new 
KafkaFutureImpl<>();
+                        future.completeExceptionally(e);
+                        return future;
+                    }
+                } else {
+                    return allDone();
+                }
+            }
+
+            KafkaFutureImpl<ConsumerGroupListing> future = new 
KafkaFutureImpl<>();
+            future.complete(innerIter.next());
+            return future;
+        }
+    }
+
+    /**
+     * Return an iterator of futures for ConsumerGroupListing objects; the 
returned future will throw exception
+     * if we cannot get a complete collection of consumer listings.
+     */
+    public Iterator<KafkaFuture<ConsumerGroupListing>> iterator() {
+        return new FutureConsumerGroupListingIterator();
     }
 
     /**
-     * Return a future which yields a collection of ConsumerGroupListing 
objects.
+     * Return a future which yields a full collection of ConsumerGroupListing 
objects; will throw exception
+     * if we cannot get a complete collection of consumer listings.
      */
-    public KafkaFuture<Collection<ConsumerGroupListing>> listings() {
-        return future;
+    public KafkaFuture<Collection<ConsumerGroupListing>> all() {
+        return flattenFuture;
     }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java 
b/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java
index 4af996cb6b8..9cd2e01dc42 100644
--- a/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java
+++ b/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java
@@ -106,15 +106,6 @@ private void maybeComplete() {
         return allOfFuture;
     }
 
-    public KafkaFuture<T> combine(KafkaFuture<?>... futures) {
-        AllOfAdapter<Object> allOfWaiter = new AllOfAdapter<>(futures.length, 
this);
-        for (KafkaFuture<?> future : futures) {
-            future.addWaiter(allOfWaiter);
-        }
-
-        return this;
-    }
-
     /**
      * Returns a new KafkaFuture that, when this future completes normally, is 
executed with this
      * futures's result as the argument to the supplied function.
diff --git 
a/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java 
b/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
index b1e5b6dc205..33916ac952a 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
@@ -141,11 +141,15 @@ R await(long timeout, TimeUnit unit)
      */
     @Override
     public <R> KafkaFuture<R> thenApply(BaseFunction<T, R> function) {
-        KafkaFutureImpl<R> future = new KafkaFutureImpl<R>();
+        KafkaFutureImpl<R> future = new KafkaFutureImpl<>();
         addWaiter(new Applicant<>(function, future));
         return future;
     }
 
+    public <R> void copyWith(KafkaFuture<R> future, BaseFunction<R, T> 
function) {
+        KafkaFutureImpl<R> futureImpl = (KafkaFutureImpl<R>) future;
+        futureImpl.addWaiter(new Applicant<>(function, this));
+    }
 
     /**
      * @See KafkaFutureImpl#thenApply(BaseFunction)
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java 
b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index a73175c9954..37b43e5e36e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -73,6 +73,7 @@ public FutureResponse(Node node,
 
     }
 
+    private int correlation;
     private final Time time;
     private final Metadata metadata;
     private Set<String> unavailableTopics;
@@ -464,7 +465,7 @@ public ClientRequest newClientRequest(String nodeId, 
AbstractRequest.Builder<?>
     public ClientRequest newClientRequest(String nodeId, 
AbstractRequest.Builder<?> requestBuilder, long createdTimeMs,
                                           boolean expectResponse, 
RequestCompletionHandler callback) {
         totalRequestCount.incrementAndGet();
-        return new ClientRequest(nodeId, requestBuilder, 0, "mockClientId", 
createdTimeMs,
+        return new ClientRequest(nodeId, requestBuilder, correlation++, 
"mockClientId", createdTimeMs,
                 expectResponse, callback);
     }
 
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index d2244135035..d2789b62621 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -41,6 +41,7 @@
 import org.apache.kafka.common.errors.SecurityDisabledException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.internals.Topic;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.ApiError;
 import org.apache.kafka.common.requests.CreateAclsResponse;
@@ -79,6 +80,7 @@
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -647,11 +649,15 @@ public void testDeleteRecords() throws Exception {
     }
 
     //Ignoring test to be fixed on follow-up PR
-    @Ignore
     @Test
-    public void testListConsumerGroups() throws Exception {
+    public void testListConsumerGroups() {
         final HashMap<Integer, Node> nodes = new HashMap<>();
-        nodes.put(0, new Node(0, "localhost", 8121));
+        Node node0 = new Node(0, "localhost", 8121);
+        Node node1 = new Node(1, "localhost", 8122);
+        Node node2 = new Node(2, "localhost", 8123);
+        nodes.put(0, node0);
+        nodes.put(1, node1);
+        nodes.put(2, node2);
 
         final Cluster cluster =
             new Cluster(
@@ -666,27 +672,72 @@ public void testListConsumerGroups() throws Exception {
             env.kafkaClient().prepareMetadataUpdate(env.cluster(), 
Collections.<String>emptySet());
             env.kafkaClient().setNode(env.cluster().controller());
 
-            env.kafkaClient().prepareResponse(
-                new MetadataResponse(
-                    env.cluster().nodes(),
-                    env.cluster().clusterResource().clusterId(),
-                    env.cluster().controller().id(),
-                    new ArrayList<MetadataResponse.TopicMetadata>()));
+            List<MetadataResponse.PartitionMetadata> partitionMetadata = new 
ArrayList<>();
+            partitionMetadata.add(new 
MetadataResponse.PartitionMetadata(Errors.NONE, 0, node0,
+                    Collections.singletonList(node0), 
Collections.singletonList(node0), Collections.<Node>emptyList()));
+            partitionMetadata.add(new 
MetadataResponse.PartitionMetadata(Errors.NONE, 0, node1,
+                    Collections.singletonList(node1), 
Collections.singletonList(node1), Collections.<Node>emptyList()));
+            partitionMetadata.add(new 
MetadataResponse.PartitionMetadata(Errors.NONE, 0, node2,
+                    Collections.singletonList(node2), 
Collections.singletonList(node2), Collections.<Node>emptyList()));
 
             env.kafkaClient().prepareResponse(
-                new ListGroupsResponse(
-                    Errors.NONE,
-                    Arrays.asList(
-                        new ListGroupsResponse.Group("group-1", 
ConsumerProtocol.PROTOCOL_TYPE),
-                        new ListGroupsResponse.Group("group-connect-1", 
"connector")
-                    )));
+                    new MetadataResponse(
+                            env.cluster().nodes(),
+                            env.cluster().clusterResource().clusterId(),
+                            env.cluster().controller().id(),
+                            Collections.singletonList(new 
MetadataResponse.TopicMetadata(Errors.NONE, Topic.GROUP_METADATA_TOPIC_NAME, 
true, partitionMetadata))));
+
+            env.kafkaClient().prepareResponseFrom(
+                    new ListGroupsResponse(
+                            Errors.NONE,
+                            Arrays.asList(
+                                    new ListGroupsResponse.Group("group-1", 
ConsumerProtocol.PROTOCOL_TYPE),
+                                    new 
ListGroupsResponse.Group("group-connect-1", "connector")
+                            )),
+                    node0);
+
+            env.kafkaClient().prepareResponseFrom(
+                    new ListGroupsResponse(
+                            Errors.COORDINATOR_NOT_AVAILABLE,
+                            Collections.<ListGroupsResponse.Group>emptyList()
+                    ),
+                    node1);
+
+            env.kafkaClient().prepareResponseFrom(
+                    new ListGroupsResponse(
+                            Errors.NONE,
+                            Arrays.asList(
+                                    new ListGroupsResponse.Group("group-2", 
ConsumerProtocol.PROTOCOL_TYPE),
+                                    new 
ListGroupsResponse.Group("group-connect-2", "connector")
+                            )),
+                    node2);
 
             final ListConsumerGroupsResult result = 
env.adminClient().listConsumerGroups();
-            final List<ConsumerGroupListing> consumerGroups = new 
ArrayList<>();
 
-            final KafkaFuture<Collection<ConsumerGroupListing>> listings = 
result.listings();
-            consumerGroups.addAll(listings.get());
-            assertEquals(1, consumerGroups.size());
+            try {
+                Collection<ConsumerGroupListing> listing = result.all().get();
+                fail("Expected to throw exception");
+            } catch (Exception e) {
+                // this is good
+            }
+
+            Iterator<KafkaFuture<ConsumerGroupListing>> iterator = 
result.iterator();
+            int numListing = 0;
+            int numFailure = 0;
+
+            while (iterator.hasNext()) {
+                KafkaFuture<ConsumerGroupListing> future = iterator.next();
+                try {
+                    ConsumerGroupListing listing = future.get();
+                    numListing++;
+                    assertTrue(listing.groupId().equals("group-1") || 
listing.groupId().equals("group-2"));
+                } catch (Exception e) {
+                    numFailure++;
+                }
+            }
+
+            assertEquals(2, numListing);
+            assertEquals(1, numFailure);
         }
     }
 
@@ -746,17 +797,15 @@ public void testDescribeConsumerGroups() throws Exception 
{
             env.kafkaClient().prepareResponse(new 
DescribeGroupsResponse(groupMetadataMap));
 
             final DescribeConsumerGroupsResult result = 
env.adminClient().describeConsumerGroups(Collections.singletonList("group-0"));
-            final KafkaFuture<ConsumerGroupDescription> groupDescriptionFuture 
= result.describedGroups().get().get("group-0");
-            final ConsumerGroupDescription groupDescription = 
groupDescriptionFuture.get();
+            final ConsumerGroupDescription groupDescription = 
result.describedGroups().get("group-0").get();
 
-            assertEquals(1, result.describedGroups().get().size());
+            assertEquals(1, result.describedGroups().size());
             assertEquals("group-0", groupDescription.groupId());
             assertEquals(2, groupDescription.members().size());
         }
     }
 
     @Test
-    @Ignore
     public void testDescribeConsumerGroupOffsets() throws Exception {
         final HashMap<Integer, Node> nodes = new HashMap<>();
         nodes.put(0, new Node(0, "localhost", 8121));
@@ -787,12 +836,12 @@ public void testDescribeConsumerGroupOffsets() throws 
Exception {
             env.kafkaClient().prepareResponse(new 
OffsetFetchResponse(Errors.NONE, responseData));
 
             final ListConsumerGroupOffsetsResult result = 
env.adminClient().listConsumerGroupOffsets("group-0");
+            final Map<TopicPartition, OffsetAndMetadata> 
partitionToOffsetAndMetadata = result.partitionsToOffsetAndMetadata().get();
 
-            assertEquals(3, 
result.partitionsToOffsetAndMetadata().get().size());
-            final TopicPartition topicPartition = 
result.partitionsToOffsetAndMetadata().get().keySet().iterator().next();
-            assertEquals("my_topic", topicPartition.topic());
-            final OffsetAndMetadata offsetAndMetadata = 
result.partitionsToOffsetAndMetadata().get().values().iterator().next();
-            assertEquals(10, offsetAndMetadata.offset());
+            assertEquals(3, partitionToOffsetAndMetadata.size());
+            assertEquals(10, 
partitionToOffsetAndMetadata.get(myTopicPartition0).offset());
+            assertEquals(0, 
partitionToOffsetAndMetadata.get(myTopicPartition1).offset());
+            assertEquals(20, 
partitionToOffsetAndMetadata.get(myTopicPartition2).offset());
         }
     }
 
@@ -824,8 +873,8 @@ public void testDeleteConsumerGroups() throws Exception {
 
             final DeleteConsumerGroupsResult result = 
env.adminClient().deleteConsumerGroups(groupIds);
 
-            final Map<String, KafkaFuture<Void>> results = 
result.deletedGroups().get();
-            assertNull(results.get("group-0").get());
+            final KafkaFuture<Void> results = 
result.deletedGroups().get("group-0");
+            assertNull(results.get());
         }
     }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> KIP-222: Add "describe consumer groups" and "list consumer groups" to 
> KafkaAdminClient
> --------------------------------------------------------------------------------------
>
>                 Key: KAFKA-6058
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6058
>             Project: Kafka
>          Issue Type: Improvement
>          Components: clients
>            Reporter: Matthias J. Sax
>            Assignee: Jorge Quilcate
>            Priority: Major
>              Labels: kip-222
>             Fix For: 1.2.0
>
>
> {{KafkaAdminClient}} does not allow to get information about consumer groups. 
> This feature is supported by old {{kafka.admin.AdminClient}} though.
> We should add {{KafkaAdminClient#describeConsumerGroups()}} and 
> {{KafkaAdminClient#listConsumerGroup()}}.
> Associated KIP: KIP-222



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to