apoorvmittal10 commented on code in PR #20839:
URL: https://github.com/apache/kafka/pull/20839#discussion_r2511666235


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -1835,27 +1850,117 @@ private 
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGro
                     return;
                 }
 
-                // Return -1 (uninitialized offset) for the situation where 
the persister returned an error.
-                // This is consistent with OffsetFetch for situations in which 
there is no offset information to fetch.
-                // It's treated as absence of data, rather than an error.
-                result.topicsData().forEach(topicData ->
-                    describeShareGroupOffsetsResponseTopicList.add(new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
-                        .setTopicId(topicData.topicId())
-                        
.setTopicName(requestTopicIdToNameMapping.get(topicData.topicId()))
-                        .setPartitions(topicData.partitions().stream().map(
-                            partitionData -> new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
+                // Now compute lag for each partition and build the final 
response.
+                computeShareGroupLagAndBuildResponse(
+                    result,
+                    requestTopicIdToNameMapping,
+                    describeShareGroupOffsetsResponseTopicList,
+                    future,
+                    readSummaryRequestData.groupId()
+                );
+            });
+        return future;
+    }
+
+    private void computeShareGroupLagAndBuildResponse(
+        ReadShareGroupStateSummaryResult readSummaryResult,
+        Map<Uuid, String> requestTopicIdToNameMapping,
+        
List<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic>
 describeShareGroupOffsetsResponseTopicList,
+        
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup>
 responseFuture,
+        String groupId
+    ) {
+        // This set keeps track of the partitions for which lag computation is 
needed.
+        Set<TopicPartition> partitionsToComputeLag = new HashSet<>();
+
+        readSummaryResult.topicsData().forEach(topicData -> {
+            topicData.partitions().forEach(partitionData -> {
+                if (partitionData.errorCode() == Errors.NONE.code() && 
partitionData.startOffset() != PartitionFactory.UNINITIALIZED_START_OFFSET) {
+                    // If the readSummaryResult is successful for a partition, 
we need to compute lag.
+                    partitionsToComputeLag.add(new 
TopicPartition(requestTopicIdToNameMapping.get(topicData.topicId()), 
partitionData.partition()));
+                }
+            });
+        });
+
+        // Fetch latest offsets for all partitions that need lag computation.
+        Map<TopicPartition, CompletableFuture<Long>> partitionLatestOffsets = 
partitionsToComputeLag.isEmpty() ? Map.of() :
+                
partitionMetadataClient.listLatestOffsets(partitionsToComputeLag);
+
+        // This map groups partitions by topicId for building the final 
response.
+        Map<Uuid, 
List<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition>>
 topicToPartitionResults = new HashMap<>();
+
+        CompletableFuture.allOf(partitionLatestOffsets.values().toArray(new 
CompletableFuture<?>[0]))
+            .whenComplete((result, error) -> {
+                readSummaryResult.topicsData().forEach(topicData -> {
+                    topicData.partitions().forEach(partitionData -> {
+                        TopicPartition tp = new 
TopicPartition(requestTopicIdToNameMapping.get(topicData.topicId()), 
partitionData.partition());
+                        // Return -1 (uninitialized offset) for the situation 
where the persister returned an error.
+                        // This is consistent with OffsetFetch for situations 
in which there is no offset information to fetch.
+                        // It's treated as absence of data, rather than an 
error. Also, the persister returns startOffset
+                        // as -1 (uninitialized offset) for share partitions 
for which consumption hasn't begun yet. Thus,
+                        // lag computation is not needed in these situations, 
and -1 (uninitialized lag) is returned.
+                        
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition
 partitionResponse;
+                        if (partitionData.errorCode() != Errors.NONE.code() || 
partitionData.startOffset() == PartitionFactory.UNINITIALIZED_START_OFFSET) {
+                            partitionResponse = new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
                                 .setPartitionIndex(partitionData.partition())
-                                .setStartOffset(partitionData.errorCode() == 
Errors.NONE.code() ? partitionData.startOffset() : 
PartitionFactory.UNINITIALIZED_START_OFFSET)
-                                .setLeaderEpoch(partitionData.errorCode() == 
Errors.NONE.code() ? partitionData.leaderEpoch() : 
PartitionFactory.DEFAULT_LEADER_EPOCH)
-                        ).toList())
-                    ));
+                                
.setStartOffset(PartitionFactory.UNINITIALIZED_START_OFFSET)
+                                
.setLeaderEpoch(PartitionFactory.DEFAULT_LEADER_EPOCH)
+                                .setLag(PartitionFactory.UNINITIALIZED_LAG);
+                        } else {
+                            try {
+                                // This code is reached when allOf above is 
complete, which happens when all the
+                                // individual futures are complete. Thus, the 
call to join() here is safe.
+                                long partitionLatestOffset = 
partitionLatestOffsets.get(tp).join();
+                                // Compute lag as (partition end offset - 
startOffset + 1 - deliveryCompleteCount)
+                                long lag = partitionLatestOffset - 
partitionData.startOffset() + 1 - partitionData.deliveryCompleteCount();
+                                partitionResponse = new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
+                                    
.setPartitionIndex(partitionData.partition())
+                                    
.setStartOffset(partitionData.startOffset())
+                                    
.setLeaderEpoch(partitionData.leaderEpoch())
+                                    .setLag(lag);
+                            } catch (CompletionException e) {
+                                // If fetching latest offset for a partition 
failed, return the error in the response for that partition.
+                                partitionResponse = new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
+                                    
.setPartitionIndex(partitionData.partition())
+                                    
.setErrorCode(Errors.forException(e.getCause()).code())
+                                    
.setErrorMessage(e.getCause().getMessage());
+                            }
+                        }
+                        
topicToPartitionResults.computeIfAbsent(topicData.topicId(), k -> new 
ArrayList<>()).add(partitionResponse);
+                    });
+                });
 
