jeffkbkim commented on code in PR #14467: URL: https://github.com/apache/kafka/pull/14467#discussion_r1348064529
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ########## @@ -544,6 +573,81 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup fetchAllOffsets( .setTopics(topicResponses); } + /** + * Remove expired offsets for group. + * + * @param groupId The group id. + * @param records The list of records to populate with offset commit tombstone records. + * @param offsetsRetentionMs The offset retention in milliseconds. + * + * @return The group id if the group no longer has any offsets remaining, empty otherwise. + */ + public Optional<String> cleanupExpiredOffsets(String groupId, List<Record> records, long offsetsRetentionMs) { + TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>> offsetsByTopic = offsetsByGroup.get(groupId); + if (offsetsByTopic == null) { + return Optional.of(groupId); + } + try { + Group group = groupMetadataManager.group(groupId); + ExpirationCondition expirationCondition = group.expirationCondition(); + Set<TopicPartition> expiredPartitions = new HashSet<>(); + long currentTimestamp = time.milliseconds(); + AtomicBoolean hasAllOffsetsExpired = new AtomicBoolean(true); + offsetsByTopic.forEach((topic, partitions) -> { + if (!expirationCondition.subscribedTopics.contains(topic)) { + partitions.forEach((partition, offsetAndMetadata) -> { + OptionalLong expireTimestampMs = offsetAndMetadata.expireTimestampMs; + if (expireTimestampMs.isPresent()) { + // Older versions with explicit expire_timestamp field => old expiration semantics is used + if (currentTimestamp >= expireTimestampMs.getAsLong()) { + expiredPartitions.add(addOffsetCommitTombstone(groupId, topic, partition, records)); + } + // Current version with no per partition retention + } else if (currentTimestamp - expirationCondition.baseTimestamp.apply(offsetAndMetadata) >= offsetsRetentionMs) { + expiredPartitions.add(addOffsetCommitTombstone(groupId, topic, partition, records)); + } else { + hasAllOffsetsExpired.set(false); + } + }); + } + }); + log.debug("[GroupId {}] Expiring offsets: {}", groupId, expiredPartitions); + + if (hasAllOffsetsExpired.get()) { + // All offsets were expired for this group. Remove the group. + return Optional.of(groupId); + } + + } catch (GroupIdNotFoundException e) { + // groups in offsets should exist. + log.warn("GroupId {} should exist.", groupId); + } + + return Optional.empty(); + } + + /** + * Add an offset commit tombstone record for the group. + * + * @param groupId The group id. + * @param topic The topic name. + * @param partition The partition. + * @param records The list of records to append the tombstone. + * + * @return The topic partition of the corresponding tombstone. + */ + private TopicPartition addOffsetCommitTombstone( + String groupId, + String topic, + int partition, + List<Record> records + ) { + records.add(RecordHelpers.newOffsetCommitTombstoneRecord(groupId, topic, partition)); + TopicPartition tp = new TopicPartition(topic, partition); + log.trace("[GroupId {}] Removing expired offset and metadata for {}", groupId, tp); Review Comment: The TopicPartition was created and returned in this method so that we can log all of the expired partitions at the end -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org