[ https://issues.apache.org/jira/browse/KAFKA-6058?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16438777#comment-16438777 ]
ASF GitHub Bot commented on KAFKA-6058: --------------------------------------- guozhangwang closed pull request #4856: KAFKA-6058: Refactor consumer API result return types URL: https://github.com/apache/kafka/pull/4856 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java index b4bce264405..dd6835cf10c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java @@ -29,13 +29,24 @@ */ @InterfaceStability.Evolving public class DeleteConsumerGroupsResult { - final KafkaFuture<Map<String, KafkaFuture<Void>>> futures; + private final Map<String, KafkaFuture<Void>> futures; - DeleteConsumerGroupsResult(KafkaFuture<Map<String, KafkaFuture<Void>>> futures) { + DeleteConsumerGroupsResult(final Map<String, KafkaFuture<Void>> futures) { this.futures = futures; } - public KafkaFuture<Map<String, KafkaFuture<Void>>> deletedGroups() { + /** + * Return a map from group id to futures which can be used to check the status of + * individual deletions. + */ + public Map<String, KafkaFuture<Void>> deletedGroups() { return futures; } + + /** + * Return a future which succeeds only if all the consumer group deletions succeed. + */ + public KafkaFuture<Void> all() { + return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])); + } } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsResult.java index adde031b678..ac2189cc6dc 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsResult.java @@ -32,16 +32,23 @@ @InterfaceStability.Evolving public class DescribeConsumerGroupsResult { - private final KafkaFuture<Map<String, KafkaFuture<ConsumerGroupDescription>>> futures; + private final Map<String, KafkaFuture<ConsumerGroupDescription>> futures; - public DescribeConsumerGroupsResult(KafkaFuture<Map<String, KafkaFuture<ConsumerGroupDescription>>> futures) { + public DescribeConsumerGroupsResult(final Map<String, KafkaFuture<ConsumerGroupDescription>> futures) { this.futures = futures; } /** - * Return a map from group name to futures which can be used to check the description of a consumer group. + * Return a map from group id to futures which can be used to check the description of a consumer group. */ - public KafkaFuture<Map<String, KafkaFuture<ConsumerGroupDescription>>> describedGroups() { + public Map<String, KafkaFuture<ConsumerGroupDescription>> describedGroups() { return futures; } + + /** + * Return a future which succeeds only if all the consumer group description succeed. + */ + public KafkaFuture<Void> all() { + return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])); + } } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 50bcfd38856..fa3f943555b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -46,6 +46,7 @@ import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.BrokerNotAvailableException; import org.apache.kafka.common.errors.DisconnectException; +import org.apache.kafka.common.errors.InvalidGroupIdException; import org.apache.kafka.common.errors.InvalidRequestException; import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.errors.RetriableException; @@ -53,6 +54,7 @@ import org.apache.kafka.common.errors.UnknownServerException; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.internals.KafkaFutureImpl; +import org.apache.kafka.common.internals.Topic; import org.apache.kafka.common.metrics.JmxReporter; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; @@ -916,8 +918,11 @@ private void failCalls(long now, List<Call> calls, AuthenticationException authe * @param correlationIdToCall A map of correlation IDs to calls. * @param callsInFlight A map of nodes to the calls they have in flight. **/ - private void handleResponses(long now, List<ClientResponse> responses, Map<String, List<Call>> callsInFlight, - Map<Integer, Call> correlationIdToCall) { + private void handleResponses(long now, + List<ClientResponse> responses, + Map<String, List<Call>> callsInFlight, + Map<Integer, Call> correlationIdToCall) { + for (ClientResponse response : responses) { int correlationId = response.requestHeader().correlationId(); @@ -1108,7 +1113,11 @@ void call(Call call, long now) { * those policies on the server, so that they can be changed in the future if needed. */ private static boolean topicNameIsUnrepresentable(String topicName) { - return (topicName == null) || topicName.isEmpty(); + return topicName == null || topicName.isEmpty(); + } + + private static boolean groupIdIsUnrepresentable(String groupId) { + return groupId == null; } @Override @@ -1951,6 +1960,7 @@ void handleFailure(Throwable throwable) { return new DescribeReplicaLogDirsResult(new HashMap<TopicPartitionReplica, KafkaFuture<ReplicaLogDirInfo>>(futures)); } + @Override public CreatePartitionsResult createPartitions(Map<String, NewPartitions> newPartitions, final CreatePartitionsOptions options) { final Map<String, KafkaFutureImpl<Void>> futures = new HashMap<>(newPartitions.size()); @@ -1989,6 +1999,7 @@ void handleFailure(Throwable throwable) { return new CreatePartitionsResult(new HashMap<String, KafkaFuture<Void>>(futures)); } + @Override public DeleteRecordsResult deleteRecords(final Map<TopicPartition, RecordsToDelete> recordsToDelete, final DeleteRecordsOptions options) { @@ -2228,20 +2239,30 @@ void handleFailure(Throwable throwable) { @Override public DescribeConsumerGroupsResult describeConsumerGroups(final Collection<String> groupIds, final DescribeConsumerGroupsOptions options) { - final KafkaFutureImpl<Map<String, KafkaFuture<ConsumerGroupDescription>>> resultFutures = new KafkaFutureImpl<>(); - final Map<String, KafkaFutureImpl<ConsumerGroupDescription>> consumerGroupFutures = new HashMap<>(groupIds.size()); - final ArrayList<String> groupIdList = new ArrayList<>(); - for (String groupId : groupIds) { - if (!consumerGroupFutures.containsKey(groupId)) { - consumerGroupFutures.put(groupId, new KafkaFutureImpl<ConsumerGroupDescription>()); - groupIdList.add(groupId); + + final Map<String, KafkaFutureImpl<ConsumerGroupDescription>> futures = new HashMap<>(groupIds.size()); + for (String groupId: groupIds) { + if (groupIdIsUnrepresentable(groupId)) { + KafkaFutureImpl<ConsumerGroupDescription> future = new KafkaFutureImpl<>(); + future.completeExceptionally(new InvalidGroupIdException("The given group id '" + + groupId + "' cannot be represented in a request.")); + futures.put(groupId, future); + } else if (!futures.containsKey(groupId)) { + futures.put(groupId, new KafkaFutureImpl<ConsumerGroupDescription>()); } } - for (final String groupId : groupIdList) { + // TODO: KAFKA-6788, we should consider grouping the request per coordinator and send one request with a list of + // all consumer groups this coordinator host + for (final Map.Entry<String, KafkaFutureImpl<ConsumerGroupDescription>> entry : futures.entrySet()) { + // skip sending request for those futures that already failed. + if (entry.getValue().isCompletedExceptionally()) + continue; + + final String groupId = entry.getKey(); - final long nowFindCoordinator = time.milliseconds(); - final long deadline = calcDeadlineMs(nowFindCoordinator, options.timeoutMs()); + final long startFindCoordinatorMs = time.milliseconds(); + final long deadline = calcDeadlineMs(startFindCoordinatorMs, options.timeoutMs()); runnable.call(new Call("findCoordinator", deadline, new LeastLoadedNodeProvider()) { @Override @@ -2261,23 +2282,21 @@ void handleResponse(AbstractResponse abstractResponse) { @Override AbstractRequest.Builder createRequest(int timeoutMs) { - return new DescribeGroupsRequest.Builder(groupIdList); + return new DescribeGroupsRequest.Builder(Collections.singletonList(groupId)); } @Override void handleResponse(AbstractResponse abstractResponse) { final DescribeGroupsResponse response = (DescribeGroupsResponse) abstractResponse; - // Handle server responses for particular groupId. - for (Map.Entry<String, KafkaFutureImpl<ConsumerGroupDescription>> entry : consumerGroupFutures.entrySet()) { - final String groupId = entry.getKey(); - final KafkaFutureImpl<ConsumerGroupDescription> future = entry.getValue(); - final DescribeGroupsResponse.GroupMetadata groupMetadata = response.groups().get(groupId); - final Errors groupError = groupMetadata.error(); - if (groupError != Errors.NONE) { - future.completeExceptionally(groupError.exception()); - continue; - } + KafkaFutureImpl<ConsumerGroupDescription> future = futures.get(groupId); + final DescribeGroupsResponse.GroupMetadata groupMetadata = response.groups().get(groupId); + + final Errors groupError = groupMetadata.error(); + if (groupError != Errors.NONE) { + // TODO: KAFKA-6789, we can retry based on the error code + future.completeExceptionally(groupError.exception()); + } else { final String protocolType = groupMetadata.protocolType(); if (protocolType.equals(ConsumerProtocol.PROTOCOL_TYPE) || protocolType.isEmpty()) { final List<DescribeGroupsResponse.GroupMember> members = groupMetadata.members(); @@ -2306,27 +2325,28 @@ void handleResponse(AbstractResponse abstractResponse) { @Override void handleFailure(Throwable throwable) { - completeAllExceptionally(consumerGroupFutures.values(), throwable); + KafkaFutureImpl<ConsumerGroupDescription> future = futures.get(groupId); + future.completeExceptionally(throwable); } }, nowDescribeConsumerGroups); - - resultFutures.complete(new HashMap<String, KafkaFuture<ConsumerGroupDescription>>(consumerGroupFutures)); } @Override void handleFailure(Throwable throwable) { - resultFutures.completeExceptionally(throwable); + KafkaFutureImpl<ConsumerGroupDescription> future = futures.get(groupId); + future.completeExceptionally(throwable); } - }, nowFindCoordinator); + }, startFindCoordinatorMs); } - return new DescribeConsumerGroupsResult(resultFutures); + return new DescribeConsumerGroupsResult(new HashMap<String, KafkaFuture<ConsumerGroupDescription>>(futures)); } @Override public ListConsumerGroupsResult listConsumerGroups(ListConsumerGroupsOptions options) { - //final KafkaFutureImpl<Map<Node, KafkaFuture<Collection<ConsumerGroupListing>>>> nodeAndConsumerGroupListing = new KafkaFutureImpl<>(); - final KafkaFutureImpl<Collection<ConsumerGroupListing>> future = new KafkaFutureImpl<Collection<ConsumerGroupListing>>(); + final Map<Node, KafkaFutureImpl<Collection<ConsumerGroupListing>>> futuresMap = new HashMap<>(); + final KafkaFutureImpl<Collection<ConsumerGroupListing>> flattenFuture = new KafkaFutureImpl<>(); + final KafkaFutureImpl<Void> listFuture = new KafkaFutureImpl<>(); final long nowMetadata = time.milliseconds(); final long deadline = calcDeadlineMs(nowMetadata, options.timeoutMs()); @@ -2334,49 +2354,74 @@ public ListConsumerGroupsResult listConsumerGroups(ListConsumerGroupsOptions opt runnable.call(new Call("listNodes", deadline, new LeastLoadedNodeProvider()) { @Override AbstractRequest.Builder createRequest(int timeoutMs) { - return new MetadataRequest.Builder(Collections.<String>emptyList(), true); + return new MetadataRequest.Builder(Collections.singletonList(Topic.GROUP_METADATA_TOPIC_NAME), true); } @Override void handleResponse(AbstractResponse abstractResponse) { MetadataResponse metadataResponse = (MetadataResponse) abstractResponse; - final Map<Node, KafkaFutureImpl<Collection<ConsumerGroupListing>>> futures = new HashMap<>(); - - for (final Node node : metadataResponse.brokers()) { - futures.put(node, new KafkaFutureImpl<Collection<ConsumerGroupListing>>()); + for (final MetadataResponse.TopicMetadata metadata : metadataResponse.topicMetadata()) { + if (metadata.topic().equals(Topic.GROUP_METADATA_TOPIC_NAME)) { + for (final MetadataResponse.PartitionMetadata partitionMetadata : metadata.partitionMetadata()) { + final Node leader = partitionMetadata.leader(); + if (partitionMetadata.error() != Errors.NONE) { + // TODO: KAFKA-6789, retry based on the error code + KafkaFutureImpl<Collection<ConsumerGroupListing>> future = new KafkaFutureImpl<>(); + future.completeExceptionally(partitionMetadata.error().exception()); + // if it is the leader not found error, then the leader might be NoNode; if there are more than + // one such error, we will only have one entry in the map. For now it is okay since we are not + // guaranteeing to return the full list of consumers still. + futuresMap.put(leader, future); + } else { + futuresMap.put(leader, new KafkaFutureImpl<Collection<ConsumerGroupListing>>()); + } + } + listFuture.complete(null); + } else { + if (metadata.error() != Errors.NONE) + listFuture.completeExceptionally(metadata.error().exception()); + else + listFuture.completeExceptionally(new IllegalStateException("Unexpected topic metadata for " + + metadata.topic() + " is returned; cannot find the brokers to query consumer listings.")); + } } - future.combine(futures.values().toArray(new KafkaFuture[0])).thenApply( - new KafkaFuture.BaseFunction<Collection<ConsumerGroupListing>, Collection<ConsumerGroupListing>>() { + // we have to flatten the future here instead in the result, because we need to wait until the map of nodes + // are known from the listNode request. + flattenFuture.copyWith( + KafkaFuture.allOf(futuresMap.values().toArray(new KafkaFuture[0])), + new KafkaFuture.BaseFunction<Void, Collection<ConsumerGroupListing>>() { @Override - public Collection<ConsumerGroupListing> apply(Collection<ConsumerGroupListing> v) { + public Collection<ConsumerGroupListing> apply(Void v) { List<ConsumerGroupListing> listings = new ArrayList<>(); - for (Map.Entry<Node, KafkaFutureImpl<Collection<ConsumerGroupListing>>> entry : futures.entrySet()) { + for (Map.Entry<Node, KafkaFutureImpl<Collection<ConsumerGroupListing>>> entry : futuresMap.entrySet()) { Collection<ConsumerGroupListing> results; try { results = entry.getValue().get(); + listings.addAll(results); } catch (Throwable e) { - // This should be unreachable, since the future returned by KafkaFuture#allOf should - // have failed if any Future failed. - throw new KafkaException("ListConsumerGroupsResult#listings(): internal error", e); + // This should be unreachable, because allOf ensured that all the futures + // completed successfully. + throw new RuntimeException(e); } - listings.addAll(results); } return listings; } }); + for (final Map.Entry<Node, KafkaFutureImpl<Collection<ConsumerGroupListing>>> entry : futuresMap.entrySet()) { + // skip sending the request for those futures who have already failed + if (entry.getValue().isCompletedExceptionally()) + continue; - for (final Map.Entry<Node, KafkaFutureImpl<Collection<ConsumerGroupListing>>> entry : futures.entrySet()) { final long nowList = time.milliseconds(); final int brokerId = entry.getKey().id(); + final KafkaFutureImpl<Collection<ConsumerGroupListing>> future = entry.getValue(); runnable.call(new Call("listConsumerGroups", deadline, new ConstantNodeIdProvider(brokerId)) { - private final KafkaFutureImpl<Collection<ConsumerGroupListing>> future = entry.getValue(); - @Override AbstractRequest.Builder createRequest(int timeoutMs) { return new ListGroupsRequest.Builder(); @@ -2385,21 +2430,26 @@ void handleResponse(AbstractResponse abstractResponse) { @Override void handleResponse(AbstractResponse abstractResponse) { final ListGroupsResponse response = (ListGroupsResponse) abstractResponse; - final List<ConsumerGroupListing> groupsListing = new ArrayList<>(); - for (ListGroupsResponse.Group group : response.groups()) { - if (group.protocolType().equals(ConsumerProtocol.PROTOCOL_TYPE) || group.protocolType().isEmpty()) { - final String groupId = group.groupId(); - final String protocolType = group.protocolType(); - final ConsumerGroupListing groupListing = new ConsumerGroupListing(groupId, protocolType.isEmpty()); - groupsListing.add(groupListing); + + if (response.error() != Errors.NONE) { + future.completeExceptionally(response.error().exception()); + } else { + final List<ConsumerGroupListing> groupsListing = new ArrayList<>(); + for (ListGroupsResponse.Group group : response.groups()) { + if (group.protocolType().equals(ConsumerProtocol.PROTOCOL_TYPE) || group.protocolType().isEmpty()) { + final String groupId = group.groupId(); + final String protocolType = group.protocolType(); + final ConsumerGroupListing groupListing = new ConsumerGroupListing(groupId, protocolType.isEmpty()); + groupsListing.add(groupListing); + } } + future.complete(groupsListing); } - future.complete(groupsListing); } @Override void handleFailure(Throwable throwable) { - completeAllExceptionally(futures.values(), throwable); + future.completeExceptionally(throwable); } }, nowList); @@ -2408,19 +2458,19 @@ void handleFailure(Throwable throwable) { @Override void handleFailure(Throwable throwable) { - future.completeExceptionally(throwable); + listFuture.completeExceptionally(throwable); } }, nowMetadata); - return new ListConsumerGroupsResult(future); + return new ListConsumerGroupsResult(listFuture, flattenFuture, futuresMap); } @Override public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(final String groupId, final ListConsumerGroupOffsetsOptions options) { final KafkaFutureImpl<Map<TopicPartition, OffsetAndMetadata>> groupOffsetListingFuture = new KafkaFutureImpl<>(); - final long nowFindCoordinator = time.milliseconds(); - final long deadline = calcDeadlineMs(nowFindCoordinator, options.timeoutMs()); + final long startFindCoordinatorMs = time.milliseconds(); + final long deadline = calcDeadlineMs(startFindCoordinatorMs, options.timeoutMs()); runnable.call(new Call("findCoordinator", deadline, new LeastLoadedNodeProvider()) { @Override @@ -2446,14 +2496,25 @@ void handleResponse(AbstractResponse abstractResponse) { void handleResponse(AbstractResponse abstractResponse) { final OffsetFetchResponse response = (OffsetFetchResponse) abstractResponse; final Map<TopicPartition, OffsetAndMetadata> groupOffsetsListing = new HashMap<>(); - for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> entry : - response.responseData().entrySet()) { - final TopicPartition topicPartition = entry.getKey(); - final Long offset = entry.getValue().offset; - final String metadata = entry.getValue().metadata; - groupOffsetsListing.put(topicPartition, new OffsetAndMetadata(offset, metadata)); + + if (response.hasError()) { + groupOffsetListingFuture.completeExceptionally(response.error().exception()); + } else { + for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> entry : + response.responseData().entrySet()) { + final TopicPartition topicPartition = entry.getKey(); + final Errors error = entry.getValue().error; + + if (error == Errors.NONE) { + final Long offset = entry.getValue().offset; + final String metadata = entry.getValue().metadata; + groupOffsetsListing.put(topicPartition, new OffsetAndMetadata(offset, metadata)); + } else { + log.warn("Skipping return offset for {} due to error {}.", topicPartition, error); + } + } + groupOffsetListingFuture.complete(groupOffsetsListing); } - groupOffsetListingFuture.complete(groupOffsetsListing); } @Override @@ -2467,27 +2528,35 @@ void handleFailure(Throwable throwable) { void handleFailure(Throwable throwable) { groupOffsetListingFuture.completeExceptionally(throwable); } - }, nowFindCoordinator); + }, startFindCoordinatorMs); return new ListConsumerGroupOffsetsResult(groupOffsetListingFuture); } @Override public DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String> groupIds, DeleteConsumerGroupsOptions options) { - final KafkaFutureImpl<Map<String, KafkaFuture<Void>>> deleteConsumerGroupsFuture = new KafkaFutureImpl<>(); - final Map<String, KafkaFutureImpl<Void>> deleteConsumerGroupFutures = new HashMap<>(groupIds.size()); - final Set<String> groupIdList = new HashSet<>(); - for (String groupId : groupIds) { - if (!deleteConsumerGroupFutures.containsKey(groupId)) { - deleteConsumerGroupFutures.put(groupId, new KafkaFutureImpl<Void>()); - groupIdList.add(groupId); + + final Map<String, KafkaFutureImpl<Void>> futures = new HashMap<>(groupIds.size()); + for (String groupId: groupIds) { + if (groupIdIsUnrepresentable(groupId)) { + KafkaFutureImpl<Void> future = new KafkaFutureImpl<>(); + future.completeExceptionally(new ApiException("The given group id '" + + groupId + "' cannot be represented in a request.")); + futures.put(groupId, future); + } else if (!futures.containsKey(groupId)) { + futures.put(groupId, new KafkaFutureImpl<Void>()); } } - for (final String groupId : groupIdList) { + // TODO: KAFKA-6788, we should consider grouping the request per coordinator and send one request with a list of + // all consumer groups this coordinator host + for (final String groupId : groupIds) { + // skip sending request for those futures that already failed. + if (futures.get(groupId).isCompletedExceptionally()) + continue; - final long nowFindCoordinator = time.milliseconds(); - final long deadline = calcDeadlineMs(nowFindCoordinator, options.timeoutMs()); + final long startFindCoordinatorMs = time.milliseconds(); + final long deadline = calcDeadlineMs(startFindCoordinatorMs, options.timeoutMs()); runnable.call(new Call("findCoordinator", deadline, new LeastLoadedNodeProvider()) { @Override @@ -2513,36 +2582,33 @@ void handleResponse(AbstractResponse abstractResponse) { @Override void handleResponse(AbstractResponse abstractResponse) { final DeleteGroupsResponse response = (DeleteGroupsResponse) abstractResponse; - // Handle server responses for particular groupId. - for (Map.Entry<String, KafkaFutureImpl<Void>> entry : deleteConsumerGroupFutures.entrySet()) { - final String groupId = entry.getKey(); - final KafkaFutureImpl<Void> future = entry.getValue(); - final Errors groupError = response.get(groupId); - if (groupError != Errors.NONE) { - future.completeExceptionally(groupError.exception()); - continue; - } + KafkaFutureImpl<Void> future = futures.get(groupId); + final Errors groupError = response.get(groupId); + + if (groupError != Errors.NONE) { + future.completeExceptionally(groupError.exception()); + } else { future.complete(null); } } @Override void handleFailure(Throwable throwable) { - completeAllExceptionally(deleteConsumerGroupFutures.values(), throwable); + KafkaFutureImpl<Void> future = futures.get(groupId); + future.completeExceptionally(throwable); } }, nowDeleteConsumerGroups); - - deleteConsumerGroupsFuture.complete(new HashMap<String, KafkaFuture<Void>>(deleteConsumerGroupFutures)); } @Override void handleFailure(Throwable throwable) { - deleteConsumerGroupsFuture.completeExceptionally(throwable); + KafkaFutureImpl<Void> future = futures.get(groupId); + future.completeExceptionally(throwable); } - }, nowFindCoordinator); + }, startFindCoordinatorMs); } - return new DeleteConsumerGroupsResult(deleteConsumerGroupsFuture); + return new DeleteConsumerGroupsResult(new HashMap<String, KafkaFuture<Void>>(futures)); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsResult.java index c7253710440..c3f1236eb09 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsResult.java @@ -18,9 +18,14 @@ package org.apache.kafka.clients.admin; import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.Node; import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.common.internals.KafkaFutureImpl; +import org.apache.kafka.common.utils.AbstractIterator; import java.util.Collection; +import java.util.Iterator; +import java.util.Map; /** * The result of the {@link AdminClient#listConsumerGroups()} call. @@ -29,16 +34,70 @@ */ @InterfaceStability.Evolving public class ListConsumerGroupsResult { - private final KafkaFuture<Collection<ConsumerGroupListing>> future; + private final Map<Node, KafkaFutureImpl<Collection<ConsumerGroupListing>>> futuresMap; + private final KafkaFuture<Collection<ConsumerGroupListing>> flattenFuture; + private final KafkaFuture<Void> listFuture; - ListConsumerGroupsResult(KafkaFuture<Collection<ConsumerGroupListing>> future) { - this.future = future; + ListConsumerGroupsResult(final KafkaFuture<Void> listFuture, + final KafkaFuture<Collection<ConsumerGroupListing>> flattenFuture, + final Map<Node, KafkaFutureImpl<Collection<ConsumerGroupListing>>> futuresMap) { + this.flattenFuture = flattenFuture; + this.listFuture = listFuture; + this.futuresMap = futuresMap; + } + + private class FutureConsumerGroupListingIterator extends AbstractIterator<KafkaFuture<ConsumerGroupListing>> { + private Iterator<KafkaFutureImpl<Collection<ConsumerGroupListing>>> futuresIter; + private Iterator<ConsumerGroupListing> innerIter; + + @Override + protected KafkaFuture<ConsumerGroupListing> makeNext() { + if (futuresIter == null) { + try { + listFuture.get(); + } catch (Exception e) { + // the list future has failed, there will be no listings to show at all + return allDone(); + } + + futuresIter = futuresMap.values().iterator(); + } + + while (innerIter == null || !innerIter.hasNext()) { + if (futuresIter.hasNext()) { + KafkaFuture<Collection<ConsumerGroupListing>> collectionFuture = futuresIter.next(); + try { + Collection<ConsumerGroupListing> collection = collectionFuture.get(); + innerIter = collection.iterator(); + } catch (Exception e) { + KafkaFutureImpl<ConsumerGroupListing> future = new KafkaFutureImpl<>(); + future.completeExceptionally(e); + return future; + } + } else { + return allDone(); + } + } + + KafkaFutureImpl<ConsumerGroupListing> future = new KafkaFutureImpl<>(); + future.complete(innerIter.next()); + return future; + } + } + + /** + * Return an iterator of futures for ConsumerGroupListing objects; the returned future will throw exception + * if we cannot get a complete collection of consumer listings. + */ + public Iterator<KafkaFuture<ConsumerGroupListing>> iterator() { + return new FutureConsumerGroupListingIterator(); } /** - * Return a future which yields a collection of ConsumerGroupListing objects. + * Return a future which yields a full collection of ConsumerGroupListing objects; will throw exception + * if we cannot get a complete collection of consumer listings. */ - public KafkaFuture<Collection<ConsumerGroupListing>> listings() { - return future; + public KafkaFuture<Collection<ConsumerGroupListing>> all() { + return flattenFuture; } } diff --git a/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java b/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java index 4af996cb6b8..9cd2e01dc42 100644 --- a/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java +++ b/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java @@ -106,15 +106,6 @@ private void maybeComplete() { return allOfFuture; } - public KafkaFuture<T> combine(KafkaFuture<?>... futures) { - AllOfAdapter<Object> allOfWaiter = new AllOfAdapter<>(futures.length, this); - for (KafkaFuture<?> future : futures) { - future.addWaiter(allOfWaiter); - } - - return this; - } - /** * Returns a new KafkaFuture that, when this future completes normally, is executed with this * futures's result as the argument to the supplied function. diff --git a/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java b/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java index b1e5b6dc205..33916ac952a 100644 --- a/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java +++ b/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java @@ -141,11 +141,15 @@ R await(long timeout, TimeUnit unit) */ @Override public <R> KafkaFuture<R> thenApply(BaseFunction<T, R> function) { - KafkaFutureImpl<R> future = new KafkaFutureImpl<R>(); + KafkaFutureImpl<R> future = new KafkaFutureImpl<>(); addWaiter(new Applicant<>(function, future)); return future; } + public <R> void copyWith(KafkaFuture<R> future, BaseFunction<R, T> function) { + KafkaFutureImpl<R> futureImpl = (KafkaFutureImpl<R>) future; + futureImpl.addWaiter(new Applicant<>(function, this)); + } /** * @See KafkaFutureImpl#thenApply(BaseFunction) diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java index a73175c9954..37b43e5e36e 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java @@ -73,6 +73,7 @@ public FutureResponse(Node node, } + private int correlation; private final Time time; private final Metadata metadata; private Set<String> unavailableTopics; @@ -464,7 +465,7 @@ public ClientRequest newClientRequest(String nodeId, AbstractRequest.Builder<?> public ClientRequest newClientRequest(String nodeId, AbstractRequest.Builder<?> requestBuilder, long createdTimeMs, boolean expectResponse, RequestCompletionHandler callback) { totalRequestCount.incrementAndGet(); - return new ClientRequest(nodeId, requestBuilder, 0, "mockClientId", createdTimeMs, + return new ClientRequest(nodeId, requestBuilder, correlation++, "mockClientId", createdTimeMs, expectResponse, callback); } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index d2244135035..d2789b62621 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -41,6 +41,7 @@ import org.apache.kafka.common.errors.SecurityDisabledException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; +import org.apache.kafka.common.internals.Topic; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.ApiError; import org.apache.kafka.common.requests.CreateAclsResponse; @@ -79,6 +80,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -647,11 +649,15 @@ public void testDeleteRecords() throws Exception { } //Ignoring test to be fixed on follow-up PR - @Ignore @Test - public void testListConsumerGroups() throws Exception { + public void testListConsumerGroups() { final HashMap<Integer, Node> nodes = new HashMap<>(); - nodes.put(0, new Node(0, "localhost", 8121)); + Node node0 = new Node(0, "localhost", 8121); + Node node1 = new Node(1, "localhost", 8122); + Node node2 = new Node(2, "localhost", 8123); + nodes.put(0, node0); + nodes.put(1, node1); + nodes.put(2, node2); final Cluster cluster = new Cluster( @@ -666,27 +672,72 @@ public void testListConsumerGroups() throws Exception { env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet()); env.kafkaClient().setNode(env.cluster().controller()); - env.kafkaClient().prepareResponse( - new MetadataResponse( - env.cluster().nodes(), - env.cluster().clusterResource().clusterId(), - env.cluster().controller().id(), - new ArrayList<MetadataResponse.TopicMetadata>())); + List<MetadataResponse.PartitionMetadata> partitionMetadata = new ArrayList<>(); + partitionMetadata.add(new MetadataResponse.PartitionMetadata(Errors.NONE, 0, node0, + Collections.singletonList(node0), Collections.singletonList(node0), Collections.<Node>emptyList())); + partitionMetadata.add(new MetadataResponse.PartitionMetadata(Errors.NONE, 0, node1, + Collections.singletonList(node1), Collections.singletonList(node1), Collections.<Node>emptyList())); + partitionMetadata.add(new MetadataResponse.PartitionMetadata(Errors.NONE, 0, node2, + Collections.singletonList(node2), Collections.singletonList(node2), Collections.<Node>emptyList())); env.kafkaClient().prepareResponse( - new ListGroupsResponse( - Errors.NONE, - Arrays.asList( - new ListGroupsResponse.Group("group-1", ConsumerProtocol.PROTOCOL_TYPE), - new ListGroupsResponse.Group("group-connect-1", "connector") - ))); + new MetadataResponse( + env.cluster().nodes(), + env.cluster().clusterResource().clusterId(), + env.cluster().controller().id(), + Collections.singletonList(new MetadataResponse.TopicMetadata(Errors.NONE, Topic.GROUP_METADATA_TOPIC_NAME, true, partitionMetadata)))); + + env.kafkaClient().prepareResponseFrom( + new ListGroupsResponse( + Errors.NONE, + Arrays.asList( + new ListGroupsResponse.Group("group-1", ConsumerProtocol.PROTOCOL_TYPE), + new ListGroupsResponse.Group("group-connect-1", "connector") + )), + node0); + + env.kafkaClient().prepareResponseFrom( + new ListGroupsResponse( + Errors.COORDINATOR_NOT_AVAILABLE, + Collections.<ListGroupsResponse.Group>emptyList() + ), + node1); + + env.kafkaClient().prepareResponseFrom( + new ListGroupsResponse( + Errors.NONE, + Arrays.asList( + new ListGroupsResponse.Group("group-2", ConsumerProtocol.PROTOCOL_TYPE), + new ListGroupsResponse.Group("group-connect-2", "connector") + )), + node2); final ListConsumerGroupsResult result = env.adminClient().listConsumerGroups(); - final List<ConsumerGroupListing> consumerGroups = new ArrayList<>(); - final KafkaFuture<Collection<ConsumerGroupListing>> listings = result.listings(); - consumerGroups.addAll(listings.get()); - assertEquals(1, consumerGroups.size()); + try { + Collection<ConsumerGroupListing> listing = result.all().get(); + fail("Expected to throw exception"); + } catch (Exception e) { + // this is good + } + + Iterator<KafkaFuture<ConsumerGroupListing>> iterator = result.iterator(); + int numListing = 0; + int numFailure = 0; + + while (iterator.hasNext()) { + KafkaFuture<ConsumerGroupListing> future = iterator.next(); + try { + ConsumerGroupListing listing = future.get(); + numListing++; + assertTrue(listing.groupId().equals("group-1") || listing.groupId().equals("group-2")); + } catch (Exception e) { + numFailure++; + } + } + + assertEquals(2, numListing); + assertEquals(1, numFailure); } } @@ -746,17 +797,15 @@ public void testDescribeConsumerGroups() throws Exception { env.kafkaClient().prepareResponse(new DescribeGroupsResponse(groupMetadataMap)); final DescribeConsumerGroupsResult result = env.adminClient().describeConsumerGroups(Collections.singletonList("group-0")); - final KafkaFuture<ConsumerGroupDescription> groupDescriptionFuture = result.describedGroups().get().get("group-0"); - final ConsumerGroupDescription groupDescription = groupDescriptionFuture.get(); + final ConsumerGroupDescription groupDescription = result.describedGroups().get("group-0").get(); - assertEquals(1, result.describedGroups().get().size()); + assertEquals(1, result.describedGroups().size()); assertEquals("group-0", groupDescription.groupId()); assertEquals(2, groupDescription.members().size()); } } @Test - @Ignore public void testDescribeConsumerGroupOffsets() throws Exception { final HashMap<Integer, Node> nodes = new HashMap<>(); nodes.put(0, new Node(0, "localhost", 8121)); @@ -787,12 +836,12 @@ public void testDescribeConsumerGroupOffsets() throws Exception { env.kafkaClient().prepareResponse(new OffsetFetchResponse(Errors.NONE, responseData)); final ListConsumerGroupOffsetsResult result = env.adminClient().listConsumerGroupOffsets("group-0"); + final Map<TopicPartition, OffsetAndMetadata> partitionToOffsetAndMetadata = result.partitionsToOffsetAndMetadata().get(); - assertEquals(3, result.partitionsToOffsetAndMetadata().get().size()); - final TopicPartition topicPartition = result.partitionsToOffsetAndMetadata().get().keySet().iterator().next(); - assertEquals("my_topic", topicPartition.topic()); - final OffsetAndMetadata offsetAndMetadata = result.partitionsToOffsetAndMetadata().get().values().iterator().next(); - assertEquals(10, offsetAndMetadata.offset()); + assertEquals(3, partitionToOffsetAndMetadata.size()); + assertEquals(10, partitionToOffsetAndMetadata.get(myTopicPartition0).offset()); + assertEquals(0, partitionToOffsetAndMetadata.get(myTopicPartition1).offset()); + assertEquals(20, partitionToOffsetAndMetadata.get(myTopicPartition2).offset()); } } @@ -824,8 +873,8 @@ public void testDeleteConsumerGroups() throws Exception { final DeleteConsumerGroupsResult result = env.adminClient().deleteConsumerGroups(groupIds); - final Map<String, KafkaFuture<Void>> results = result.deletedGroups().get(); - assertNull(results.get("group-0").get()); + final KafkaFuture<Void> results = result.deletedGroups().get("group-0"); + assertNull(results.get()); } } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > KIP-222: Add "describe consumer groups" and "list consumer groups" to > KafkaAdminClient > -------------------------------------------------------------------------------------- > > Key: KAFKA-6058 > URL: https://issues.apache.org/jira/browse/KAFKA-6058 > Project: Kafka > Issue Type: Improvement > Components: clients > Reporter: Matthias J. Sax > Assignee: Jorge Quilcate > Priority: Major > Labels: kip-222 > Fix For: 1.2.0 > > > {{KafkaAdminClient}} does not allow to get information about consumer groups. > This feature is supported by old {{kafka.admin.AdminClient}} though. > We should add {{KafkaAdminClient#describeConsumerGroups()}} and > {{KafkaAdminClient#listConsumerGroup()}}. > Associated KIP: KIP-222 -- This message was sent by Atlassian JIRA (v7.6.3#76005)