apoorvmittal10 commented on code in PR #20839:
URL: https://github.com/apache/kafka/pull/20839#discussion_r2504681458
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -1835,27 +1852,128 @@ private
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGro
return;
}
+ computeLagAndBuildResponse(
+ result,
+ requestTopicIdToNameMapping,
+ describeShareGroupOffsetsResponseTopicList,
+ future,
+ readSummaryRequestData.groupId()
+ );
+ });
+ return future;
+ }
+
+ private void computeLagAndBuildResponse(
+ ReadShareGroupStateSummaryResult readSummaryResult,
+ Map<Uuid, String> requestTopicIdToNameMapping,
+
List<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic>
describeShareGroupOffsetsResponseTopicList,
+
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup>
responseFuture,
+ String groupId
+ ) {
+ Set<TopicPartition> partitionsToComputeLag = new HashSet<>();
+ Map<TopicIdPartition,
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition>
partitionsResponses = new HashMap<>();
+
+ readSummaryResult.topicsData().forEach(topicData -> {
+ topicData.partitions().forEach(partitionData -> {
+ TopicIdPartition tp = new TopicIdPartition(
+ topicData.topicId(),
+ new
TopicPartition(requestTopicIdToNameMapping.get(topicData.topicId()),
partitionData.partition())
+ );
// Return -1 (uninitialized offset) for the situation where
the persister returned an error.
// This is consistent with OffsetFetch for situations in which
there is no offset information to fetch.
// It's treated as absence of data, rather than an error.
- result.topicsData().forEach(topicData ->
- describeShareGroupOffsetsResponseTopicList.add(new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
- .setTopicId(topicData.topicId())
-
.setTopicName(requestTopicIdToNameMapping.get(topicData.topicId()))
- .setPartitions(topicData.partitions().stream().map(
- partitionData -> new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
- .setPartitionIndex(partitionData.partition())
- .setStartOffset(partitionData.errorCode() ==
Errors.NONE.code() ? partitionData.startOffset() :
PartitionFactory.UNINITIALIZED_START_OFFSET)
- .setLeaderEpoch(partitionData.errorCode() ==
Errors.NONE.code() ? partitionData.leaderEpoch() :
PartitionFactory.DEFAULT_LEADER_EPOCH)
- ).toList())
- ));
+ if (partitionData.errorCode() != Errors.NONE.code() ||
partitionData.startOffset() == PartitionFactory.UNINITIALIZED_START_OFFSET) {
Review Comment:
Yeah, make sense. Please add that as comment.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]