dongnuo123 commented on code in PR #15442: URL: https://github.com/apache/kafka/pull/15442#discussion_r1516784744
########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -9607,6 +9607,151 @@ public void testOnConsumerGroupStateTransitionOnLoading() { verify(context.metrics, times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.EMPTY, null); } + @Test + public void testMaybeUpgradeEmptyGroup() { + String classicGroupId = "classic-group-id"; + String consumerGroupId = "consumer-group-id"; + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + ClassicGroup classicGroup = new ClassicGroup( + new LogContext(), + classicGroupId, + EMPTY, + context.time, + context.metrics + ); + context.replay(RecordHelpers.newGroupMetadataRecord(classicGroup, classicGroup.groupAssignment(), MetadataVersion.latestTesting())); + context.replay(RecordHelpers.newGroupEpochRecord(consumerGroupId, 10)); + + // A consumer group can't be upgraded. + List<Record> records = new ArrayList<>(); + context.groupMetadataManager.maybeUpgradeEmptyGroup(consumerGroupId, records); + assertEquals(Collections.emptyList(), records); + + // A non-empty classic group can't be upgraded. + context.groupMetadataManager.getOrMaybeCreateClassicGroup(classicGroupId, false).transitionTo(PREPARING_REBALANCE); + context.groupMetadataManager.maybeUpgradeEmptyGroup(classicGroupId, records); + assertEquals(Collections.emptyList(), records); + + // An empty classic group can be upgraded. + context.groupMetadataManager.getOrMaybeCreateClassicGroup(classicGroupId, false).transitionTo(EMPTY); + context.groupMetadataManager.maybeUpgradeEmptyGroup(classicGroupId, records); + assertEquals(Arrays.asList(RecordHelpers.newGroupMetadataTombstoneRecord(classicGroupId)), records); + } + + @Test + public void testMaybeDowngradeEmptyGroup() { + String classicGroupId = "classic-group-id"; + String consumerGroupId = "consumer-group-id"; + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + ClassicGroup classicGroup = new ClassicGroup( + new LogContext(), + classicGroupId, + EMPTY, + context.time, + context.metrics + ); + context.replay(RecordHelpers.newGroupMetadataRecord(classicGroup, classicGroup.groupAssignment(), MetadataVersion.latestTesting())); + context.replay(RecordHelpers.newGroupEpochRecord(consumerGroupId, 10)); + + List<Record> records = new ArrayList<>(); + context.groupMetadataManager.maybeDowngradeEmptyGroup(classicGroupId, records); + assertEquals(Collections.emptyList(), records); + + // A classic group can't be downgraded. + context.groupMetadataManager.maybeDowngradeEmptyGroup(classicGroupId, records); + assertEquals(Collections.emptyList(), records); + + // An empty consumer group can be upgraded. + context.groupMetadataManager.maybeDowngradeEmptyGroup(consumerGroupId, records); + assertEquals(Arrays.asList( + RecordHelpers.newTargetAssignmentEpochTombstoneRecord(consumerGroupId), + RecordHelpers.newGroupSubscriptionMetadataTombstoneRecord(consumerGroupId), + RecordHelpers.newGroupEpochTombstoneRecord(consumerGroupId)), records); + records.clear(); + + // A non-empty consumer group can't be downgraded. + ConsumerGroupMember.Builder memberBuilder = new ConsumerGroupMember.Builder(Uuid.randomUuid().toString()); + context.replay(RecordHelpers.newMemberSubscriptionRecord(consumerGroupId, memberBuilder.build())); + context.groupMetadataManager.maybeDowngradeEmptyGroup(classicGroupId, records); + assertEquals(Collections.emptyList(), records); + } + + @Test + public void testConsumerGroupHeartbeatWithEmptyClassicGroup() { + String classicGroupId = "classic-group-id"; + String memberId = Uuid.randomUuid().toString(); + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + assignor.prepareGroupAssignment(new GroupAssignment(Collections.emptyMap())); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .build(); + ClassicGroup classicGroup = new ClassicGroup( + new LogContext(), + classicGroupId, + EMPTY, + context.time, + context.metrics + ); + context.replay(RecordHelpers.newGroupMetadataRecord(classicGroup, classicGroup.groupAssignment(), MetadataVersion.latestTesting())); + + context.groupMetadataManager.getOrMaybeCreateClassicGroup(classicGroupId, false).transitionTo(PREPARING_REBALANCE); + assertThrows(GroupIdNotFoundException.class, () -> + context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(classicGroupId) + .setMemberId(memberId) + .setMemberEpoch(0) + .setServerAssignor("range") + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setTopicPartitions(Collections.emptyList()))); + + context.groupMetadataManager.getOrMaybeCreateClassicGroup(classicGroupId, false).transitionTo(EMPTY); + CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = context.consumerGroupHeartbeat( Review Comment: Maybe we can remove the group only if the group is classic when replaying GroupMetadataTombstone. It's also a bit strange but I can't think of another way.. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org