dajac commented on code in PR #15442: URL: https://github.com/apache/kafka/pull/15442#discussion_r1515850376
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -3500,6 +3503,56 @@ public void maybeDeleteGroup(String groupId, List<Record> records) { } } + /** + * A group can be upgraded offline if it's a classic group and empty. + * + * @param groupId The group to be validated. + * @return true if the offline upgrade is valid. + */ + private boolean validateOfflineUpgrade(String groupId) { + Group group = groups.get(groupId); + return group != null && group.type() == CLASSIC && group.isEmpty(); + } + + /** + * A group can be downgraded offline if it's a consumer group and empty. + * + * @param groupId The group to be validated. + * @return true if the offline downgrade is valid. + */ + private boolean validateOfflineDowngrade(String groupId) { + Group group = groups.get(groupId); + return group != null && group.type() == CONSUMER && group.isEmpty(); + } + + /** + * Upgrade the empty group if it's valid. + * + * @param groupId The group id to be migrated. + * @param records The list of records to delete the previous group. + */ + public void maybeUpgradeEmptyGroup(String groupId, List<Record> records) { + if (validateOfflineUpgrade(groupId)) { + deleteGroup(groupId, records); + removeGroup(groupId); Review Comment: nit: Those two methods next to each others look a bit weird. I wonder if we should rename `deleteGroup` to `createGroupTombstoneRecords`. Another thing is that we actually get the group from the map in each method called here. I wonder if we should inline and simplify it. For instance, we could lookup the group as the first thing in this method, check the condition, then call `createGroupTombstoneRecords` on the group and finally remove the group from the map. What do you think? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -3500,6 +3503,56 @@ public void maybeDeleteGroup(String groupId, List<Record> records) { } } + /** + * A group can be upgraded offline if it's a classic group and empty. + * + * @param groupId The group to be validated. + * @return true if the offline upgrade is valid. + */ + private boolean validateOfflineUpgrade(String groupId) { + Group group = groups.get(groupId); + return group != null && group.type() == CLASSIC && group.isEmpty(); + } + + /** + * A group can be downgraded offline if it's a consumer group and empty. + * + * @param groupId The group to be validated. + * @return true if the offline downgrade is valid. + */ + private boolean validateOfflineDowngrade(String groupId) { + Group group = groups.get(groupId); + return group != null && group.type() == CONSUMER && group.isEmpty(); + } + + /** + * Upgrade the empty group if it's valid. + * + * @param groupId The group id to be migrated. + * @param records The list of records to delete the previous group. + */ + public void maybeUpgradeEmptyGroup(String groupId, List<Record> records) { + if (validateOfflineUpgrade(groupId)) { Review Comment: nit: I wonder if naming it `isEmptyClassicGroup` would be better. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -3500,6 +3503,56 @@ public void maybeDeleteGroup(String groupId, List<Record> records) { } } + /** + * A group can be upgraded offline if it's a classic group and empty. + * + * @param groupId The group to be validated. + * @return true if the offline upgrade is valid. + */ + private boolean validateOfflineUpgrade(String groupId) { + Group group = groups.get(groupId); + return group != null && group.type() == CLASSIC && group.isEmpty(); + } + + /** + * A group can be downgraded offline if it's a consumer group and empty. + * + * @param groupId The group to be validated. + * @return true if the offline downgrade is valid. + */ + private boolean validateOfflineDowngrade(String groupId) { + Group group = groups.get(groupId); + return group != null && group.type() == CONSUMER && group.isEmpty(); + } + + /** + * Upgrade the empty group if it's valid. + * + * @param groupId The group id to be migrated. + * @param records The list of records to delete the previous group. + */ + public void maybeUpgradeEmptyGroup(String groupId, List<Record> records) { Review Comment: nit: Could we keep it private? It does not seems necessary to expose it. I also wonder if we should name it `maybeDeleteEmptyClassicGroup`. What do you think? ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -9607,6 +9607,151 @@ public void testOnConsumerGroupStateTransitionOnLoading() { verify(context.metrics, times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.EMPTY, null); } + @Test + public void testMaybeUpgradeEmptyGroup() { + String classicGroupId = "classic-group-id"; + String consumerGroupId = "consumer-group-id"; + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + ClassicGroup classicGroup = new ClassicGroup( + new LogContext(), + classicGroupId, + EMPTY, + context.time, + context.metrics + ); + context.replay(RecordHelpers.newGroupMetadataRecord(classicGroup, classicGroup.groupAssignment(), MetadataVersion.latestTesting())); + context.replay(RecordHelpers.newGroupEpochRecord(consumerGroupId, 10)); + + // A consumer group can't be upgraded. + List<Record> records = new ArrayList<>(); + context.groupMetadataManager.maybeUpgradeEmptyGroup(consumerGroupId, records); + assertEquals(Collections.emptyList(), records); + + // A non-empty classic group can't be upgraded. + context.groupMetadataManager.getOrMaybeCreateClassicGroup(classicGroupId, false).transitionTo(PREPARING_REBALANCE); + context.groupMetadataManager.maybeUpgradeEmptyGroup(classicGroupId, records); + assertEquals(Collections.emptyList(), records); + + // An empty classic group can be upgraded. + context.groupMetadataManager.getOrMaybeCreateClassicGroup(classicGroupId, false).transitionTo(EMPTY); + context.groupMetadataManager.maybeUpgradeEmptyGroup(classicGroupId, records); + assertEquals(Arrays.asList(RecordHelpers.newGroupMetadataTombstoneRecord(classicGroupId)), records); + } + + @Test + public void testMaybeDowngradeEmptyGroup() { + String classicGroupId = "classic-group-id"; + String consumerGroupId = "consumer-group-id"; + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + ClassicGroup classicGroup = new ClassicGroup( + new LogContext(), + classicGroupId, + EMPTY, + context.time, + context.metrics + ); + context.replay(RecordHelpers.newGroupMetadataRecord(classicGroup, classicGroup.groupAssignment(), MetadataVersion.latestTesting())); + context.replay(RecordHelpers.newGroupEpochRecord(consumerGroupId, 10)); + + List<Record> records = new ArrayList<>(); + context.groupMetadataManager.maybeDowngradeEmptyGroup(classicGroupId, records); + assertEquals(Collections.emptyList(), records); + + // A classic group can't be downgraded. + context.groupMetadataManager.maybeDowngradeEmptyGroup(classicGroupId, records); + assertEquals(Collections.emptyList(), records); + + // An empty consumer group can be upgraded. + context.groupMetadataManager.maybeDowngradeEmptyGroup(consumerGroupId, records); + assertEquals(Arrays.asList( + RecordHelpers.newTargetAssignmentEpochTombstoneRecord(consumerGroupId), + RecordHelpers.newGroupSubscriptionMetadataTombstoneRecord(consumerGroupId), + RecordHelpers.newGroupEpochTombstoneRecord(consumerGroupId)), records); + records.clear(); + + // A non-empty consumer group can't be downgraded. + ConsumerGroupMember.Builder memberBuilder = new ConsumerGroupMember.Builder(Uuid.randomUuid().toString()); + context.replay(RecordHelpers.newMemberSubscriptionRecord(consumerGroupId, memberBuilder.build())); + context.groupMetadataManager.maybeDowngradeEmptyGroup(classicGroupId, records); + assertEquals(Collections.emptyList(), records); + } + + @Test + public void testConsumerGroupHeartbeatWithEmptyClassicGroup() { + String classicGroupId = "classic-group-id"; + String memberId = Uuid.randomUuid().toString(); + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + assignor.prepareGroupAssignment(new GroupAssignment(Collections.emptyMap())); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .build(); + ClassicGroup classicGroup = new ClassicGroup( + new LogContext(), + classicGroupId, + EMPTY, + context.time, + context.metrics + ); + context.replay(RecordHelpers.newGroupMetadataRecord(classicGroup, classicGroup.groupAssignment(), MetadataVersion.latestTesting())); + + context.groupMetadataManager.getOrMaybeCreateClassicGroup(classicGroupId, false).transitionTo(PREPARING_REBALANCE); + assertThrows(GroupIdNotFoundException.class, () -> + context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(classicGroupId) + .setMemberId(memberId) + .setMemberEpoch(0) + .setServerAssignor("range") + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setTopicPartitions(Collections.emptyList()))); + + context.groupMetadataManager.getOrMaybeCreateClassicGroup(classicGroupId, false).transitionTo(EMPTY); + CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(classicGroupId) + .setMemberId(memberId) + .setMemberEpoch(0) + .setServerAssignor("range") + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setTopicPartitions(Collections.emptyList())); + assertEquals(0, result.response().errorCode()); + assertEquals(Group.GroupType.CONSUMER, + context.groupMetadataManager.getOrMaybeCreateConsumerGroup(classicGroupId, false).type()); Review Comment: Should we verify the records generated here? ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -9607,6 +9607,151 @@ public void testOnConsumerGroupStateTransitionOnLoading() { verify(context.metrics, times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.EMPTY, null); } + @Test + public void testMaybeUpgradeEmptyGroup() { + String classicGroupId = "classic-group-id"; + String consumerGroupId = "consumer-group-id"; + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + ClassicGroup classicGroup = new ClassicGroup( + new LogContext(), + classicGroupId, + EMPTY, + context.time, + context.metrics + ); + context.replay(RecordHelpers.newGroupMetadataRecord(classicGroup, classicGroup.groupAssignment(), MetadataVersion.latestTesting())); + context.replay(RecordHelpers.newGroupEpochRecord(consumerGroupId, 10)); + + // A consumer group can't be upgraded. + List<Record> records = new ArrayList<>(); + context.groupMetadataManager.maybeUpgradeEmptyGroup(consumerGroupId, records); + assertEquals(Collections.emptyList(), records); + + // A non-empty classic group can't be upgraded. + context.groupMetadataManager.getOrMaybeCreateClassicGroup(classicGroupId, false).transitionTo(PREPARING_REBALANCE); + context.groupMetadataManager.maybeUpgradeEmptyGroup(classicGroupId, records); + assertEquals(Collections.emptyList(), records); + + // An empty classic group can be upgraded. + context.groupMetadataManager.getOrMaybeCreateClassicGroup(classicGroupId, false).transitionTo(EMPTY); + context.groupMetadataManager.maybeUpgradeEmptyGroup(classicGroupId, records); + assertEquals(Arrays.asList(RecordHelpers.newGroupMetadataTombstoneRecord(classicGroupId)), records); + } + + @Test + public void testMaybeDowngradeEmptyGroup() { + String classicGroupId = "classic-group-id"; + String consumerGroupId = "consumer-group-id"; + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + ClassicGroup classicGroup = new ClassicGroup( + new LogContext(), + classicGroupId, + EMPTY, + context.time, + context.metrics + ); + context.replay(RecordHelpers.newGroupMetadataRecord(classicGroup, classicGroup.groupAssignment(), MetadataVersion.latestTesting())); + context.replay(RecordHelpers.newGroupEpochRecord(consumerGroupId, 10)); + + List<Record> records = new ArrayList<>(); + context.groupMetadataManager.maybeDowngradeEmptyGroup(classicGroupId, records); + assertEquals(Collections.emptyList(), records); + + // A classic group can't be downgraded. + context.groupMetadataManager.maybeDowngradeEmptyGroup(classicGroupId, records); + assertEquals(Collections.emptyList(), records); + + // An empty consumer group can be upgraded. + context.groupMetadataManager.maybeDowngradeEmptyGroup(consumerGroupId, records); + assertEquals(Arrays.asList( + RecordHelpers.newTargetAssignmentEpochTombstoneRecord(consumerGroupId), + RecordHelpers.newGroupSubscriptionMetadataTombstoneRecord(consumerGroupId), + RecordHelpers.newGroupEpochTombstoneRecord(consumerGroupId)), records); + records.clear(); + + // A non-empty consumer group can't be downgraded. + ConsumerGroupMember.Builder memberBuilder = new ConsumerGroupMember.Builder(Uuid.randomUuid().toString()); + context.replay(RecordHelpers.newMemberSubscriptionRecord(consumerGroupId, memberBuilder.build())); + context.groupMetadataManager.maybeDowngradeEmptyGroup(classicGroupId, records); + assertEquals(Collections.emptyList(), records); + } + + @Test + public void testConsumerGroupHeartbeatWithEmptyClassicGroup() { + String classicGroupId = "classic-group-id"; + String memberId = Uuid.randomUuid().toString(); + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + assignor.prepareGroupAssignment(new GroupAssignment(Collections.emptyMap())); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .build(); + ClassicGroup classicGroup = new ClassicGroup( + new LogContext(), + classicGroupId, + EMPTY, + context.time, + context.metrics + ); + context.replay(RecordHelpers.newGroupMetadataRecord(classicGroup, classicGroup.groupAssignment(), MetadataVersion.latestTesting())); + + context.groupMetadataManager.getOrMaybeCreateClassicGroup(classicGroupId, false).transitionTo(PREPARING_REBALANCE); + assertThrows(GroupIdNotFoundException.class, () -> + context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(classicGroupId) + .setMemberId(memberId) + .setMemberEpoch(0) + .setServerAssignor("range") + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setTopicPartitions(Collections.emptyList()))); Review Comment: This feels a bit weird in this test as it does not align with its name. How about putting it in a separate test? ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -9607,6 +9607,151 @@ public void testOnConsumerGroupStateTransitionOnLoading() { verify(context.metrics, times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.EMPTY, null); } + @Test + public void testMaybeUpgradeEmptyGroup() { + String classicGroupId = "classic-group-id"; + String consumerGroupId = "consumer-group-id"; + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + ClassicGroup classicGroup = new ClassicGroup( + new LogContext(), + classicGroupId, + EMPTY, + context.time, + context.metrics + ); + context.replay(RecordHelpers.newGroupMetadataRecord(classicGroup, classicGroup.groupAssignment(), MetadataVersion.latestTesting())); + context.replay(RecordHelpers.newGroupEpochRecord(consumerGroupId, 10)); + + // A consumer group can't be upgraded. + List<Record> records = new ArrayList<>(); + context.groupMetadataManager.maybeUpgradeEmptyGroup(consumerGroupId, records); + assertEquals(Collections.emptyList(), records); + + // A non-empty classic group can't be upgraded. + context.groupMetadataManager.getOrMaybeCreateClassicGroup(classicGroupId, false).transitionTo(PREPARING_REBALANCE); + context.groupMetadataManager.maybeUpgradeEmptyGroup(classicGroupId, records); + assertEquals(Collections.emptyList(), records); + + // An empty classic group can be upgraded. + context.groupMetadataManager.getOrMaybeCreateClassicGroup(classicGroupId, false).transitionTo(EMPTY); + context.groupMetadataManager.maybeUpgradeEmptyGroup(classicGroupId, records); + assertEquals(Arrays.asList(RecordHelpers.newGroupMetadataTombstoneRecord(classicGroupId)), records); + } + + @Test + public void testMaybeDowngradeEmptyGroup() { + String classicGroupId = "classic-group-id"; + String consumerGroupId = "consumer-group-id"; + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + ClassicGroup classicGroup = new ClassicGroup( + new LogContext(), + classicGroupId, + EMPTY, + context.time, + context.metrics + ); + context.replay(RecordHelpers.newGroupMetadataRecord(classicGroup, classicGroup.groupAssignment(), MetadataVersion.latestTesting())); + context.replay(RecordHelpers.newGroupEpochRecord(consumerGroupId, 10)); + + List<Record> records = new ArrayList<>(); + context.groupMetadataManager.maybeDowngradeEmptyGroup(classicGroupId, records); + assertEquals(Collections.emptyList(), records); + + // A classic group can't be downgraded. + context.groupMetadataManager.maybeDowngradeEmptyGroup(classicGroupId, records); + assertEquals(Collections.emptyList(), records); + + // An empty consumer group can be upgraded. + context.groupMetadataManager.maybeDowngradeEmptyGroup(consumerGroupId, records); + assertEquals(Arrays.asList( + RecordHelpers.newTargetAssignmentEpochTombstoneRecord(consumerGroupId), + RecordHelpers.newGroupSubscriptionMetadataTombstoneRecord(consumerGroupId), + RecordHelpers.newGroupEpochTombstoneRecord(consumerGroupId)), records); + records.clear(); + + // A non-empty consumer group can't be downgraded. + ConsumerGroupMember.Builder memberBuilder = new ConsumerGroupMember.Builder(Uuid.randomUuid().toString()); + context.replay(RecordHelpers.newMemberSubscriptionRecord(consumerGroupId, memberBuilder.build())); + context.groupMetadataManager.maybeDowngradeEmptyGroup(classicGroupId, records); + assertEquals(Collections.emptyList(), records); + } + + @Test + public void testConsumerGroupHeartbeatWithEmptyClassicGroup() { + String classicGroupId = "classic-group-id"; + String memberId = Uuid.randomUuid().toString(); + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + assignor.prepareGroupAssignment(new GroupAssignment(Collections.emptyMap())); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .build(); + ClassicGroup classicGroup = new ClassicGroup( + new LogContext(), + classicGroupId, + EMPTY, + context.time, + context.metrics + ); + context.replay(RecordHelpers.newGroupMetadataRecord(classicGroup, classicGroup.groupAssignment(), MetadataVersion.latestTesting())); + + context.groupMetadataManager.getOrMaybeCreateClassicGroup(classicGroupId, false).transitionTo(PREPARING_REBALANCE); + assertThrows(GroupIdNotFoundException.class, () -> + context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(classicGroupId) + .setMemberId(memberId) + .setMemberEpoch(0) + .setServerAssignor("range") + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setTopicPartitions(Collections.emptyList()))); + + context.groupMetadataManager.getOrMaybeCreateClassicGroup(classicGroupId, false).transitionTo(EMPTY); + CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(classicGroupId) + .setMemberId(memberId) + .setMemberEpoch(0) + .setServerAssignor("range") + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setTopicPartitions(Collections.emptyList())); + assertEquals(0, result.response().errorCode()); + assertEquals(Group.GroupType.CONSUMER, + context.groupMetadataManager.getOrMaybeCreateConsumerGroup(classicGroupId, false).type()); + } + + @Test + public void testClassicGroupJoinWithEmptyConsumerGroup() throws Exception { + String consumerGroupId = "consumer-group-id"; + String memberId = Uuid.randomUuid().toString(); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + ConsumerGroupMember.Builder memberBuilder = new ConsumerGroupMember.Builder(memberId); + context.replay(RecordHelpers.newGroupEpochRecord(consumerGroupId, 10)); + context.replay(RecordHelpers.newMemberSubscriptionRecord(consumerGroupId, memberBuilder.build())); Review Comment: Similarly, I think that we should put the non empty case in a separate test case. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -9607,6 +9607,151 @@ public void testOnConsumerGroupStateTransitionOnLoading() { verify(context.metrics, times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.EMPTY, null); } + @Test + public void testMaybeUpgradeEmptyGroup() { + String classicGroupId = "classic-group-id"; + String consumerGroupId = "consumer-group-id"; + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + ClassicGroup classicGroup = new ClassicGroup( + new LogContext(), + classicGroupId, + EMPTY, + context.time, + context.metrics + ); + context.replay(RecordHelpers.newGroupMetadataRecord(classicGroup, classicGroup.groupAssignment(), MetadataVersion.latestTesting())); + context.replay(RecordHelpers.newGroupEpochRecord(consumerGroupId, 10)); + + // A consumer group can't be upgraded. + List<Record> records = new ArrayList<>(); + context.groupMetadataManager.maybeUpgradeEmptyGroup(consumerGroupId, records); + assertEquals(Collections.emptyList(), records); + + // A non-empty classic group can't be upgraded. + context.groupMetadataManager.getOrMaybeCreateClassicGroup(classicGroupId, false).transitionTo(PREPARING_REBALANCE); + context.groupMetadataManager.maybeUpgradeEmptyGroup(classicGroupId, records); + assertEquals(Collections.emptyList(), records); + + // An empty classic group can be upgraded. + context.groupMetadataManager.getOrMaybeCreateClassicGroup(classicGroupId, false).transitionTo(EMPTY); + context.groupMetadataManager.maybeUpgradeEmptyGroup(classicGroupId, records); + assertEquals(Arrays.asList(RecordHelpers.newGroupMetadataTombstoneRecord(classicGroupId)), records); + } + + @Test + public void testMaybeDowngradeEmptyGroup() { + String classicGroupId = "classic-group-id"; + String consumerGroupId = "consumer-group-id"; + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + ClassicGroup classicGroup = new ClassicGroup( + new LogContext(), + classicGroupId, + EMPTY, + context.time, + context.metrics + ); + context.replay(RecordHelpers.newGroupMetadataRecord(classicGroup, classicGroup.groupAssignment(), MetadataVersion.latestTesting())); + context.replay(RecordHelpers.newGroupEpochRecord(consumerGroupId, 10)); + + List<Record> records = new ArrayList<>(); + context.groupMetadataManager.maybeDowngradeEmptyGroup(classicGroupId, records); + assertEquals(Collections.emptyList(), records); + + // A classic group can't be downgraded. + context.groupMetadataManager.maybeDowngradeEmptyGroup(classicGroupId, records); + assertEquals(Collections.emptyList(), records); + + // An empty consumer group can be upgraded. + context.groupMetadataManager.maybeDowngradeEmptyGroup(consumerGroupId, records); + assertEquals(Arrays.asList( + RecordHelpers.newTargetAssignmentEpochTombstoneRecord(consumerGroupId), + RecordHelpers.newGroupSubscriptionMetadataTombstoneRecord(consumerGroupId), + RecordHelpers.newGroupEpochTombstoneRecord(consumerGroupId)), records); + records.clear(); + + // A non-empty consumer group can't be downgraded. + ConsumerGroupMember.Builder memberBuilder = new ConsumerGroupMember.Builder(Uuid.randomUuid().toString()); + context.replay(RecordHelpers.newMemberSubscriptionRecord(consumerGroupId, memberBuilder.build())); + context.groupMetadataManager.maybeDowngradeEmptyGroup(classicGroupId, records); + assertEquals(Collections.emptyList(), records); + } + + @Test + public void testConsumerGroupHeartbeatWithEmptyClassicGroup() { + String classicGroupId = "classic-group-id"; + String memberId = Uuid.randomUuid().toString(); + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + assignor.prepareGroupAssignment(new GroupAssignment(Collections.emptyMap())); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .build(); + ClassicGroup classicGroup = new ClassicGroup( + new LogContext(), + classicGroupId, + EMPTY, + context.time, + context.metrics + ); + context.replay(RecordHelpers.newGroupMetadataRecord(classicGroup, classicGroup.groupAssignment(), MetadataVersion.latestTesting())); + + context.groupMetadataManager.getOrMaybeCreateClassicGroup(classicGroupId, false).transitionTo(PREPARING_REBALANCE); + assertThrows(GroupIdNotFoundException.class, () -> + context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(classicGroupId) + .setMemberId(memberId) + .setMemberEpoch(0) + .setServerAssignor("range") + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setTopicPartitions(Collections.emptyList()))); + + context.groupMetadataManager.getOrMaybeCreateClassicGroup(classicGroupId, false).transitionTo(EMPTY); + CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(classicGroupId) + .setMemberId(memberId) + .setMemberEpoch(0) + .setServerAssignor("range") + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setTopicPartitions(Collections.emptyList())); + assertEquals(0, result.response().errorCode()); + assertEquals(Group.GroupType.CONSUMER, + context.groupMetadataManager.getOrMaybeCreateConsumerGroup(classicGroupId, false).type()); + } + + @Test + public void testClassicGroupJoinWithEmptyConsumerGroup() throws Exception { + String consumerGroupId = "consumer-group-id"; + String memberId = Uuid.randomUuid().toString(); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + ConsumerGroupMember.Builder memberBuilder = new ConsumerGroupMember.Builder(memberId); + context.replay(RecordHelpers.newGroupEpochRecord(consumerGroupId, 10)); Review Comment: An empty group should have the following records: * GroupEpoch * GroupSubscriptionMetadata (empty) * TargetAssignmentEpoch ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -3500,6 +3503,56 @@ public void maybeDeleteGroup(String groupId, List<Record> records) { } } + /** + * A group can be upgraded offline if it's a classic group and empty. + * + * @param groupId The group to be validated. + * @return true if the offline upgrade is valid. + */ + private boolean validateOfflineUpgrade(String groupId) { + Group group = groups.get(groupId); + return group != null && group.type() == CLASSIC && group.isEmpty(); + } + + /** + * A group can be downgraded offline if it's a consumer group and empty. + * + * @param groupId The group to be validated. + * @return true if the offline downgrade is valid. + */ + private boolean validateOfflineDowngrade(String groupId) { + Group group = groups.get(groupId); + return group != null && group.type() == CONSUMER && group.isEmpty(); + } + + /** + * Upgrade the empty group if it's valid. + * + * @param groupId The group id to be migrated. + * @param records The list of records to delete the previous group. + */ + public void maybeUpgradeEmptyGroup(String groupId, List<Record> records) { + if (validateOfflineUpgrade(groupId)) { + deleteGroup(groupId, records); + removeGroup(groupId); + } + } + + /** + * Downgrade the empty group if it's valid. + * + * @param groupId The group id to be migrated. + * @param records The list of records to delete the previous group. + */ + public void maybeDowngradeEmptyGroup(String groupId, List<Record> records) { Review Comment: The same comments apply to this one. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -9607,6 +9607,151 @@ public void testOnConsumerGroupStateTransitionOnLoading() { verify(context.metrics, times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.EMPTY, null); } + @Test + public void testMaybeUpgradeEmptyGroup() { + String classicGroupId = "classic-group-id"; + String consumerGroupId = "consumer-group-id"; + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + ClassicGroup classicGroup = new ClassicGroup( + new LogContext(), + classicGroupId, + EMPTY, + context.time, + context.metrics + ); + context.replay(RecordHelpers.newGroupMetadataRecord(classicGroup, classicGroup.groupAssignment(), MetadataVersion.latestTesting())); + context.replay(RecordHelpers.newGroupEpochRecord(consumerGroupId, 10)); + + // A consumer group can't be upgraded. + List<Record> records = new ArrayList<>(); + context.groupMetadataManager.maybeUpgradeEmptyGroup(consumerGroupId, records); + assertEquals(Collections.emptyList(), records); + + // A non-empty classic group can't be upgraded. + context.groupMetadataManager.getOrMaybeCreateClassicGroup(classicGroupId, false).transitionTo(PREPARING_REBALANCE); + context.groupMetadataManager.maybeUpgradeEmptyGroup(classicGroupId, records); + assertEquals(Collections.emptyList(), records); + + // An empty classic group can be upgraded. + context.groupMetadataManager.getOrMaybeCreateClassicGroup(classicGroupId, false).transitionTo(EMPTY); + context.groupMetadataManager.maybeUpgradeEmptyGroup(classicGroupId, records); + assertEquals(Arrays.asList(RecordHelpers.newGroupMetadataTombstoneRecord(classicGroupId)), records); + } + + @Test + public void testMaybeDowngradeEmptyGroup() { + String classicGroupId = "classic-group-id"; + String consumerGroupId = "consumer-group-id"; + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + ClassicGroup classicGroup = new ClassicGroup( + new LogContext(), + classicGroupId, + EMPTY, + context.time, + context.metrics + ); + context.replay(RecordHelpers.newGroupMetadataRecord(classicGroup, classicGroup.groupAssignment(), MetadataVersion.latestTesting())); + context.replay(RecordHelpers.newGroupEpochRecord(consumerGroupId, 10)); + + List<Record> records = new ArrayList<>(); + context.groupMetadataManager.maybeDowngradeEmptyGroup(classicGroupId, records); + assertEquals(Collections.emptyList(), records); + + // A classic group can't be downgraded. + context.groupMetadataManager.maybeDowngradeEmptyGroup(classicGroupId, records); + assertEquals(Collections.emptyList(), records); + + // An empty consumer group can be upgraded. + context.groupMetadataManager.maybeDowngradeEmptyGroup(consumerGroupId, records); + assertEquals(Arrays.asList( + RecordHelpers.newTargetAssignmentEpochTombstoneRecord(consumerGroupId), + RecordHelpers.newGroupSubscriptionMetadataTombstoneRecord(consumerGroupId), + RecordHelpers.newGroupEpochTombstoneRecord(consumerGroupId)), records); + records.clear(); + + // A non-empty consumer group can't be downgraded. + ConsumerGroupMember.Builder memberBuilder = new ConsumerGroupMember.Builder(Uuid.randomUuid().toString()); + context.replay(RecordHelpers.newMemberSubscriptionRecord(consumerGroupId, memberBuilder.build())); + context.groupMetadataManager.maybeDowngradeEmptyGroup(classicGroupId, records); + assertEquals(Collections.emptyList(), records); + } + + @Test + public void testConsumerGroupHeartbeatWithEmptyClassicGroup() { + String classicGroupId = "classic-group-id"; + String memberId = Uuid.randomUuid().toString(); + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + assignor.prepareGroupAssignment(new GroupAssignment(Collections.emptyMap())); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .build(); + ClassicGroup classicGroup = new ClassicGroup( + new LogContext(), + classicGroupId, + EMPTY, + context.time, + context.metrics + ); + context.replay(RecordHelpers.newGroupMetadataRecord(classicGroup, classicGroup.groupAssignment(), MetadataVersion.latestTesting())); + + context.groupMetadataManager.getOrMaybeCreateClassicGroup(classicGroupId, false).transitionTo(PREPARING_REBALANCE); + assertThrows(GroupIdNotFoundException.class, () -> + context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(classicGroupId) + .setMemberId(memberId) + .setMemberEpoch(0) + .setServerAssignor("range") + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setTopicPartitions(Collections.emptyList()))); + + context.groupMetadataManager.getOrMaybeCreateClassicGroup(classicGroupId, false).transitionTo(EMPTY); + CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = context.consumerGroupHeartbeat( Review Comment: Talking about records, I was actually wondering how their replay could work in this case. My understanding is that (1) we remove the classic group from the map directly, (2) then create the consumer group, (3) then replay the classic group deletion tombstone and (4) finally replay the consumer group records. I was expecting (3) to fail because the classic group is not there anymore. It actually works because the deletion does not check the group type in this case. I wonder if we should strengthen this path. For consumer group, the replay of tombstones only work if the group type is correct. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -9607,6 +9607,151 @@ public void testOnConsumerGroupStateTransitionOnLoading() { verify(context.metrics, times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.EMPTY, null); } + @Test + public void testMaybeUpgradeEmptyGroup() { Review Comment: `testMaybeUpgradeEmptyGroup` and `testMaybeDowngradeEmptyGroup` seem a bit redundant now that we have the others. Do they still bring something? ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -9607,6 +9607,151 @@ public void testOnConsumerGroupStateTransitionOnLoading() { verify(context.metrics, times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.EMPTY, null); } + @Test + public void testMaybeUpgradeEmptyGroup() { + String classicGroupId = "classic-group-id"; + String consumerGroupId = "consumer-group-id"; + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + ClassicGroup classicGroup = new ClassicGroup( + new LogContext(), + classicGroupId, + EMPTY, + context.time, + context.metrics + ); + context.replay(RecordHelpers.newGroupMetadataRecord(classicGroup, classicGroup.groupAssignment(), MetadataVersion.latestTesting())); + context.replay(RecordHelpers.newGroupEpochRecord(consumerGroupId, 10)); + + // A consumer group can't be upgraded. + List<Record> records = new ArrayList<>(); + context.groupMetadataManager.maybeUpgradeEmptyGroup(consumerGroupId, records); + assertEquals(Collections.emptyList(), records); + + // A non-empty classic group can't be upgraded. + context.groupMetadataManager.getOrMaybeCreateClassicGroup(classicGroupId, false).transitionTo(PREPARING_REBALANCE); + context.groupMetadataManager.maybeUpgradeEmptyGroup(classicGroupId, records); + assertEquals(Collections.emptyList(), records); + + // An empty classic group can be upgraded. + context.groupMetadataManager.getOrMaybeCreateClassicGroup(classicGroupId, false).transitionTo(EMPTY); + context.groupMetadataManager.maybeUpgradeEmptyGroup(classicGroupId, records); + assertEquals(Arrays.asList(RecordHelpers.newGroupMetadataTombstoneRecord(classicGroupId)), records); + } + + @Test + public void testMaybeDowngradeEmptyGroup() { + String classicGroupId = "classic-group-id"; + String consumerGroupId = "consumer-group-id"; + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + ClassicGroup classicGroup = new ClassicGroup( + new LogContext(), + classicGroupId, + EMPTY, + context.time, + context.metrics + ); + context.replay(RecordHelpers.newGroupMetadataRecord(classicGroup, classicGroup.groupAssignment(), MetadataVersion.latestTesting())); + context.replay(RecordHelpers.newGroupEpochRecord(consumerGroupId, 10)); + + List<Record> records = new ArrayList<>(); + context.groupMetadataManager.maybeDowngradeEmptyGroup(classicGroupId, records); + assertEquals(Collections.emptyList(), records); + + // A classic group can't be downgraded. + context.groupMetadataManager.maybeDowngradeEmptyGroup(classicGroupId, records); + assertEquals(Collections.emptyList(), records); + + // An empty consumer group can be upgraded. + context.groupMetadataManager.maybeDowngradeEmptyGroup(consumerGroupId, records); + assertEquals(Arrays.asList( + RecordHelpers.newTargetAssignmentEpochTombstoneRecord(consumerGroupId), + RecordHelpers.newGroupSubscriptionMetadataTombstoneRecord(consumerGroupId), + RecordHelpers.newGroupEpochTombstoneRecord(consumerGroupId)), records); + records.clear(); + + // A non-empty consumer group can't be downgraded. + ConsumerGroupMember.Builder memberBuilder = new ConsumerGroupMember.Builder(Uuid.randomUuid().toString()); + context.replay(RecordHelpers.newMemberSubscriptionRecord(consumerGroupId, memberBuilder.build())); + context.groupMetadataManager.maybeDowngradeEmptyGroup(classicGroupId, records); + assertEquals(Collections.emptyList(), records); + } + + @Test + public void testConsumerGroupHeartbeatWithEmptyClassicGroup() { + String classicGroupId = "classic-group-id"; + String memberId = Uuid.randomUuid().toString(); + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + assignor.prepareGroupAssignment(new GroupAssignment(Collections.emptyMap())); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .build(); + ClassicGroup classicGroup = new ClassicGroup( + new LogContext(), + classicGroupId, + EMPTY, + context.time, + context.metrics + ); + context.replay(RecordHelpers.newGroupMetadataRecord(classicGroup, classicGroup.groupAssignment(), MetadataVersion.latestTesting())); + + context.groupMetadataManager.getOrMaybeCreateClassicGroup(classicGroupId, false).transitionTo(PREPARING_REBALANCE); + assertThrows(GroupIdNotFoundException.class, () -> + context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(classicGroupId) + .setMemberId(memberId) + .setMemberEpoch(0) + .setServerAssignor("range") + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setTopicPartitions(Collections.emptyList()))); + + context.groupMetadataManager.getOrMaybeCreateClassicGroup(classicGroupId, false).transitionTo(EMPTY); + CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(classicGroupId) + .setMemberId(memberId) + .setMemberEpoch(0) + .setServerAssignor("range") + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setTopicPartitions(Collections.emptyList())); + assertEquals(0, result.response().errorCode()); + assertEquals(Group.GroupType.CONSUMER, + context.groupMetadataManager.getOrMaybeCreateConsumerGroup(classicGroupId, false).type()); + } + + @Test + public void testClassicGroupJoinWithEmptyConsumerGroup() throws Exception { + String consumerGroupId = "consumer-group-id"; + String memberId = Uuid.randomUuid().toString(); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + ConsumerGroupMember.Builder memberBuilder = new ConsumerGroupMember.Builder(memberId); + context.replay(RecordHelpers.newGroupEpochRecord(consumerGroupId, 10)); + context.replay(RecordHelpers.newMemberSubscriptionRecord(consumerGroupId, memberBuilder.build())); + + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(consumerGroupId) + .withMemberId(UNKNOWN_MEMBER_ID) + .withDefaultProtocolTypeAndProtocols() + .build(); + + GroupMetadataManagerTestContext.JoinResult joinResult1 = context.sendClassicGroupJoin(request); + assertEquals(Errors.GROUP_ID_NOT_FOUND.code(), joinResult1.joinFuture.get().errorCode()); + + // Remove the member. The consumer group becomes empty. + context.replay(RecordHelpers.newCurrentAssignmentTombstoneRecord(consumerGroupId, memberId)); + context.replay(RecordHelpers.newMemberSubscriptionTombstoneRecord(consumerGroupId, memberId)); + + GroupMetadataManagerTestContext.JoinResult joinResult2 = context.sendClassicGroupJoin(request, true); + assertNotEquals(Errors.GROUP_ID_NOT_FOUND.code(), joinResult2.joinFuture.get().errorCode()); + assertEquals(Group.GroupType.CLASSIC, + context.groupMetadataManager.getOrMaybeCreateClassicGroup(consumerGroupId, false).type()); Review Comment: Should we assert the records here too? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org