dajac commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1348548710


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java:
##########
@@ -623,4 +674,33 @@ public void testReplayGroupMetadataWithNullValue() {
 
         verify(groupMetadataManager, times(1)).replay(key, null);
     }
+
+    @Test
+    public void testScheduleCleanupGroupMetadata() {

Review Comment:
   Do we also need a test which verifies that the implementation does the right 
steps?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -544,6 +579,100 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup 
fetchAllOffsets(
             .setTopics(topicResponses);
     }
 
+    /**
+     * Remove expired offsets for group.
+     *
+     * @param groupId The group id.
+     * @param records The list of records to populate with offset commit 
tombstone records.
+     *
+     * @return True if no offsets exist or if all offsets expired, false 
otherwise.
+     */
+    public boolean cleanupExpiredOffsets(String groupId, List<Record> records) 
{
+        TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>> 
offsetsByTopic = offsetsByGroup.get(groupId);
+        if (offsetsByTopic == null) {
+            return true;
+        }
+
+        // We expect the group to exist.
+        Group group = groupMetadataManager.group(groupId);
+        Set<TopicPartition> expiredPartitions = new HashSet<>();
+        long currentTimestamp = time.milliseconds();
+        Optional<OffsetExpirationCondition> offsetExpirationCondition = 
group.offsetExpirationCondition();
+
+        if (!offsetExpirationCondition.isPresent()) {
+            return false;
+        }
+
+        AtomicBoolean hasAllOffsetsExpired = new AtomicBoolean(true);
+        OffsetExpirationCondition condition = offsetExpirationCondition.get();
+
+        offsetsByTopic.forEach((topic, partitions) -> {
+            if (!group.isSubscribedToTopic(topic, false)) {
+                partitions.forEach((partition, offsetAndMetadata) -> {
+                    if (condition.isOffsetExpired(offsetAndMetadata, 
currentTimestamp, config.offsetsRetentionMs)) {
+                        
expiredPartitions.add(appendOffsetCommitTombstone(groupId, topic, partition, 
records));
+                    } else {
+                        hasAllOffsetsExpired.set(false);
+                    }
+                });
+            } else {
+                hasAllOffsetsExpired.set(false);
+            }
+        });
+
+        log.debug("[GroupId {}] Expiring offsets: {}", groupId, 
expiredPartitions);

Review Comment:
   I wonder if we should log this with info level. What do you think? We should 
also format `expiredPartitions` correctly here.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##########
@@ -417,6 +441,40 @@ public CoordinatorResult<OffsetDeleteResponseData, Record> 
deleteOffsets(
         return offsetMetadataManager.deleteOffsets(request);
     }
 
+    /**
+     * For each group, remove all expired offsets. If all offsets for the 
group is removed and the group is eligible
+     * for deletion, delete the group.
+     *
+     * @return The list of tombstones (offset commit and group metadata) to 
append.
+     */
+    public CoordinatorResult<Void, Record> cleanupGroupMetadata() {
+        List<Record> records = new ArrayList<>();
+        groupMetadataManager.groupIds()
+            .forEach(groupId -> {

Review Comment:
   nit: Let's bring `forEach` on the previous line.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##########
@@ -898,6 +900,52 @@ public void createGroupTombstoneRecords(List<Record> 
records) {
         records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId()));
     }
 
+    @Override
+    public boolean isGroupEmpty() {

Review Comment:
   nit: `isEmpty`?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##########
@@ -282,6 +311,17 @@ public void commitOffset(
             int partition,
             long offset,
             int leaderEpoch
+        ) {
+            commitOffset(groupId, topic, partition, offset, leaderEpoch, 
time.milliseconds());
+
+        }
+        public void commitOffset(

Review Comment:
   nit: Let's add an empty line before the method declaration.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupTest.java:
##########
@@ -1085,6 +1093,101 @@ public void testValidateDeleteGroup() {
         assertThrows(GroupIdNotFoundException.class, 
group::validateDeleteGroup);
     }
 
+    @Test
+    public void testOffsetExpirationCondition() {
+        MockedStatic<OffsetMetadataManager> offsetMetadataManager = 
mockStatic(OffsetMetadataManager.class);
+        long currentTimestamp = 30000L;
+        long commitTimestamp = 20000L;
+        long offsetsRetentionMs = 10000L;
+        OptionalLong expireTimestamp = OptionalLong.of(35000);
+        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(15000L, 
OptionalInt.empty(), "", commitTimestamp, expireTimestamp);
+        MockTime time = new MockTime();
+        long currentStateTimestamp = time.milliseconds();
+        GenericGroup group = new GenericGroup(new LogContext(), "groupId", 
EMPTY, time);
+
+        // 1. Test no protocol type. Simple consumer case, Base timestamp 
based off of last commit timestamp.
+        Optional<OffsetMetadataManager.OffsetExpirationCondition> condition = 
group.offsetExpirationCondition();
+        assertTrue(condition.isPresent());
+
+        condition.get().isOffsetExpired(

Review Comment:
   Should we assert the returned value? This applies to others as well.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##########
@@ -265,6 +293,7 @@ public 
List<OffsetFetchResponseData.OffsetFetchResponseTopics> fetchAllOffsets(
             return response.topics();
         }
 
+

Review Comment:
   nit: This empty line could be removed.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##########
@@ -1763,6 +1803,120 @@ public void testDeleteGroupAllOffsets(Group.GroupType 
groupType) {
         assertEquals(3, numDeleteOffsets);
     }
 
+    @Test
+    public void testIsExpiredOffset() {
+        long currentTimestamp = 1000L;
+        long baseTimestamp = 500L;
+        OptionalLong expireTimestampMs = OptionalLong.of(1500);
+        long offsetsRetentionMs = 500L;
+
+        // Current timestamp >= expire timestamp => should expire
+        assertFalse(OffsetMetadataManager.isExpiredOffset(currentTimestamp, 
baseTimestamp, expireTimestampMs, offsetsRetentionMs));
+
+        // Current timestamp < expire timestamp => should not expire
+        currentTimestamp = 499;
+        assertFalse(OffsetMetadataManager.isExpiredOffset(currentTimestamp, 
baseTimestamp, expireTimestampMs, offsetsRetentionMs));
+
+        // Expire timestamp does not exist (current version with no per 
partition retention)
+        // Current timestamp - base timestamp >= offsets retention => should 
expire
+        expireTimestampMs = OptionalLong.empty();
+        currentTimestamp = 1000L;
+        assertTrue(OffsetMetadataManager.isExpiredOffset(currentTimestamp, 
baseTimestamp, expireTimestampMs, offsetsRetentionMs));
+
+        // Current timestamp - base timestamp < offsets retention => should 
not expire
+        currentTimestamp = 999L;
+        assertFalse(OffsetMetadataManager.isExpiredOffset(currentTimestamp, 
baseTimestamp, expireTimestampMs, offsetsRetentionMs));
+    }
+
+    @Test
+    public void testCleanupExpiredOffsetsGroupDoesNotExist() {
+        OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder()
+            .build();
+
+        List<Record> records = new ArrayList<>();
+        assertTrue(context.cleanupExpiredOffsets("unknown-group-id", records));
+        assertEquals(Collections.emptyList(), records);
+    }
+
+    @Test
+    public void testCleanupExpiredOffsetsGroupEmptyOffsetExpirationCondition() 
{

Review Comment:
   Is `GroupEmpty` correct here?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##########
@@ -70,47 +70,66 @@
 import java.util.OptionalLong;
 
 import static 
org.apache.kafka.common.requests.OffsetFetchResponse.INVALID_OFFSET;
+import static 
org.apache.kafka.coordinator.group.OffsetMetadataManager.OffsetExpirationCondition.DEFAULT_OFFSET_EXPIRATION_CONDITION;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class OffsetMetadataManagerTest {
     static class OffsetMetadataManagerTestContext {
         public static class Builder {
-            final private MockTime time = new MockTime();
-            final private MockCoordinatorTimer<Void, Record> timer = new 
MockCoordinatorTimer<>(time);
-            final private LogContext logContext = new LogContext();
-            final private SnapshotRegistry snapshotRegistry = new 
SnapshotRegistry(logContext);
+            private final MockTime time = new MockTime();
+            private final MockCoordinatorTimer<Void, Record> timer = new 
MockCoordinatorTimer<>(time);
+            private final LogContext logContext = new LogContext();
+            private final SnapshotRegistry snapshotRegistry = new 
SnapshotRegistry(logContext);
+            private GroupMetadataManager groupMetadataManager = null;
             private MetadataImage metadataImage = null;
-            private int offsetMetadataMaxSize = 4096;
+            private GroupCoordinatorConfig config = null;
 
             Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) {
-                this.offsetMetadataMaxSize = offsetMetadataMaxSize;
+                config = 
GroupCoordinatorConfigTest.createGroupCoordinatorConfig(offsetMetadataMaxSize, 
60000L, 24 * 60 * 1000);
+                return this;
+            }
+
+            Builder withOffsetsRetentionMs(long offsetsRetentionMs) {
+                config = 
GroupCoordinatorConfigTest.createGroupCoordinatorConfig(4096, 60000L, 
offsetsRetentionMs);

Review Comment:
   I am not a big fan of this because it won't scale if we add another params. 
It seems to be that we should have a config builder in the test package with 
sane default for tests. Then we could pass the config object here or 
instantiate the default one. What do you think?
   
   I am fine with doing this as a follow-up though.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##########
@@ -1763,6 +1803,120 @@ public void testDeleteGroupAllOffsets(Group.GroupType 
groupType) {
         assertEquals(3, numDeleteOffsets);
     }
 
+    @Test
+    public void testIsExpiredOffset() {
+        long currentTimestamp = 1000L;
+        long baseTimestamp = 500L;
+        OptionalLong expireTimestampMs = OptionalLong.of(1500);
+        long offsetsRetentionMs = 500L;
+
+        // Current timestamp >= expire timestamp => should expire
+        assertFalse(OffsetMetadataManager.isExpiredOffset(currentTimestamp, 
baseTimestamp, expireTimestampMs, offsetsRetentionMs));
+
+        // Current timestamp < expire timestamp => should not expire
+        currentTimestamp = 499;
+        assertFalse(OffsetMetadataManager.isExpiredOffset(currentTimestamp, 
baseTimestamp, expireTimestampMs, offsetsRetentionMs));
+
+        // Expire timestamp does not exist (current version with no per 
partition retention)
+        // Current timestamp - base timestamp >= offsets retention => should 
expire
+        expireTimestampMs = OptionalLong.empty();
+        currentTimestamp = 1000L;
+        assertTrue(OffsetMetadataManager.isExpiredOffset(currentTimestamp, 
baseTimestamp, expireTimestampMs, offsetsRetentionMs));
+
+        // Current timestamp - base timestamp < offsets retention => should 
not expire
+        currentTimestamp = 999L;
+        assertFalse(OffsetMetadataManager.isExpiredOffset(currentTimestamp, 
baseTimestamp, expireTimestampMs, offsetsRetentionMs));
+    }
+
+    @Test
+    public void testCleanupExpiredOffsetsGroupDoesNotExist() {
+        OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder()
+            .build();
+
+        List<Record> records = new ArrayList<>();
+        assertTrue(context.cleanupExpiredOffsets("unknown-group-id", records));

Review Comment:
   Shouldn't this throw an `GroupIdNotFoundException` exception?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##########
@@ -898,6 +900,52 @@ public void createGroupTombstoneRecords(List<Record> 
records) {
         records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId()));
     }
 
+    @Override
+    public boolean isGroupEmpty() {
+        return isInState(EMPTY);
+    }
+
+    /**
+     * Return the offset expiration condition to be used for this group. This 
is based on several factors
+     * such as the group state, the protocol type, and the GroupMetadata 
record version.
+     *
+     * See {@link 
org.apache.kafka.coordinator.group.OffsetMetadataManager.OffsetExpirationCondition}
+     *
+     * @return The offset expiration condition for the group or Empty of no 
such condition exists.
+     */
+    @Override
+    public Optional<OffsetMetadataManager.OffsetExpirationCondition> 
offsetExpirationCondition() {
+        if (protocolType.isPresent()) {
+            if (isInState(EMPTY)) {
+                // No consumer exists in the group =>
+                // - If current state timestamp exists and retention period 
has passed since group became Empty,
+                //   expire all offsets with no pending offset commit;
+                // - If there is no current state timestamp (old group 
metadata schema) and retention period has passed
+                //   since the last commit timestamp, expire the offset
+                return Optional.of(
+                    (offsetAndMetadata, currentTimestamp, offsetsRetentionMs) 
-> OffsetMetadataManager.isExpiredOffset(
+                        currentTimestamp,
+                        
currentStateTimestamp.orElse(offsetAndMetadata.commitTimestampMs),
+                        offsetAndMetadata.expireTimestampMs,
+                        offsetsRetentionMs
+                    )

Review Comment:
   The dependency between the classes feels a bit weird here. The 
OffsetMetadataManager calls offsetExpirationCondition which returns an 
OffsetExpirationCondition which calls the OffsetMetadataManager. I was not 
expecting this when I suggested it.
   
   We could perhaps have an implementation of the OffsetExpirationCondition 
which contains the logic of `OffsetMetadataManager.isExpiredOffset` and takes a 
function to extract the timestamp from the record. Here we could just 
instantiate the new class with the correct way to get the timestamp. Would it 
make sense?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##########
@@ -1763,6 +1803,120 @@ public void testDeleteGroupAllOffsets(Group.GroupType 
groupType) {
         assertEquals(3, numDeleteOffsets);
     }
 
+    @Test
+    public void testIsExpiredOffset() {
+        long currentTimestamp = 1000L;
+        long baseTimestamp = 500L;
+        OptionalLong expireTimestampMs = OptionalLong.of(1500);
+        long offsetsRetentionMs = 500L;
+
+        // Current timestamp >= expire timestamp => should expire
+        assertFalse(OffsetMetadataManager.isExpiredOffset(currentTimestamp, 
baseTimestamp, expireTimestampMs, offsetsRetentionMs));

Review Comment:
   Either the comment or the assertion is incorrect.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##########
@@ -1763,6 +1803,120 @@ public void testDeleteGroupAllOffsets(Group.GroupType 
groupType) {
         assertEquals(3, numDeleteOffsets);
     }
 
+    @Test
+    public void testIsExpiredOffset() {
+        long currentTimestamp = 1000L;
+        long baseTimestamp = 500L;
+        OptionalLong expireTimestampMs = OptionalLong.of(1500);
+        long offsetsRetentionMs = 500L;
+
+        // Current timestamp >= expire timestamp => should expire
+        assertFalse(OffsetMetadataManager.isExpiredOffset(currentTimestamp, 
baseTimestamp, expireTimestampMs, offsetsRetentionMs));
+
+        // Current timestamp < expire timestamp => should not expire
+        currentTimestamp = 499;
+        assertFalse(OffsetMetadataManager.isExpiredOffset(currentTimestamp, 
baseTimestamp, expireTimestampMs, offsetsRetentionMs));
+
+        // Expire timestamp does not exist (current version with no per 
partition retention)
+        // Current timestamp - base timestamp >= offsets retention => should 
expire
+        expireTimestampMs = OptionalLong.empty();
+        currentTimestamp = 1000L;
+        assertTrue(OffsetMetadataManager.isExpiredOffset(currentTimestamp, 
baseTimestamp, expireTimestampMs, offsetsRetentionMs));
+
+        // Current timestamp - base timestamp < offsets retention => should 
not expire
+        currentTimestamp = 999L;
+        assertFalse(OffsetMetadataManager.isExpiredOffset(currentTimestamp, 
baseTimestamp, expireTimestampMs, offsetsRetentionMs));
+    }
+
+    @Test
+    public void testCleanupExpiredOffsetsGroupDoesNotExist() {
+        OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder()
+            .build();
+
+        List<Record> records = new ArrayList<>();
+        assertTrue(context.cleanupExpiredOffsets("unknown-group-id", records));
+        assertEquals(Collections.emptyList(), records);
+    }
+
+    @Test
+    public void testCleanupExpiredOffsetsGroupEmptyOffsetExpirationCondition() 
{
+        GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
+        Group group = mock(Group.class);
+
+        OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder()
+            .withGroupMetadataManager(groupMetadataManager)
+            .build();
+
+        context.commitOffset("group-id", "topic", 0, 100L, 0);
+
+        when(groupMetadataManager.group("group-id")).thenReturn(group);
+        when(group.offsetExpirationCondition()).thenReturn(Optional.empty());
+
+        List<Record> records = new ArrayList<>();
+        assertFalse(context.cleanupExpiredOffsets("group-id", records));
+        assertEquals(Collections.emptyList(), records);
+    }
+
+    @Test
+    public void testCleanupExpiredOffsets() {

Review Comment:
   Here we basically test the default condition, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to