chirag-wadhwa5 commented on code in PR #20839:
URL: https://github.com/apache/kafka/pull/20839#discussion_r2518870406
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -1835,27 +1852,122 @@ private
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGro
return;
}
- // Return -1 (uninitialized offset) for the situation where
the persister returned an error.
- // This is consistent with OffsetFetch for situations in which
there is no offset information to fetch.
- // It's treated as absence of data, rather than an error.
- result.topicsData().forEach(topicData ->
- describeShareGroupOffsetsResponseTopicList.add(new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
- .setTopicId(topicData.topicId())
-
.setTopicName(requestTopicIdToNameMapping.get(topicData.topicId()))
- .setPartitions(topicData.partitions().stream().map(
- partitionData -> new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
+ // Now compute lag for each partition and build the final
response.
+ computeShareGroupLagAndBuildResponse(
+ result,
+ requestTopicIdToNameMapping,
+ describeShareGroupOffsetsResponseTopicList,
+ future,
+ readSummaryRequestData.groupId()
+ );
+ });
+ return future;
+ }
+
+ private void computeShareGroupLagAndBuildResponse(
+ ReadShareGroupStateSummaryResult readSummaryResult,
+ Map<Uuid, String> requestTopicIdToNameMapping,
+
List<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic>
describeShareGroupOffsetsResponseTopicList,
+
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup>
responseFuture,
+ String groupId
+ ) {
+ // This set keeps track of the partitions for which lag computation is
needed.
+ Set<TopicPartition> partitionsToComputeLag = new HashSet<>();
+
+ readSummaryResult.topicsData().forEach(topicData -> {
+ topicData.partitions().forEach(partitionData -> {
+ if (shouldComputeSharePartitionLag(partitionData)) {
+ // If the readSummaryResult is successful for a partition,
we need to compute lag.
+ partitionsToComputeLag.add(new
TopicPartition(requestTopicIdToNameMapping.get(topicData.topicId()),
partitionData.partition()));
+ }
+ });
+ });
+
+ // Fetch latest offsets for all partitions that need lag computation.
+ Map<TopicPartition, CompletableFuture<Long>> partitionLatestOffsets =
partitionsToComputeLag.isEmpty() ? Map.of() :
+
partitionMetadataClient.listLatestOffsets(partitionsToComputeLag);
+
+ // Final response object to be built. It will include lag information
computed from partitionMetadataClient.
+
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup
responseGroup =
+ new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
+ .setGroupId(groupId);
+
+ // List of response topics to be set in the response group.
+
List<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic>
responseTopics = new ArrayList<>();
+
+ CompletableFuture.allOf(partitionLatestOffsets.values().toArray(new
CompletableFuture<?>[0]))
+ .whenComplete((result, error) -> {
+ // The error variable will not be null when one or more of the
partitionLatestOffsets futures get completed exceptionally.
+ // If that is the case, then the same exception would be
caught in the try catch executed below when .join() is called.
+ // Thus, we do not need to check error != null here.
+ readSummaryResult.topicsData().forEach(topicData -> {
+ // Build response for each topic.
+
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic
topic =
+ new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
+ .setTopicId(topicData.topicId())
+
.setTopicName(requestTopicIdToNameMapping.get(topicData.topicId()));
+
+ // Build response for each partition within the topic.
+
List<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition>
partitionResponses = new ArrayList<>();
+
+ topicData.partitions().forEach(partitionData -> {
+ TopicPartition tp = new
TopicPartition(requestTopicIdToNameMapping.get(topicData.topicId()),
partitionData.partition());
+ // For the partitions where lag computation is not
needed, a partitionResponse is built directly.
+ // The lag is set to -1 (uninitialized lag) in these
cases. If the persister returned an error for a
+ // partition, the startOffset is set to -1
(uninitialized offset) and the leaderEpoch is set to 0
+ // (default epoch). This is consistent with
OffsetFetch for situations in which there is no offset
Review Comment:
Apologies for the delay @chia7712 , this seems to be an inconsistency. In
the PartitionFactory class, the same default value of 0 is being used in other
errors responses as well. I will address this in a separate PR. thanks for
pointing it out.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]