jeffkbkim commented on code in PR #14056:
URL: https://github.com/apache/kafka/pull/14056#discussion_r1276959521


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -8540,5 +8568,1182 @@ private static class RebalanceResult {
             this.followerAssignment = followerAssignment;
         }
     }
+
+    @Test
+    public void testStaticMemberHeartbeatLeaderWithInvalidMemberId() throws 
Exception {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+        context.createGenericGroup("group-id");
+
+        RebalanceResult rebalanceResult = staticMembersJoinAndRebalance(
+            context,
+            "group-id",
+            "leader-instance-id",
+            "follower-instance-id"
+        );
+
+        SyncGroupRequestData syncRequest = new SyncGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withGroupInstanceId("leader-instance-id")
+            .withMemberId(rebalanceResult.leaderId)
+            .withGenerationId(rebalanceResult.generationId)
+            .build();
+
+        CompletableFuture<SyncGroupResponseData> syncFuture = new 
CompletableFuture<>();
+        CoordinatorResult<Void, Record> result = 
context.sendGenericGroupSync(syncRequest, syncFuture);
+
+        assertTrue(result.records().isEmpty());
+        assertTrue(syncFuture.isDone());
+        assertEquals(Errors.NONE.code(), syncFuture.get().errorCode());
+
+        HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+            .setGroupId("group-id")
+            .setMemberId(rebalanceResult.leaderId)
+            .setGenerationId(rebalanceResult.generationId);
+
+        HeartbeatResponseData validHeartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+        assertEquals(Errors.NONE.code(), validHeartbeatResponse.errorCode());
+
+        HeartbeatResponseData inValidHeartbeatResponse = 
context.sendGenericGroupHeartbeat(
+            heartbeatRequest.setGroupInstanceId("leader-instance-id")
+                .setMemberId("invalid-member-id"));
+
+        assertEquals(Errors.FENCED_INSTANCE_ID.code(), 
inValidHeartbeatResponse.errorCode());
+    }
+
+    @Test
+    public void testHeartbeatUnknownGroup() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+            .setGroupId("group-id")
+            .setMemberId("member-id")
+            .setGenerationId(-1);
+
+        HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+        assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), 
heartbeatResponse.errorCode());
+    }
+
+    @Test
+    public void testHeartbeatDeadGroup() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+        GenericGroup group = context.createGenericGroup("group-id");
+
+        group.transitionTo(DEAD);
+
+        HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+            .setGroupId("group-id")
+            .setMemberId("member-id")
+            .setGenerationId(-1);
+
+        HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+        assertEquals(Errors.COORDINATOR_NOT_AVAILABLE.code(), 
heartbeatResponse.errorCode());
+    }
+
+    @Test
+    public void testHeartbeatEmptyGroup() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+        GenericGroup group = context.createGenericGroup("group-id");
+
+        JoinGroupRequestProtocolCollection protocols = new 
JoinGroupRequestProtocolCollection();
+        protocols.add(new JoinGroupRequestProtocol()
+            .setName("range")
+            .setMetadata(new byte[]{0}));
+
+        group.add(new GenericGroupMember(
+            "member-id",
+            Optional.empty(),
+            "client-id",
+            "client-host",
+            10000,
+            5000,
+            "consumer",
+            protocols
+        ));
+
+        group.transitionTo(PREPARING_REBALANCE);
+        group.transitionTo(EMPTY);
+
+        HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+            .setGroupId("group-id")
+            .setMemberId("member-id")
+            .setGenerationId(0);
+
+        HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+        assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), 
heartbeatResponse.errorCode());
+    }
+
+    @Test
+    public void testHeartbeatUnknownMemberExistingGroup() throws Exception {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+        GenericGroup group = context.createGenericGroup("group-id");
+
+        JoinGroupRequestData joinRequest = new JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withDefaultProtocolTypeAndProtocols()
+            .build();
+
+        JoinGroupResponseData leaderJoinResponse =
+            
context.joinGenericGroupAndCompleteJoinAsDynamicMemberAndCompleteJoin(joinRequest);
+
+        String leaderId = leaderJoinResponse.memberId();
+        int generationId = leaderJoinResponse.generationId();
+
+        assertEquals(1, generationId);
+
+        SyncGroupRequestData syncRequest = new SyncGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(leaderId)
+            .withGenerationId(generationId)
+            .build();
+
+        CompletableFuture<SyncGroupResponseData> syncFuture = new 
CompletableFuture<>();
+        CoordinatorResult<Void, Record> result = 
context.sendGenericGroupSync(syncRequest, syncFuture);
+
+        assertEquals(
+            
Collections.singletonList(RecordHelpers.newGroupMetadataRecord(group, 
MetadataVersion.latest())),
+            result.records()
+        );
+        // Simulate a successful write to the log.
+        result.appendFuture().complete(null);
+
+        assertTrue(syncFuture.isDone());
+        assertEquals(Errors.NONE.code(), syncFuture.get().errorCode());
+        assertTrue(group.isInState(STABLE));
+
+        HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+            .setGroupId("group-id")
+            .setMemberId("unknown-member-id")
+            .setGenerationId(generationId);
+
+        HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+        assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), 
heartbeatResponse.errorCode());
+    }
+
+    @Test
+    public void testHeartbeatDuringPreparingRebalance() throws Exception {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+        GenericGroup group = context.createGenericGroup("group-id");
+
+        JoinGroupRequestData joinRequest = new JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withDefaultProtocolTypeAndProtocols()
+            .build();
+
+        CompletableFuture<JoinGroupResponseData> joinFuture = new 
CompletableFuture<>();
+        CoordinatorResult<Void, Record> result = 
context.sendGenericGroupJoin(joinRequest, joinFuture, true);
+
+        assertTrue(result.records().isEmpty());
+        assertTrue(joinFuture.isDone());
+        assertEquals(Errors.MEMBER_ID_REQUIRED.code(), 
joinFuture.get().errorCode());
+
+        String memberId = joinFuture.get().memberId();
+
+        joinFuture = new CompletableFuture<>();
+        context.sendGenericGroupJoin(joinRequest.setMemberId(memberId), 
joinFuture);
+
+        assertTrue(group.isInState(PREPARING_REBALANCE));
+        HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+            .setGroupId("group-id")
+            .setMemberId(memberId)
+            .setGenerationId(0);
+
+        HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+        assertEquals(Errors.REBALANCE_IN_PROGRESS.code(), 
heartbeatResponse.errorCode());
+    }
+
+    @Test
+    public void testHeartbeatDuringCompletingRebalance() throws Exception {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+        GenericGroup group = context.createGenericGroup("group-id");
+
+        JoinGroupRequestData joinRequest = new JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withDefaultProtocolTypeAndProtocols()
+            .build();
+
+        JoinGroupResponseData leaderJoinResponse =
+            
context.joinGenericGroupAndCompleteJoinAsDynamicMemberAndCompleteJoin(joinRequest);
+
+        String leaderId = leaderJoinResponse.memberId();
+        int generationId = leaderJoinResponse.generationId();
+
+        assertEquals(1, generationId);
+        assertTrue(group.isInState(COMPLETING_REBALANCE));
+
+        HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+            .setGroupId("group-id")
+            .setMemberId(leaderId)
+            .setGenerationId(generationId);
+
+        HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+        assertEquals(new HeartbeatResponseData(), heartbeatResponse);
+    }
+
+    @Test
+    public void testHeartbeatIllegalGeneration() throws Exception {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+        GenericGroup group = context.createGenericGroup("group-id");
+
+        JoinGroupRequestData joinRequest = new JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withDefaultProtocolTypeAndProtocols()
+            .build();
+
+        JoinGroupResponseData leaderJoinResponse =
+            
context.joinGenericGroupAndCompleteJoinAsDynamicMemberAndCompleteJoin(joinRequest);
+
+        String leaderId = leaderJoinResponse.memberId();
+        int generationId = leaderJoinResponse.generationId();
+
+        assertEquals(1, generationId);
+        assertTrue(group.isInState(COMPLETING_REBALANCE));
+
+        SyncGroupRequestData syncRequest = new SyncGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(leaderId)
+            .withGenerationId(generationId)
+            .build();
+
+        CompletableFuture<SyncGroupResponseData> syncFuture = new 
CompletableFuture<>();
+        CoordinatorResult<Void, Record> result = 
context.sendGenericGroupSync(syncRequest, syncFuture);
+
+        assertEquals(
+            
Collections.singletonList(RecordHelpers.newGroupMetadataRecord(group, 
MetadataVersion.latest())),
+            result.records()
+        );
+        // Simulate a successful write to the log.
+        result.appendFuture().complete(null);
+
+        assertTrue(syncFuture.isDone());
+        assertEquals(Errors.NONE.code(), syncFuture.get().errorCode());
+        assertTrue(group.isInState(STABLE));
+
+        HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+            .setGroupId("group-id")
+            .setMemberId(leaderId)
+            .setGenerationId(generationId + 1);
+
+        HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+        assertEquals(Errors.ILLEGAL_GENERATION.code(), 
heartbeatResponse.errorCode());
+    }
+
+    @Test
+    public void testValidHeartbeat() throws Exception {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+        GenericGroup group = context.createGenericGroup("group-id");
+
+        JoinGroupRequestData joinRequest = new JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withDefaultProtocolTypeAndProtocols()
+            .build();
+
+        JoinGroupResponseData leaderJoinResponse =
+            
context.joinGenericGroupAndCompleteJoinAsDynamicMemberAndCompleteJoin(joinRequest);
+
+        String leaderId = leaderJoinResponse.memberId();
+        int generationId = leaderJoinResponse.generationId();
+
+        assertEquals(1, generationId);
+        assertTrue(group.isInState(COMPLETING_REBALANCE));
+
+        SyncGroupRequestData syncRequest = new SyncGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(leaderId)
+            .withGenerationId(generationId)
+            .build();
+
+        CompletableFuture<SyncGroupResponseData> syncFuture = new 
CompletableFuture<>();
+        CoordinatorResult<Void, Record> result = 
context.sendGenericGroupSync(syncRequest, syncFuture);
+
+        assertEquals(
+            
Collections.singletonList(RecordHelpers.newGroupMetadataRecord(group, 
MetadataVersion.latest())),
+            result.records()
+        );
+        // Simulate a successful write to the log.
+        result.appendFuture().complete(null);
+
+        assertTrue(syncFuture.isDone());
+        assertEquals(Errors.NONE.code(), syncFuture.get().errorCode());
+        assertTrue(group.isInState(STABLE));
+
+        HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+            .setGroupId("group-id")
+            .setMemberId(leaderId)
+            .setGenerationId(generationId);
+
+        HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+        assertEquals(Errors.NONE.code(), heartbeatResponse.errorCode());
+    }
+
+    @Test
+    public void testGenericGroupMemberSessionTimeout() throws Exception {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+        GenericGroup group = context.createGenericGroup("group-id");
+
+        JoinGroupRequestData joinRequest = new JoinGroupRequestBuilder()

Review Comment:
   Done. named `joinGenericGroupAsDynamicMemberAndCompleteRebalance()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to