chirag-wadhwa5 commented on code in PR #20839:
URL: https://github.com/apache/kafka/pull/20839#discussion_r2504586332


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -1835,27 +1852,128 @@ private 
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGro
                     return;
                 }
 
+                computeLagAndBuildResponse(
+                    result,
+                    requestTopicIdToNameMapping,
+                    describeShareGroupOffsetsResponseTopicList,
+                    future,
+                    readSummaryRequestData.groupId()
+                );
+            });
+        return future;
+    }
+
+    private void computeLagAndBuildResponse(
+        ReadShareGroupStateSummaryResult readSummaryResult,
+        Map<Uuid, String> requestTopicIdToNameMapping,
+        
List<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic>
 describeShareGroupOffsetsResponseTopicList,
+        
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup>
 responseFuture,
+        String groupId
+    ) {
+        Set<TopicPartition> partitionsToComputeLag = new HashSet<>();
+        Map<TopicIdPartition, 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition>
 partitionsResponses = new HashMap<>();
+
+        readSummaryResult.topicsData().forEach(topicData -> {
+            topicData.partitions().forEach(partitionData -> {
+                TopicIdPartition tp = new TopicIdPartition(
+                    topicData.topicId(),
+                    new 
TopicPartition(requestTopicIdToNameMapping.get(topicData.topicId()), 
partitionData.partition())
+                );
                 // Return -1 (uninitialized offset) for the situation where 
the persister returned an error.
                 // This is consistent with OffsetFetch for situations in which 
there is no offset information to fetch.
                 // It's treated as absence of data, rather than an error.
-                result.topicsData().forEach(topicData ->
-                    describeShareGroupOffsetsResponseTopicList.add(new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
-                        .setTopicId(topicData.topicId())
-                        
.setTopicName(requestTopicIdToNameMapping.get(topicData.topicId()))
-                        .setPartitions(topicData.partitions().stream().map(
-                            partitionData -> new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
-                                .setPartitionIndex(partitionData.partition())
-                                .setStartOffset(partitionData.errorCode() == 
Errors.NONE.code() ? partitionData.startOffset() : 
PartitionFactory.UNINITIALIZED_START_OFFSET)
-                                .setLeaderEpoch(partitionData.errorCode() == 
Errors.NONE.code() ? partitionData.leaderEpoch() : 
PartitionFactory.DEFAULT_LEADER_EPOCH)
-                        ).toList())
-                    ));
+                if (partitionData.errorCode() != Errors.NONE.code() || 
partitionData.startOffset() == PartitionFactory.UNINITIALIZED_START_OFFSET) {

Review Comment:
   Persister returns startOffset as -1 (uninitialized offset) for share 
partitions for which consumption hasn't begun yet. Thus, lag computation is not 
needed in these situations, since the persister does not yet know from where 
the consumption will begin. So, -1 (uninitialized lag) is returned here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to