-                future.complete(
-                    new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
-                        .setGroupId(readSummaryRequestData.groupId())
-                        
.setTopics(describeShareGroupOffsetsResponseTopicList));
+                // Build the final response and complete the future.
+                responseFuture.complete(buildDescribeShareGroupOffsetsResponse(
+                    topicToPartitionResults,
+                    requestTopicIdToNameMapping,
+                    describeShareGroupOffsetsResponseTopicList,
+                    groupId
+                ));
             });
-        return future;
+    }
+
+    private 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup 
buildDescribeShareGroupOffsetsResponse(
+        Map<Uuid, 
List<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition>>
 topicToPartitionResults,
+        Map<Uuid, String> requestTopicIdToNameMapping,
+        
List<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic>
 describeShareGroupOffsetsResponseTopicList,
+        String groupId
+    ) {
+        
List<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic>
 responseTopics = new ArrayList<>();
+        for (Map.Entry<Uuid, 
List<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition>>
 entry : topicToPartitionResults.entrySet()) {
+            
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic 
topic =
+                new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
+                    .setTopicId(entry.getKey())
+                    
.setTopicName(requestTopicIdToNameMapping.get(entry.getKey()))
+                    .setPartitions(entry.getValue());
+            responseTopics.add(topic);
+        }

Review Comment:
   Why do you need another parse to fill this and can't be done in previous 
iteration of `readSummaryResult`?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -1835,27 +1851,139 @@ private 
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGro
                     return;
                 }
 
