dajac commented on code in PR #14467: URL: https://github.com/apache/kafka/pull/14467#discussion_r1348539504
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ########## @@ -544,6 +573,81 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup fetchAllOffsets( .setTopics(topicResponses); } + /** + * Remove expired offsets for group. + * + * @param groupId The group id. + * @param records The list of records to populate with offset commit tombstone records. + * @param offsetsRetentionMs The offset retention in milliseconds. + * + * @return The group id if the group no longer has any offsets remaining, empty otherwise. + */ + public Optional<String> cleanupExpiredOffsets(String groupId, List<Record> records, long offsetsRetentionMs) { + TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>> offsetsByTopic = offsetsByGroup.get(groupId); + if (offsetsByTopic == null) { + return Optional.of(groupId); + } + try { + Group group = groupMetadataManager.group(groupId); + ExpirationCondition expirationCondition = group.expirationCondition(); + Set<TopicPartition> expiredPartitions = new HashSet<>(); + long currentTimestamp = time.milliseconds(); + AtomicBoolean hasAllOffsetsExpired = new AtomicBoolean(true); + offsetsByTopic.forEach((topic, partitions) -> { + if (!expirationCondition.subscribedTopics.contains(topic)) { Review Comment: Hum... Please help me to better understand this one. My understanding is that defaulting to `true` when the subscribed topics is not defined causes an issue for the generic group not using the consumer protocol type. Did I get this right? If my understanding is correct, I wonder if we could actually change the implementation of `GenericGroup#isSubscribedToTopic` to something like this: ``` public boolean isSubscribedToTopic(String topic) { return subscribedTopics.map(topics -> topics.contains(topic)) .orElse(usesConsumerGroupProtocol()); } ``` The idea is to only be conservative if the consumer protocol type is used. Would something like this work? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org