This is an automated email from the ASF dual-hosted git repository. dajac pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new b3116f4f76e KAFKA-16148: Implement GroupMetadataManager#onUnloaded (#15446) b3116f4f76e is described below commit b3116f4f76ebc8a074e0d7ce38bf46981da44723 Author: Jeff Kim <kimkb2...@gmail.com> AuthorDate: Tue Apr 2 06:16:02 2024 -0400 KAFKA-16148: Implement GroupMetadataManager#onUnloaded (#15446) This patch completes all awaiting futures when a group is unloaded. Reviewers: David Jacot <dja...@confluent.io> --- .../coordinator/group/GroupCoordinatorShard.java | 1 + .../coordinator/group/GroupMetadataManager.java | 44 ++++++++- .../coordinator/group/classic/ClassicGroup.java | 11 --- .../group/GroupCoordinatorShardTest.java | 22 +++++ .../group/GroupMetadataManagerTest.java | 103 +++++++++++++++++++++ .../group/GroupMetadataManagerTestContext.java | 4 + 6 files changed, 172 insertions(+), 13 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java index 12c194c331b..be4a9bf7d0a 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java @@ -639,6 +639,7 @@ public class GroupCoordinatorShard implements CoordinatorShard<Record> { public void onUnloaded() { timer.cancel(GROUP_EXPIRATION_KEY); coordinatorMetrics.deactivateMetricsShard(metricsShard); + groupMetadataManager.onUnloaded(); } /** diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 9068ad17efc..9cfe8f617e6 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -275,6 +275,7 @@ public class GroupMetadataManager { ); } } + /** * The log context. */ @@ -1920,6 +1921,47 @@ public class GroupMetadataManager { }); } + /** + * Called when the partition is unloaded. + * ClassicGroup: Complete all awaiting join and sync futures. Transition group to Dead. + */ + public void onUnloaded() { + groups.values().forEach(group -> { + switch (group.type()) { + case CONSUMER: + ConsumerGroup consumerGroup = (ConsumerGroup) group; + log.info("[GroupId={}] Unloaded group metadata for group epoch {}.", + consumerGroup.groupId(), consumerGroup.groupEpoch()); + break; + case CLASSIC: + ClassicGroup classicGroup = (ClassicGroup) group; + log.info("[GroupId={}] Unloading group metadata for generation {}.", + classicGroup.groupId(), classicGroup.generationId()); + + classicGroup.transitionTo(DEAD); + switch (classicGroup.previousState()) { + case EMPTY: + case DEAD: + break; + case PREPARING_REBALANCE: + classicGroup.allMembers().forEach(member -> { + classicGroup.completeJoinFuture(member, new JoinGroupResponseData() + .setMemberId(member.memberId()) + .setErrorCode(NOT_COORDINATOR.code())); + }); + + break; + case COMPLETING_REBALANCE: + case STABLE: + classicGroup.allMembers().forEach(member -> { + classicGroup.completeSyncFuture(member, new SyncGroupResponseData() + .setErrorCode(NOT_COORDINATOR.code())); + }); + } + } + }); + } + public static String consumerGroupSessionTimeoutKey(String groupId, String memberId) { return "session-timeout-" + groupId + "-" + memberId; } @@ -3088,7 +3130,6 @@ public class GroupMetadataManager { responseFuture.complete( new JoinGroupResponseData() - .setMembers(Collections.emptyList()) .setMemberId(UNKNOWN_MEMBER_ID) .setGenerationId(group.generationId()) .setProtocolName(group.protocolName().orElse(null)) @@ -3111,7 +3152,6 @@ public class GroupMetadataManager { ); } else { group.completeJoinFuture(newMember, new JoinGroupResponseData() - .setMembers(Collections.emptyList()) .setMemberId(newMemberId) .setGenerationId(group.generationId()) .setProtocolName(group.protocolName().orElse(null)) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java index 49b087ad3df..3ba31d5d855 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java @@ -71,16 +71,6 @@ import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.STABL */ public class ClassicGroup implements Group { - /** - * Empty generation. - */ - public static final int NO_GENERATION = -1; - - /** - * Protocol with empty name. - */ - public static final String NO_PROTOCOL_NAME = ""; - /** * No leader. */ @@ -545,7 +535,6 @@ public class ClassicGroup implements Group { JoinGroupResponseData joinGroupResponse = new JoinGroupResponseData() .setMembers(Collections.emptyList()) .setMemberId(oldMemberId) - .setGenerationId(NO_GENERATION) .setProtocolName(null) .setProtocolType(null) .setLeader(NO_LEADER) diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java index 59868f36f10..19c4b366a92 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java @@ -1056,4 +1056,26 @@ public class GroupCoordinatorShardTest { assertEquals(records, result.records()); assertNull(result.response()); } + + @Test + public void testOnUnloaded() { + GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); + OffsetMetadataManager offsetMetadataManager = mock(OffsetMetadataManager.class); + Time mockTime = new MockTime(); + MockCoordinatorTimer<Void, Record> timer = new MockCoordinatorTimer<>(mockTime); + GroupCoordinatorShard coordinator = new GroupCoordinatorShard( + new LogContext(), + groupMetadataManager, + offsetMetadataManager, + mockTime, + timer, + GroupCoordinatorConfigTest.createGroupCoordinatorConfig(4096, 1000L, 24 * 60), + mock(CoordinatorMetrics.class), + mock(CoordinatorMetricsShard.class) + ); + + coordinator.onUnloaded(); + assertEquals(0, timer.size()); + verify(groupMetadataManager, times(1)).onUnloaded(); + } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index dc21a2140d2..81c582ed4d4 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -94,6 +94,7 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; +import static org.apache.kafka.common.protocol.Errors.NOT_COORDINATOR; import static org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH; import static org.apache.kafka.common.utils.Utils.mkSet; import static org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol; @@ -113,6 +114,7 @@ import static org.apache.kafka.coordinator.group.GroupMetadataManager.EMPTY_RESU import static org.apache.kafka.coordinator.group.GroupMetadataManager.classicGroupHeartbeatKey; import static org.apache.kafka.coordinator.group.GroupMetadataManager.classicGroupSyncKey; import static org.apache.kafka.coordinator.group.RecordHelpersTest.mkMapOfPartitionRacks; +import static org.apache.kafka.coordinator.group.classic.ClassicGroupMember.EMPTY_ASSIGNMENT; import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.COMPLETING_REBALANCE; import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.DEAD; import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.EMPTY; @@ -9520,6 +9522,107 @@ public class GroupMetadataManagerTest { ); } + @Test + public void testClassicGroupOnUnloadedEmptyAndPreparingRebalance() throws Exception { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + + ClassicGroup emptyGroup = context.createClassicGroup("empty-group"); + assertTrue(emptyGroup.isInState(EMPTY)); + + ClassicGroup preparingGroup = context.createClassicGroup("preparing-group"); + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId("preparing-group") + .withMemberId(UNKNOWN_MEMBER_ID) + .withDefaultProtocolTypeAndProtocols() + .build(); + + // preparing-group should have 2 members. + GroupMetadataManagerTestContext.JoinResult joinResult1 = context.sendClassicGroupJoin(request); + GroupMetadataManagerTestContext.JoinResult joinResult2 = context.sendClassicGroupJoin(request); + + assertFalse(joinResult1.joinFuture.isDone()); + assertFalse(joinResult2.joinFuture.isDone()); + assertTrue(preparingGroup.isInState(PREPARING_REBALANCE)); + assertEquals(2, preparingGroup.size()); + + context.onUnloaded(); + + assertTrue(emptyGroup.isInState(DEAD)); + assertTrue(preparingGroup.isInState(DEAD)); + assertTrue(joinResult1.joinFuture.isDone()); + assertTrue(joinResult2.joinFuture.isDone()); + assertEquals(new JoinGroupResponseData() + .setMemberId(joinResult1.joinFuture.get().memberId()) + .setMembers(Collections.emptyList()) + .setErrorCode(NOT_COORDINATOR.code()), joinResult1.joinFuture.get()); + + assertEquals(new JoinGroupResponseData() + .setMemberId(joinResult2.joinFuture.get().memberId()) + .setMembers(Collections.emptyList()) + .setErrorCode(NOT_COORDINATOR.code()), joinResult2.joinFuture.get()); + } + + @Test + public void testClassicGroupOnUnloadedCompletingRebalance() throws Exception { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); + ClassicGroup group = context.createClassicGroup("group-id"); + + // Set up a group in with a leader, follower, and a pending member. + // Have the pending member join the group and both the pending member + // and the follower sync. We should have 2 members awaiting sync. + GroupMetadataManagerTestContext.PendingMemberGroupResult pendingGroupResult = context.setupGroupWithPendingMember(group); + String pendingMemberId = pendingGroupResult.pendingMemberResponse.memberId(); + + // Compete join group for the pending member + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId("group-id") + .withMemberId(pendingMemberId) + .withDefaultProtocolTypeAndProtocols() + .build(); + + GroupMetadataManagerTestContext.JoinResult joinResult = context.sendClassicGroupJoin(request); + + assertTrue(joinResult.records.isEmpty()); + assertTrue(joinResult.joinFuture.isDone()); + assertEquals(Errors.NONE.code(), joinResult.joinFuture.get().errorCode()); + assertEquals(3, group.allMembers().size()); + assertEquals(0, group.numPendingJoinMembers()); + + // Follower and pending send SyncGroup request. + // Follower and pending member should be awaiting sync while the leader is pending sync. + GroupMetadataManagerTestContext.SyncResult followerSyncResult = context.sendClassicGroupSync( + new GroupMetadataManagerTestContext.SyncGroupRequestBuilder() + .withGroupId("group-id") + .withMemberId(pendingGroupResult.followerId) + .withGenerationId(joinResult.joinFuture.get().generationId()) + .build()); + + GroupMetadataManagerTestContext.SyncResult pendingMemberSyncResult = context.sendClassicGroupSync( + new GroupMetadataManagerTestContext.SyncGroupRequestBuilder() + .withGroupId("group-id") + .withMemberId(pendingMemberId) + .withGenerationId(joinResult.joinFuture.get().generationId()) + .build()); + + assertFalse(followerSyncResult.syncFuture.isDone()); + assertFalse(pendingMemberSyncResult.syncFuture.isDone()); + assertTrue(group.isInState(COMPLETING_REBALANCE)); + + context.onUnloaded(); + + assertTrue(group.isInState(DEAD)); + assertTrue(followerSyncResult.syncFuture.isDone()); + assertTrue(pendingMemberSyncResult.syncFuture.isDone()); + assertEquals(new SyncGroupResponseData() + .setAssignment(EMPTY_ASSIGNMENT) + .setErrorCode(NOT_COORDINATOR.code()), followerSyncResult.syncFuture.get()); + assertEquals(new SyncGroupResponseData() + .setAssignment(EMPTY_ASSIGNMENT) + .setErrorCode(NOT_COORDINATOR.code()), pendingMemberSyncResult.syncFuture.get()); + } + private static void checkJoinGroupResponse( JoinGroupResponseData expectedResponse, JoinGroupResponseData actualResponse, diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java index 4a7cd8cae9e..86b0b12d998 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java @@ -1274,4 +1274,8 @@ public class GroupMetadataManagerTestContext { lastWrittenOffset++; snapshotRegistry.getOrCreateSnapshot(lastWrittenOffset); } + + void onUnloaded() { + groupMetadataManager.onUnloaded(); + } }