+                // Now compute lag for each partition and build the final 
response.
+                computeShareGroupLagAndBuildResponse(
+                    result,
+                    requestTopicIdToNameMapping,
+                    describeShareGroupOffsetsResponseTopicList,
+                    future,
+                    readSummaryRequestData.groupId()
+                );
+            });
+        return future;
+    }
+
+    private void computeShareGroupLagAndBuildResponse(
+        ReadShareGroupStateSummaryResult readSummaryResult,
+        Map<Uuid, String> requestTopicIdToNameMapping,
+        
List<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic>
 describeShareGroupOffsetsResponseTopicList,
+        
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup>
 responseFuture,
+        String groupId
+    ) {
+
+        // This set keeps track of the partitions for which lag computation is 
needed.
+        Set<TopicPartition> partitionsToComputeLag = new HashSet<>();
+
+        // This map stores the final 
DescribeShareGroupOffsetsResponsePartition, including the lag, for all the 
partitions.
+        Map<TopicIdPartition, 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition>
 partitionsResponses = new HashMap<>();
+
+        readSummaryResult.topicsData().forEach(topicData -> {
+            topicData.partitions().forEach(partitionData -> {
+                TopicIdPartition tp = new TopicIdPartition(
+                    topicData.topicId(),
+                    new 
TopicPartition(requestTopicIdToNameMapping.get(topicData.topicId()), 
partitionData.partition())
+                );
                 // Return -1 (uninitialized offset) for the situation where 
the persister returned an error.
                 // This is consistent with OffsetFetch for situations in which 
there is no offset information to fetch.
-                // It's treated as absence of data, rather than an error.
-                result.topicsData().forEach(topicData ->
-                    describeShareGroupOffsetsResponseTopicList.add(new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
-                        .setTopicId(topicData.topicId())
-                        
.setTopicName(requestTopicIdToNameMapping.get(topicData.topicId()))
-                        .setPartitions(topicData.partitions().stream().map(
-                            partitionData -> new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
-                                .setPartitionIndex(partitionData.partition())
-                                .setStartOffset(partitionData.errorCode() == 
Errors.NONE.code() ? partitionData.startOffset() : 
PartitionFactory.UNINITIALIZED_START_OFFSET)
-                                .setLeaderEpoch(partitionData.errorCode() == 
Errors.NONE.code() ? partitionData.leaderEpoch() : 
PartitionFactory.DEFAULT_LEADER_EPOCH)
-                        ).toList())
-                    ));
+                // It's treated as absence of data, rather than an error. 
Also, the persister returns startOffset
+                // as -1 (uninitialized offset) for share partitions for which 
consumption hasn't begun yet. Thus,
+                // lag computation is not needed in these situations, and -1 
(uninitialized lag) is returned.
+                if (partitionData.errorCode() != Errors.NONE.code() || 
partitionData.startOffset() == PartitionFactory.UNINITIALIZED_START_OFFSET) {
+                    partitionsResponses.put(
+                        tp,
+                        new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
+                            .setPartitionIndex(partitionData.partition())
+                            
.setStartOffset(PartitionFactory.UNINITIALIZED_START_OFFSET)
+                            
.setLeaderEpoch(PartitionFactory.DEFAULT_LEADER_EPOCH)
+                            .setLag(PartitionFactory.UNINITIALIZED_LAG)
+                    );
+                } else {
+                    // If the readSummaryResult is successful for a partition, 
we need to compute lag.
+                    partitionsToComputeLag.add(new 
TopicPartition(requestTopicIdToNameMapping.get(topicData.topicId()), 
partitionData.partition()));
+                }
+            });
+        });
+
+        // Fetch latest offsets for all partitions that need lag computation.
+        Map<TopicPartition, CompletableFuture<Long>> partitionLatestOffsets = 
partitionsToComputeLag.isEmpty() ? new HashMap<>() :
+                
partitionMetadataClient.listLatestOffsets(partitionsToComputeLag);
 
-                future.complete(
-                    new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
-                        .setGroupId(readSummaryRequestData.groupId())
-                        
.setTopics(describeShareGroupOffsetsResponseTopicList));
+        CompletableFuture.allOf(partitionLatestOffsets.values().toArray(new 
CompletableFuture<?>[0]))
+            .whenComplete((result, error) -> {

Review Comment:
   > allOf will complete exceptionally and thus throw an error
   
   It will not throw exception, you need to check `error` is not null. You can 
write a test and verify as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to