dajac commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1351957160


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java:
##########
@@ -106,7 +107,8 @@ void validateOffsetFetch(
     /**
      * Returns true if the group is actively subscribed to the topic.
      *
-     * @param topic The topic name.
+     * @param topic                            The topic name.

Review Comment:
   nit: We can remove a few tabs here.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -349,9 +351,11 @@ public Set<String> subscribedTopicNames() {
     /**
      * Returns true if the consumer group is actively subscribed to the topic.
      *
-     * @param topic The topic name.
-     * @return whether the group is subscribed to the topic.
+     * @param topic                            The topic name.

Review Comment:
   nit: Alignment.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetExpirationConditionImpl.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import java.util.function.Function;
+
+public class OffsetExpirationConditionImpl implements 
OffsetExpirationCondition {
+
+    /**
+     * Given an offset and metadata, obtain the base timestamp that should be 
used
+     * as the start of the offsets retention period.
+     */
+    private final Function<OffsetAndMetadata, Long> baseTimestamp;
+
+    public OffsetExpirationConditionImpl(Function<OffsetAndMetadata, Long> 
baseTimestamp) {
+        this.baseTimestamp = baseTimestamp;
+    }
+
+    /**
+     * Determine whether an offset is expired. Older versions have an expire 
timestamp per partition. If this
+     * exists, compare against the current timestamp. Otherwise, use the base 
timestamp (either commit timestamp
+     * or current state timestamp if group is empty for generic groups) and 
check whether the offset has
+     * exceeded the offset retention.
+     *
+     * @param offset              The offset and metadata.
+     * @param currentTimestampMs    The current timestamp.

Review Comment:
   nit: Alignment.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##########
@@ -1063,16 +1105,17 @@ public Optional<Set<String>> subscribedTopics() {
     }
 
     /**
-     * Returns true if the consumer group is actively subscribed to the topic. 
When the consumer
+     * Returns true if the consumer group is actively subscribed to the topic. 
When the generic
      * group does not know, because the information is not available yet or 
because it has
-     * failed to parse the Consumer Protocol, it returns true to be safe.
+     * failed to parse the Consumer Protocol, it returns whether the group is 
using the consumer protocol.
+     *
+     * @param topic                            The topic name.

Review Comment:
   nit: Alignment.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##########
@@ -1063,16 +1105,17 @@ public Optional<Set<String>> subscribedTopics() {
     }
 
     /**
-     * Returns true if the consumer group is actively subscribed to the topic. 
When the consumer
+     * Returns true if the consumer group is actively subscribed to the topic. 
When the generic
      * group does not know, because the information is not available yet or 
because it has
-     * failed to parse the Consumer Protocol, it returns true to be safe.
+     * failed to parse the Consumer Protocol, it returns whether the group is 
using the consumer protocol.

Review Comment:
   nit: `it returns whether the group is using the consumer protocol.` It would 
be great to expand a little more on this. We basically want to be conservative 
for consumer groups but not for other types, etc.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##########
@@ -898,6 +900,46 @@ public void createGroupTombstoneRecords(List<Record> 
records) {
         records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId()));
     }
 
+    @Override
+    public boolean isEmpty() {
+        return isInState(EMPTY);
+    }
+
+    /**
+     * Return the offset expiration condition to be used for this group. This 
is based on several factors
+     * such as the group state, the protocol type, and the GroupMetadata 
record version.
+     *
+     * See {@link org.apache.kafka.coordinator.group.OffsetExpirationCondition}
+     *
+     * @return The offset expiration condition for the group or Empty of no 
such condition exists.
+     */
+    @Override
+    public Optional<OffsetExpirationCondition> offsetExpirationCondition() {
+        if (protocolType.isPresent()) {
+            if (isInState(EMPTY)) {
+                // No consumer exists in the group =>
+                // - If current state timestamp exists and retention period 
has passed since group became Empty,
+                //   expire all offsets with no pending offset commit;
+                // - If there is no current state timestamp (old group 
metadata schema) and retention period has passed
+                //   since the last commit timestamp, expire the offset
+                return Optional.of(new OffsetExpirationConditionImpl(
+                    offsetAndMetadata -> 
currentStateTimestamp.orElse(offsetAndMetadata.commitTimestampMs)));

Review Comment:
   nit: Let's put the closing parenthesis on a new line to keep the format 
consistent with the rest of the code.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -544,6 +550,77 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup 
fetchAllOffsets(
             .setTopics(topicResponses);
     }
 
+    /**
+     * Remove expired offsets for group.
+     *
+     * @param groupId The group id.
+     * @param records The list of records to populate with offset commit 
tombstone records.
+     *
+     * @return True if no offsets exist or if all offsets expired, false 
otherwise.
+     */
+    public boolean cleanupExpiredOffsets(String groupId, List<Record> records) 
{
+        TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>> 
offsetsByTopic = offsetsByGroup.get(groupId);
+        if (offsetsByTopic == null) {
+            return true;
+        }
+
+        // We expect the group to exist.
+        Group group = groupMetadataManager.group(groupId);
+        Set<String> expiredPartitions = new HashSet<>();
+        long currentTimestampMs = time.milliseconds();
+        Optional<OffsetExpirationCondition> offsetExpirationCondition = 
group.offsetExpirationCondition();
+
+        if (!offsetExpirationCondition.isPresent()) {
+            return false;
+        }
+
+        AtomicBoolean hasAllOffsetsExpired = new AtomicBoolean(true);
+        OffsetExpirationCondition condition = offsetExpirationCondition.get();
+
+        offsetsByTopic.forEach((topic, partitions) -> {
+            if (!group.isSubscribedToTopic(topic)) {
+                partitions.forEach((partition, offsetAndMetadata) -> {
+                    if (condition.isOffsetExpired(offsetAndMetadata, 
currentTimestampMs, config.offsetsRetentionMs)) {
+                        
expiredPartitions.add(appendOffsetCommitTombstone(groupId, topic, partition, 
records).toString());
+                    } else {
+                        hasAllOffsetsExpired.set(false);
+                    }
+                });
+            } else {
+                hasAllOffsetsExpired.set(false);
+            }
+        });
+
+        if (!expiredPartitions.isEmpty()) {
+            log.info("[GroupId {}] hasAllOffsetsExpired={}; Expiring offsets 
of partitions: {}",

Review Comment:
   nit: `[GroupId {}] Expiring offsets of partitions (expiredAllOffsets={}): 
{}` may be a bit better. What do you think?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupTest.java:
##########
@@ -1085,6 +1091,88 @@ public void testValidateDeleteGroup() {
         assertThrows(GroupIdNotFoundException.class, 
group::validateDeleteGroup);

Review Comment:
   Should we also add test to cover changes in isSubscribedToTopic?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupTest.java:
##########
@@ -1085,6 +1091,88 @@ public void testValidateDeleteGroup() {
         assertThrows(GroupIdNotFoundException.class, 
group::validateDeleteGroup);
     }
 
+    @Test
+    public void testOffsetExpirationCondition() {

Review Comment:
   Should we also add a small test to ConsumerGroupTest?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##########
@@ -898,6 +900,46 @@ public void createGroupTombstoneRecords(List<Record> 
records) {
         records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId()));
     }
 
+    @Override
+    public boolean isEmpty() {
+        return isInState(EMPTY);
+    }
+
+    /**
+     * Return the offset expiration condition to be used for this group. This 
is based on several factors
+     * such as the group state, the protocol type, and the GroupMetadata 
record version.
+     *
+     * See {@link org.apache.kafka.coordinator.group.OffsetExpirationCondition}
+     *
+     * @return The offset expiration condition for the group or Empty of no 
such condition exists.
+     */
+    @Override
+    public Optional<OffsetExpirationCondition> offsetExpirationCondition() {
+        if (protocolType.isPresent()) {
+            if (isInState(EMPTY)) {
+                // No consumer exists in the group =>
+                // - If current state timestamp exists and retention period 
has passed since group became Empty,
+                //   expire all offsets with no pending offset commit;
+                // - If there is no current state timestamp (old group 
metadata schema) and retention period has passed
+                //   since the last commit timestamp, expire the offset
+                return Optional.of(new OffsetExpirationConditionImpl(
+                    offsetAndMetadata -> 
currentStateTimestamp.orElse(offsetAndMetadata.commitTimestampMs)));
+            } else if (usesConsumerGroupProtocol() && 
subscribedTopics.isPresent() && isInState(STABLE)) {
+                // Consumers exist in the group and group is Stable =>
+                // - If the group is aware of the subscribed topics and 
retention period had passed since the
+                //   last commit timestamp, expire the offset. offset with 
pending offset commit are not
+                //   expired
+                return Optional.of(new 
OffsetExpirationConditionImpl(offsetAndMetadata -> 
offsetAndMetadata.commitTimestampMs));
+            }
+        } else {
+            // protocolType is None => standalone (simple) consumer, that uses 
Kafka for offset storage only
+            // expire offsets where retention period has passed since their 
last commit
+            return Optional.of(new 
OffsetExpirationConditionImpl(offsetAndMetadata -> 
offsetAndMetadata.commitTimestampMs));

Review Comment:
   same question.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##########
@@ -898,6 +900,46 @@ public void createGroupTombstoneRecords(List<Record> 
records) {
         records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId()));
     }
 
+    @Override
+    public boolean isEmpty() {
+        return isInState(EMPTY);
+    }
+
+    /**
+     * Return the offset expiration condition to be used for this group. This 
is based on several factors
+     * such as the group state, the protocol type, and the GroupMetadata 
record version.
+     *
+     * See {@link org.apache.kafka.coordinator.group.OffsetExpirationCondition}
+     *
+     * @return The offset expiration condition for the group or Empty of no 
such condition exists.
+     */
+    @Override
+    public Optional<OffsetExpirationCondition> offsetExpirationCondition() {
+        if (protocolType.isPresent()) {
+            if (isInState(EMPTY)) {
+                // No consumer exists in the group =>
+                // - If current state timestamp exists and retention period 
has passed since group became Empty,
+                //   expire all offsets with no pending offset commit;
+                // - If there is no current state timestamp (old group 
metadata schema) and retention period has passed
+                //   since the last commit timestamp, expire the offset
+                return Optional.of(new OffsetExpirationConditionImpl(
+                    offsetAndMetadata -> 
currentStateTimestamp.orElse(offsetAndMetadata.commitTimestampMs)));
+            } else if (usesConsumerGroupProtocol() && 
subscribedTopics.isPresent() && isInState(STABLE)) {
+                // Consumers exist in the group and group is Stable =>
+                // - If the group is aware of the subscribed topics and 
retention period had passed since the
+                //   last commit timestamp, expire the offset. offset with 
pending offset commit are not
+                //   expired
+                return Optional.of(new 
OffsetExpirationConditionImpl(offsetAndMetadata -> 
offsetAndMetadata.commitTimestampMs));

Review Comment:
   nit: Does `offsetAndMetadata::commitTimestampMs` work here?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetExpirationCondition.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+/**
+ * An offset is considered expired based on different factors, such as the 
state of the group
+ * and/or the GroupMetadata record version (for generic groups). This class is 
used to check
+ * how offsets for the group should be expired.
+ */
+public interface OffsetExpirationCondition {
+
+    /**
+     * Given an offset metadata and offsets retention, return whether the 
offset is expired or not.
+     *
+     * @param offset               The offset metadata.
+     * @param currentTimestampMs     The current timestamp.

Review Comment:
   nit: Alignement. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to