AndrewJSchofield commented on code in PR #18834:
URL: https://github.com/apache/kafka/pull/18834#discussion_r1950965331
##########
clients/src/main/java/org/apache/kafka/clients/admin/internals/ListShareGroupOffsetsHandler.java:
##########
@@ -71,63 +73,122 @@ public AdminApiLookupStrategy<CoordinatorKey>
lookupStrategy() {
return lookupStrategy;
}
+ private void validateKeys(Set<CoordinatorKey> groupIds) {
+ Set<CoordinatorKey> keys = coordinatorKeys(groupSpecs.keySet());
+ if (!keys.containsAll(groupIds)) {
+ throw new IllegalArgumentException("Received unexpected group ids
" + groupIds +
+ " (expected one of " + keys + ")");
+ }
+ }
+
@Override
public DescribeShareGroupOffsetsRequest.Builder buildBatchedRequest(int
coordinatorId, Set<CoordinatorKey> keys) {
- List<String> groupIds = keys.stream().map(key -> {
- if (key.type != FindCoordinatorRequest.CoordinatorType.GROUP) {
- throw new IllegalArgumentException("Invalid group coordinator
key " + key +
- " when building `DescribeShareGroupOffsets` request");
+ validateKeys(keys);
+
+ List<DescribeShareGroupOffsetsRequestGroup> groups = new
ArrayList<>(keys.size());
+ keys.forEach(coordinatorKey -> {
+ String groupId = coordinatorKey.idValue;
+ ListShareGroupOffsetsSpec spec = groupSpecs.get(groupId);
+ DescribeShareGroupOffsetsRequestGroup requestGroup = new
DescribeShareGroupOffsetsRequestGroup()
+ .setGroupId(groupId);
+
+ Map<String, List<Integer>> topicPartitionMap = new HashMap<>();
+ spec.topicPartitions().forEach(tp ->
topicPartitionMap.computeIfAbsent(tp.topic(), t -> new
LinkedList<>()).add(tp.partition()));
+
+ Map<String, DescribeShareGroupOffsetsRequestTopic> requestTopics =
new HashMap<>();
+ for (TopicPartition tp : spec.topicPartitions()) {
+ requestTopics.computeIfAbsent(tp.topic(), t ->
+ new DescribeShareGroupOffsetsRequestTopic()
+ .setTopicName(tp.topic())
+ .setPartitions(new LinkedList<>()))
+ .partitions()
+ .add(tp.partition());
}
- return key.idValue;
- }).collect(Collectors.toList());
- // The DescribeShareGroupOffsetsRequest only includes a single group
ID at this point, which is likely a mistake to be fixing a follow-on PR.
- String groupId = groupIds.isEmpty() ? null : groupIds.get(0);
- if (groupId == null) {
- throw new IllegalArgumentException("Missing group id in request");
- }
- ListShareGroupOffsetsSpec spec = groupSpecs.get(groupId);
-
List<DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic>
topics =
- spec.topicPartitions().stream().map(
- topicPartition -> new
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic()
- .setTopicName(topicPartition.topic())
- .setPartitions(List.of(topicPartition.partition()))
- ).collect(Collectors.toList());
+ requestGroup.setTopics(new ArrayList<>(requestTopics.values()));
+ groups.add(requestGroup);
+ });
DescribeShareGroupOffsetsRequestData data = new
DescribeShareGroupOffsetsRequestData()
- .setGroupId(groupId)
- .setTopics(topics);
+ .setGroups(groups);
return new DescribeShareGroupOffsetsRequest.Builder(data, true);
}
@Override
public ApiResult<CoordinatorKey, Map<TopicPartition, Long>>
handleResponse(Node coordinator,
Set<CoordinatorKey> groupIds,
AbstractResponse abstractResponse) {
+ validateKeys(groupIds);
+
final DescribeShareGroupOffsetsResponse response =
(DescribeShareGroupOffsetsResponse) abstractResponse;
final Map<CoordinatorKey, Map<TopicPartition, Long>> completed = new
HashMap<>();
final Map<CoordinatorKey, Throwable> failed = new HashMap<>();
+ final List<CoordinatorKey> unmapped = new ArrayList<>();
- for (CoordinatorKey groupId : groupIds) {
- Map<TopicPartition, Long> data = new HashMap<>();
- response.data().responses().stream().map(
- describedTopic ->
- describedTopic.partitions().stream().map(
- partition -> {
- if (partition.errorCode() == Errors.NONE.code())
- data.put(new
TopicPartition(describedTopic.topicName(), partition.partitionIndex()),
partition.startOffset());
- else
- log.error("Skipping return offset for topic {}
partition {} due to error {}.", describedTopic.topicName(),
partition.partitionIndex(), Errors.forCode(partition.errorCode()));
- return data;
+ for (CoordinatorKey coordinatorKey : groupIds) {
+ String groupId = coordinatorKey.idValue;
+ if (response.hasGroupError(groupId)) {
+ handleGroupError(coordinatorKey, response.groupError(groupId),
failed, unmapped);
+ } else {
+ Map<TopicPartition, Long> groupOffsetsListing = new
HashMap<>();
+ response.data().groups().stream().filter(g ->
g.groupId().equals(groupId)).forEach(groupResponse -> {
+ for
(DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic
topicResponse : groupResponse.topics()) {
+ for
(DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition
partitionResponse : topicResponse.partitions()) {
+ TopicPartition tp = new
TopicPartition(topicResponse.topicName(), partitionResponse.partitionIndex());
+ if (partitionResponse.errorCode() ==
Errors.NONE.code()) {
+ // Negative offset indicates there is no start
offset for this partition
+ if (partitionResponse.startOffset() < 0) {
+ groupOffsetsListing.put(tp, null);
+ } else {
+ groupOffsetsListing.put(tp,
partitionResponse.startOffset());
+ }
+ } else {
+ log.warn("Skipping return offset for {} due to
error {}: {}.", tp, partitionResponse.errorCode(),
partitionResponse.errorMessage());
+ }
}
- ).collect(Collectors.toList())
- ).collect(Collectors.toList());
- completed.put(groupId, data);
+ }
+ });
+
+ completed.put(coordinatorKey, groupOffsetsListing);
+ }
}
- return new ApiResult<>(completed, failed, Collections.emptyList());
+ return new ApiResult<>(completed, failed, unmapped);
}
private static Set<CoordinatorKey> coordinatorKeys(Collection<String>
groupIds) {
return groupIds.stream()
.map(CoordinatorKey::byGroupId)
.collect(Collectors.toSet());
}
+
+ private void handleGroupError(CoordinatorKey groupId,
+ Errors error,
+ Map<CoordinatorKey, Throwable> failed,
+ List<CoordinatorKey> groupsToUnmap) {
+ switch (error) {
+ case GROUP_AUTHORIZATION_FAILED:
+ case UNKNOWN_MEMBER_ID:
+ case STALE_MEMBER_EPOCH:
+ log.debug("`DescribeShareGroupOffsets` request for group id {}
failed due to error {}", groupId.idValue, error);
Review Comment:
This reflects the name of the RPC, and that's `DescribeShareGroupOffsets`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]