jeffkbkim commented on code in PR #14017: URL: https://github.com/apache/kafka/pull/14017#discussion_r1272852347
########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -5524,5 +5670,2897 @@ private static Record newGroupMetadataRecord( ) ); } + + @Test + public void testNewMemberTimeoutCompletion() throws Exception { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + GenericGroup group = context.createGenericGroup("group-id"); + + JoinGroupRequestData joinRequest = new JoinGroupRequestBuilder() + .withGroupId("group-id") + .withMemberId(UNKNOWN_MEMBER_ID) + .withDefaultProtocolTypeAndProtocols() + .withSessionTimeoutMs(context.genericGroupNewMemberJoinTimeoutMs + 5000) + .build(); + + CompletableFuture<JoinGroupResponseData> joinFuture = new CompletableFuture<>(); + CoordinatorResult<Void, Record> result = context.sendGenericGroupJoin(joinRequest, joinFuture); + assertTrue(result.records().isEmpty()); + assertFalse(joinFuture.isDone()); + + context.sleepAndAssertEmptyResult(context.genericGroupInitialRebalanceDelayMs); + + assertTrue(joinFuture.isDone()); + assertEquals(Errors.NONE.code(), joinFuture.get().errorCode()); + + assertEquals(0, group.allMembers().stream().filter(GenericGroupMember::isNew).count()); + + SyncGroupRequestData syncRequest = new SyncGroupRequestBuilder() + .withGroupId("group-id") + .withMemberId(joinFuture.get().memberId()) + .withGenerationId(joinFuture.get().generationId()) + .build(); + + CompletableFuture<SyncGroupResponseData> syncFuture = new CompletableFuture<>(); + result = context.sendGenericGroupSync(syncRequest, syncFuture); + + assertEquals( + Collections.singletonList(newGroupMetadataRecord(group, MetadataVersion.latest())), + result.records() + ); + // Simulate a successful write to the log. + result.appendFuture().complete(null); + + assertTrue(syncFuture.isDone()); + assertEquals(Errors.NONE.code(), syncFuture.get().errorCode()); + assertEquals(1, group.size()); + + // Make sure the NewMemberTimeout is not still in effect, and the member is not kicked + context.sleepAndAssertEmptyResult(context.genericGroupNewMemberJoinTimeoutMs); + assertEquals(1, group.size()); + + // Member should be removed as heartbeat expires. + List<ExpiredTimeout<Void, Record>> timeouts = context.sleep(5000); + List<Record> expectedRecords = Collections.singletonList(newGroupMetadataRecord("group-id", + new GroupMetadataValue() + .setMembers(Collections.emptyList()) + .setGeneration(2) + .setLeader(null) + .setProtocolType("consumer") + .setProtocol(null) + .setCurrentStateTimestamp(context.time.milliseconds()), + MetadataVersion.latest())); + + assertEquals(1, timeouts.size()); + String memberId = joinFuture.get().memberId(); + timeouts.forEach(timeout -> { + assertEquals(genericGroupHeartbeatKey("group-id", memberId), timeout.key); + assertEquals(expectedRecords, timeout.result.records()); + }); + + assertEquals(0, group.size()); + } + + @Test + public void testNewMemberFailureAfterJoinGroupCompletion() throws Exception { + // For old versions of the JoinGroup protocol, new members were subject + // to expiration if the rebalance took long enough. This test case ensures + // that following completion of the JoinGroup phase, new members follow + // normal heartbeat expiration logic. + + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + GenericGroup group = context.createGenericGroup("group-id"); + + JoinGroupRequestData joinRequest = new JoinGroupRequestBuilder() + .withGroupId("group-id") + .withMemberId(UNKNOWN_MEMBER_ID) + .withDefaultProtocolTypeAndProtocols() + .withSessionTimeoutMs(5000) + .withRebalanceTimeoutMs(10000) + .build(); + + JoinGroupResponseData joinResponse = context.joinGenericGroupAndCompleteJoin(joinRequest, false, false); + assertEquals(Errors.NONE.code(), joinResponse.errorCode()); + + String memberId = joinResponse.memberId(); + assertEquals(memberId, joinResponse.leader()); + assertEquals(1, joinResponse.generationId()); + + SyncGroupRequestData syncRequest = new SyncGroupRequestBuilder() + .withGroupId("group-id") + .withMemberId(memberId) + .withGenerationId(1) + .build(); + + CompletableFuture<SyncGroupResponseData> syncResponseFuture = new CompletableFuture<>(); + CoordinatorResult<Void, Record> result = context.sendGenericGroupSync(syncRequest, syncResponseFuture); + + assertEquals( + Collections.singletonList(newGroupMetadataRecord(group, MetadataVersion.latest())), + result.records() + ); + // Simulate a successful write to the log. + result.appendFuture().complete(null); + + assertTrue(syncResponseFuture.isDone()); + assertEquals(Errors.NONE.code(), syncResponseFuture.get().errorCode()); + + assertTrue(group.isInState(STABLE)); + assertEquals(1, group.generationId()); + + CompletableFuture<JoinGroupResponseData> otherJoinFuture = new CompletableFuture<>(); + CoordinatorResult<Void, Record> otherJoinResult = context.sendGenericGroupJoin( + joinRequest.setMemberId(UNKNOWN_MEMBER_ID), + otherJoinFuture); + + CompletableFuture<JoinGroupResponseData> joinFuture = new CompletableFuture<>(); + CoordinatorResult<Void, Record> joinResult = context.sendGenericGroupJoin( + joinRequest.setMemberId(memberId), + joinFuture); + + assertTrue(otherJoinResult.records().isEmpty()); + assertTrue(joinResult.records().isEmpty()); + assertTrue(joinFuture.isDone()); + assertTrue(otherJoinFuture.isDone()); + + verifySessionExpiration(context, group, 5000); + } + + @Test + public void testStaticMemberFenceDuplicateRejoinedFollower() throws Exception { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + GenericGroup group = context.createGenericGroup("group-id"); + + RebalanceResult rebalanceResult = staticMembersJoinAndRebalance( + context, + "group-id", + "leader-instance-id", + "follower-instance-id" + ); + + // A third member joins. Trigger a rebalance. + JoinGroupRequestData request = new JoinGroupRequestBuilder() + .withGroupId("group-id") + .withMemberId(UNKNOWN_MEMBER_ID) + .withDefaultProtocolTypeAndProtocols() + .build(); + + context.sendGenericGroupJoin(request, new CompletableFuture<>()); + + assertTrue(group.isInState(PREPARING_REBALANCE)); + + // Old follower rejoins group will be matching current member.id. + CompletableFuture<JoinGroupResponseData> oldFollowerJoinFuture = new CompletableFuture<>(); + CoordinatorResult<Void, Record> result = context.sendGenericGroupJoin( + request + .setMemberId(rebalanceResult.followerId) + .setGroupInstanceId("follower-instance-id"), + oldFollowerJoinFuture); + + assertTrue(result.records().isEmpty()); + assertFalse(oldFollowerJoinFuture.isDone()); + + // Duplicate follower joins group with unknown member id will trigger member id replacement. + result = context.sendGenericGroupJoin( + request.setMemberId(UNKNOWN_MEMBER_ID).setGroupInstanceId("follower-instance-id"), + new CompletableFuture<>()); + + // Old member shall be fenced immediately upon duplicate follower joins. + assertTrue(result.records().isEmpty()); + assertTrue(oldFollowerJoinFuture.isDone()); + + JoinGroupResponseData expectedResponse = new JoinGroupResponseData() + .setErrorCode(Errors.FENCED_INSTANCE_ID.code()) + .setProtocolName(null) + .setProtocolType(null) + .setLeader(UNKNOWN_MEMBER_ID) + .setMemberId(rebalanceResult.followerId) + .setGenerationId(-1); + + checkJoinGroupResponse( + expectedResponse, + oldFollowerJoinFuture.get(), + group, + PREPARING_REBALANCE, + Collections.emptySet() + ); + } + + @Test + public void testStaticMemberFenceDuplicateSyncingFollowerAfterMemberIdChanged() throws Exception { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + GenericGroup group = context.createGenericGroup("group-id"); + + RebalanceResult rebalanceResult = staticMembersJoinAndRebalance( + context, + "group-id", + "leader-instance-id", + "follower-instance-id" + ); + + // Known leader rejoins will trigger rebalance. + JoinGroupRequestData request = new JoinGroupRequestBuilder() + .withGroupId("group-id") + .withGroupInstanceId("leader-instance-id") + .withMemberId(rebalanceResult.leaderId) + .withDefaultProtocolTypeAndProtocols() + .withRebalanceTimeoutMs(10000) + .build(); + + CompletableFuture<JoinGroupResponseData> leaderJoinFuture = new CompletableFuture<>(); + CoordinatorResult<Void, Record> result = context.sendGenericGroupJoin(request, leaderJoinFuture); + + assertTrue(result.records().isEmpty()); + assertFalse(leaderJoinFuture.isDone()); + assertTrue(group.isInState(PREPARING_REBALANCE)); + + // Old follower rejoins group will match current member.id. + CompletableFuture<JoinGroupResponseData> oldFollowerJoinFuture = new CompletableFuture<>(); + result = context.sendGenericGroupJoin( + request.setMemberId(rebalanceResult.followerId).setGroupInstanceId("follower-instance-id"), + oldFollowerJoinFuture); + + assertTrue(result.records().isEmpty()); + assertTrue(oldFollowerJoinFuture.isDone()); + assertTrue(leaderJoinFuture.isDone()); + + JoinGroupResponseData expectedLeaderResponse = new JoinGroupResponseData() + .setErrorCode(Errors.NONE.code()) + .setGenerationId(rebalanceResult.generationId + 1) + .setMemberId(rebalanceResult.leaderId) + .setLeader(rebalanceResult.leaderId) + .setProtocolName("range") + .setProtocolType("consumer") + .setMembers(toJoinResponseMembers(group)); + + Set<String> expectedGroupInstanceIds = new HashSet<>(); + expectedGroupInstanceIds.add("leader-instance-id"); + expectedGroupInstanceIds.add("follower-instance-id"); + + checkJoinGroupResponse( + expectedLeaderResponse, + leaderJoinFuture.get(), + group, + COMPLETING_REBALANCE, + expectedGroupInstanceIds + ); + + assertEquals(rebalanceResult.leaderId, leaderJoinFuture.get().memberId()); + assertEquals(rebalanceResult.leaderId, leaderJoinFuture.get().leader()); + + // Old follower should get a successful join group response. + assertTrue(oldFollowerJoinFuture.isDone()); + + JoinGroupResponseData expectedFollowerResponse = new JoinGroupResponseData() + .setErrorCode(Errors.NONE.code()) + .setGenerationId(rebalanceResult.generationId + 1) + .setMemberId(oldFollowerJoinFuture.get().memberId()) + .setLeader(rebalanceResult.leaderId) + .setProtocolName("range") + .setProtocolType("consumer"); + + checkJoinGroupResponse( + expectedFollowerResponse, + oldFollowerJoinFuture.get(), + group, + COMPLETING_REBALANCE, + Collections.emptySet() + ); + + assertTrue(group.isInState(COMPLETING_REBALANCE)); + assertEquals(rebalanceResult.followerId, oldFollowerJoinFuture.get().memberId()); + assertEquals(rebalanceResult.leaderId, oldFollowerJoinFuture.get().leader()); + + // Duplicate follower joins group with unknown member id will trigger member.id replacement, + // and will also trigger a rebalance under CompletingRebalance state; the old follower sync callback + // will return fenced exception while broker replaces the member identity with the duplicate follower joins. + SyncGroupRequestData syncRequest = new SyncGroupRequestBuilder() + .withGroupId("group-id") + .withGroupInstanceId("follower-instance-id") + .withGenerationId(oldFollowerJoinFuture.get().generationId()) + .withMemberId(oldFollowerJoinFuture.get().memberId()) + .build(); + + CompletableFuture<SyncGroupResponseData> oldFollowerSyncFuture = new CompletableFuture<>(); + result = context.sendGenericGroupSync(syncRequest, oldFollowerSyncFuture); + + assertTrue(result.records().isEmpty()); + assertFalse(oldFollowerSyncFuture.isDone()); + + CompletableFuture<JoinGroupResponseData> duplicateFollowerJoinFuture = new CompletableFuture<>(); + result = context.sendGenericGroupJoin( + request.setMemberId(UNKNOWN_MEMBER_ID).setGroupInstanceId("follower-instance-id"), + duplicateFollowerJoinFuture); + + assertTrue(result.records().isEmpty()); + assertTrue(group.isInState(PREPARING_REBALANCE)); + assertFalse(duplicateFollowerJoinFuture.isDone()); + assertTrue(oldFollowerSyncFuture.isDone()); + assertEquals(Errors.FENCED_INSTANCE_ID.code(), oldFollowerSyncFuture.get().errorCode()); + + // Advance clock by rebalance timeout so that the join phase completes with duplicate follower. + // Leader is kicked out. + context.sleepAndAssertEmptyResult(10000); + + assertTrue(duplicateFollowerJoinFuture.isDone()); + assertTrue(group.isInState(COMPLETING_REBALANCE)); + assertEquals(3, group.generationId()); + assertEquals(1, group.size()); + assertTrue(group.hasMemberId(duplicateFollowerJoinFuture.get().memberId())); + assertEquals(duplicateFollowerJoinFuture.get().memberId(), duplicateFollowerJoinFuture.get().leader()); + } + + @Test + public void testStaticMemberFenceDuplicateRejoiningFollowerAfterMemberIdChanged() throws Exception { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + GenericGroup group = context.createGenericGroup("group-id"); + + RebalanceResult rebalanceResult = staticMembersJoinAndRebalance( + context, + "group-id", + "leader-instance-id", + "follower-instance-id" + ); + + // Known leader rejoins will trigger rebalance. + JoinGroupRequestData request = new JoinGroupRequestBuilder() + .withGroupId("group-id") + .withGroupInstanceId("leader-instance-id") + .withMemberId(rebalanceResult.leaderId) + .withDefaultProtocolTypeAndProtocols() + .withRebalanceTimeoutMs(10000) + .build(); + + CompletableFuture<JoinGroupResponseData> leaderJoinFuture = new CompletableFuture<>(); + CoordinatorResult<Void, Record> result = context.sendGenericGroupJoin(request, leaderJoinFuture); + + assertTrue(result.records().isEmpty()); + assertFalse(leaderJoinFuture.isDone()); + assertTrue(group.isInState(PREPARING_REBALANCE)); + + // Duplicate follower joins group will trigger member id replacement. + CompletableFuture<JoinGroupResponseData> duplicateFollowerJoinFuture = new CompletableFuture<>(); + result = context.sendGenericGroupJoin( + request.setMemberId(UNKNOWN_MEMBER_ID).setGroupInstanceId("follower-instance-id"), + duplicateFollowerJoinFuture); + + assertTrue(result.records().isEmpty()); + assertTrue(duplicateFollowerJoinFuture.isDone()); + + // Old follower rejoins group will fail because member id is already updated. + CompletableFuture<JoinGroupResponseData> oldFollowerJoinFuture = new CompletableFuture<>(); + result = context.sendGenericGroupJoin( + request.setMemberId(rebalanceResult.followerId), + oldFollowerJoinFuture); + + assertTrue(result.records().isEmpty()); + assertTrue(oldFollowerJoinFuture.isDone()); + assertTrue(leaderJoinFuture.isDone()); + + JoinGroupResponseData expectedLeaderResponse = new JoinGroupResponseData() + .setErrorCode(Errors.NONE.code()) + .setGenerationId(rebalanceResult.generationId + 1) + .setMemberId(rebalanceResult.leaderId) + .setLeader(rebalanceResult.leaderId) + .setProtocolName("range") + .setProtocolType("consumer") + .setMembers(toJoinResponseMembers(group)); + + Set<String> expectedGroupInstanceIds = new HashSet<>(); + expectedGroupInstanceIds.add("leader-instance-id"); + expectedGroupInstanceIds.add("follower-instance-id"); + + checkJoinGroupResponse( + expectedLeaderResponse, + leaderJoinFuture.get(), + group, + COMPLETING_REBALANCE, + expectedGroupInstanceIds + ); + + assertTrue(duplicateFollowerJoinFuture.isDone()); + + JoinGroupResponseData expectedDuplicateFollowerResponse = new JoinGroupResponseData() + .setErrorCode(Errors.NONE.code()) + .setGenerationId(rebalanceResult.generationId + 1) + .setMemberId(duplicateFollowerJoinFuture.get().memberId()) + .setLeader(rebalanceResult.leaderId) + .setProtocolName("range") + .setProtocolType("consumer") + .setMembers(Collections.emptyList()); + + checkJoinGroupResponse( + expectedDuplicateFollowerResponse, + duplicateFollowerJoinFuture.get(), + group, + COMPLETING_REBALANCE, + Collections.emptySet() + ); + + assertTrue(duplicateFollowerJoinFuture.isDone()); + + JoinGroupResponseData expectedOldFollowerResponse = new JoinGroupResponseData() + .setErrorCode(Errors.FENCED_INSTANCE_ID.code()) + .setGenerationId(-1) + .setMemberId(rebalanceResult.followerId) + .setLeader(UNKNOWN_MEMBER_ID) + .setProtocolName(null) + .setProtocolType(null) + .setMembers(Collections.emptyList()); + + checkJoinGroupResponse( + expectedOldFollowerResponse, + oldFollowerJoinFuture.get(), + group, + COMPLETING_REBALANCE, + Collections.emptySet() + ); + } + + @Test + public void testStaticMemberRejoinWithKnownMemberId() throws Exception { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + GenericGroup group = context.createGenericGroup("group-id"); + + JoinGroupRequestData request = new JoinGroupRequestBuilder() + .withGroupId("group-id") + .withMemberId(UNKNOWN_MEMBER_ID) + .withGroupInstanceId("group-instance-id") + .withDefaultProtocolTypeAndProtocols() + .build(); + + JoinGroupResponseData joinResponse = context.joinGenericGroupAndCompleteJoin(request, false, false); + assertEquals(Errors.NONE.code(), joinResponse.errorCode()); + + String memberId = joinResponse.memberId(); + + CompletableFuture<JoinGroupResponseData> rejoinResponseFuture = new CompletableFuture<>(); + CoordinatorResult<Void, Record> result = context.sendGenericGroupJoin( + request.setMemberId(memberId), + rejoinResponseFuture); + + // The second join group should return immediately since we are using the same metadata during CompletingRebalance. + assertTrue(result.records().isEmpty()); + assertTrue(group.isInState(COMPLETING_REBALANCE)); + assertTrue(rejoinResponseFuture.isDone()); + assertEquals(Errors.NONE.code(), rejoinResponseFuture.get().errorCode()); + + SyncGroupRequestData syncRequest = new SyncGroupRequestBuilder() + .withGroupId("group-id") + .withMemberId(memberId) + .withGenerationId(joinResponse.generationId()) + .withGroupInstanceId("group-instance-id") + .build(); + + CompletableFuture<SyncGroupResponseData> syncFuture = new CompletableFuture<>(); + result = context.sendGenericGroupSync(syncRequest, syncFuture); + + assertEquals( + Collections.singletonList(newGroupMetadataRecord(group, MetadataVersion.latest())), + result.records() + ); + // Successful write to the log. + result.appendFuture().complete(null); + + assertTrue(syncFuture.isDone()); + assertEquals(Errors.NONE.code(), syncFuture.get().errorCode()); + assertTrue(group.isInState(STABLE)); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testStaticMemberRejoinWithLeaderIdAndUnknownMemberId( + boolean supportSkippingAssignment + ) throws Exception { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + GenericGroup group = context.createGenericGroup("group-id"); + + RebalanceResult rebalanceResult = staticMembersJoinAndRebalance( + context, + "group-id", + "leader-instance-id", + "follower-instance-id" + ); + + // A static leader rejoin with unknown id will not trigger rebalance, and no assignment will be returned. + // As the group was in Stable state and the member id was updated, this will generate records. + JoinGroupRequestData joinRequest = new JoinGroupRequestBuilder() + .withGroupId("group-id") + .withGroupInstanceId("leader-instance-id") + .withMemberId(UNKNOWN_MEMBER_ID) + .withDefaultProtocolTypeAndProtocols() + .build(); + + CompletableFuture<JoinGroupResponseData> joinResponseFuture = new CompletableFuture<>(); + CoordinatorResult<Void, Record> result = context.sendGenericGroupJoin( + joinRequest, + joinResponseFuture, + true, + supportSkippingAssignment); + + assertEquals( + Collections.singletonList(newGroupMetadataRecord(group, MetadataVersion.latest())), + result.records() + ); + // Simulate a successful write to the log. + result.appendFuture().complete(null); + assertTrue(joinResponseFuture.isDone()); + + Set<String> expectedGroupInstanceIds = new HashSet<>(); + expectedGroupInstanceIds.add("leader-instance-id"); + expectedGroupInstanceIds.add("follower-instance-id"); + + String leader = supportSkippingAssignment ? + joinResponseFuture.get().memberId() : rebalanceResult.leaderId; + + List<JoinGroupResponseMember> members = supportSkippingAssignment ? + toJoinResponseMembers(group) : Collections.emptyList(); + + JoinGroupResponseData expectedJoinResponse = new JoinGroupResponseData() + .setErrorCode(Errors.NONE.code()) + .setGenerationId(rebalanceResult.generationId) + .setMemberId(joinResponseFuture.get().memberId()) + .setLeader(leader) + .setProtocolName("range") + .setProtocolType("consumer") + .setSkipAssignment(supportSkippingAssignment) + .setMembers(members); + + checkJoinGroupResponse( + expectedJoinResponse, + joinResponseFuture.get(), + group, + STABLE, + supportSkippingAssignment ? expectedGroupInstanceIds : Collections.emptySet() + ); + } + + @Test + public void testStaticMemberRejoinWithLeaderIdAndKnownMemberId() throws Exception { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + GenericGroup group = context.createGenericGroup("group-id"); + + RebalanceResult rebalanceResult = staticMembersJoinAndRebalance( + context, + "group-id", + "leader-instance-id", + "follower-instance-id" + ); + + // Known static leader rejoin will trigger rebalance. + JoinGroupRequestData request = new JoinGroupRequestBuilder() + .withGroupId("group-id") + .withGroupInstanceId("leader-instance-id") + .withMemberId(rebalanceResult.leaderId) + .withDefaultProtocolTypeAndProtocols() + .withRebalanceTimeoutMs(10000) + .build(); + + JoinGroupResponseData joinResponse = context.joinGenericGroupAndCompleteJoin(request, true, true, 10000); + + JoinGroupResponseData expectedResponse = new JoinGroupResponseData() + .setErrorCode(Errors.NONE.code()) + .setGenerationId(rebalanceResult.generationId + 1) + .setMemberId(rebalanceResult.leaderId) + .setLeader(rebalanceResult.leaderId) + .setProtocolName("range") + .setProtocolType("consumer") + .setMembers(toJoinResponseMembers(group)); + + checkJoinGroupResponse( + expectedResponse, + joinResponse, + group, + COMPLETING_REBALANCE, + Collections.singleton("leader-instance-id") + ); + } + + @Test + public void testStaticMemberRejoinWithLeaderIdAndUnexpectedDeadGroup() throws Exception { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + GenericGroup group = context.createGenericGroup("group-id"); + + RebalanceResult rebalanceResult = staticMembersJoinAndRebalance( + context, + "group-id", + "leader-instance-id", + "follower-instance-id" + ); + + group.transitionTo(DEAD); + + JoinGroupRequestData request = new JoinGroupRequestBuilder() + .withGroupId("group-id") + .withGroupInstanceId("leader-instance-id") + .withMemberId(rebalanceResult.leaderId) + .withDefaultProtocolTypeAndProtocols() + .build(); + + CompletableFuture<JoinGroupResponseData> joinFuture = new CompletableFuture<>(); + CoordinatorResult<Void, Record> result = context.sendGenericGroupJoin(request, joinFuture, true, true); + + assertTrue(result.records().isEmpty()); + assertTrue(joinFuture.isDone()); + assertEquals(Errors.COORDINATOR_NOT_AVAILABLE.code(), joinFuture.get().errorCode()); + } + + @Test + public void testStaticMemberRejoinWithLeaderIdAndUnexpectedEmptyGroup() throws Exception { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + GenericGroup group = context.createGenericGroup("group-id"); + + RebalanceResult rebalanceResult = staticMembersJoinAndRebalance( + context, + "group-id", + "leader-instance-id", + "follower-instance-id" + ); + + group.transitionTo(PREPARING_REBALANCE); + group.transitionTo(EMPTY); + + JoinGroupRequestData request = new JoinGroupRequestBuilder() + .withGroupId("group-id") + .withGroupInstanceId("leader-instance-id") + .withMemberId(rebalanceResult.leaderId) + .withDefaultProtocolTypeAndProtocols() + .build(); + + CompletableFuture<JoinGroupResponseData> joinFuture = new CompletableFuture<>(); + CoordinatorResult<Void, Record> result = context.sendGenericGroupJoin(request, joinFuture, true, true); + + assertTrue(result.records().isEmpty()); + assertTrue(joinFuture.isDone()); + assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), joinFuture.get().errorCode()); + } + + @Test + public void testStaticMemberRejoinWithFollowerIdAndChangeOfProtocol() throws Exception { + int rebalanceTimeoutMs = 10000; + int sessionTimeoutMs = 15000; + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + GenericGroup group = context.createGenericGroup("group-id"); + + RebalanceResult rebalanceResult = staticMembersJoinAndRebalance( + context, + "group-id", + "leader-instance-id", + "follower-instance-id", + rebalanceTimeoutMs, + sessionTimeoutMs + ); + + // A static follower rejoin with changed protocol will trigger rebalance. + JoinGroupRequestProtocolCollection protocols = new JoinGroupRequestProtocolCollection(0); + protocols.add(new JoinGroupRequestProtocol() + .setName("roundrobin") + .setMetadata(ConsumerProtocol.serializeSubscription(new ConsumerPartitionAssignor.Subscription( + Collections.singletonList("foo"))).array()) + ); + JoinGroupRequestData request = new JoinGroupRequestBuilder() + .withGroupId("group-id") + .withGroupInstanceId("follower-instance-id") + .withMemberId(rebalanceResult.followerId) + .withProtocols(protocols) + .withRebalanceTimeoutMs(rebalanceTimeoutMs) + .withSessionTimeoutMs(sessionTimeoutMs) + .build(); + + CompletableFuture<JoinGroupResponseData> responseFuture = new CompletableFuture<>(); + CoordinatorResult<Void, Record> result = context.sendGenericGroupJoin(request, responseFuture); + + assertTrue(result.records().isEmpty()); + assertFalse(responseFuture.isDone()); + + // Old leader hasn't joined in the meantime, triggering a re-election. + context.sleepAndAssertEmptyResult(rebalanceTimeoutMs); + + assertTrue(responseFuture.isDone()); + assertEquals(Errors.NONE.code(), responseFuture.get().errorCode()); + assertTrue(group.hasStaticMember("leader-instance-id")); + assertTrue(group.isLeader(rebalanceResult.followerId)); + + Set<String> expectedGroupInstanceIds = new HashSet<>(); + expectedGroupInstanceIds.add("leader-instance-id"); + expectedGroupInstanceIds.add("follower-instance-id"); + + JoinGroupResponseData expectedResponse = new JoinGroupResponseData() + .setErrorCode(Errors.NONE.code()) + .setGenerationId(rebalanceResult.generationId + 1) + .setMemberId(rebalanceResult.followerId) + .setLeader(rebalanceResult.followerId) + .setProtocolName("roundrobin") + .setProtocolType("consumer") + .setMembers(toJoinResponseMembers(group)); + + checkJoinGroupResponse( + expectedResponse, + responseFuture.get(), + group, + COMPLETING_REBALANCE, + expectedGroupInstanceIds + ); + } + + @Test + public void testStaticMemberRejoinWithUnknownMemberIdAndChangeOfProtocolWithSelectedProtocolChanged() + throws Exception { + + int rebalanceTimeoutMs = 10000; + int sessionTimeoutMs = 15000; + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + GenericGroup group = context.createGenericGroup("group-id"); + + RebalanceResult rebalanceResult = staticMembersJoinAndRebalance( + context, + "group-id", + "leader-instance-id", + "follower-instance-id", + rebalanceTimeoutMs, + sessionTimeoutMs + ); + + assertNotEquals("roundrobin", group.selectProtocol()); + + // A static follower rejoin with changed protocol will trigger rebalance. + JoinGroupRequestProtocolCollection protocols = new JoinGroupRequestProtocolCollection(0); + protocols.add(new JoinGroupRequestProtocol() + .setName("roundrobin") + .setMetadata(ConsumerProtocol.serializeSubscription(new ConsumerPartitionAssignor.Subscription( + Collections.singletonList("foo"))).array()) + ); + + JoinGroupRequestData request = new JoinGroupRequestBuilder() + .withGroupId("group-id") + .withGroupInstanceId("follower-instance-id") + .withMemberId(UNKNOWN_MEMBER_ID) + .withProtocols(protocols) + .withRebalanceTimeoutMs(rebalanceTimeoutMs) + .withSessionTimeoutMs(sessionTimeoutMs) + .build(); + + CompletableFuture<JoinGroupResponseData> responseFuture = new CompletableFuture<>(); + CoordinatorResult<Void, Record> result = context.sendGenericGroupJoin(request, responseFuture); + assertTrue(result.records().isEmpty()); + assertFalse(responseFuture.isDone()); + assertEquals("roundrobin", group.selectProtocol()); + + // Old leader hasn't joined in the meantime, triggering a re-election. + context.sleepAndAssertEmptyResult(rebalanceTimeoutMs); + assertTrue(responseFuture.isDone()); + assertEquals(Errors.NONE.code(), responseFuture.get().errorCode()); + assertTrue(group.hasStaticMember("leader-instance-id")); + assertTrue(group.isLeader(responseFuture.get().memberId())); + assertNotEquals(rebalanceResult.followerId, responseFuture.get().memberId()); + + Set<String> expectedGroupInstanceIds = new HashSet<>(); + expectedGroupInstanceIds.add("leader-instance-id"); + expectedGroupInstanceIds.add("follower-instance-id"); + + JoinGroupResponseData expectedResponse = new JoinGroupResponseData() + .setErrorCode(Errors.NONE.code()) + .setGenerationId(rebalanceResult.generationId + 1) + .setMemberId(responseFuture.get().memberId()) + .setLeader(responseFuture.get().memberId()) + .setProtocolName("roundrobin") + .setProtocolType("consumer") + .setMembers(toJoinResponseMembers(group)); + + checkJoinGroupResponse( + expectedResponse, + responseFuture.get(), + group, + COMPLETING_REBALANCE, + expectedGroupInstanceIds + ); + } + + @Test + public void testStaticMemberRejoinWithUnknownMemberIdAndChangeOfProtocolWhileSelectProtocolUnchangedPersistenceFailure() + throws Exception { + + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + GenericGroup group = context.createGenericGroup("group-id"); + + RebalanceResult rebalanceResult = staticMembersJoinAndRebalance( + context, + "group-id", + "leader-instance-id", + "follower-instance-id" + ); + + String selectedProtocol = group.selectProtocol(); + JoinGroupRequestProtocolCollection protocols = new JoinGroupRequestProtocolCollection(0); + protocols.add(new JoinGroupRequestProtocol() + .setName(selectedProtocol) + .setMetadata(ConsumerProtocol.serializeSubscription(new ConsumerPartitionAssignor.Subscription( + Collections.singletonList("foo"))).array()) + ); + + JoinGroupRequestData request = new JoinGroupRequestBuilder() + .withGroupId("group-id") + .withGroupInstanceId("follower-instance-id") + .withMemberId(UNKNOWN_MEMBER_ID) + .withProtocols(protocols) + .build(); + + CompletableFuture<JoinGroupResponseData> followerJoinFuture = new CompletableFuture<>(); + CoordinatorResult<Void, Record> result = context.sendGenericGroupJoin( + request, + followerJoinFuture, + true, + true); + + assertEquals( + Collections.singletonList(newGroupMetadataRecord(group, MetadataVersion.latest())), + result.records() + ); + // Simulate a failed write to the log. + result.appendFuture().completeExceptionally(Errors.MESSAGE_TOO_LARGE.exception()); + assertTrue(followerJoinFuture.isDone()); + + JoinGroupResponseData expectedResponse = new JoinGroupResponseData() + .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code()) + .setGenerationId(rebalanceResult.generationId) + .setMemberId(followerJoinFuture.get().memberId()) + .setLeader(rebalanceResult.leaderId) + .setProtocolName("range") + .setProtocolType("consumer") + .setSkipAssignment(false) + .setMembers(Collections.emptyList()); + + checkJoinGroupResponse( + expectedResponse, + followerJoinFuture.get(), + group, + STABLE, + Collections.emptySet() + ); + + // Join with old member id will not fail because the member id is not updated because of persistence failure + assertNotEquals(rebalanceResult.followerId, followerJoinFuture.get().memberId()); + followerJoinFuture = new CompletableFuture<>(); + result = context.sendGenericGroupJoin(request.setMemberId(rebalanceResult.followerId), followerJoinFuture); + + assertTrue(result.records().isEmpty()); + + // Join with leader and complete join phase. + CompletableFuture<JoinGroupResponseData> leaderJoinFuture = new CompletableFuture<>(); + result = context.sendGenericGroupJoin( + request.setGroupInstanceId("leader-instance-id") + .setMemberId(rebalanceResult.leaderId), + leaderJoinFuture + ); + + assertTrue(result.records().isEmpty()); + assertTrue(leaderJoinFuture.isDone()); + assertTrue(followerJoinFuture.isDone()); + assertEquals(Errors.NONE.code(), leaderJoinFuture.get().errorCode()); + assertEquals(Errors.NONE.code(), followerJoinFuture.get().errorCode()); + + // Sync with leader and receive assignment. + SyncGroupRequestData syncRequest = new SyncGroupRequestBuilder() + .withGroupId("group-id") + .withGroupInstanceId("leader-instance-id") + .withMemberId(rebalanceResult.leaderId) + .withGenerationId(rebalanceResult.generationId + 1) + .build(); + + CompletableFuture<SyncGroupResponseData> leaderSyncFuture = new CompletableFuture<>(); + result = context.sendGenericGroupSync(syncRequest, leaderSyncFuture); + + // Simulate a successful write to the log. This will update the group with the new (empty) assignment. + result.appendFuture().complete(null); + + assertEquals( + Collections.singletonList(newGroupMetadataRecord(group, MetadataVersion.latest())), + result.records() + ); + + assertTrue(leaderSyncFuture.isDone()); + assertTrue(group.isInState(STABLE)); + assertEquals(Errors.NONE.code(), leaderSyncFuture.get().errorCode()); + + // Sync with old member id will also not fail because the member id is not updated because of persistence failure + CompletableFuture<SyncGroupResponseData> oldMemberSyncFuture = new CompletableFuture<>(); + result = context.sendGenericGroupSync( + syncRequest + .setGroupInstanceId("follower-instance-id") + .setMemberId(rebalanceResult.followerId), + oldMemberSyncFuture + ); + assertTrue(result.records().isEmpty()); + assertTrue(oldMemberSyncFuture.isDone()); + assertTrue(group.isInState(STABLE)); + assertEquals(Errors.NONE.code(), oldMemberSyncFuture.get().errorCode()); + } + + @Test + public void testStaticMemberRejoinWithUnknownMemberIdAndChangeOfProtocolWhileSelectProtocolUnchanged() + throws Exception { + + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + GenericGroup group = context.createGenericGroup("group-id"); + + RebalanceResult rebalanceResult = staticMembersJoinAndRebalance( + context, + "group-id", + "leader-instance-id", + "follower-instance-id" + ); + + // A static follower rejoin with protocol changing to leader protocol subset won't trigger rebalance if updated + // group's selectProtocol remain unchanged. + String selectedProtocol = group.selectProtocol(); + JoinGroupRequestProtocolCollection protocols = new JoinGroupRequestProtocolCollection(0); + protocols.add(new JoinGroupRequestProtocol() + .setName(selectedProtocol) + .setMetadata(ConsumerProtocol.serializeSubscription(new ConsumerPartitionAssignor.Subscription( + Collections.singletonList("foo"))).array()) + ); + + JoinGroupRequestData request = new JoinGroupRequestBuilder() + .withGroupId("group-id") + .withGroupInstanceId("follower-instance-id") + .withMemberId(UNKNOWN_MEMBER_ID) + .withProtocols(protocols) + .build(); + + CompletableFuture<JoinGroupResponseData> followerJoinFuture = new CompletableFuture<>(); + CoordinatorResult<Void, Record> result = context.sendGenericGroupJoin( + request, + followerJoinFuture, + true, + true); + + assertEquals( + Collections.singletonList(newGroupMetadataRecord(group, MetadataVersion.latest())), + result.records() + ); + + // Simulate a successful write to the log. + result.appendFuture().complete(null); + assertTrue(followerJoinFuture.isDone()); + + JoinGroupResponseData expectedResponse = new JoinGroupResponseData() + .setErrorCode(Errors.NONE.code()) + .setGenerationId(rebalanceResult.generationId) + .setMemberId(followerJoinFuture.get().memberId()) + .setLeader(rebalanceResult.leaderId) + .setProtocolName("range") + .setProtocolType("consumer") + .setSkipAssignment(false) + .setMembers(Collections.emptyList()); + + checkJoinGroupResponse( + expectedResponse, + followerJoinFuture.get(), + group, + STABLE, + Collections.emptySet() + ); + + // Join with old member id will fail because the member id is updated + String newFollowerId = followerJoinFuture.get().memberId(); + assertNotEquals(rebalanceResult.followerId, newFollowerId); + followerJoinFuture = new CompletableFuture<>(); + result = context.sendGenericGroupJoin(request.setMemberId(rebalanceResult.followerId), followerJoinFuture); + + assertTrue(result.records().isEmpty()); + assertEquals(Errors.FENCED_INSTANCE_ID.code(), followerJoinFuture.get().errorCode()); + + // Sync with old member id will fail because the member id is updated + CompletableFuture<SyncGroupResponseData> syncFuture = new CompletableFuture<>(); + SyncGroupRequestData syncRequest = new SyncGroupRequestBuilder() + .withGroupId("group-id") + .withGroupInstanceId("follower-instance-id") + .withGenerationId(rebalanceResult.generationId) + .withMemberId(rebalanceResult.followerId) + .withAssignment(Collections.emptyList()) + .build(); + + CoordinatorResult<Void, Record> syncResult = context.sendGenericGroupSync(syncRequest, syncFuture); + + assertTrue(syncResult.records().isEmpty()); + assertTrue(syncFuture.isDone()); + assertEquals(Errors.FENCED_INSTANCE_ID.code(), syncFuture.get().errorCode()); + + + // Sync with new member id succeeds + syncFuture = new CompletableFuture<>(); + syncResult = context.sendGenericGroupSync( + syncRequest.setMemberId(newFollowerId), + syncFuture + ); + + assertTrue(syncResult.records().isEmpty()); + assertTrue(syncFuture.isDone()); + assertEquals(Errors.NONE.code(), syncFuture.get().errorCode()); + assertEquals(rebalanceResult.followerAssignment, syncFuture.get().assignment()); + } + + @Test + public void testStaticMemberRejoinWithKnownLeaderIdToTriggerRebalanceAndFollowerWithChangeofProtocol() + throws Exception { + + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + GenericGroup group = context.createGenericGroup("group-id"); + + RebalanceResult rebalanceResult = staticMembersJoinAndRebalance( + context, + "group-id", + "leader-instance-id", + "follower-instance-id" + ); + + // A static leader rejoin with known member id will trigger rebalance. + JoinGroupRequestData request = new JoinGroupRequestBuilder() + .withGroupId("group-id") + .withGroupInstanceId("leader-instance-id") + .withMemberId(rebalanceResult.leaderId) + .withProtocolSuperset() + .withRebalanceTimeoutMs(10000) + .withSessionTimeoutMs(5000) + .build(); + + CompletableFuture<JoinGroupResponseData> leaderJoinFuture = new CompletableFuture<>(); + CoordinatorResult<Void, Record> result = context.sendGenericGroupJoin( + request, + leaderJoinFuture, + true, + true); + + assertTrue(result.records().isEmpty()); + assertFalse(leaderJoinFuture.isDone()); + + // Rebalance completes immediately after follower rejoins. + CompletableFuture<JoinGroupResponseData> followerJoinFuture = new CompletableFuture<>(); + result = context.sendGenericGroupJoin( + request.setGroupInstanceId("follower-instance-id") + .setMemberId(rebalanceResult.followerId), + followerJoinFuture, + true, + true); + + assertTrue(result.records().isEmpty()); + assertTrue(leaderJoinFuture.isDone()); + assertTrue(followerJoinFuture.isDone()); + assertTrue(group.isInState(COMPLETING_REBALANCE)); + assertEquals(2, group.generationId()); + + // Leader should get the same assignment as last round. + JoinGroupResponseData expectedLeaderResponse = new JoinGroupResponseData() + .setErrorCode(Errors.NONE.code()) + .setGenerationId(rebalanceResult.generationId + 1) // The group has promoted to the new generation. + .setMemberId(leaderJoinFuture.get().memberId()) + .setLeader(rebalanceResult.leaderId) + .setProtocolName("range") + .setProtocolType("consumer") + .setSkipAssignment(false) + .setMembers(toJoinResponseMembers(group)); + + Set<String> expectedGroupInstanceIds = new HashSet<>(); + expectedGroupInstanceIds.add("leader-instance-id"); + expectedGroupInstanceIds.add("follower-instance-id"); + + checkJoinGroupResponse( + expectedLeaderResponse, + leaderJoinFuture.get(), + group, + COMPLETING_REBALANCE, + expectedGroupInstanceIds + ); + + JoinGroupResponseData expectedFollowerResponse = new JoinGroupResponseData() + .setErrorCode(Errors.NONE.code()) + .setGenerationId(rebalanceResult.generationId + 1) // The group has promoted to the new generation. + .setMemberId(followerJoinFuture.get().memberId()) + .setLeader(rebalanceResult.leaderId) + .setProtocolName("range") + .setProtocolType("consumer") + .setSkipAssignment(false) + .setMembers(Collections.emptyList()); + + checkJoinGroupResponse( + expectedFollowerResponse, + followerJoinFuture.get(), + group, + COMPLETING_REBALANCE, + Collections.emptySet() + ); + + // The follower protocol changed from protocolSuperset to general protocols. + JoinGroupRequestProtocolCollection protocols = new JoinGroupRequestProtocolCollection(); + protocols.add(new JoinGroupRequestProtocol() + .setName("range") + .setMetadata(ConsumerProtocol.serializeSubscription(new ConsumerPartitionAssignor.Subscription( + Collections.singletonList("foo"))).array())); + + followerJoinFuture = new CompletableFuture<>(); + result = context.sendGenericGroupJoin( + request.setGroupInstanceId("follower-instance-id") + .setMemberId(rebalanceResult.followerId) + .setProtocols(protocols), + followerJoinFuture, + true, + true); + + assertTrue(result.records().isEmpty()); + assertFalse(followerJoinFuture.isDone()); + // The group will transition to PreparingRebalance due to protocol change from follower. + assertTrue(group.isInState(PREPARING_REBALANCE)); + + // Advance clock by session timeout to kick leader out and complete join phase. + List<ExpiredTimeout<Void, Record>> timeouts = context.sleep(5000); + // Both leader and follower heartbeat timers may expire. However, the follower heartbeat expiration + // will not kick the follower out because it is awaiting a join response. + assertTrue(timeouts.size() <= 2); + assertTrue(followerJoinFuture.isDone()); + + String newFollowerId = followerJoinFuture.get().memberId(); + expectedFollowerResponse = new JoinGroupResponseData() + .setErrorCode(Errors.NONE.code()) + .setGenerationId(rebalanceResult.generationId + 2) // The group has promoted to the new generation. + .setMemberId(newFollowerId) + .setLeader(newFollowerId) + .setProtocolName("range") + .setProtocolType("consumer") + .setSkipAssignment(false) + .setMembers(toJoinResponseMembers(group)); + + checkJoinGroupResponse( + expectedFollowerResponse, + followerJoinFuture.get(), + group, + COMPLETING_REBALANCE, + Collections.singleton("follower-instance-id") + ); + } + + @Test + public void testStaticMemberRejoinAsFollowerWithUnknownMemberId() throws Exception { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + GenericGroup group = context.createGenericGroup("group-id"); + + RebalanceResult rebalanceResult = staticMembersJoinAndRebalance( + context, + "group-id", + "leader-instance-id", + "follower-instance-id" + ); + + // A static follower rejoin with no protocol change will not trigger rebalance. + JoinGroupRequestData request = new JoinGroupRequestBuilder() + .withGroupId("group-id") + .withGroupInstanceId("follower-instance-id") + .withMemberId(UNKNOWN_MEMBER_ID) + .withProtocolSuperset() + .build(); + + CompletableFuture<JoinGroupResponseData> followerJoinFuture = new CompletableFuture<>(); + CoordinatorResult<Void, Record> result = context.sendGenericGroupJoin( + request, + followerJoinFuture, + true, + true); + + assertEquals( + Collections.singletonList(newGroupMetadataRecord(group, MetadataVersion.latest())), + result.records() + ); + // Simulate successful write to log. + result.appendFuture().complete(null); + + assertTrue(followerJoinFuture.isDone()); + + // Old leader shouldn't be timed out. + assertTrue(group.hasStaticMember("leader-instance-id")); + + JoinGroupResponseData expectedFollowerResponse = new JoinGroupResponseData() + .setErrorCode(Errors.NONE.code()) + .setGenerationId(rebalanceResult.generationId) // The group has not changed. + .setMemberId(followerJoinFuture.get().memberId()) + .setLeader(rebalanceResult.leaderId) + .setProtocolName("range") + .setProtocolType("consumer") + .setSkipAssignment(false) + .setMembers(Collections.emptyList()); + + checkJoinGroupResponse( + expectedFollowerResponse, + followerJoinFuture.get(), + group, + STABLE, + Collections.emptySet() + ); + assertNotEquals(rebalanceResult.followerId, followerJoinFuture.get().memberId()); + + CompletableFuture<SyncGroupResponseData> syncFuture = new CompletableFuture<>(); + SyncGroupRequestData syncRequest = new SyncGroupRequestBuilder() + .withGroupId("group-id") + .withGroupInstanceId("follower-instance-id") + .withGenerationId(rebalanceResult.generationId) + .withMemberId(followerJoinFuture.get().memberId()) + .build(); + + result = context.sendGenericGroupSync(syncRequest, syncFuture); + + assertTrue(result.records().isEmpty()); + assertTrue(syncFuture.isDone()); + assertEquals(Errors.NONE.code(), syncFuture.get().errorCode()); + assertEquals(rebalanceResult.followerAssignment, syncFuture.get().assignment()); + } + + @Test + public void testStaticMemberRejoinAsFollowerWithKnownMemberIdAndNoProtocolChange() throws Exception { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + GenericGroup group = context.createGenericGroup("group-id"); + + RebalanceResult rebalanceResult = staticMembersJoinAndRebalance( + context, + "group-id", + "leader-instance-id", + "follower-instance-id" + ); + + // A static follower rejoin with no protocol change will not trigger rebalance. + JoinGroupRequestData request = new JoinGroupRequestBuilder() + .withGroupId("group-id") + .withGroupInstanceId("follower-instance-id") + .withMemberId(rebalanceResult.followerId) + .withProtocolSuperset() + .build(); + + CompletableFuture<JoinGroupResponseData> followerJoinFuture = new CompletableFuture<>(); + CoordinatorResult<Void, Record> result = context.sendGenericGroupJoin( + request, + followerJoinFuture, + true, + true); + + // No records to write because no metadata changed. + assertTrue(result.records().isEmpty()); + assertTrue(followerJoinFuture.isDone()); + + // Old leader shouldn't be timed out. + assertTrue(group.hasStaticMember("leader-instance-id")); + + JoinGroupResponseData expectedFollowerResponse = new JoinGroupResponseData() + .setErrorCode(Errors.NONE.code()) + .setGenerationId(rebalanceResult.generationId) // The group has not changed. + .setMemberId(rebalanceResult.followerId) + .setLeader(rebalanceResult.leaderId) + .setProtocolName("range") + .setProtocolType("consumer") + .setSkipAssignment(false) + .setMembers(Collections.emptyList()); + + checkJoinGroupResponse( + expectedFollowerResponse, + followerJoinFuture.get(), + group, + STABLE, + Collections.emptySet() + ); + } + + @Test + public void testStaticMemberRejoinAsFollowerWithMismatchedInstanceId() throws Exception { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + context.createGenericGroup("group-id"); + + RebalanceResult rebalanceResult = staticMembersJoinAndRebalance( + context, + "group-id", + "leader-instance-id", + "follower-instance-id" + ); + + JoinGroupRequestData request = new JoinGroupRequestBuilder() + .withGroupId("group-id") + .withGroupInstanceId("leader-instance-id") + .withMemberId(rebalanceResult.followerId) + .withProtocolSuperset() + .build(); + + CompletableFuture<JoinGroupResponseData> followerJoinFuture = new CompletableFuture<>(); + CoordinatorResult<Void, Record> result = context.sendGenericGroupJoin( + request, + followerJoinFuture, + true, + true); + + assertTrue(result.records().isEmpty()); + assertTrue(followerJoinFuture.isDone()); + assertEquals(Errors.FENCED_INSTANCE_ID.code(), followerJoinFuture.get().errorCode()); + } + + @Test + public void testStaticMemberRejoinAsLeaderWithMismatchedInstanceId() throws Exception { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + context.createGenericGroup("group-id"); + + RebalanceResult rebalanceResult = staticMembersJoinAndRebalance( + context, + "group-id", + "leader-instance-id", + "follower-instance-id" + ); + + JoinGroupRequestData request = new JoinGroupRequestBuilder() + .withGroupId("group-id") + .withGroupInstanceId("follower-instance-id") + .withMemberId(rebalanceResult.leaderId) + .withProtocolSuperset() + .build(); + + CompletableFuture<JoinGroupResponseData> leaderJoinFuture = new CompletableFuture<>(); + CoordinatorResult<Void, Record> result = context.sendGenericGroupJoin( + request, + leaderJoinFuture, + true, + true); + + assertTrue(result.records().isEmpty()); + assertTrue(leaderJoinFuture.isDone()); + assertEquals(Errors.FENCED_INSTANCE_ID.code(), leaderJoinFuture.get().errorCode()); + } + + @Test + public void testStaticMemberSyncAsLeaderWithInvalidMemberId() throws Exception { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + context.createGenericGroup("group-id"); + + staticMembersJoinAndRebalance( + context, + "group-id", + "leader-instance-id", + "follower-instance-id" + ); + + SyncGroupRequestData request = new SyncGroupRequestBuilder() + .withGroupId("group-id") + .withGroupInstanceId("follower-instance-id") + .withMemberId("invalid-member-id") + .build(); + + CompletableFuture<SyncGroupResponseData> leaderSyncFuture = new CompletableFuture<>(); + CoordinatorResult<Void, Record> result = context.sendGenericGroupSync( + request, + leaderSyncFuture + ); + + assertTrue(result.records().isEmpty()); + assertTrue(leaderSyncFuture.isDone()); + assertEquals(Errors.FENCED_INSTANCE_ID.code(), leaderSyncFuture.get().errorCode()); + } + + @Test + public void testGetDifferentStaticMemberIdAfterEachRejoin() throws Exception { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + GenericGroup group = context.createGenericGroup("group-id"); + + RebalanceResult rebalanceResult = staticMembersJoinAndRebalance( + context, + "group-id", + "leader-instance-id", + "follower-instance-id" + ); + + String lastMemberId = rebalanceResult.leaderId; + for (int i = 0; i < 5; i++) { + JoinGroupRequestData request = new JoinGroupRequestBuilder() + .withGroupId("group-id") + .withGroupInstanceId("leader-instance-id") + .withMemberId(UNKNOWN_MEMBER_ID) + .withProtocolSuperset() + .build(); + + CompletableFuture<JoinGroupResponseData> leaderJoinFuture = new CompletableFuture<>(); + CoordinatorResult<Void, Record> result = context.sendGenericGroupJoin( + request, + leaderJoinFuture, + true, + true); + + assertEquals( + Collections.singletonList(newGroupMetadataRecord(group, MetadataVersion.latest())), + result.records() + ); + // Simulate successful write to log. + result.appendFuture().complete(null); + assertTrue(leaderJoinFuture.isDone()); + assertEquals(group.staticMemberId("leader-instance-id"), leaderJoinFuture.get().memberId()); + assertEquals(Errors.NONE.code(), leaderJoinFuture.get().errorCode()); + assertNotEquals(lastMemberId, leaderJoinFuture.get().memberId()); + + lastMemberId = leaderJoinFuture.get().memberId(); + } + } + + @Test + public void testStaticMemberJoinWithUnknownInstanceIdAndKnownMemberId() throws Exception { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + context.createGenericGroup("group-id"); + + RebalanceResult rebalanceResult = staticMembersJoinAndRebalance( + context, + "group-id", + "leader-instance-id", + "follower-instance-id" + ); + + JoinGroupRequestData request = new JoinGroupRequestBuilder() + .withGroupId("group-id") + .withGroupInstanceId("unknown-instance-id") + .withMemberId(rebalanceResult.leaderId) + .withProtocolSuperset() + .build(); + + CompletableFuture<JoinGroupResponseData> joinFuture = new CompletableFuture<>(); + CoordinatorResult<Void, Record> result = context.sendGenericGroupJoin( + request, + joinFuture, + true, + true); + + assertTrue(result.records().isEmpty()); + assertTrue(joinFuture.isDone()); + assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), joinFuture.get().errorCode()); + } + + @Test + public void testStaticMemberReJoinWithIllegalStateAsUnknownMember() throws Exception { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + GenericGroup group = context.createGenericGroup("group-id"); + + staticMembersJoinAndRebalance( + context, + "group-id", + "leader-instance-id", + "follower-instance-id" + ); + + group.transitionTo(PREPARING_REBALANCE); + group.transitionTo(EMPTY); + + JoinGroupRequestData request = new JoinGroupRequestBuilder() + .withGroupId("group-id") + .withGroupInstanceId("follower-instance-id") + .withMemberId(UNKNOWN_MEMBER_ID) + .withProtocolSuperset() + .build(); + + // Illegal state exception shall trigger since follower id resides in pending member bucket. + IllegalStateException exception = assertThrows(IllegalStateException.class, () -> context.sendGenericGroupJoin( + request, + new CompletableFuture<>(), + true, + true)); + + String message = exception.getMessage(); + assertTrue(message.contains(group.groupId())); + assertTrue(message.contains("follower-instance-id")); + } + + @Test + public void testStaticMemberFollowerFailToRejoinBeforeRebalanceTimeout() throws Exception { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + GenericGroup group = context.createGenericGroup("group-id"); + + // Increase session timeout so that the follower won't be evicted when rebalance timeout is reached. + RebalanceResult rebalanceResult = staticMembersJoinAndRebalance( + context, + "group-id", + "leader-instance-id", + "follower-instance-id", + 10000, + 15000 + ); + + String newMemberInstanceId = "new-member-instance-id"; + String leaderId = rebalanceResult.leaderId; + + JoinGroupRequestData request = new JoinGroupRequestBuilder() + .withGroupId("group-id") + .withGroupInstanceId(newMemberInstanceId) + .withMemberId(UNKNOWN_MEMBER_ID) + .withProtocolSuperset() + .build(); + + CompletableFuture<JoinGroupResponseData> newMemberFuture = new CompletableFuture<>(); + CoordinatorResult<Void, Record> result = context.sendGenericGroupJoin( + request, + newMemberFuture, + true, + true); + + assertTrue(result.records().isEmpty()); + assertFalse(newMemberFuture.isDone()); + assertTrue(group.isInState(PREPARING_REBALANCE)); + + CompletableFuture<JoinGroupResponseData> leaderJoinFuture = new CompletableFuture<>(); + result = context.sendGenericGroupJoin( + request.setGroupInstanceId("leader-instance-id") + .setMemberId(leaderId), + leaderJoinFuture, + true, + true); + + assertTrue(result.records().isEmpty()); + assertFalse(leaderJoinFuture.isDone()); + + // Advance clock by rebalance timeout to complete join phase. + assertNoOrEmptyResult(context.sleep(10000)); + + assertTrue(leaderJoinFuture.isDone()); + assertTrue(newMemberFuture.isDone()); + + JoinGroupResponseData expectedLeaderResponse = new JoinGroupResponseData() + .setErrorCode(Errors.NONE.code()) + .setGenerationId(rebalanceResult.generationId + 1) + .setMemberId(leaderId) + .setLeader(leaderId) + .setProtocolName("range") + .setProtocolType("consumer") + .setSkipAssignment(false) + .setMembers(toJoinResponseMembers(group)); + + Set<String> expectedGroupInstanceIds = new HashSet<>(); + expectedGroupInstanceIds.add("leader-instance-id"); + expectedGroupInstanceIds.add("follower-instance-id"); + expectedGroupInstanceIds.add(newMemberInstanceId); + + checkJoinGroupResponse( + expectedLeaderResponse, + leaderJoinFuture.get(), + group, + COMPLETING_REBALANCE, + expectedGroupInstanceIds + ); + + JoinGroupResponseData expectedNewMemberResponse = new JoinGroupResponseData() + .setErrorCode(Errors.NONE.code()) + .setGenerationId(rebalanceResult.generationId + 1) + .setMemberId(newMemberFuture.get().memberId()) + .setLeader(leaderId) + .setProtocolName("range") + .setProtocolType("consumer") + .setSkipAssignment(false) + .setMembers(Collections.emptyList()); + + checkJoinGroupResponse( + expectedNewMemberResponse, + newMemberFuture.get(), + group, + COMPLETING_REBALANCE, + Collections.emptySet() + ); + } + + @Test + public void testStaticMemberLeaderFailToRejoinBeforeRebalanceTimeout() throws Exception { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + GenericGroup group = context.createGenericGroup("group-id"); + + // Increase session timeout so that the leader won't be evicted when rebalance timeout is reached. + RebalanceResult rebalanceResult = staticMembersJoinAndRebalance( + context, + "group-id", + "leader-instance-id", + "follower-instance-id", + 10000, + 15000 + ); + + String newMemberInstanceId = "new-member-instance-id"; + JoinGroupRequestData request = new JoinGroupRequestBuilder() + .withGroupId("group-id") + .withGroupInstanceId(newMemberInstanceId) + .withMemberId(UNKNOWN_MEMBER_ID) + .withProtocolSuperset() + .build(); + + CompletableFuture<JoinGroupResponseData> newMemberFuture = new CompletableFuture<>(); + CoordinatorResult<Void, Record> result = context.sendGenericGroupJoin( + request, + newMemberFuture, + true, + true); + + assertTrue(result.records().isEmpty()); + assertFalse(newMemberFuture.isDone()); + assertTrue(group.isInState(PREPARING_REBALANCE)); + + CompletableFuture<JoinGroupResponseData> oldFollowerJoinFuture = new CompletableFuture<>(); + result = context.sendGenericGroupJoin( + request.setGroupInstanceId("follower-instance-id") + .setMemberId(rebalanceResult.followerId), + oldFollowerJoinFuture, + true, + true); + + assertTrue(result.records().isEmpty()); + assertFalse(oldFollowerJoinFuture.isDone()); + + // Advance clock by rebalance timeout to complete join phase. + assertNoOrEmptyResult(context.sleep(10000)); + + assertTrue(oldFollowerJoinFuture.isDone()); + assertTrue(newMemberFuture.isDone()); + + JoinGroupResponseData newLeaderResponse = oldFollowerJoinFuture.get().leader() + .equals(oldFollowerJoinFuture.get().memberId()) ? oldFollowerJoinFuture.get() : newMemberFuture.get(); + + JoinGroupResponseData newFollowerResponse = oldFollowerJoinFuture.get().leader() + .equals(oldFollowerJoinFuture.get().memberId()) ? newMemberFuture.get() : oldFollowerJoinFuture.get(); + + JoinGroupResponseData expectedLeaderResponse = new JoinGroupResponseData() + .setErrorCode(Errors.NONE.code()) + .setGenerationId(rebalanceResult.generationId + 1) + .setMemberId(newLeaderResponse.memberId()) + .setLeader(newLeaderResponse.memberId()) + .setProtocolName("range") + .setProtocolType("consumer") + .setSkipAssignment(false) + .setMembers(toJoinResponseMembers(group)); + + Set<String> expectedGroupInstanceIds = new HashSet<>(); + expectedGroupInstanceIds.add("leader-instance-id"); + expectedGroupInstanceIds.add("follower-instance-id"); + expectedGroupInstanceIds.add(newMemberInstanceId); + + checkJoinGroupResponse( + expectedLeaderResponse, + newLeaderResponse, + group, + COMPLETING_REBALANCE, + expectedGroupInstanceIds + ); + + JoinGroupResponseData expectedNewMemberResponse = new JoinGroupResponseData() + .setErrorCode(Errors.NONE.code()) + .setGenerationId(rebalanceResult.generationId + 1) + .setMemberId(newFollowerResponse.memberId()) + .setLeader(newLeaderResponse.memberId()) + .setProtocolName("range") + .setProtocolType("consumer") + .setSkipAssignment(false) + .setMembers(Collections.emptyList()); + + checkJoinGroupResponse( + expectedNewMemberResponse, + newFollowerResponse, + group, + COMPLETING_REBALANCE, + Collections.emptySet() + ); + } + + @Test + public void testSyncGroupReturnsAnErrorWhenProtocolTypeIsInconsistent() throws Exception { + testSyncGroupProtocolTypeAndNameWith( + Optional.of("protocolType"), + Optional.empty(), + Errors.INCONSISTENT_GROUP_PROTOCOL, + Optional.empty(), + Optional.empty() + ); + } + + @Test + public void testSyncGroupReturnsAnErrorWhenProtocolNameIsInconsistent() throws Exception { + testSyncGroupProtocolTypeAndNameWith( + Optional.empty(), + Optional.of("protocolName"), + Errors.INCONSISTENT_GROUP_PROTOCOL, + Optional.empty(), + Optional.empty() + ); + } + + @Test + public void testSyncGroupSucceedWhenProtocolTypeAndNameAreNotProvided() throws Exception { + testSyncGroupProtocolTypeAndNameWith( + Optional.empty(), + Optional.empty(), + Errors.NONE, + Optional.of("consumer"), + Optional.of("range") + ); + } + + @Test + public void testSyncGroupSucceedWhenProtocolTypeAndNameAreConsistent() throws Exception { + testSyncGroupProtocolTypeAndNameWith( + Optional.of("consumer"), + Optional.of("range"), + Errors.NONE, + Optional.of("consumer"), + Optional.of("range") + ); + } + + private void testSyncGroupProtocolTypeAndNameWith( + Optional<String> protocolType, + Optional<String> protocolName, + Errors expectedError, + Optional<String> expectedProtocolType, + Optional<String> expectedProtocolName + ) throws Exception { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + GenericGroup group = context.createGenericGroup("group-id"); + + // JoinGroup(leader) with the Protocol Type of the group + JoinGroupRequestData joinRequest = new JoinGroupRequestBuilder() + .withGroupId("group-id") + .withGroupInstanceId("leader-instance-id") + .withMemberId(UNKNOWN_MEMBER_ID) + .withProtocolSuperset() + .build(); + + CompletableFuture<JoinGroupResponseData> leaderJoinFuture = new CompletableFuture<>(); + CoordinatorResult<Void, Record> result = context.sendGenericGroupJoin(joinRequest, leaderJoinFuture); + + assertTrue(result.records().isEmpty()); + assertFalse(leaderJoinFuture.isDone()); + + // JoinGroup(follower) with the Protocol Type of the group + CompletableFuture<JoinGroupResponseData> followerJoinFuture = new CompletableFuture<>(); + result = context.sendGenericGroupJoin(joinRequest.setGroupInstanceId("follower-instance-id"), followerJoinFuture); + + assertTrue(result.records().isEmpty()); + assertFalse(followerJoinFuture.isDone()); + + // Advance clock by rebalance timeout to complete join phase. + assertNoOrEmptyResult(context.sleep(10000)); + assertTrue(leaderJoinFuture.isDone()); + assertTrue(followerJoinFuture.isDone()); + assertEquals(Errors.NONE.code(), leaderJoinFuture.get().errorCode()); + assertEquals(Errors.NONE.code(), followerJoinFuture.get().errorCode()); + + String leaderId = leaderJoinFuture.get().memberId(); + String followerId = followerJoinFuture.get().memberId(); + int generationId = leaderJoinFuture.get().generationId(); + + // SyncGroup with the provided Protocol Type and Name + List<SyncGroupRequestAssignment> assignment = new ArrayList<>(); + assignment.add(new SyncGroupRequestAssignment().setMemberId(leaderId)); + SyncGroupRequestData syncRequest = new SyncGroupRequestBuilder() + .withGroupId("group-id") + .withMemberId(leaderId) + .withProtocolType(protocolType.orElse(null)) + .withProtocolName(protocolName.orElse(null)) + .withGenerationId(generationId) + .withAssignment(assignment) + .build(); + + CompletableFuture<SyncGroupResponseData> leaderSyncFuture = new CompletableFuture<>(); + result = context.sendGenericGroupSync(syncRequest, leaderSyncFuture); + + if (expectedError == Errors.NONE) { + assertEquals( + Collections.singletonList(newGroupMetadataRecord(group, MetadataVersion.latest())), + result.records() + ); + // Simulate successful write to log. + result.appendFuture().complete(null); + } else { + assertTrue(result.records().isEmpty()); + } Review Comment: makes sense, i removed it for most of the tests, at least for the ones that generate records -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org