This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new beac86f049 KAFKA-13043: Implement Admin APIs for offsetFetch batching
(#10964)
beac86f049 is described below
commit beac86f049385932309158c1cb49c8657e53f45f
Author: Sanjana Kaundinya <[email protected]>
AuthorDate: Thu Jul 14 05:47:34 2022 -0700
KAFKA-13043: Implement Admin APIs for offsetFetch batching (#10964)
This implements the AdminAPI portion of KIP-709:
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=173084258. The
request/response protocol changes were implemented in 3.0.0. A new batched API
has been introduced to list consumer offsets for different groups. For brokers
older than 3.0.0, separate requests are sent for each group.
Co-authored-by: Rajini Sivaram <[email protected]>
Co-authored-by: David Jacot <[email protected]>
Reviewers: David Jacot <[email protected]>, Rajini Sivaram
<[email protected]>
---
.../java/org/apache/kafka/clients/admin/Admin.java | 36 ++-
.../kafka/clients/admin/KafkaAdminClient.java | 11 +-
.../admin/ListConsumerGroupOffsetsOptions.java | 14 +-
.../admin/ListConsumerGroupOffsetsResult.java | 56 +++-
.../admin/ListConsumerGroupOffsetsSpec.java | 79 ++++++
.../clients/admin/internals/AdminApiDriver.java | 3 +-
.../admin/internals/CoordinatorStrategy.java | 4 +
.../internals/ListConsumerGroupOffsetsHandler.java | 128 +++++----
.../kafka/common/requests/OffsetFetchResponse.java | 10 +-
.../kafka/clients/admin/AdminClientTestUtils.java | 12 +-
.../kafka/clients/admin/KafkaAdminClientTest.java | 206 ++++++++++++--
.../kafka/clients/admin/MockAdminClient.java | 16 +-
.../ListConsumerGroupOffsetsHandlerTest.java | 308 +++++++++++++++++++--
.../kafka/clients/consumer/KafkaConsumerTest.java | 6 +-
.../internals/ConsumerCoordinatorTest.java | 26 +-
.../scala/kafka/admin/ConsumerGroupCommand.scala | 8 +-
.../kafka/admin/ConsumerGroupServiceTest.scala | 22 +-
.../scala/unit/kafka/server/RequestQuotaTest.scala | 2 +-
.../processor/internals/StoreChangelogReader.java | 12 +-
.../internals/StoreChangelogReaderTest.java | 11 +-
20 files changed, 813 insertions(+), 157 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
index fdacc09db8..0698d29702 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
@@ -36,6 +36,7 @@ import org.apache.kafka.common.requests.LeaveGroupResponse;
import java.time.Duration;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -919,12 +920,20 @@ public interface Admin extends AutoCloseable {
* @param options The options to use when listing the consumer group
offsets.
* @return The ListGroupOffsetsResult
*/
- ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId,
ListConsumerGroupOffsetsOptions options);
+ default ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String
groupId, ListConsumerGroupOffsetsOptions options) {
+ ListConsumerGroupOffsetsOptions listOptions = new
ListConsumerGroupOffsetsOptions()
+ .requireStable(options.requireStable());
+ @SuppressWarnings("deprecation")
+ ListConsumerGroupOffsetsSpec groupSpec = new
ListConsumerGroupOffsetsSpec()
+ .topicPartitions(options.topicPartitions());
+ return listConsumerGroupOffsets(Collections.singletonMap(groupId,
groupSpec), listOptions);
+ }
/**
* List the consumer group offsets available in the cluster with the
default options.
* <p>
- * This is a convenience method for {@link
#listConsumerGroupOffsets(String, ListConsumerGroupOffsetsOptions)} with
default options.
+ * This is a convenience method for {@link #listConsumerGroupOffsets(Map,
ListConsumerGroupOffsetsOptions)}
+ * to list offsets of all partitions of one group with default options.
*
* @return The ListGroupOffsetsResult.
*/
@@ -932,6 +941,29 @@ public interface Admin extends AutoCloseable {
return listConsumerGroupOffsets(groupId, new
ListConsumerGroupOffsetsOptions());
}
+ /**
+ * List the consumer group offsets available in the cluster for the
specified consumer groups.
+ *
+ * @param groupSpecs Map of consumer group ids to a spec that specifies
the topic partitions of the group to list offsets for.
+ *
+ * @param options The options to use when listing the consumer group
offsets.
+ * @return The ListConsumerGroupOffsetsResult
+ */
+ ListConsumerGroupOffsetsResult listConsumerGroupOffsets(Map<String,
ListConsumerGroupOffsetsSpec> groupSpecs, ListConsumerGroupOffsetsOptions
options);
+
+ /**
+ * List the consumer group offsets available in the cluster for the
specified groups with the default options.
+ * <p>
+ * This is a convenience method for
+ * {@link #listConsumerGroupOffsets(Map, ListConsumerGroupOffsetsOptions)}
with default options.
+ *
+ * @param groupSpecs Map of consumer group ids to a spec that specifies
the topic partitions of the group to list offsets for.
+ * @return The ListConsumerGroupOffsetsResult.
+ */
+ default ListConsumerGroupOffsetsResult
listConsumerGroupOffsets(Map<String, ListConsumerGroupOffsetsSpec> groupSpecs) {
+ return listConsumerGroupOffsets(groupSpecs, new
ListConsumerGroupOffsetsOptions());
+ }
+
/**
* Delete consumer groups from the cluster.
*
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 2b2642e351..41eb27a1dd 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -3401,13 +3401,14 @@ public class KafkaAdminClient extends AdminClient {
}
@Override
- public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(final
String groupId,
- final
ListConsumerGroupOffsetsOptions options) {
+ public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(Map<String,
ListConsumerGroupOffsetsSpec> groupSpecs,
+
ListConsumerGroupOffsetsOptions options) {
SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition,
OffsetAndMetadata>> future =
- ListConsumerGroupOffsetsHandler.newFuture(groupId);
- ListConsumerGroupOffsetsHandler handler = new
ListConsumerGroupOffsetsHandler(groupId, options.topicPartitions(),
options.requireStable(), logContext);
+ ListConsumerGroupOffsetsHandler.newFuture(groupSpecs.keySet());
+ ListConsumerGroupOffsetsHandler handler =
+ new ListConsumerGroupOffsetsHandler(groupSpecs,
options.requireStable(), logContext);
invokeDriver(handler, future, options.timeoutMs);
- return new
ListConsumerGroupOffsetsResult(future.get(CoordinatorKey.byGroupId(groupId)));
+ return new ListConsumerGroupOffsetsResult(future.all());
}
@Override
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsOptions.java
b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsOptions.java
index 292a47ef39..44d3a40732 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsOptions.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsOptions.java
@@ -23,23 +23,28 @@ import
org.apache.kafka.common.annotation.InterfaceStability;
import java.util.List;
/**
- * Options for {@link Admin#listConsumerGroupOffsets(String)}.
+ * Options for {@link Admin#listConsumerGroupOffsets(java.util.Map)} and
{@link Admin#listConsumerGroupOffsets(String)}.
* <p>
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class ListConsumerGroupOffsetsOptions extends
AbstractOptions<ListConsumerGroupOffsetsOptions> {
- private List<TopicPartition> topicPartitions = null;
+ private List<TopicPartition> topicPartitions;
private boolean requireStable = false;
/**
* Set the topic partitions to list as part of the result.
* {@code null} includes all topic partitions.
+ * <p>
+ * @deprecated Since 3.3.
+ * Use {@link Admin#listConsumerGroupOffsets(java.util.Map,
ListConsumerGroupOffsetsOptions)}
+ * to specify topic partitions.
*
* @param topicPartitions List of topic partitions to include
* @return This ListGroupOffsetsOptions
*/
+ @Deprecated
public ListConsumerGroupOffsetsOptions
topicPartitions(List<TopicPartition> topicPartitions) {
this.topicPartitions = topicPartitions;
return this;
@@ -55,7 +60,12 @@ public class ListConsumerGroupOffsetsOptions extends
AbstractOptions<ListConsume
/**
* Returns a list of topic partitions to add as part of the result.
+ * <p>
+ * @deprecated Since 3.3.
+ * Use {@link Admin#listConsumerGroupOffsets(java.util.Map,
ListConsumerGroupOffsetsOptions)}
+ * to specify topic partitions.
*/
+ @Deprecated
public List<TopicPartition> topicPartitions() {
return topicPartitions;
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsResult.java
b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsResult.java
index 48f4531418..2136e33a40 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsResult.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsResult.java
@@ -17,25 +17,32 @@
package org.apache.kafka.clients.admin;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import org.apache.kafka.clients.admin.internals.CoordinatorKey;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.annotation.InterfaceStability;
-import java.util.Map;
-
/**
- * The result of the {@link Admin#listConsumerGroupOffsets(String)} call.
+ * The result of the {@link Admin#listConsumerGroupOffsets(Map)} and
+ * {@link Admin#listConsumerGroupOffsets(String)} call.
* <p>
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class ListConsumerGroupOffsetsResult {
- final KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> future;
+ final Map<String, KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>>
futures;
- ListConsumerGroupOffsetsResult(KafkaFuture<Map<TopicPartition,
OffsetAndMetadata>> future) {
- this.future = future;
+ ListConsumerGroupOffsetsResult(final Map<CoordinatorKey,
KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> futures) {
+ this.futures = futures.entrySet().stream()
+ .collect(Collectors.toMap(e -> e.getKey().idValue,
Entry::getValue));
}
/**
@@ -43,7 +50,42 @@ public class ListConsumerGroupOffsetsResult {
* If the group does not have a committed offset for this partition, the
corresponding value in the returned map will be null.
*/
public KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>
partitionsToOffsetAndMetadata() {
- return future;
+ if (futures.size() != 1) {
+ throw new IllegalStateException("Offsets from multiple consumer
groups were requested. " +
+ "Use partitionsToOffsetAndMetadata(groupId) instead to get
future for a specific group.");
+ }
+ return futures.values().iterator().next();
}
+ /**
+ * Return a future which yields a map of topic partitions to
OffsetAndMetadata objects for
+ * the specified group. If the group doesn't have a committed offset for a
specific
+ * partition, the corresponding value in the returned map will be null.
+ */
+ public KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>
partitionsToOffsetAndMetadata(String groupId) {
+ if (!futures.containsKey(groupId))
+ throw new IllegalArgumentException("Offsets for consumer group '"
+ groupId + "' were not requested.");
+ return futures.get(groupId);
+ }
+
+ /**
+ * Return a future which yields all Map<String, Map<TopicPartition,
OffsetAndMetadata> objects,
+ * if requests for all the groups succeed.
+ */
+ public KafkaFuture<Map<String, Map<TopicPartition, OffsetAndMetadata>>>
all() {
+ return KafkaFuture.allOf(futures.values().toArray(new
KafkaFuture[0])).thenApply(
+ nil -> {
+ Map<String, Map<TopicPartition, OffsetAndMetadata>>
listedConsumerGroupOffsets = new HashMap<>(futures.size());
+ futures.forEach((key, future) -> {
+ try {
+ listedConsumerGroupOffsets.put(key, future.get());
+ } catch (InterruptedException | ExecutionException e) {
+ // This should be unreachable, since the
KafkaFuture#allOf already ensured
+ // that all of the futures completed successfully.
+ throw new RuntimeException(e);
+ }
+ });
+ return listedConsumerGroupOffsets;
+ });
+ }
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsSpec.java
b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsSpec.java
new file mode 100644
index 0000000000..83858e49c8
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsSpec.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Collection;
+import java.util.Objects;
+
+/**
+ * Specification of consumer group offsets to list using {@link
Admin#listConsumerGroupOffsets(java.util.Map)}.
+ *
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
[email protected]
+public class ListConsumerGroupOffsetsSpec {
+
+ private Collection<TopicPartition> topicPartitions;
+
+ /**
+ * Set the topic partitions whose offsets are to be listed for a consumer
group.
+ * {@code null} includes all topic partitions.
+ *
+ * @param topicPartitions List of topic partitions to include
+ * @return This ListConsumerGroupOffsetSpec
+ */
+ public ListConsumerGroupOffsetsSpec
topicPartitions(Collection<TopicPartition> topicPartitions) {
+ this.topicPartitions = topicPartitions;
+ return this;
+ }
+
+ /**
+ * Returns the topic partitions whose offsets are to be listed for a
consumer group.
+ * {@code null} indicates that offsets of all partitions of the group are
to be listed.
+ */
+ public Collection<TopicPartition> topicPartitions() {
+ return topicPartitions;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof ListConsumerGroupOffsetsSpec)) {
+ return false;
+ }
+ ListConsumerGroupOffsetsSpec that = (ListConsumerGroupOffsetsSpec) o;
+ return Objects.equals(topicPartitions, that.topicPartitions);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(topicPartitions);
+ }
+
+ @Override
+ public String toString() {
+ return "ListConsumerGroupOffsetsSpec(" +
+ "topicPartitions=" + topicPartitions +
+ ')';
+ }
+}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java
index d00db4b18c..0e1b03d964 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import
org.apache.kafka.common.requests.FindCoordinatorRequest.NoBatchedFindCoordinatorsException;
+import
org.apache.kafka.common.requests.OffsetFetchRequest.NoBatchedOffsetFetchRequestException;
import org.apache.kafka.common.utils.LogContext;
import org.slf4j.Logger;
@@ -253,7 +254,7 @@ public class AdminApiDriver<K, V> {
.collect(Collectors.toSet());
retryLookup(keysToUnmap);
- } else if (t instanceof NoBatchedFindCoordinatorsException) {
+ } else if (t instanceof NoBatchedFindCoordinatorsException || t
instanceof NoBatchedOffsetFetchRequestException) {
((CoordinatorStrategy) handler.lookupStrategy()).disableBatch();
Set<K> keysToUnmap = spec.keys.stream()
.filter(future.lookupKeys()::contains)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/CoordinatorStrategy.java
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/CoordinatorStrategy.java
index e6fc0d624a..02b68527c3 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/CoordinatorStrategy.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/CoordinatorStrategy.java
@@ -120,6 +120,10 @@ public class CoordinatorStrategy implements
AdminApiLookupStrategy<CoordinatorKe
batch = false;
}
+ public boolean batch() {
+ return batch;
+ }
+
private CoordinatorKey requireSingletonAndType(Set<CoordinatorKey> keys) {
if (keys.size() != 1) {
throw new IllegalArgumentException("Unexpected size of key set:
expected 1, but got " + keys.size());
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
index 08648821f7..21c7d8d488 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java
@@ -17,14 +17,16 @@
package org.apache.kafka.clients.admin.internals;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
@@ -36,39 +38,26 @@ import
org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
import org.apache.kafka.common.utils.LogContext;
import org.slf4j.Logger;
-public class ListConsumerGroupOffsetsHandler extends
AdminApiHandler.Batched<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>>
{
+public class ListConsumerGroupOffsetsHandler implements
AdminApiHandler<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> {
- private final CoordinatorKey groupId;
- private final List<TopicPartition> partitions;
private final boolean requireStable;
+ private final Map<String, ListConsumerGroupOffsetsSpec> groupSpecs;
private final Logger log;
- private final AdminApiLookupStrategy<CoordinatorKey> lookupStrategy;
+ private final CoordinatorStrategy lookupStrategy;
public ListConsumerGroupOffsetsHandler(
- String groupId,
- List<TopicPartition> partitions,
- LogContext logContext
- ) {
- this(groupId, partitions, false, logContext);
- }
-
- public ListConsumerGroupOffsetsHandler(
- String groupId,
- List<TopicPartition> partitions,
+ Map<String, ListConsumerGroupOffsetsSpec> groupSpecs,
boolean requireStable,
LogContext logContext
) {
- this.groupId = CoordinatorKey.byGroupId(groupId);
- this.partitions = partitions;
- this.requireStable = requireStable;
this.log = logContext.logger(ListConsumerGroupOffsetsHandler.class);
this.lookupStrategy = new CoordinatorStrategy(CoordinatorType.GROUP,
logContext);
+ this.groupSpecs = groupSpecs;
+ this.requireStable = requireStable;
}
- public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey,
Map<TopicPartition, OffsetAndMetadata>> newFuture(
- String groupId
- ) {
- return
AdminApiFuture.forKeys(Collections.singleton(CoordinatorKey.byGroupId(groupId)));
+ public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey,
Map<TopicPartition, OffsetAndMetadata>> newFuture(Collection<String> groupIds) {
+ return AdminApiFuture.forKeys(coordinatorKeys(groupIds));
}
@Override
@@ -82,16 +71,45 @@ public class ListConsumerGroupOffsetsHandler extends
AdminApiHandler.Batched<Coo
}
private void validateKeys(Set<CoordinatorKey> groupIds) {
- if (!groupIds.equals(Collections.singleton(groupId))) {
+ Set<CoordinatorKey> keys = coordinatorKeys(groupSpecs.keySet());
+ if (!keys.containsAll(groupIds)) {
throw new IllegalArgumentException("Received unexpected group ids
" + groupIds +
- " (expected only " + Collections.singleton(groupId) + ")");
+ " (expected one of " + keys + ")");
}
}
+ private static Set<CoordinatorKey> coordinatorKeys(Collection<String>
groupIds) {
+ return groupIds.stream()
+ .map(CoordinatorKey::byGroupId)
+ .collect(Collectors.toSet());
+ }
+
+ public OffsetFetchRequest.Builder buildBatchedRequest(Set<CoordinatorKey>
groupIds) {
+ // Create a map that only contains the consumer groups owned by the
coordinator.
+ Map<String, List<TopicPartition>> coordinatorGroupIdToTopicPartitions
= new HashMap<>(groupIds.size());
+ groupIds.forEach(g -> {
+ ListConsumerGroupOffsetsSpec spec = groupSpecs.get(g.idValue);
+ List<TopicPartition> partitions = spec.topicPartitions() != null ?
new ArrayList<>(spec.topicPartitions()) : null;
+ coordinatorGroupIdToTopicPartitions.put(g.idValue, partitions);
+ });
+
+ return new
OffsetFetchRequest.Builder(coordinatorGroupIdToTopicPartitions, requireStable,
false);
+ }
+
@Override
- public OffsetFetchRequest.Builder buildBatchedRequest(int coordinatorId,
Set<CoordinatorKey> groupIds) {
+ public Collection<RequestAndKeys<CoordinatorKey>> buildRequest(int
brokerId, Set<CoordinatorKey> groupIds) {
validateKeys(groupIds);
- return new OffsetFetchRequest.Builder(groupId.idValue, requireStable,
partitions, false);
+
+ // When the OffsetFetchRequest fails with
NoBatchedOffsetFetchRequestException, we completely disable
+ // the batching end-to-end, including the FindCoordinatorRequest.
+ if (lookupStrategy.batch()) {
+ return Collections.singletonList(new
RequestAndKeys<>(buildBatchedRequest(groupIds), groupIds));
+ } else {
+ return groupIds.stream().map(groupId -> {
+ Set<CoordinatorKey> keys = Collections.singleton(groupId);
+ return new RequestAndKeys<>(buildBatchedRequest(keys), keys);
+ }).collect(Collectors.toList());
+ }
}
@Override
@@ -104,44 +122,46 @@ public class ListConsumerGroupOffsetsHandler extends
AdminApiHandler.Batched<Coo
final OffsetFetchResponse response = (OffsetFetchResponse)
abstractResponse;
- // the groupError will contain the group level error for v0-v8
OffsetFetchResponse
- Errors groupError = response.groupLevelError(groupId.idValue);
- if (groupError != Errors.NONE) {
- final Map<CoordinatorKey, Throwable> failed = new HashMap<>();
- final Set<CoordinatorKey> groupsToUnmap = new HashSet<>();
-
- handleGroupError(groupId, groupError, failed, groupsToUnmap);
-
- return new ApiResult<>(Collections.emptyMap(), failed, new
ArrayList<>(groupsToUnmap));
- } else {
- final Map<TopicPartition, OffsetAndMetadata> groupOffsetsListing =
new HashMap<>();
-
-
response.partitionDataMap(groupId.idValue).forEach((topicPartition,
partitionData) -> {
- final Errors error = partitionData.error;
- if (error == Errors.NONE) {
- final long offset = partitionData.offset;
- final String metadata = partitionData.metadata;
- final Optional<Integer> leaderEpoch =
partitionData.leaderEpoch;
- // Negative offset indicates that the group has no
committed offset for this partition
- if (offset < 0) {
- groupOffsetsListing.put(topicPartition, null);
+ Map<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> completed
= new HashMap<>();
+ Map<CoordinatorKey, Throwable> failed = new HashMap<>();
+ List<CoordinatorKey> unmapped = new ArrayList<>();
+ for (CoordinatorKey coordinatorKey : groupIds) {
+ String group = coordinatorKey.idValue;
+ if (response.groupHasError(group)) {
+ handleGroupError(CoordinatorKey.byGroupId(group),
response.groupLevelError(group), failed, unmapped);
+ } else {
+ final Map<TopicPartition, OffsetAndMetadata>
groupOffsetsListing = new HashMap<>();
+ Map<TopicPartition, OffsetFetchResponse.PartitionData>
responseData = response.partitionDataMap(group);
+ for (Map.Entry<TopicPartition,
OffsetFetchResponse.PartitionData> partitionEntry : responseData.entrySet()) {
+ final TopicPartition topicPartition =
partitionEntry.getKey();
+ OffsetFetchResponse.PartitionData partitionData =
partitionEntry.getValue();
+ final Errors error = partitionData.error;
+
+ if (error == Errors.NONE) {
+ final long offset = partitionData.offset;
+ final String metadata = partitionData.metadata;
+ final Optional<Integer> leaderEpoch =
partitionData.leaderEpoch;
+ // Negative offset indicates that the group has no
committed offset for this partition
+ if (offset < 0) {
+ groupOffsetsListing.put(topicPartition, null);
+ } else {
+ groupOffsetsListing.put(topicPartition, new
OffsetAndMetadata(offset, leaderEpoch, metadata));
+ }
} else {
- groupOffsetsListing.put(topicPartition, new
OffsetAndMetadata(offset, leaderEpoch, metadata));
+ log.warn("Skipping return offset for {} due to error
{}.", topicPartition, error);
}
- } else {
- log.warn("Skipping return offset for {} due to error {}.",
topicPartition, error);
}
- });
-
- return ApiResult.completed(groupId, groupOffsetsListing);
+ completed.put(CoordinatorKey.byGroupId(group),
groupOffsetsListing);
+ }
}
+ return new ApiResult<>(completed, failed, unmapped);
}
private void handleGroupError(
CoordinatorKey groupId,
Errors error,
Map<CoordinatorKey, Throwable> failed,
- Set<CoordinatorKey> groupsToUnmap
+ List<CoordinatorKey> groupsToUnmap
) {
switch (error) {
case GROUP_AUTHORIZATION_FAILED:
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
index 213182ec8c..4e25984668 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
@@ -173,8 +173,8 @@ public class OffsetFetchResponse extends AbstractResponse {
* @param responseData Fetched offset information grouped by
topic-partition and by group
*/
public OffsetFetchResponse(int throttleTimeMs,
- Map<String, Errors> errors, Map<String,
- Map<TopicPartition, PartitionData>>
responseData) {
+ Map<String, Errors> errors,
+ Map<String, Map<TopicPartition, PartitionData>>
responseData) {
super(ApiKeys.OFFSET_FETCH);
List<OffsetFetchResponseGroup> groupList = new ArrayList<>();
for (Entry<String, Map<TopicPartition, PartitionData>> entry :
responseData.entrySet()) {
@@ -250,7 +250,11 @@ public class OffsetFetchResponse extends AbstractResponse {
}
public boolean groupHasError(String groupId) {
- return groupLevelErrors.get(groupId) != Errors.NONE;
+ Errors error = groupLevelErrors.get(groupId);
+ if (error == null) {
+ return this.error != null && this.error != Errors.NONE;
+ }
+ return error != Errors.NONE;
}
public Errors error() {
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
index 6f98a166b1..d8b9f427d6 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
@@ -24,6 +24,7 @@ import java.util.stream.Collectors;
import org.apache.kafka.clients.HostResolver;
import
org.apache.kafka.clients.admin.CreateTopicsResult.TopicMetadataAndConfig;
import org.apache.kafka.clients.admin.internals.MetadataOperationContext;
+import org.apache.kafka.clients.admin.internals.CoordinatorKey;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.KafkaFuture;
@@ -104,14 +105,17 @@ public class AdminClientTestUtils {
.collect(Collectors.toMap(Map.Entry::getKey, e ->
KafkaFuture.completedFuture(e.getValue()))));
}
- public static ListConsumerGroupOffsetsResult
listConsumerGroupOffsetsResult(Map<TopicPartition, OffsetAndMetadata> offsets) {
- return new
ListConsumerGroupOffsetsResult(KafkaFuture.completedFuture(offsets));
+ public static ListConsumerGroupOffsetsResult
listConsumerGroupOffsetsResult(Map<String, Map<TopicPartition,
OffsetAndMetadata>> offsets) {
+ Map<CoordinatorKey, KafkaFuture<Map<TopicPartition,
OffsetAndMetadata>>> resultMap = offsets.entrySet().stream()
+ .collect(Collectors.toMap(e ->
CoordinatorKey.byGroupId(e.getKey()),
+ e ->
KafkaFutureImpl.completedFuture(e.getValue())));
+ return new ListConsumerGroupOffsetsResult(resultMap);
}
- public static ListConsumerGroupOffsetsResult
listConsumerGroupOffsetsResult(KafkaException exception) {
+ public static ListConsumerGroupOffsetsResult
listConsumerGroupOffsetsResult(String group, KafkaException exception) {
final KafkaFutureImpl<Map<TopicPartition, OffsetAndMetadata>> future =
new KafkaFutureImpl<>();
future.completeExceptionally(exception);
- return new ListConsumerGroupOffsetsResult(future);
+ return new
ListConsumerGroupOffsetsResult(Collections.singletonMap(CoordinatorKey.byGroupId(group),
future));
}
/**
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 61a2aaa00b..3d285a45f7 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -108,6 +108,7 @@ import
org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData;
import
org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData.CredentialInfo;
import
org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
import
org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
import org.apache.kafka.common.message.FindCoordinatorResponseData;
import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData;
import
org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse;
@@ -192,6 +193,7 @@ import
org.apache.kafka.common.requests.OffsetCommitResponse;
import org.apache.kafka.common.requests.OffsetDeleteResponse;
import org.apache.kafka.common.requests.OffsetFetchRequest;
import org.apache.kafka.common.requests.OffsetFetchResponse;
+import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData;
import org.apache.kafka.common.requests.RequestTestUtils;
import org.apache.kafka.common.requests.UnregisterBrokerResponse;
import org.apache.kafka.common.requests.UpdateFeaturesRequest;
@@ -224,6 +226,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
@@ -266,6 +269,7 @@ import static org.junit.jupiter.api.Assertions.fail;
public class KafkaAdminClientTest {
private static final Logger log =
LoggerFactory.getLogger(KafkaAdminClientTest.class);
private static final String GROUP_ID = "group-0";
+ private static final int THROTTLE = 10;
@Test
public void testDefaultApiTimeoutAndRequestTimeoutConflicts() {
@@ -501,6 +505,21 @@ public class KafkaAdminClientTest {
return FindCoordinatorResponse.prepareOldResponse(error, node);
}
+ private static FindCoordinatorResponse
prepareBatchedFindCoordinatorResponse(Errors error, Node node,
Collection<String> groups) {
+ FindCoordinatorResponseData data = new FindCoordinatorResponseData();
+ List<FindCoordinatorResponseData.Coordinator> coordinators =
groups.stream()
+ .map(group -> new FindCoordinatorResponseData.Coordinator()
+ .setErrorCode(error.code())
+ .setErrorMessage(error.message())
+ .setKey(group)
+ .setHost(node.host())
+ .setPort(node.port())
+ .setNodeId(node.id()))
+ .collect(Collectors.toList());
+ data.setCoordinators(coordinators);
+ return new FindCoordinatorResponse(data);
+ }
+
private static MetadataResponse prepareMetadataResponse(Cluster cluster,
Errors error) {
List<MetadataResponseTopic> metadata = new ArrayList<>();
for (String topic : cluster.topics()) {
@@ -3067,9 +3086,11 @@ public class KafkaAdminClientTest {
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE,
env.cluster().controller()));
final TopicPartition tp1 = new TopicPartition("A", 0);
- final ListConsumerGroupOffsetsOptions options = new
ListConsumerGroupOffsetsOptions();
-
options.topicPartitions(Collections.singletonList(tp1)).requireStable(true);
- final ListConsumerGroupOffsetsResult result =
env.adminClient().listConsumerGroupOffsets(GROUP_ID, options);
+ final ListConsumerGroupOffsetsOptions options = new
ListConsumerGroupOffsetsOptions()
+ .requireStable(true);
+ final ListConsumerGroupOffsetsSpec groupSpec = new
ListConsumerGroupOffsetsSpec()
+ .topicPartitions(Collections.singletonList(tp1));
+
env.adminClient().listConsumerGroupOffsets(Collections.singletonMap(GROUP_ID,
groupSpec), options);
final MockClient mockClient = env.kafkaClient();
TestUtils.waitForCondition(() -> {
@@ -3077,11 +3098,11 @@ public class KafkaAdminClientTest {
if (clientRequest != null) {
OffsetFetchRequestData data =
((OffsetFetchRequest.Builder) clientRequest.requestBuilder()).data;
return data.requireStable() &&
- data.topics().get(0).name().equals("A") &&
-
data.topics().get(0).partitionIndexes().equals(Collections.singletonList(0));
+
data.groups().get(0).topics().get(0).name().equals("A") &&
+
data.groups().get(0).topics().get(0).partitionIndexes().equals(Collections.singletonList(0));
}
return false;
- }, "Failed awaiting ListConsumerGroupOffsets request");
+ }, "Failed awaiting ListConsumerGroupOfsets request");
}
}
@@ -3095,12 +3116,11 @@ public class KafkaAdminClientTest {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE,
env.cluster().controller()));
- env.kafkaClient().prepareResponse(new
OffsetFetchResponse(Errors.NOT_COORDINATOR, Collections.emptyMap()));
+
env.kafkaClient().prepareResponse(offsetFetchResponse(Errors.NOT_COORDINATOR,
Collections.emptyMap()));
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE,
env.cluster().controller()));
final ListConsumerGroupOffsetsResult result =
env.adminClient().listConsumerGroupOffsets(GROUP_ID);
-
TestUtils.assertFutureError(result.partitionsToOffsetAndMetadata(),
TimeoutException.class);
}
}
@@ -3124,16 +3144,16 @@ public class KafkaAdminClientTest {
mockClient.prepareResponse(body -> {
firstAttemptTime.set(time.milliseconds());
return true;
- }, new OffsetFetchResponse(Errors.NOT_COORDINATOR,
Collections.emptyMap()));
+ }, offsetFetchResponse(Errors.NOT_COORDINATOR,
Collections.emptyMap()));
mockClient.prepareResponse(prepareFindCoordinatorResponse(Errors.NONE,
env.cluster().controller()));
mockClient.prepareResponse(body -> {
secondAttemptTime.set(time.milliseconds());
return true;
- }, new OffsetFetchResponse(Errors.NONE, Collections.emptyMap()));
+ }, offsetFetchResponse(Errors.NONE, Collections.emptyMap()));
- final KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> future =
env.adminClient().listConsumerGroupOffsets("group-0").partitionsToOffsetAndMetadata();
+ final KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> future =
env.adminClient().listConsumerGroupOffsets(GROUP_ID).partitionsToOffsetAndMetadata();
TestUtils.waitForCondition(() -> mockClient.numAwaitingResponses()
== 1, "Failed awaiting ListConsumerGroupOffsets first request failure");
TestUtils.waitForCondition(() -> ((KafkaAdminClient)
env.adminClient()).numPendingCalls() == 1, "Failed to add retry
ListConsumerGroupOffsets call on first failure");
@@ -3157,7 +3177,8 @@ public class KafkaAdminClientTest {
prepareFindCoordinatorResponse(Errors.NONE,
env.cluster().controller()));
env.kafkaClient().prepareResponse(
- new OffsetFetchResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS,
Collections.emptyMap()));
+ offsetFetchResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS,
Collections.emptyMap()));
+
/*
* We need to return two responses here, one for NOT_COORDINATOR
call when calling list consumer offsets
* api using coordinator that has moved. This will retry whole
operation. So we need to again respond with a
@@ -3166,19 +3187,19 @@ public class KafkaAdminClientTest {
* And the same reason for the following COORDINATOR_NOT_AVAILABLE
error response
*/
env.kafkaClient().prepareResponse(
- new OffsetFetchResponse(Errors.NOT_COORDINATOR,
Collections.emptyMap()));
+ offsetFetchResponse(Errors.NOT_COORDINATOR,
Collections.emptyMap()));
env.kafkaClient().prepareResponse(
prepareFindCoordinatorResponse(Errors.NONE,
env.cluster().controller()));
env.kafkaClient().prepareResponse(
- new OffsetFetchResponse(Errors.COORDINATOR_NOT_AVAILABLE,
Collections.emptyMap()));
+ offsetFetchResponse(Errors.COORDINATOR_NOT_AVAILABLE,
Collections.emptyMap()));
env.kafkaClient().prepareResponse(
prepareFindCoordinatorResponse(Errors.NONE,
env.cluster().controller()));
env.kafkaClient().prepareResponse(
- new OffsetFetchResponse(Errors.NONE, Collections.emptyMap()));
+ offsetFetchResponse(Errors.NONE, Collections.emptyMap()));
final ListConsumerGroupOffsetsResult errorResult1 =
env.adminClient().listConsumerGroupOffsets(GROUP_ID);
@@ -3199,8 +3220,7 @@ public class KafkaAdminClientTest {
env.kafkaClient().prepareResponse(
prepareFindCoordinatorResponse(Errors.NONE,
env.cluster().controller()));
- env.kafkaClient().prepareResponse(
- new OffsetFetchResponse(error, Collections.emptyMap()));
+ env.kafkaClient().prepareResponse(offsetFetchResponse(error,
Collections.emptyMap()));
ListConsumerGroupOffsetsResult errorResult =
env.adminClient().listConsumerGroupOffsets(GROUP_ID);
@@ -3220,7 +3240,7 @@ public class KafkaAdminClientTest {
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE,
env.cluster().controller()));
// Retriable errors should be retried
- env.kafkaClient().prepareResponse(new
OffsetFetchResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS,
Collections.emptyMap()));
+
env.kafkaClient().prepareResponse(offsetFetchResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS,
Collections.emptyMap()));
/*
* We need to return two responses here, one for NOT_COORDINATOR
error when calling list consumer group offsets
@@ -3229,10 +3249,10 @@ public class KafkaAdminClientTest {
*
* And the same reason for the following COORDINATOR_NOT_AVAILABLE
error response
*/
- env.kafkaClient().prepareResponse(new
OffsetFetchResponse(Errors.NOT_COORDINATOR, Collections.emptyMap()));
+
env.kafkaClient().prepareResponse(offsetFetchResponse(Errors.NOT_COORDINATOR,
Collections.emptyMap()));
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE,
env.cluster().controller()));
- env.kafkaClient().prepareResponse(new
OffsetFetchResponse(Errors.COORDINATOR_NOT_AVAILABLE, Collections.emptyMap()));
+
env.kafkaClient().prepareResponse(offsetFetchResponse(Errors.COORDINATOR_NOT_AVAILABLE,
Collections.emptyMap()));
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE,
env.cluster().controller()));
TopicPartition myTopicPartition0 = new TopicPartition("my_topic",
0);
@@ -3249,7 +3269,7 @@ public class KafkaAdminClientTest {
Optional.empty(), "", Errors.NONE));
responseData.put(myTopicPartition3, new
OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET,
Optional.empty(), "", Errors.NONE));
- env.kafkaClient().prepareResponse(new
OffsetFetchResponse(Errors.NONE, responseData));
+ env.kafkaClient().prepareResponse(offsetFetchResponse(Errors.NONE,
responseData));
final ListConsumerGroupOffsetsResult result =
env.adminClient().listConsumerGroupOffsets(GROUP_ID);
final Map<TopicPartition, OffsetAndMetadata>
partitionToOffsetAndMetadata = result.partitionsToOffsetAndMetadata().get();
@@ -3263,6 +3283,144 @@ public class KafkaAdminClientTest {
}
}
+ @Test
+ public void testBatchedListConsumerGroupOffsets() throws Exception {
+ Cluster cluster = mockCluster(1, 0);
+ Time time = new MockTime();
+ Map<String, ListConsumerGroupOffsetsSpec> groupSpecs =
batchedListConsumerGroupOffsetsSpec();
+
+ try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time,
cluster, AdminClientConfig.RETRIES_CONFIG, "0")) {
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
env.kafkaClient().prepareResponse(prepareBatchedFindCoordinatorResponse(Errors.NONE,
env.cluster().controller(), groupSpecs.keySet()));
+
+ ListConsumerGroupOffsetsResult result =
env.adminClient().listConsumerGroupOffsets(groupSpecs, new
ListConsumerGroupOffsetsOptions());
+ sendOffsetFetchResponse(env.kafkaClient(), groupSpecs, true,
Errors.NONE);
+
+ verifyListOffsetsForMultipleGroups(groupSpecs, result);
+ }
+ }
+
+ @Test
+ public void
testBatchedListConsumerGroupOffsetsWithNoFindCoordinatorBatching() throws
Exception {
+ Cluster cluster = mockCluster(1, 0);
+ Time time = new MockTime();
+ Map<String, ListConsumerGroupOffsetsSpec> groupSpecs =
batchedListConsumerGroupOffsetsSpec();
+
+ ApiVersion findCoordinatorV3 = new ApiVersion()
+ .setApiKey(ApiKeys.FIND_COORDINATOR.id)
+ .setMinVersion((short) 0)
+ .setMaxVersion((short) 3);
+ ApiVersion offsetFetchV7 = new ApiVersion()
+ .setApiKey(ApiKeys.OFFSET_FETCH.id)
+ .setMinVersion((short) 0)
+ .setMaxVersion((short) 7);
+
+ try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time,
cluster, AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, "0")) {
+
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(Arrays.asList(findCoordinatorV3,
offsetFetchV7)));
+
env.kafkaClient().prepareResponse(prepareOldFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE,
Node.noNode()));
+
env.kafkaClient().prepareResponse(prepareOldFindCoordinatorResponse(Errors.NONE,
env.cluster().controller()));
+
env.kafkaClient().prepareResponse(prepareOldFindCoordinatorResponse(Errors.NONE,
env.cluster().controller()));
+
+ ListConsumerGroupOffsetsResult result =
env.adminClient().listConsumerGroupOffsets(groupSpecs);
+
+ // Fail the first request in order to ensure that the group is not
batched when retried.
+ sendOffsetFetchResponse(env.kafkaClient(), groupSpecs, false,
Errors.COORDINATOR_LOAD_IN_PROGRESS);
+
+ sendOffsetFetchResponse(env.kafkaClient(), groupSpecs, false,
Errors.NONE);
+ sendOffsetFetchResponse(env.kafkaClient(), groupSpecs, false,
Errors.NONE);
+
+ verifyListOffsetsForMultipleGroups(groupSpecs, result);
+ }
+ }
+
+ @Test
+ public void testBatchedListConsumerGroupOffsetsWithNoOffsetFetchBatching()
throws Exception {
+ Cluster cluster = mockCluster(1, 0);
+ Time time = new MockTime();
+ Map<String, ListConsumerGroupOffsetsSpec> groupSpecs =
batchedListConsumerGroupOffsetsSpec();
+
+ ApiVersion offsetFetchV7 = new ApiVersion()
+ .setApiKey(ApiKeys.OFFSET_FETCH.id)
+ .setMinVersion((short) 0)
+ .setMaxVersion((short) 7);
+
+ try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time,
cluster, AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, "0")) {
+
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(Collections.singleton(offsetFetchV7)));
+
env.kafkaClient().prepareResponse(prepareBatchedFindCoordinatorResponse(Errors.NONE,
env.cluster().controller(), groupSpecs.keySet()));
+ // Prepare a response to force client to attempt batched request
creation that throws
+ // NoBatchedOffsetFetchRequestException. This triggers creation of
non-batched requests.
+
env.kafkaClient().prepareResponse(offsetFetchResponse(Errors.COORDINATOR_NOT_AVAILABLE,
Collections.emptyMap()));
+
+ ListConsumerGroupOffsetsResult result =
env.adminClient().listConsumerGroupOffsets(groupSpecs);
+
+ // The request handler attempts both FindCoordinator and
OffsetFetch requests. This seems
+ // ok since since we expect this scenario only during upgrades
from versions < 3.0.0 where
+ // some upgraded brokers could handle batched FindCoordinator
while non-upgraded coordinators
+ // rejected batched OffsetFetch requests.
+ sendFindCoordinatorResponse(env.kafkaClient(),
env.cluster().controller());
+ sendFindCoordinatorResponse(env.kafkaClient(),
env.cluster().controller());
+ sendOffsetFetchResponse(env.kafkaClient(), groupSpecs, false,
Errors.NONE);
+ sendOffsetFetchResponse(env.kafkaClient(), groupSpecs, false,
Errors.NONE);
+
+ verifyListOffsetsForMultipleGroups(groupSpecs, result);
+ }
+ }
+
+ private Map<String, ListConsumerGroupOffsetsSpec>
batchedListConsumerGroupOffsetsSpec() {
+ Set<TopicPartition> groupAPartitions = Collections.singleton(new
TopicPartition("A", 1));
+ Set<TopicPartition> groupBPartitions = Collections.singleton(new
TopicPartition("B", 2));
+
+ ListConsumerGroupOffsetsSpec groupASpec = new
ListConsumerGroupOffsetsSpec().topicPartitions(groupAPartitions);
+ ListConsumerGroupOffsetsSpec groupBSpec = new
ListConsumerGroupOffsetsSpec().topicPartitions(groupBPartitions);
+ return Utils.mkMap(Utils.mkEntry("groupA", groupASpec),
Utils.mkEntry("groupB", groupBSpec));
+ }
+
+ private void waitForRequest(MockClient mockClient, ApiKeys apiKeys) throws
Exception {
+ TestUtils.waitForCondition(() -> {
+ ClientRequest clientRequest = mockClient.requests().peek();
+ return clientRequest != null && clientRequest.apiKey() == apiKeys;
+ }, "Failed awaiting " + apiKeys + " request");
+ }
+
+ private void sendFindCoordinatorResponse(MockClient mockClient, Node
coordinator) throws Exception {
+ waitForRequest(mockClient, ApiKeys.FIND_COORDINATOR);
+
+ ClientRequest clientRequest = mockClient.requests().peek();
+ FindCoordinatorRequestData data = ((FindCoordinatorRequest.Builder)
clientRequest.requestBuilder()).data();
+ mockClient.respond(prepareFindCoordinatorResponse(Errors.NONE,
data.key(), coordinator));
+ }
+
+ private void sendOffsetFetchResponse(MockClient mockClient, Map<String,
ListConsumerGroupOffsetsSpec> groupSpecs, boolean batched, Errors error) throws
Exception {
+ waitForRequest(mockClient, ApiKeys.OFFSET_FETCH);
+
+ ClientRequest clientRequest = mockClient.requests().peek();
+ OffsetFetchRequestData data = ((OffsetFetchRequest.Builder)
clientRequest.requestBuilder()).data;
+ Map<String, Map<TopicPartition, PartitionData>> results = new
HashMap<>();
+ Map<String, Errors> errors = new HashMap<>();
+ data.groups().forEach(group -> {
+ Map<TopicPartition, PartitionData> partitionResults = new
HashMap<>();
+ for (TopicPartition tp :
groupSpecs.get(group.groupId()).topicPartitions()) {
+ partitionResults.put(tp, new PartitionData(10,
Optional.empty(), "", Errors.NONE));
+ }
+ results.put(group.groupId(), partitionResults);
+ errors.put(group.groupId(), error);
+ });
+ if (!batched) {
+ assertEquals(1, data.groups().size());
+ mockClient.respond(new OffsetFetchResponse(THROTTLE, error,
results.values().iterator().next()));
+ } else
+ mockClient.respond(new OffsetFetchResponse(THROTTLE, errors,
results));
+ }
+
+ private void verifyListOffsetsForMultipleGroups(Map<String,
ListConsumerGroupOffsetsSpec> groupSpecs,
+
ListConsumerGroupOffsetsResult result) throws Exception {
+ assertEquals(groupSpecs.size(), result.all().get(10,
TimeUnit.SECONDS).size());
+ for (Map.Entry<String, ListConsumerGroupOffsetsSpec> entry :
groupSpecs.entrySet()) {
+ assertEquals(entry.getValue().topicPartitions(),
+
result.partitionsToOffsetAndMetadata(entry.getKey()).get().keySet());
+ }
+ }
+
@Test
public void testDeleteConsumerGroupsNumRetries() throws Exception {
final Cluster cluster = mockCluster(3, 0);
@@ -6544,6 +6702,12 @@ public class KafkaAdminClientTest {
.setLogDir(logDir))));
}
+ private OffsetFetchResponse offsetFetchResponse(Errors error,
Map<TopicPartition, PartitionData> responseData) {
+ return new OffsetFetchResponse(THROTTLE,
+ Collections.singletonMap(GROUP_ID,
error),
+ Collections.singletonMap(GROUP_ID,
responseData));
+ }
+
private static MemberDescription
convertToMemberDescriptions(DescribedGroupMember member,
MemberAssignment assignment) {
return new MemberDescription(member.memberId(),
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
index ef858c5003..8c31c7cf69 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
@@ -17,6 +17,7 @@
package org.apache.kafka.clients.admin;
import
org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult.ReplicaLogDirInfo;
+import org.apache.kafka.clients.admin.internals.CoordinatorKey;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.ElectionType;
import org.apache.kafka.common.KafkaException;
@@ -583,24 +584,29 @@ public class MockAdminClient extends AdminClient {
}
@Override
- synchronized public ListConsumerGroupOffsetsResult
listConsumerGroupOffsets(String groupId, ListConsumerGroupOffsetsOptions
options) {
- // ignoring the groupId and assume one test would only work on one
group only
+ synchronized public ListConsumerGroupOffsetsResult
listConsumerGroupOffsets(Map<String, ListConsumerGroupOffsetsSpec> groupSpecs,
ListConsumerGroupOffsetsOptions options) {
+ // ignoring the groups and assume one test would only work on one
group only
+ if (groupSpecs.size() != 1)
+ throw new UnsupportedOperationException("Not implemented yet");
+
+ String group = groupSpecs.keySet().iterator().next();
+ Collection<TopicPartition> topicPartitions =
groupSpecs.get(group).topicPartitions();
final KafkaFutureImpl<Map<TopicPartition, OffsetAndMetadata>> future =
new KafkaFutureImpl<>();
if (listConsumerGroupOffsetsException != null) {
future.completeExceptionally(listConsumerGroupOffsetsException);
} else {
- if (options.topicPartitions().isEmpty()) {
+ if (topicPartitions.isEmpty()) {
future.complete(committedOffsets.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, entry ->
new OffsetAndMetadata(entry.getValue()))));
} else {
future.complete(committedOffsets.entrySet().stream()
- .filter(entry ->
options.topicPartitions().contains(entry.getKey()))
+ .filter(entry ->
topicPartitions.contains(entry.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, entry ->
new OffsetAndMetadata(entry.getValue()))));
}
}
- return new ListConsumerGroupOffsetsResult(future);
+ return new
ListConsumerGroupOffsetsResult(Collections.singletonMap(CoordinatorKey.byGroupId(group),
future));
}
@Override
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandlerTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandlerTest.java
index 27597ce035..95fabb3fc2 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandlerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandlerTest.java
@@ -24,52 +24,140 @@ import static
org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
+import org.apache.kafka.clients.admin.internals.AdminApiHandler.RequestAndKeys;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.InvalidGroupIdException;
+import
org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestGroup;
+import
org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestTopics;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.OffsetFetchRequest;
import org.apache.kafka.common.requests.OffsetFetchResponse;
import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData;
import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
import org.junit.jupiter.api.Test;
public class ListConsumerGroupOffsetsHandlerTest {
private final LogContext logContext = new LogContext();
- private final String groupId = "group-id";
+ private final int throttleMs = 10;
+ private final String groupZero = "group0";
+ private final String groupOne = "group1";
+ private final String groupTwo = "group2";
+ private final List<String> groups = Arrays.asList(groupZero, groupOne,
groupTwo);
private final TopicPartition t0p0 = new TopicPartition("t0", 0);
private final TopicPartition t0p1 = new TopicPartition("t0", 1);
private final TopicPartition t1p0 = new TopicPartition("t1", 0);
private final TopicPartition t1p1 = new TopicPartition("t1", 1);
- private final List<TopicPartition> tps = Arrays.asList(t0p0, t0p1, t1p0,
t1p1);
+ private final TopicPartition t2p0 = new TopicPartition("t2", 0);
+ private final TopicPartition t2p1 = new TopicPartition("t2", 1);
+ private final TopicPartition t2p2 = new TopicPartition("t2", 2);
+ private final Map<String, ListConsumerGroupOffsetsSpec> singleRequestMap =
Collections.singletonMap(groupZero,
+ new
ListConsumerGroupOffsetsSpec().topicPartitions(Arrays.asList(t0p0, t0p1, t1p0,
t1p1)));
+ private final Map<String, ListConsumerGroupOffsetsSpec> batchedRequestMap =
+ new HashMap<String, ListConsumerGroupOffsetsSpec>() {{
+ put(groupZero, new
ListConsumerGroupOffsetsSpec().topicPartitions(singletonList(t0p0)));
+ put(groupOne, new
ListConsumerGroupOffsetsSpec().topicPartitions(Arrays.asList(t0p0, t1p0,
t1p1)));
+ put(groupTwo, new
ListConsumerGroupOffsetsSpec().topicPartitions(Arrays.asList(t0p0, t1p0, t1p1,
t2p0, t2p1, t2p2)));
+ }};
@Test
public void testBuildRequest() {
- ListConsumerGroupOffsetsHandler handler = new
ListConsumerGroupOffsetsHandler(groupId, tps, logContext);
- OffsetFetchRequest request = handler.buildBatchedRequest(1,
singleton(CoordinatorKey.byGroupId(groupId))).build();
- assertEquals(groupId, request.data().groups().get(0).groupId());
+ ListConsumerGroupOffsetsHandler handler =
+ new ListConsumerGroupOffsetsHandler(singleRequestMap, false,
logContext);
+ OffsetFetchRequest request =
handler.buildBatchedRequest(coordinatorKeys(groupZero)).build();
+ assertEquals(groupZero, request.data().groups().get(0).groupId());
assertEquals(2, request.data().groups().get(0).topics().size());
assertEquals(2,
request.data().groups().get(0).topics().get(0).partitionIndexes().size());
assertEquals(2,
request.data().groups().get(0).topics().get(1).partitionIndexes().size());
}
+ @Test
+ public void testBuildRequestWithMultipleGroups() {
+ Map<String, ListConsumerGroupOffsetsSpec> requestMap = new
HashMap<>(this.batchedRequestMap);
+ String groupThree = "group3";
+ requestMap.put(groupThree, new ListConsumerGroupOffsetsSpec()
+ .topicPartitions(Arrays.asList(new TopicPartition("t3", 0),
new TopicPartition("t3", 1))));
+
+ ListConsumerGroupOffsetsHandler handler = new
ListConsumerGroupOffsetsHandler(requestMap, false, logContext);
+ OffsetFetchRequest request1 =
handler.buildBatchedRequest(coordinatorKeys(groupZero, groupOne,
groupTwo)).build();
+ assertEquals(Utils.mkSet(groupZero, groupOne, groupTwo),
requestGroups(request1));
+
+ OffsetFetchRequest request2 =
handler.buildBatchedRequest(coordinatorKeys(groupThree)).build();
+ assertEquals(Utils.mkSet(groupThree), requestGroups(request2));
+
+ Map<String, ListConsumerGroupOffsetsSpec> builtRequests = new
HashMap<>();
+ request1.groupIdsToPartitions().forEach((group, partitions) ->
+ builtRequests.put(group, new
ListConsumerGroupOffsetsSpec().topicPartitions(partitions)));
+ request2.groupIdsToPartitions().forEach((group, partitions) ->
+ builtRequests.put(group, new
ListConsumerGroupOffsetsSpec().topicPartitions(partitions)));
+
+ assertEquals(requestMap, builtRequests);
+ Map<String, List<OffsetFetchRequestTopics>> groupIdsToTopics =
request1.groupIdsToTopics();
+
+ assertEquals(3, groupIdsToTopics.size());
+ assertEquals(1, groupIdsToTopics.get(groupZero).size());
+ assertEquals(2, groupIdsToTopics.get(groupOne).size());
+ assertEquals(3, groupIdsToTopics.get(groupTwo).size());
+
+ assertEquals(1,
groupIdsToTopics.get(groupZero).get(0).partitionIndexes().size());
+ assertEquals(1,
groupIdsToTopics.get(groupOne).get(0).partitionIndexes().size());
+ assertEquals(2,
groupIdsToTopics.get(groupOne).get(1).partitionIndexes().size());
+ assertEquals(1,
groupIdsToTopics.get(groupTwo).get(0).partitionIndexes().size());
+ assertEquals(2,
groupIdsToTopics.get(groupTwo).get(1).partitionIndexes().size());
+ assertEquals(3,
groupIdsToTopics.get(groupTwo).get(2).partitionIndexes().size());
+
+ groupIdsToTopics = request2.groupIdsToTopics();
+ assertEquals(1, groupIdsToTopics.size());
+ assertEquals(1, groupIdsToTopics.get(groupThree).size());
+ assertEquals(2,
groupIdsToTopics.get(groupThree).get(0).partitionIndexes().size());
+ }
+
+ @Test
+ public void testBuildRequestBatchGroups() {
+ ListConsumerGroupOffsetsHandler handler = new
ListConsumerGroupOffsetsHandler(batchedRequestMap, false, logContext);
+ Collection<RequestAndKeys<CoordinatorKey>> requests =
handler.buildRequest(1, coordinatorKeys(groupZero, groupOne, groupTwo));
+ assertEquals(1, requests.size());
+ assertEquals(Utils.mkSet(groupZero, groupOne, groupTwo),
requestGroups((OffsetFetchRequest) requests.iterator().next().request.build()));
+ }
+
+ @Test
+ public void testBuildRequestDoesNotBatchGroup() {
+ ListConsumerGroupOffsetsHandler handler = new
ListConsumerGroupOffsetsHandler(batchedRequestMap, false, logContext);
+ // Disable batching.
+ ((CoordinatorStrategy) handler.lookupStrategy()).disableBatch();
+ Collection<RequestAndKeys<CoordinatorKey>> requests =
handler.buildRequest(1, coordinatorKeys(groupZero, groupOne, groupTwo));
+ assertEquals(3, requests.size());
+ assertEquals(
+ Utils.mkSet(Utils.mkSet(groupZero), Utils.mkSet(groupOne),
Utils.mkSet(groupTwo)),
+ requests.stream().map(requestAndKey ->
requestGroups((OffsetFetchRequest)
requestAndKey.request.build())).collect(Collectors.toSet())
+ );
+ }
+
@Test
public void testSuccessfulHandleResponse() {
Map<TopicPartition, OffsetAndMetadata> expected = new HashMap<>();
assertCompleted(handleWithError(Errors.NONE), expected);
}
-
@Test
public void testSuccessfulHandleResponseWithOnePartitionError() {
Map<TopicPartition, OffsetAndMetadata> expectedResult =
Collections.singletonMap(t0p0, new OffsetAndMetadata(10L));
@@ -80,17 +168,62 @@ public class ListConsumerGroupOffsetsHandlerTest {
assertCompleted(handleWithPartitionError(Errors.UNSTABLE_OFFSET_COMMIT),
expectedResult);
}
+ @Test
+ public void
testSuccessfulHandleResponseWithOnePartitionErrorWithMultipleGroups() {
+ Map<TopicPartition, OffsetAndMetadata> offsetAndMetadataMapZero =
+ Collections.singletonMap(t0p0, new OffsetAndMetadata(10L));
+ Map<TopicPartition, OffsetAndMetadata> offsetAndMetadataMapOne =
+ Collections.singletonMap(t1p1, new OffsetAndMetadata(10L));
+ Map<TopicPartition, OffsetAndMetadata> offsetAndMetadataMapTwo =
+ Collections.singletonMap(t2p2, new OffsetAndMetadata(10L));
+ Map<String, Map<TopicPartition, OffsetAndMetadata>> expectedResult =
+ new HashMap<String, Map<TopicPartition, OffsetAndMetadata>>() {{
+ put(groupZero, offsetAndMetadataMapZero);
+ put(groupOne, offsetAndMetadataMapOne);
+ put(groupTwo, offsetAndMetadataMapTwo);
+ }};
+
+ assertCompletedForMultipleGroups(
+
handleWithPartitionErrorMultipleGroups(Errors.UNKNOWN_TOPIC_OR_PARTITION),
expectedResult);
+ assertCompletedForMultipleGroups(
+
handleWithPartitionErrorMultipleGroups(Errors.TOPIC_AUTHORIZATION_FAILED),
expectedResult);
+ assertCompletedForMultipleGroups(
+
handleWithPartitionErrorMultipleGroups(Errors.UNSTABLE_OFFSET_COMMIT),
expectedResult);
+ }
+
+ @Test
+ public void testSuccessfulHandleResponseWithMultipleGroups() {
+ Map<String, Map<TopicPartition, OffsetAndMetadata>> expected = new
HashMap<>();
+ Map<String, Errors> errorMap = errorMap(groups, Errors.NONE);
+
assertCompletedForMultipleGroups(handleWithErrorWithMultipleGroups(errorMap,
batchedRequestMap), expected);
+ }
+
@Test
public void testUnmappedHandleResponse() {
assertUnmapped(handleWithError(Errors.COORDINATOR_NOT_AVAILABLE));
assertUnmapped(handleWithError(Errors.NOT_COORDINATOR));
}
+ @Test
+ public void testUnmappedHandleResponseWithMultipleGroups() {
+ Map<String, Errors> errorMap = new HashMap<>();
+ errorMap.put(groupZero, Errors.NOT_COORDINATOR);
+ errorMap.put(groupOne, Errors.COORDINATOR_NOT_AVAILABLE);
+ errorMap.put(groupTwo, Errors.NOT_COORDINATOR);
+
assertUnmappedWithMultipleGroups(handleWithErrorWithMultipleGroups(errorMap,
batchedRequestMap));
+ }
+
@Test
public void testRetriableHandleResponse() {
assertRetriable(handleWithError(Errors.COORDINATOR_LOAD_IN_PROGRESS));
}
+ @Test
+ public void testRetriableHandleResponseWithMultipleGroups() {
+ Map<String, Errors> errorMap = errorMap(groups,
Errors.COORDINATOR_LOAD_IN_PROGRESS);
+ assertRetriable(handleWithErrorWithMultipleGroups(errorMap,
batchedRequestMap));
+ }
+
@Test
public void testFailedHandleResponse() {
assertFailed(GroupAuthorizationException.class,
handleWithError(Errors.GROUP_AUTHORIZATION_FAILED));
@@ -98,10 +231,50 @@ public class ListConsumerGroupOffsetsHandlerTest {
assertFailed(InvalidGroupIdException.class,
handleWithError(Errors.INVALID_GROUP_ID));
}
+ @Test
+ public void testFailedHandleResponseWithMultipleGroups() {
+ Map<String, Errors> errorMap = new HashMap<>();
+ errorMap.put(groupZero, Errors.GROUP_AUTHORIZATION_FAILED);
+ errorMap.put(groupOne, Errors.GROUP_ID_NOT_FOUND);
+ errorMap.put(groupTwo, Errors.INVALID_GROUP_ID);
+ Map<String, Class<? extends Throwable>> groupToExceptionMap = new
HashMap<>();
+ groupToExceptionMap.put(groupZero, GroupAuthorizationException.class);
+ groupToExceptionMap.put(groupOne, GroupIdNotFoundException.class);
+ groupToExceptionMap.put(groupTwo, InvalidGroupIdException.class);
+ assertFailedForMultipleGroups(groupToExceptionMap,
+ handleWithErrorWithMultipleGroups(errorMap, batchedRequestMap));
+ }
+
private OffsetFetchResponse buildResponse(Errors error) {
- Map<TopicPartition, PartitionData> responseData = new HashMap<>();
- OffsetFetchResponse response = new OffsetFetchResponse(error,
responseData);
- return response;
+ return new OffsetFetchResponse(
+ throttleMs,
+ Collections.singletonMap(groupZero, error),
+ Collections.singletonMap(groupZero, new HashMap<>()));
+ }
+
+ private OffsetFetchResponse buildResponseWithMultipleGroups(
+ Map<String, Errors> errorMap,
+ Map<String, Map<TopicPartition, PartitionData>> responseData
+ ) {
+ return new OffsetFetchResponse(throttleMs, errorMap, responseData);
+ }
+
+ private AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition,
OffsetAndMetadata>> handleWithErrorWithMultipleGroups(
+ Map<String, Errors> errorMap,
+ Map<String, ListConsumerGroupOffsetsSpec> groupSpecs
+ ) {
+ ListConsumerGroupOffsetsHandler handler = new
ListConsumerGroupOffsetsHandler(groupSpecs, false, logContext);
+ Map<String, Map<TopicPartition, PartitionData>> responseData = new
HashMap<>();
+ for (String group : errorMap.keySet()) {
+ responseData.put(group, new HashMap<>());
+ }
+ OffsetFetchResponse response =
buildResponseWithMultipleGroups(errorMap, responseData);
+ return handler.handleResponse(new Node(1, "host", 1234),
+ errorMap.keySet()
+ .stream()
+ .map(CoordinatorKey::byGroupId)
+ .collect(Collectors.toSet()),
+ response);
}
private OffsetFetchResponse buildResponseWithPartitionError(Errors error) {
@@ -110,24 +283,68 @@ public class ListConsumerGroupOffsetsHandlerTest {
responseData.put(t0p0, new OffsetFetchResponse.PartitionData(10,
Optional.empty(), "", Errors.NONE));
responseData.put(t0p1, new OffsetFetchResponse.PartitionData(10,
Optional.empty(), "", error));
- OffsetFetchResponse response = new OffsetFetchResponse(Errors.NONE,
responseData);
- return response;
+ return new OffsetFetchResponse(Errors.NONE, responseData);
+ }
+
+ private OffsetFetchResponse
buildResponseWithPartitionErrorWithMultipleGroups(Errors error) {
+ Map<TopicPartition, PartitionData> responseDataZero = new HashMap<>();
+ responseDataZero.put(t0p0, new OffsetFetchResponse.PartitionData(10,
Optional.empty(), "", Errors.NONE));
+
+ Map<TopicPartition, PartitionData> responseDataOne = new HashMap<>();
+ responseDataOne.put(t0p0, new OffsetFetchResponse.PartitionData(10,
Optional.empty(), "", error));
+ responseDataOne.put(t1p0, new OffsetFetchResponse.PartitionData(10,
Optional.empty(), "", error));
+ responseDataOne.put(t1p1, new OffsetFetchResponse.PartitionData(10,
Optional.empty(), "", Errors.NONE));
+
+ Map<TopicPartition, PartitionData> responseDataTwo = new HashMap<>();
+ responseDataTwo.put(t0p0, new OffsetFetchResponse.PartitionData(10,
Optional.empty(), "", error));
+ responseDataTwo.put(t1p0, new OffsetFetchResponse.PartitionData(10,
Optional.empty(), "", error));
+ responseDataTwo.put(t1p1, new OffsetFetchResponse.PartitionData(10,
Optional.empty(), "", error));
+ responseDataTwo.put(t2p0, new OffsetFetchResponse.PartitionData(10,
Optional.empty(), "", error));
+ responseDataTwo.put(t2p1, new OffsetFetchResponse.PartitionData(10,
Optional.empty(), "", error));
+ responseDataTwo.put(t2p2, new OffsetFetchResponse.PartitionData(10,
Optional.empty(), "", Errors.NONE));
+
+ Map<String, Map<TopicPartition, PartitionData>> responseData =
+ new HashMap<String, Map<TopicPartition, PartitionData>>() {{
+ put(groupZero, responseDataZero);
+ put(groupOne, responseDataOne);
+ put(groupTwo, responseDataTwo);
+ }};
+
+ Map<String, Errors> errorMap = errorMap(groups, Errors.NONE);
+ return new OffsetFetchResponse(0, errorMap, responseData);
}
private AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition,
OffsetAndMetadata>> handleWithPartitionError(
Errors error
) {
- ListConsumerGroupOffsetsHandler handler = new
ListConsumerGroupOffsetsHandler(groupId, tps, logContext);
+ ListConsumerGroupOffsetsHandler handler = new
ListConsumerGroupOffsetsHandler(singleRequestMap,
+ false, logContext);
OffsetFetchResponse response = buildResponseWithPartitionError(error);
- return handler.handleResponse(new Node(1, "host", 1234),
singleton(CoordinatorKey.byGroupId(groupId)), response);
+ return handler.handleResponse(new Node(1, "host", 1234),
+ singleton(CoordinatorKey.byGroupId(groupZero)), response);
+ }
+
+ private AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition,
OffsetAndMetadata>> handleWithPartitionErrorMultipleGroups(
+ Errors error
+ ) {
+ ListConsumerGroupOffsetsHandler handler = new
ListConsumerGroupOffsetsHandler(
+ batchedRequestMap, false, logContext);
+ OffsetFetchResponse response =
buildResponseWithPartitionErrorWithMultipleGroups(error);
+ return handler.handleResponse(
+ new Node(1, "host", 1234),
+ coordinatorKeys(groupZero, groupOne, groupTwo),
+ response);
}
private AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition,
OffsetAndMetadata>> handleWithError(
Errors error
) {
- ListConsumerGroupOffsetsHandler handler = new
ListConsumerGroupOffsetsHandler(groupId, tps, logContext);
+ ListConsumerGroupOffsetsHandler handler = new
ListConsumerGroupOffsetsHandler(
+ singleRequestMap, false, logContext);
OffsetFetchResponse response = buildResponse(error);
- return handler.handleResponse(new Node(1, "host", 1234),
singleton(CoordinatorKey.byGroupId(groupId)), response);
+ return handler.handleResponse(new Node(1, "host", 1234),
+ singleton(CoordinatorKey.byGroupId(groupZero)),
+ response);
}
private void assertUnmapped(
@@ -135,11 +352,19 @@ public class ListConsumerGroupOffsetsHandlerTest {
) {
assertEquals(emptySet(), result.completedKeys.keySet());
assertEquals(emptySet(), result.failedKeys.keySet());
- assertEquals(singletonList(CoordinatorKey.byGroupId(groupId)),
result.unmappedKeys);
+ assertEquals(singletonList(CoordinatorKey.byGroupId(groupZero)),
result.unmappedKeys);
+ }
+
+ private void assertUnmappedWithMultipleGroups(
+ AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition,
OffsetAndMetadata>> result
+ ) {
+ assertEquals(emptySet(), result.completedKeys.keySet());
+ assertEquals(emptySet(), result.failedKeys.keySet());
+ assertEquals(coordinatorKeys(groupZero, groupOne, groupTwo), new
HashSet<>(result.unmappedKeys));
}
private void assertRetriable(
- AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition,
OffsetAndMetadata>> result
+ AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition,
OffsetAndMetadata>> result
) {
assertEquals(emptySet(), result.completedKeys.keySet());
assertEquals(emptySet(), result.failedKeys.keySet());
@@ -150,21 +375,64 @@ public class ListConsumerGroupOffsetsHandlerTest {
AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition,
OffsetAndMetadata>> result,
Map<TopicPartition, OffsetAndMetadata> expected
) {
- CoordinatorKey key = CoordinatorKey.byGroupId(groupId);
+ CoordinatorKey key = CoordinatorKey.byGroupId(groupZero);
assertEquals(emptySet(), result.failedKeys.keySet());
assertEquals(emptyList(), result.unmappedKeys);
assertEquals(singleton(key), result.completedKeys.keySet());
- assertEquals(expected,
result.completedKeys.get(CoordinatorKey.byGroupId(groupId)));
+ assertEquals(expected, result.completedKeys.get(key));
+ }
+
+ private void assertCompletedForMultipleGroups(
+ AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition,
OffsetAndMetadata>> result,
+ Map<String, Map<TopicPartition, OffsetAndMetadata>> expected
+ ) {
+ assertEquals(emptySet(), result.failedKeys.keySet());
+ assertEquals(emptyList(), result.unmappedKeys);
+ for (String g : expected.keySet()) {
+ CoordinatorKey key = CoordinatorKey.byGroupId(g);
+ assertTrue(result.completedKeys.containsKey(key));
+ assertEquals(expected.get(g), result.completedKeys.get(key));
+ }
}
private void assertFailed(
Class<? extends Throwable> expectedExceptionType,
AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition,
OffsetAndMetadata>> result
) {
- CoordinatorKey key = CoordinatorKey.byGroupId(groupId);
+ CoordinatorKey key = CoordinatorKey.byGroupId(groupZero);
assertEquals(emptySet(), result.completedKeys.keySet());
assertEquals(emptyList(), result.unmappedKeys);
assertEquals(singleton(key), result.failedKeys.keySet());
assertTrue(expectedExceptionType.isInstance(result.failedKeys.get(key)));
}
+
+ private void assertFailedForMultipleGroups(
+ Map<String, Class<? extends Throwable>> groupToExceptionMap,
+ AdminApiHandler.ApiResult<CoordinatorKey, Map<TopicPartition,
OffsetAndMetadata>> result
+ ) {
+ assertEquals(emptySet(), result.completedKeys.keySet());
+ assertEquals(emptyList(), result.unmappedKeys);
+ for (String g : groupToExceptionMap.keySet()) {
+ CoordinatorKey key = CoordinatorKey.byGroupId(g);
+ assertTrue(result.failedKeys.containsKey(key));
+
assertTrue(groupToExceptionMap.get(g).isInstance(result.failedKeys.get(key)));
+ }
+ }
+
+ private Set<CoordinatorKey> coordinatorKeys(String... groups) {
+ return Stream.of(groups)
+ .map(CoordinatorKey::byGroupId)
+ .collect(Collectors.toSet());
+ }
+
+ private Set<String> requestGroups(OffsetFetchRequest request) {
+ return request.data().groups()
+ .stream()
+ .map(OffsetFetchRequestGroup::groupId)
+ .collect(Collectors.toSet());
+ }
+
+ private Map<String, Errors> errorMap(Collection<String> groups, Errors
error) {
+ return groups.stream().collect(Collectors.toMap(Function.identity(),
unused -> error));
+ }
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index da3acf4983..e7f25345c6 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -172,6 +172,7 @@ public class KafkaConsumerTest {
// Set auto commit interval lower than heartbeat so we don't need to deal
with
// a concurrent heartbeat request
private final int autoCommitIntervalMs = 500;
+ private final int throttleMs = 10;
private final String groupId = "mock-group";
private final String memberId = "memberId";
@@ -2434,7 +2435,10 @@ public class KafkaConsumerTest {
partitionData.put(entry.getKey(), new
OffsetFetchResponse.PartitionData(entry.getValue(),
Optional.empty(), "", error));
}
- return new OffsetFetchResponse(Errors.NONE, partitionData);
+ return new OffsetFetchResponse(
+ throttleMs,
+ Collections.singletonMap(groupId, Errors.NONE),
+ Collections.singletonMap(groupId, partitionData));
}
private ListOffsetsResponse listOffsetsResponse(Map<TopicPartition, Long>
offsets) {
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index c65d33176f..db483c6c0f 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -71,6 +71,7 @@ import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.OffsetCommitResponse;
import org.apache.kafka.common.requests.OffsetFetchResponse;
+import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData;
import org.apache.kafka.common.requests.RequestTestUtils;
import org.apache.kafka.common.requests.SyncGroupRequest;
import org.apache.kafka.common.requests.SyncGroupResponse;
@@ -140,6 +141,7 @@ public abstract class ConsumerCoordinatorTest {
private final long retryBackoffMs = 100;
private final int autoCommitIntervalMs = 2000;
private final int requestTimeoutMs = 30000;
+ private final int throttleMs = 10;
private final MockTime time = new MockTime();
private GroupRebalanceConfig rebalanceConfig;
@@ -2872,7 +2874,7 @@ public abstract class ConsumerCoordinatorTest {
OffsetFetchResponse.PartitionData data = new
OffsetFetchResponse.PartitionData(offset, leaderEpoch,
metadata, Errors.NONE);
- client.prepareResponse(new OffsetFetchResponse(Errors.NONE,
singletonMap(t1p, data)));
+ client.prepareResponse(offsetFetchResponse(Errors.NONE,
singletonMap(t1p, data)));
Map<TopicPartition, OffsetAndMetadata> fetchedOffsets =
coordinator.fetchCommittedOffsets(singleton(t1p),
time.timer(Long.MAX_VALUE));
@@ -2888,7 +2890,7 @@ public abstract class ConsumerCoordinatorTest {
OffsetFetchResponse.PartitionData data = new
OffsetFetchResponse.PartitionData(-1, Optional.empty(),
"", Errors.TOPIC_AUTHORIZATION_FAILED);
- client.prepareResponse(new OffsetFetchResponse(Errors.NONE,
singletonMap(t1p, data)));
+ client.prepareResponse(offsetFetchResponse(Errors.NONE,
singletonMap(t1p, data)));
TopicAuthorizationException exception =
assertThrows(TopicAuthorizationException.class, () ->
coordinator.fetchCommittedOffsets(singleton(t1p),
time.timer(Long.MAX_VALUE)));
@@ -2901,7 +2903,7 @@ public abstract class ConsumerCoordinatorTest {
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
subscriptions.assignFromUser(singleton(t1p));
-
client.prepareResponse(offsetFetchResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS));
+
client.prepareResponse(offsetFetchResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS,
Collections.emptyMap()));
client.prepareResponse(offsetFetchResponse(t1p, Errors.NONE, "",
100L));
coordinator.refreshCommittedOffsetsIfNeeded(time.timer(Long.MAX_VALUE));
@@ -2916,7 +2918,7 @@ public abstract class ConsumerCoordinatorTest {
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
subscriptions.assignFromUser(singleton(t1p));
-
client.prepareResponse(offsetFetchResponse(Errors.GROUP_AUTHORIZATION_FAILED));
+
client.prepareResponse(offsetFetchResponse(Errors.GROUP_AUTHORIZATION_FAILED,
Collections.emptyMap()));
try {
coordinator.refreshCommittedOffsetsIfNeeded(time.timer(Long.MAX_VALUE));
fail("Expected group authorization error");
@@ -2959,7 +2961,7 @@ public abstract class ConsumerCoordinatorTest {
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
subscriptions.assignFromUser(singleton(t1p));
- client.prepareResponse(offsetFetchResponse(Errors.NOT_COORDINATOR));
+ client.prepareResponse(offsetFetchResponse(Errors.NOT_COORDINATOR,
Collections.emptyMap()));
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
client.prepareResponse(offsetFetchResponse(t1p, Errors.NONE, "",
100L));
coordinator.refreshCommittedOffsetsIfNeeded(time.timer(Long.MAX_VALUE));
@@ -3435,7 +3437,11 @@ public abstract class ConsumerCoordinatorTest {
OffsetFetchResponse.PartitionData data = new
OffsetFetchResponse.PartitionData(offset, leaderEpoch,
metadata, Errors.NONE);
- client.prepareResponse(new OffsetFetchResponse(Errors.NONE,
singletonMap(t1p, data)));
+ if (upperVersion < 8) {
+ client.prepareResponse(new OffsetFetchResponse(Errors.NONE,
singletonMap(t1p, data)));
+ } else {
+ client.prepareResponse(offsetFetchResponse(Errors.NONE,
singletonMap(t1p, data)));
+ }
if (expectThrows) {
assertThrows(UnsupportedVersionException.class,
() -> coordinator.fetchCommittedOffsets(singleton(t1p),
time.timer(Long.MAX_VALUE)));
@@ -3690,8 +3696,10 @@ public abstract class ConsumerCoordinatorTest {
return new OffsetCommitResponse(responseData);
}
- private OffsetFetchResponse offsetFetchResponse(Errors topLevelError) {
- return new OffsetFetchResponse(topLevelError, Collections.emptyMap());
+ private OffsetFetchResponse offsetFetchResponse(Errors error,
Map<TopicPartition, PartitionData> responseData) {
+ return new OffsetFetchResponse(throttleMs,
+ singletonMap(groupId, error),
+ singletonMap(groupId, responseData));
}
private OffsetFetchResponse offsetFetchResponse(TopicPartition tp, Errors
partitionLevelError, String metadata, long offset) {
@@ -3701,7 +3709,7 @@ public abstract class ConsumerCoordinatorTest {
private OffsetFetchResponse offsetFetchResponse(TopicPartition tp, Errors
partitionLevelError, String metadata, long offset, Optional<Integer> epoch) {
OffsetFetchResponse.PartitionData data = new
OffsetFetchResponse.PartitionData(offset,
epoch, metadata, partitionLevelError);
- return new OffsetFetchResponse(Errors.NONE, singletonMap(tp, data));
+ return offsetFetchResponse(Errors.NONE, singletonMap(tp, data));
}
private OffsetCommitCallback callback(final AtomicBoolean success) {
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index 47c1d173b3..d5aee881c9 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -18,7 +18,7 @@
package kafka.admin
import java.time.{Duration, Instant}
-import java.util.Properties
+import java.util.{Collections, Properties}
import com.fasterxml.jackson.dataformat.csv.CsvMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import kafka.utils._
@@ -753,9 +753,9 @@ object ConsumerGroupCommand extends Logging {
private def getCommittedOffsets(groupId: String): Map[TopicPartition,
OffsetAndMetadata] = {
adminClient.listConsumerGroupOffsets(
- groupId,
- withTimeoutMs(new ListConsumerGroupOffsetsOptions)
- ).partitionsToOffsetAndMetadata.get.asScala
+ Collections.singletonMap(groupId, new ListConsumerGroupOffsetsSpec),
+ withTimeoutMs(new ListConsumerGroupOffsetsOptions())
+ ).partitionsToOffsetAndMetadata(groupId).get().asScala
}
type GroupMetadata = immutable.Map[String, immutable.Map[TopicPartition,
OffsetAndMetadata]]
diff --git
a/core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
b/core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
index 76a3855a87..44b241a7ed 100644
--- a/core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
@@ -49,8 +49,8 @@ class ConsumerGroupServiceTest {
when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)),
any()))
.thenReturn(describeGroupsResult(ConsumerGroupState.STABLE))
- when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any()))
- .thenReturn(listGroupOffsetsResult)
+
when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(listConsumerGroupOffsetsSpec),
any()))
+ .thenReturn(listGroupOffsetsResult(group))
when(admin.listOffsets(offsetsArgMatcher, any()))
.thenReturn(listOffsetsResult)
@@ -60,7 +60,7 @@ class ConsumerGroupServiceTest {
assertEquals(topicPartitions.size, assignments.get.size)
verify(admin,
times(1)).describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)),
any())
- verify(admin,
times(1)).listConsumerGroupOffsets(ArgumentMatchers.eq(group), any())
+ verify(admin,
times(1)).listConsumerGroupOffsets(ArgumentMatchers.eq(listConsumerGroupOffsetsSpec),
any())
verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
}
@@ -112,8 +112,10 @@ class ConsumerGroupServiceTest {
future.complete(consumerGroupDescription)
when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)),
any()))
.thenReturn(new
DescribeConsumerGroupsResult(Collections.singletonMap(group, future)))
- when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any()))
-
.thenReturn(AdminClientTestUtils.listConsumerGroupOffsetsResult(commitedOffsets))
+
when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(listConsumerGroupOffsetsSpec),
any()))
+ .thenReturn(
+ AdminClientTestUtils.listConsumerGroupOffsetsResult(
+ Collections.singletonMap(group, commitedOffsets)))
when(admin.listOffsets(
ArgumentMatchers.argThat(offsetsArgMatcher(assignedTopicPartitions)),
any()
@@ -142,7 +144,7 @@ class ConsumerGroupServiceTest {
assertEquals(expectedOffsets, returnedOffsets)
verify(admin,
times(1)).describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)),
any())
- verify(admin,
times(1)).listConsumerGroupOffsets(ArgumentMatchers.eq(group), any())
+ verify(admin,
times(1)).listConsumerGroupOffsets(ArgumentMatchers.eq(listConsumerGroupOffsetsSpec),
any())
verify(admin,
times(1)).listOffsets(ArgumentMatchers.argThat(offsetsArgMatcher(assignedTopicPartitions)),
any())
verify(admin,
times(1)).listOffsets(ArgumentMatchers.argThat(offsetsArgMatcher(unassignedTopicPartitions)),
any())
}
@@ -192,9 +194,9 @@ class ConsumerGroupServiceTest {
new DescribeConsumerGroupsResult(Collections.singletonMap(group, future))
}
- private def listGroupOffsetsResult: ListConsumerGroupOffsetsResult = {
+ private def listGroupOffsetsResult(groupId: String):
ListConsumerGroupOffsetsResult = {
val offsets = topicPartitions.map(_ -> new
OffsetAndMetadata(100)).toMap.asJava
- AdminClientTestUtils.listConsumerGroupOffsetsResult(offsets)
+ AdminClientTestUtils.listConsumerGroupOffsetsResult(Map(groupId ->
offsets).asJava)
}
private def offsetsArgMatcher: util.Map[TopicPartition, OffsetSpec] = {
@@ -217,4 +219,8 @@ class ConsumerGroupServiceTest {
}.toMap
AdminClientTestUtils.describeTopicsResult(topicDescriptions.asJava)
}
+
+ private def listConsumerGroupOffsetsSpec: util.Map[String,
ListConsumerGroupOffsetsSpec] = {
+ Collections.singletonMap(group, new ListConsumerGroupOffsetsSpec())
+ }
}
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 6d17e93782..82c19949e3 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -320,7 +320,7 @@ class RequestQuotaTest extends BaseRequestTest {
)
)
case ApiKeys.OFFSET_FETCH =>
- new OffsetFetchRequest.Builder("test-group", false, List(tp).asJava,
false)
+ new OffsetFetchRequest.Builder(Map("test-group"->
List(tp).asJava).asJava, false, false)
case ApiKeys.FIND_COORDINATOR =>
new FindCoordinatorRequest.Builder(
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 02cfb0b49c..5240534ce7 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
+import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
import org.apache.kafka.clients.admin.ListOffsetsOptions;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.consumer.Consumer;
@@ -695,11 +696,12 @@ public class StoreChangelogReader implements
ChangelogReader {
try {
// those which do not have a committed offset would default to 0
- final ListConsumerGroupOffsetsOptions options = new
ListConsumerGroupOffsetsOptions();
- options.topicPartitions(new ArrayList<>(partitions));
- options.requireStable(true);
- final Map<TopicPartition, Long> committedOffsets =
adminClient.listConsumerGroupOffsets(groupId, options)
- .partitionsToOffsetAndMetadata().get().entrySet()
+ final ListConsumerGroupOffsetsOptions options = new
ListConsumerGroupOffsetsOptions()
+ .requireStable(true);
+ final ListConsumerGroupOffsetsSpec spec = new
ListConsumerGroupOffsetsSpec()
+ .topicPartitions(new ArrayList<>(partitions));
+ final Map<TopicPartition, Long> committedOffsets =
adminClient.listConsumerGroupOffsets(Collections.singletonMap(groupId, spec))
+ .partitionsToOffsetAndMetadata(groupId).get().entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, e ->
e.getValue() == null ? 0L : e.getValue().offset()));
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index 1961736620..fbc8d42326 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.admin.AdminClientTestUtils;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
+import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
import org.apache.kafka.clients.admin.ListOffsetsOptions;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.MockAdminClient;
@@ -648,12 +649,12 @@ public class StoreChangelogReaderTest extends
EasyMockSupport {
final AtomicBoolean functionCalled = new AtomicBoolean(false);
final MockAdminClient adminClient = new MockAdminClient() {
@Override
- public synchronized ListConsumerGroupOffsetsResult
listConsumerGroupOffsets(final String groupId, final
ListConsumerGroupOffsetsOptions options) {
+ public synchronized ListConsumerGroupOffsetsResult
listConsumerGroupOffsets(final Map<String, ListConsumerGroupOffsetsSpec>
groupSpecs, final ListConsumerGroupOffsetsOptions options) {
if (functionCalled.get()) {
- return super.listConsumerGroupOffsets(groupId, options);
+ return super.listConsumerGroupOffsets(groupSpecs, options);
} else {
functionCalled.set(true);
- return
AdminClientTestUtils.listConsumerGroupOffsetsResult(new
TimeoutException("KABOOM!"));
+ return
AdminClientTestUtils.listConsumerGroupOffsetsResult(groupSpecs.keySet().iterator().next(),
new TimeoutException("KABOOM!"));
}
}
};
@@ -708,7 +709,7 @@ public class StoreChangelogReaderTest extends
EasyMockSupport {
final MockAdminClient adminClient = new MockAdminClient() {
@Override
- public synchronized ListConsumerGroupOffsetsResult
listConsumerGroupOffsets(final String groupId, final
ListConsumerGroupOffsetsOptions options) {
+ public synchronized ListConsumerGroupOffsetsResult
listConsumerGroupOffsets(final Map<String, ListConsumerGroupOffsetsSpec>
groupSpecs, final ListConsumerGroupOffsetsOptions options) {
throw kaboom;
}
};
@@ -790,7 +791,7 @@ public class StoreChangelogReaderTest extends
EasyMockSupport {
final MockAdminClient adminClient = new MockAdminClient() {
@Override
- public synchronized ListConsumerGroupOffsetsResult
listConsumerGroupOffsets(final String groupId, final
ListConsumerGroupOffsetsOptions options) {
+ public synchronized ListConsumerGroupOffsetsResult
listConsumerGroupOffsets(final Map<String, ListConsumerGroupOffsetsSpec>
groupSpecs, final ListConsumerGroupOffsetsOptions options) {
throw new AssertionError("Should not try to fetch committed
offsets");
}
};