dongnuo123 commented on code in PR #15546:
URL: https://github.com/apache/kafka/pull/15546#discussion_r1529010875


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -9298,6 +9298,120 @@ public void 
testOnConsumerGroupStateTransitionOnLoading() {
         verify(context.metrics, 
times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.EMPTY,
 null);
     }
 
+    @Test
+    public void testConsumerGroupHeartbeatWithNonEmptyClassicGroup() {
+        String classicGroupId = "classic-group-id";
+        String memberId = Uuid.randomUuid().toString();
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        assignor.prepareGroupAssignment(new 
GroupAssignment(Collections.emptyMap()));
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .build();
+        ClassicGroup classicGroup = new ClassicGroup(
+            new LogContext(),
+            classicGroupId,
+            EMPTY,
+            context.time,
+            context.metrics
+        );
+        context.replay(RecordHelpers.newGroupMetadataRecord(classicGroup, 
classicGroup.groupAssignment(), MetadataVersion.latestTesting()));
+
+        
context.groupMetadataManager.getOrMaybeCreateClassicGroup(classicGroupId, 
false).transitionTo(PREPARING_REBALANCE);
+        assertThrows(GroupIdNotFoundException.class, () ->
+            context.consumerGroupHeartbeat(
+                new ConsumerGroupHeartbeatRequestData()
+                    .setGroupId(classicGroupId)
+                    .setMemberId(memberId)
+                    .setMemberEpoch(0)
+                    .setServerAssignor("range")
+                    .setRebalanceTimeoutMs(5000)
+                    .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                    .setTopicPartitions(Collections.emptyList())));
+    }
+
+    @Test
+    public void testConsumerGroupHeartbeatWithEmptyClassicGroup() {
+        String classicGroupId = "classic-group-id";
+        String memberId = Uuid.randomUuid().toString();
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        assignor.prepareGroupAssignment(new 
GroupAssignment(Collections.emptyMap()));
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .build();
+        ClassicGroup classicGroup = new ClassicGroup(
+            new LogContext(),
+            classicGroupId,
+            EMPTY,
+            context.time,
+            context.metrics
+        );
+        context.replay(RecordHelpers.newGroupMetadataRecord(classicGroup, 
classicGroup.groupAssignment(), MetadataVersion.latestTesting()));
+
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = 
context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(classicGroupId)
+                .setMemberId(memberId)
+                .setMemberEpoch(0)
+                .setServerAssignor("range")
+                .setRebalanceTimeoutMs(5000)
+                .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                .setTopicPartitions(Collections.emptyList()));
+
+        assertEquals(0, result.response().errorCode());
+        
assertEquals(RecordHelpers.newGroupMetadataTombstoneRecord(classicGroupId), 
result.records().get(0));
+        assertEquals(Group.GroupType.CONSUMER,
+            
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(classicGroupId, 
false).type());
+    }
+
+    @Test
+    public void testClassicGroupJoinWithNonEmptyConsumerGroup() throws 
Exception {
+        String consumerGroupId = "consumer-group-id";
+        String memberId = Uuid.randomUuid().toString();
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withConsumerGroup(new ConsumerGroupBuilder(consumerGroupId, 10)
+            .withMember(new ConsumerGroupMember.Builder(memberId)
+                .setState(MemberState.STABLE)
+                .setMemberEpoch(10)
+                .setPreviousMemberEpoch(10)
+                .build()))
+            .build();
+
+        JoinGroupRequestData request = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+            .withGroupId(consumerGroupId)
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withDefaultProtocolTypeAndProtocols()
+            .build();
+
+        GroupMetadataManagerTestContext.JoinResult joinResult = 
context.sendClassicGroupJoin(request);
+        assertEquals(Errors.GROUP_ID_NOT_FOUND.code(), 
joinResult.joinFuture.get().errorCode());
+    }
+
+    @Test
+    public void testClassicGroupJoinWithEmptyConsumerGroup() throws Exception {
+        String consumerGroupId = "consumer-group-id";
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withConsumerGroup(new ConsumerGroupBuilder(consumerGroupId, 10))
+            .build();
+
+        JoinGroupRequestData request = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+            .withGroupId(consumerGroupId)
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withDefaultProtocolTypeAndProtocols()
+            .build();
+        GroupMetadataManagerTestContext.JoinResult joinResult = 
context.sendClassicGroupJoin(request, true);
+
+        List<Record> expectedRecords = Arrays.asList(
+            
RecordHelpers.newTargetAssignmentEpochTombstoneRecord(consumerGroupId),
+            
RecordHelpers.newGroupSubscriptionMetadataTombstoneRecord(consumerGroupId),
+            RecordHelpers.newGroupEpochTombstoneRecord(consumerGroupId)
+        );
+
+        assertNotEquals(Errors.GROUP_ID_NOT_FOUND.code(), 
joinResult.joinFuture.get().errorCode());

Review Comment:
   It actually returns `MEMBER_ID_REQUIRED`. Let me use `assertEquals` to check 
it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to