jeffkbkim commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1349386383


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##########
@@ -898,6 +900,52 @@ public void createGroupTombstoneRecords(List<Record> 
records) {
         records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId()));
     }
 
+    @Override
+    public boolean isGroupEmpty() {
+        return isInState(EMPTY);
+    }
+
+    /**
+     * Return the offset expiration condition to be used for this group. This 
is based on several factors
+     * such as the group state, the protocol type, and the GroupMetadata 
record version.
+     *
+     * See {@link 
org.apache.kafka.coordinator.group.OffsetMetadataManager.OffsetExpirationCondition}
+     *
+     * @return The offset expiration condition for the group or Empty of no 
such condition exists.
+     */
+    @Override
+    public Optional<OffsetMetadataManager.OffsetExpirationCondition> 
offsetExpirationCondition() {
+        if (protocolType.isPresent()) {
+            if (isInState(EMPTY)) {
+                // No consumer exists in the group =>
+                // - If current state timestamp exists and retention period 
has passed since group became Empty,
+                //   expire all offsets with no pending offset commit;
+                // - If there is no current state timestamp (old group 
metadata schema) and retention period has passed
+                //   since the last commit timestamp, expire the offset
+                return Optional.of(
+                    (offsetAndMetadata, currentTimestamp, offsetsRetentionMs) 
-> OffsetMetadataManager.isExpiredOffset(
+                        currentTimestamp,
+                        
currentStateTimestamp.orElse(offsetAndMetadata.commitTimestampMs),
+                        offsetAndMetadata.expireTimestampMs,
+                        offsetsRetentionMs
+                    )

Review Comment:
   Thanks for the suggestion! I moved OffsetExpirationCondition to a separate 
file along with a new OffsetExpirationConditionImpl class. Let me know how this 
is.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to