dajac commented on code in PR #14467: URL: https://github.com/apache/kafka/pull/14467#discussion_r1348548710
########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java: ########## @@ -623,4 +674,33 @@ public void testReplayGroupMetadataWithNullValue() { verify(groupMetadataManager, times(1)).replay(key, null); } + + @Test + public void testScheduleCleanupGroupMetadata() { Review Comment: Do we also need a test which verifies that the implementation does the right steps? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ########## @@ -544,6 +579,100 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup fetchAllOffsets( .setTopics(topicResponses); } + /** + * Remove expired offsets for group. + * + * @param groupId The group id. + * @param records The list of records to populate with offset commit tombstone records. + * + * @return True if no offsets exist or if all offsets expired, false otherwise. + */ + public boolean cleanupExpiredOffsets(String groupId, List<Record> records) { + TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>> offsetsByTopic = offsetsByGroup.get(groupId); + if (offsetsByTopic == null) { + return true; + } + + // We expect the group to exist. + Group group = groupMetadataManager.group(groupId); + Set<TopicPartition> expiredPartitions = new HashSet<>(); + long currentTimestamp = time.milliseconds(); + Optional<OffsetExpirationCondition> offsetExpirationCondition = group.offsetExpirationCondition(); + + if (!offsetExpirationCondition.isPresent()) { + return false; + } + + AtomicBoolean hasAllOffsetsExpired = new AtomicBoolean(true); + OffsetExpirationCondition condition = offsetExpirationCondition.get(); + + offsetsByTopic.forEach((topic, partitions) -> { + if (!group.isSubscribedToTopic(topic, false)) { + partitions.forEach((partition, offsetAndMetadata) -> { + if (condition.isOffsetExpired(offsetAndMetadata, currentTimestamp, config.offsetsRetentionMs)) { + expiredPartitions.add(appendOffsetCommitTombstone(groupId, topic, partition, records)); + } else { + hasAllOffsetsExpired.set(false); + } + }); + } else { + hasAllOffsetsExpired.set(false); + } + }); + + log.debug("[GroupId {}] Expiring offsets: {}", groupId, expiredPartitions); Review Comment: I wonder if we should log this with info level. What do you think? We should also format `expiredPartitions` correctly here. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java: ########## @@ -417,6 +441,40 @@ public CoordinatorResult<OffsetDeleteResponseData, Record> deleteOffsets( return offsetMetadataManager.deleteOffsets(request); } + /** + * For each group, remove all expired offsets. If all offsets for the group is removed and the group is eligible + * for deletion, delete the group. + * + * @return The list of tombstones (offset commit and group metadata) to append. + */ + public CoordinatorResult<Void, Record> cleanupGroupMetadata() { + List<Record> records = new ArrayList<>(); + groupMetadataManager.groupIds() + .forEach(groupId -> { Review Comment: nit: Let's bring `forEach` on the previous line. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java: ########## @@ -898,6 +900,52 @@ public void createGroupTombstoneRecords(List<Record> records) { records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId())); } + @Override + public boolean isGroupEmpty() { Review Comment: nit: `isEmpty`? ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java: ########## @@ -282,6 +311,17 @@ public void commitOffset( int partition, long offset, int leaderEpoch + ) { + commitOffset(groupId, topic, partition, offset, leaderEpoch, time.milliseconds()); + + } + public void commitOffset( Review Comment: nit: Let's add an empty line before the method declaration. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupTest.java: ########## @@ -1085,6 +1093,101 @@ public void testValidateDeleteGroup() { assertThrows(GroupIdNotFoundException.class, group::validateDeleteGroup); } + @Test + public void testOffsetExpirationCondition() { + MockedStatic<OffsetMetadataManager> offsetMetadataManager = mockStatic(OffsetMetadataManager.class); + long currentTimestamp = 30000L; + long commitTimestamp = 20000L; + long offsetsRetentionMs = 10000L; + OptionalLong expireTimestamp = OptionalLong.of(35000); + OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(15000L, OptionalInt.empty(), "", commitTimestamp, expireTimestamp); + MockTime time = new MockTime(); + long currentStateTimestamp = time.milliseconds(); + GenericGroup group = new GenericGroup(new LogContext(), "groupId", EMPTY, time); + + // 1. Test no protocol type. Simple consumer case, Base timestamp based off of last commit timestamp. + Optional<OffsetMetadataManager.OffsetExpirationCondition> condition = group.offsetExpirationCondition(); + assertTrue(condition.isPresent()); + + condition.get().isOffsetExpired( Review Comment: Should we assert the returned value? This applies to others as well. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java: ########## @@ -265,6 +293,7 @@ public List<OffsetFetchResponseData.OffsetFetchResponseTopics> fetchAllOffsets( return response.topics(); } + Review Comment: nit: This empty line could be removed. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java: ########## @@ -1763,6 +1803,120 @@ public void testDeleteGroupAllOffsets(Group.GroupType groupType) { assertEquals(3, numDeleteOffsets); } + @Test + public void testIsExpiredOffset() { + long currentTimestamp = 1000L; + long baseTimestamp = 500L; + OptionalLong expireTimestampMs = OptionalLong.of(1500); + long offsetsRetentionMs = 500L; + + // Current timestamp >= expire timestamp => should expire + assertFalse(OffsetMetadataManager.isExpiredOffset(currentTimestamp, baseTimestamp, expireTimestampMs, offsetsRetentionMs)); + + // Current timestamp < expire timestamp => should not expire + currentTimestamp = 499; + assertFalse(OffsetMetadataManager.isExpiredOffset(currentTimestamp, baseTimestamp, expireTimestampMs, offsetsRetentionMs)); + + // Expire timestamp does not exist (current version with no per partition retention) + // Current timestamp - base timestamp >= offsets retention => should expire + expireTimestampMs = OptionalLong.empty(); + currentTimestamp = 1000L; + assertTrue(OffsetMetadataManager.isExpiredOffset(currentTimestamp, baseTimestamp, expireTimestampMs, offsetsRetentionMs)); + + // Current timestamp - base timestamp < offsets retention => should not expire + currentTimestamp = 999L; + assertFalse(OffsetMetadataManager.isExpiredOffset(currentTimestamp, baseTimestamp, expireTimestampMs, offsetsRetentionMs)); + } + + @Test + public void testCleanupExpiredOffsetsGroupDoesNotExist() { + OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder() + .build(); + + List<Record> records = new ArrayList<>(); + assertTrue(context.cleanupExpiredOffsets("unknown-group-id", records)); + assertEquals(Collections.emptyList(), records); + } + + @Test + public void testCleanupExpiredOffsetsGroupEmptyOffsetExpirationCondition() { Review Comment: Is `GroupEmpty` correct here? ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java: ########## @@ -70,47 +70,66 @@ import java.util.OptionalLong; import static org.apache.kafka.common.requests.OffsetFetchResponse.INVALID_OFFSET; +import static org.apache.kafka.coordinator.group.OffsetMetadataManager.OffsetExpirationCondition.DEFAULT_OFFSET_EXPIRATION_CONDITION; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class OffsetMetadataManagerTest { static class OffsetMetadataManagerTestContext { public static class Builder { - final private MockTime time = new MockTime(); - final private MockCoordinatorTimer<Void, Record> timer = new MockCoordinatorTimer<>(time); - final private LogContext logContext = new LogContext(); - final private SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext); + private final MockTime time = new MockTime(); + private final MockCoordinatorTimer<Void, Record> timer = new MockCoordinatorTimer<>(time); + private final LogContext logContext = new LogContext(); + private final SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext); + private GroupMetadataManager groupMetadataManager = null; private MetadataImage metadataImage = null; - private int offsetMetadataMaxSize = 4096; + private GroupCoordinatorConfig config = null; Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) { - this.offsetMetadataMaxSize = offsetMetadataMaxSize; + config = GroupCoordinatorConfigTest.createGroupCoordinatorConfig(offsetMetadataMaxSize, 60000L, 24 * 60 * 1000); + return this; + } + + Builder withOffsetsRetentionMs(long offsetsRetentionMs) { + config = GroupCoordinatorConfigTest.createGroupCoordinatorConfig(4096, 60000L, offsetsRetentionMs); Review Comment: I am not a big fan of this because it won't scale if we add another params. It seems to be that we should have a config builder in the test package with sane default for tests. Then we could pass the config object here or instantiate the default one. What do you think? I am fine with doing this as a follow-up though. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java: ########## @@ -1763,6 +1803,120 @@ public void testDeleteGroupAllOffsets(Group.GroupType groupType) { assertEquals(3, numDeleteOffsets); } + @Test + public void testIsExpiredOffset() { + long currentTimestamp = 1000L; + long baseTimestamp = 500L; + OptionalLong expireTimestampMs = OptionalLong.of(1500); + long offsetsRetentionMs = 500L; + + // Current timestamp >= expire timestamp => should expire + assertFalse(OffsetMetadataManager.isExpiredOffset(currentTimestamp, baseTimestamp, expireTimestampMs, offsetsRetentionMs)); + + // Current timestamp < expire timestamp => should not expire + currentTimestamp = 499; + assertFalse(OffsetMetadataManager.isExpiredOffset(currentTimestamp, baseTimestamp, expireTimestampMs, offsetsRetentionMs)); + + // Expire timestamp does not exist (current version with no per partition retention) + // Current timestamp - base timestamp >= offsets retention => should expire + expireTimestampMs = OptionalLong.empty(); + currentTimestamp = 1000L; + assertTrue(OffsetMetadataManager.isExpiredOffset(currentTimestamp, baseTimestamp, expireTimestampMs, offsetsRetentionMs)); + + // Current timestamp - base timestamp < offsets retention => should not expire + currentTimestamp = 999L; + assertFalse(OffsetMetadataManager.isExpiredOffset(currentTimestamp, baseTimestamp, expireTimestampMs, offsetsRetentionMs)); + } + + @Test + public void testCleanupExpiredOffsetsGroupDoesNotExist() { + OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder() + .build(); + + List<Record> records = new ArrayList<>(); + assertTrue(context.cleanupExpiredOffsets("unknown-group-id", records)); Review Comment: Shouldn't this throw an `GroupIdNotFoundException` exception? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java: ########## @@ -898,6 +900,52 @@ public void createGroupTombstoneRecords(List<Record> records) { records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId())); } + @Override + public boolean isGroupEmpty() { + return isInState(EMPTY); + } + + /** + * Return the offset expiration condition to be used for this group. This is based on several factors + * such as the group state, the protocol type, and the GroupMetadata record version. + * + * See {@link org.apache.kafka.coordinator.group.OffsetMetadataManager.OffsetExpirationCondition} + * + * @return The offset expiration condition for the group or Empty of no such condition exists. + */ + @Override + public Optional<OffsetMetadataManager.OffsetExpirationCondition> offsetExpirationCondition() { + if (protocolType.isPresent()) { + if (isInState(EMPTY)) { + // No consumer exists in the group => + // - If current state timestamp exists and retention period has passed since group became Empty, + // expire all offsets with no pending offset commit; + // - If there is no current state timestamp (old group metadata schema) and retention period has passed + // since the last commit timestamp, expire the offset + return Optional.of( + (offsetAndMetadata, currentTimestamp, offsetsRetentionMs) -> OffsetMetadataManager.isExpiredOffset( + currentTimestamp, + currentStateTimestamp.orElse(offsetAndMetadata.commitTimestampMs), + offsetAndMetadata.expireTimestampMs, + offsetsRetentionMs + ) Review Comment: The dependency between the classes feels a bit weird here. The OffsetMetadataManager calls offsetExpirationCondition which returns an OffsetExpirationCondition which calls the OffsetMetadataManager. I was not expecting this when I suggested it. We could perhaps have an implementation of the OffsetExpirationCondition which contains the logic of `OffsetMetadataManager.isExpiredOffset` and takes a function to extract the timestamp from the record. Here we could just instantiate the new class with the correct way to get the timestamp. Would it make sense? ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java: ########## @@ -1763,6 +1803,120 @@ public void testDeleteGroupAllOffsets(Group.GroupType groupType) { assertEquals(3, numDeleteOffsets); } + @Test + public void testIsExpiredOffset() { + long currentTimestamp = 1000L; + long baseTimestamp = 500L; + OptionalLong expireTimestampMs = OptionalLong.of(1500); + long offsetsRetentionMs = 500L; + + // Current timestamp >= expire timestamp => should expire + assertFalse(OffsetMetadataManager.isExpiredOffset(currentTimestamp, baseTimestamp, expireTimestampMs, offsetsRetentionMs)); Review Comment: Either the comment or the assertion is incorrect. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java: ########## @@ -1763,6 +1803,120 @@ public void testDeleteGroupAllOffsets(Group.GroupType groupType) { assertEquals(3, numDeleteOffsets); } + @Test + public void testIsExpiredOffset() { + long currentTimestamp = 1000L; + long baseTimestamp = 500L; + OptionalLong expireTimestampMs = OptionalLong.of(1500); + long offsetsRetentionMs = 500L; + + // Current timestamp >= expire timestamp => should expire + assertFalse(OffsetMetadataManager.isExpiredOffset(currentTimestamp, baseTimestamp, expireTimestampMs, offsetsRetentionMs)); + + // Current timestamp < expire timestamp => should not expire + currentTimestamp = 499; + assertFalse(OffsetMetadataManager.isExpiredOffset(currentTimestamp, baseTimestamp, expireTimestampMs, offsetsRetentionMs)); + + // Expire timestamp does not exist (current version with no per partition retention) + // Current timestamp - base timestamp >= offsets retention => should expire + expireTimestampMs = OptionalLong.empty(); + currentTimestamp = 1000L; + assertTrue(OffsetMetadataManager.isExpiredOffset(currentTimestamp, baseTimestamp, expireTimestampMs, offsetsRetentionMs)); + + // Current timestamp - base timestamp < offsets retention => should not expire + currentTimestamp = 999L; + assertFalse(OffsetMetadataManager.isExpiredOffset(currentTimestamp, baseTimestamp, expireTimestampMs, offsetsRetentionMs)); + } + + @Test + public void testCleanupExpiredOffsetsGroupDoesNotExist() { + OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder() + .build(); + + List<Record> records = new ArrayList<>(); + assertTrue(context.cleanupExpiredOffsets("unknown-group-id", records)); + assertEquals(Collections.emptyList(), records); + } + + @Test + public void testCleanupExpiredOffsetsGroupEmptyOffsetExpirationCondition() { + GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); + Group group = mock(Group.class); + + OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder() + .withGroupMetadataManager(groupMetadataManager) + .build(); + + context.commitOffset("group-id", "topic", 0, 100L, 0); + + when(groupMetadataManager.group("group-id")).thenReturn(group); + when(group.offsetExpirationCondition()).thenReturn(Optional.empty()); + + List<Record> records = new ArrayList<>(); + assertFalse(context.cleanupExpiredOffsets("group-id", records)); + assertEquals(Collections.emptyList(), records); + } + + @Test + public void testCleanupExpiredOffsets() { Review Comment: Here we basically test the default condition, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org