jeffkbkim commented on code in PR #14017:
URL: https://github.com/apache/kafka/pull/14017#discussion_r1270901334


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -5524,5 +5670,2897 @@ private static Record newGroupMetadataRecord(
             )
         );
     }
+
+    @Test
+    public void testNewMemberTimeoutCompletion() throws Exception {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+        GenericGroup group = context.createGenericGroup("group-id");
+
+        JoinGroupRequestData joinRequest = new JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withDefaultProtocolTypeAndProtocols()
+            .withSessionTimeoutMs(context.genericGroupNewMemberJoinTimeoutMs + 
5000)
+            .build();
+
+        CompletableFuture<JoinGroupResponseData> joinFuture = new 
CompletableFuture<>();
+        CoordinatorResult<Void, Record> result = 
context.sendGenericGroupJoin(joinRequest, joinFuture);
+        assertTrue(result.records().isEmpty());
+        assertFalse(joinFuture.isDone());
+
+        
context.sleepAndAssertEmptyResult(context.genericGroupInitialRebalanceDelayMs);
+
+        assertTrue(joinFuture.isDone());
+        assertEquals(Errors.NONE.code(), joinFuture.get().errorCode());
+
+        assertEquals(0, 
group.allMembers().stream().filter(GenericGroupMember::isNew).count());
+
+        SyncGroupRequestData syncRequest = new SyncGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(joinFuture.get().memberId())
+            .withGenerationId(joinFuture.get().generationId())
+            .build();
+
+        CompletableFuture<SyncGroupResponseData> syncFuture = new 
CompletableFuture<>();
+        result = context.sendGenericGroupSync(syncRequest, syncFuture);
+
+        assertEquals(
+            Collections.singletonList(newGroupMetadataRecord(group, 
MetadataVersion.latest())),
+            result.records()
+        );
+        // Simulate a successful write to the log.
+        result.appendFuture().complete(null);
+
+        assertTrue(syncFuture.isDone());
+        assertEquals(Errors.NONE.code(), syncFuture.get().errorCode());
+        assertEquals(1, group.size());
+
+        // Make sure the NewMemberTimeout is not still in effect, and the 
member is not kicked
+        
context.sleepAndAssertEmptyResult(context.genericGroupNewMemberJoinTimeoutMs);
+        assertEquals(1, group.size());
+
+        // Member should be removed as heartbeat expires.
+        List<ExpiredTimeout<Void, Record>> timeouts = context.sleep(5000);
+        List<Record> expectedRecords = 
Collections.singletonList(newGroupMetadataRecord("group-id",
+                new GroupMetadataValue()
+                    .setMembers(Collections.emptyList())
+                    .setGeneration(2)
+                    .setLeader(null)
+                    .setProtocolType("consumer")
+                    .setProtocol(null)
+                    .setCurrentStateTimestamp(context.time.milliseconds()),
+                MetadataVersion.latest()));
+
+        assertEquals(1, timeouts.size());
+        String memberId = joinFuture.get().memberId();
+        timeouts.forEach(timeout -> {
+            assertEquals(genericGroupHeartbeatKey("group-id", memberId), 
timeout.key);
+            assertEquals(expectedRecords, timeout.result.records());
+        });
+
+        assertEquals(0, group.size());
+    }
+
+    @Test
+    public void testNewMemberFailureAfterJoinGroupCompletion() throws 
Exception {
+        // For old versions of the JoinGroup protocol, new members were subject
+        // to expiration if the rebalance took long enough. This test case 
ensures
+        // that following completion of the JoinGroup phase, new members follow
+        // normal heartbeat expiration logic.
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+        GenericGroup group = context.createGenericGroup("group-id");
+
+        JoinGroupRequestData joinRequest = new JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withDefaultProtocolTypeAndProtocols()
+            .withSessionTimeoutMs(5000)
+            .withRebalanceTimeoutMs(10000)
+            .build();
+
+        JoinGroupResponseData joinResponse = 
context.joinGenericGroupAndCompleteJoin(joinRequest, false, false);
+        assertEquals(Errors.NONE.code(), joinResponse.errorCode());
+
+        String memberId = joinResponse.memberId();
+        assertEquals(memberId, joinResponse.leader());
+        assertEquals(1, joinResponse.generationId());
+
+        SyncGroupRequestData syncRequest = new SyncGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(memberId)
+            .withGenerationId(1)
+            .build();
+
+        CompletableFuture<SyncGroupResponseData> syncResponseFuture = new 
CompletableFuture<>();
+        CoordinatorResult<Void, Record> result = 
context.sendGenericGroupSync(syncRequest, syncResponseFuture);
+
+        assertEquals(
+            Collections.singletonList(newGroupMetadataRecord(group, 
MetadataVersion.latest())),
+            result.records()
+        );
+        // Simulate a successful write to the log.
+        result.appendFuture().complete(null);
+
+        assertTrue(syncResponseFuture.isDone());
+        assertEquals(Errors.NONE.code(), syncResponseFuture.get().errorCode());
+
+        assertTrue(group.isInState(STABLE));
+        assertEquals(1, group.generationId());
+
+        CompletableFuture<JoinGroupResponseData> otherJoinFuture = new 
CompletableFuture<>();
+        CoordinatorResult<Void, Record> otherJoinResult = 
context.sendGenericGroupJoin(
+            joinRequest.setMemberId(UNKNOWN_MEMBER_ID),
+            otherJoinFuture);
+
+        CompletableFuture<JoinGroupResponseData> joinFuture = new 
CompletableFuture<>();
+        CoordinatorResult<Void, Record> joinResult = 
context.sendGenericGroupJoin(
+            joinRequest.setMemberId(memberId),
+            joinFuture);
+
+        assertTrue(otherJoinResult.records().isEmpty());
+        assertTrue(joinResult.records().isEmpty());
+        assertTrue(joinFuture.isDone());
+        assertTrue(otherJoinFuture.isDone());
+
+        verifySessionExpiration(context, group, 5000);
+    }
+
+    @Test
+    public void testStaticMemberFenceDuplicateRejoinedFollower() throws 
Exception {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+        GenericGroup group = context.createGenericGroup("group-id");
+
+        RebalanceResult rebalanceResult = staticMembersJoinAndRebalance(
+            context,
+            "group-id",
+            "leader-instance-id",
+            "follower-instance-id"
+        );
+
+        // A third member joins. Trigger a rebalance.
+        JoinGroupRequestData request = new JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withDefaultProtocolTypeAndProtocols()
+            .build();
+
+        context.sendGenericGroupJoin(request, new CompletableFuture<>());
+
+        assertTrue(group.isInState(PREPARING_REBALANCE));
+
+        // Old follower rejoins group will be matching current member.id.
+        CompletableFuture<JoinGroupResponseData> oldFollowerJoinFuture = new 
CompletableFuture<>();
+        CoordinatorResult<Void, Record> result = context.sendGenericGroupJoin(
+            request
+                .setMemberId(rebalanceResult.followerId)
+                .setGroupInstanceId("follower-instance-id"),
+            oldFollowerJoinFuture);
+
+        assertTrue(result.records().isEmpty());
+        assertFalse(oldFollowerJoinFuture.isDone());
+
+        // Duplicate follower joins group with unknown member id will trigger 
member id replacement.
+        result = context.sendGenericGroupJoin(
+            
request.setMemberId(UNKNOWN_MEMBER_ID).setGroupInstanceId("follower-instance-id"),
+            new CompletableFuture<>());
+
+        // Old member shall be fenced immediately upon duplicate follower 
joins.
+        assertTrue(result.records().isEmpty());
+        assertTrue(oldFollowerJoinFuture.isDone());
+
+        JoinGroupResponseData expectedResponse = new JoinGroupResponseData()
+            .setErrorCode(Errors.FENCED_INSTANCE_ID.code())
+            .setProtocolName(null)
+            .setProtocolType(null)
+            .setLeader(UNKNOWN_MEMBER_ID)
+            .setMemberId(rebalanceResult.followerId)
+            .setGenerationId(-1);
+
+        checkJoinGroupResponse(
+            expectedResponse,
+            oldFollowerJoinFuture.get(),
+            group,
+            PREPARING_REBALANCE,
+            Collections.emptySet()
+        );
+    }
+
+    @Test
+    public void 
testStaticMemberFenceDuplicateSyncingFollowerAfterMemberIdChanged() throws 
Exception {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+        GenericGroup group = context.createGenericGroup("group-id");
+
+        RebalanceResult rebalanceResult = staticMembersJoinAndRebalance(
+            context,
+            "group-id",
+            "leader-instance-id",
+            "follower-instance-id"
+        );
+
+        // Known leader rejoins will trigger rebalance.
+        JoinGroupRequestData request = new JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withGroupInstanceId("leader-instance-id")
+            .withMemberId(rebalanceResult.leaderId)
+            .withDefaultProtocolTypeAndProtocols()

Review Comment:
   It would be if it was the follower. i updated to `withProtocolSuperset()` so 
to be precise.
   
   A join group from the leader during stable state triggers a rebalance to 
update assignment
   
   ```
                           // Force a rebalance if the leader sends JoinGroup;
                           // This allows the leader to trigger rebalances for 
changes affecting assignment
                           // which do not affect the member metadata (such as 
topic metadata changes for the consumer)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to