jeffkbkim commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1349811501


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -544,6 +573,81 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup 
fetchAllOffsets(
             .setTopics(topicResponses);
     }
 
+    /**
+     * Remove expired offsets for group.
+     *
+     * @param groupId The group id.
+     * @param records The list of records to populate with offset commit 
tombstone records.
+     * @param offsetsRetentionMs The offset retention in milliseconds.
+     *
+     * @return The group id if the group no longer has any offsets remaining, 
empty otherwise.
+     */
+    public Optional<String> cleanupExpiredOffsets(String groupId, List<Record> 
records, long offsetsRetentionMs) {
+        TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>> 
offsetsByTopic = offsetsByGroup.get(groupId);
+        if (offsetsByTopic == null) {
+            return Optional.of(groupId);
+        }
+        try {
+            Group group = groupMetadataManager.group(groupId);
+            ExpirationCondition expirationCondition = 
group.expirationCondition();
+            Set<TopicPartition> expiredPartitions = new HashSet<>();
+            long currentTimestamp = time.milliseconds();
+            AtomicBoolean hasAllOffsetsExpired = new AtomicBoolean(true);
+            offsetsByTopic.forEach((topic, partitions) -> {
+                if (!expirationCondition.subscribedTopics.contains(topic)) {

Review Comment:
   I think the main issue is that the existing behavior in 
GroupMetadata#removeExpiredOffsets only considers what topics a group is 
subscribed to if the group is using the consumer group protocol AND is Stable. 
If a group is in any other state, it acts as if the group is not subscribed to 
any topic when expiring offsets.
   
   here's my concern with the above suggestion:
   
   let's say we have an empty group that uses the consumer group protocol. 
subscribedTopics will be empty as there are no members (set in 
`computeSubscribedTopics`). This will return `true` from `isSubscribedToTopic`. 
This is not aligned with the existing behavior which says if a group is empty 
and has a protocol type, we return an empty collection so that the group is 
considered not subscribed to any topics during offset expiration.
   ```
         case Some(_) if is(Empty) =>
           // no consumer exists in the group =>
           ...
           getExpiredOffsets(
             commitRecordMetadataAndOffset => currentStateTimestamp
               
.getOrElse(commitRecordMetadataAndOffset.offsetAndMetadata.commitTimestamp), 
<Empty set for subscribedTopics>
           )
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to