This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b3116f4f76e KAFKA-16148: Implement GroupMetadataManager#onUnloaded 
(#15446)
b3116f4f76e is described below

commit b3116f4f76ebc8a074e0d7ce38bf46981da44723
Author: Jeff Kim <kimkb2...@gmail.com>
AuthorDate: Tue Apr 2 06:16:02 2024 -0400

    KAFKA-16148: Implement GroupMetadataManager#onUnloaded (#15446)
    
    This patch completes all awaiting futures when a group is unloaded.
    
    Reviewers: David Jacot <dja...@confluent.io>
---
 .../coordinator/group/GroupCoordinatorShard.java   |   1 +
 .../coordinator/group/GroupMetadataManager.java    |  44 ++++++++-
 .../coordinator/group/classic/ClassicGroup.java    |  11 ---
 .../group/GroupCoordinatorShardTest.java           |  22 +++++
 .../group/GroupMetadataManagerTest.java            | 103 +++++++++++++++++++++
 .../group/GroupMetadataManagerTestContext.java     |   4 +
 6 files changed, 172 insertions(+), 13 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
index 12c194c331b..be4a9bf7d0a 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
@@ -639,6 +639,7 @@ public class GroupCoordinatorShard implements 
CoordinatorShard<Record> {
     public void onUnloaded() {
         timer.cancel(GROUP_EXPIRATION_KEY);
         coordinatorMetrics.deactivateMetricsShard(metricsShard);
+        groupMetadataManager.onUnloaded();
     }
 
     /**
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 9068ad17efc..9cfe8f617e6 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -275,6 +275,7 @@ public class GroupMetadataManager {
             );
         }
     }
+
     /**
      * The log context.
      */
@@ -1920,6 +1921,47 @@ public class GroupMetadataManager {
         });
     }
 
+    /**
+     * Called when the partition is unloaded.
+     * ClassicGroup: Complete all awaiting join and sync futures. Transition 
group to Dead.
+     */
+    public void onUnloaded() {
+        groups.values().forEach(group -> {
+            switch (group.type()) {
+                case CONSUMER:
+                    ConsumerGroup consumerGroup = (ConsumerGroup) group;
+                    log.info("[GroupId={}] Unloaded group metadata for group 
epoch {}.",
+                        consumerGroup.groupId(), consumerGroup.groupEpoch());
+                    break;
+                case CLASSIC:
+                    ClassicGroup classicGroup = (ClassicGroup) group;
+                    log.info("[GroupId={}] Unloading group metadata for 
generation {}.",
+                        classicGroup.groupId(), classicGroup.generationId());
+
+                    classicGroup.transitionTo(DEAD);
+                    switch (classicGroup.previousState()) {
+                        case EMPTY:
+                        case DEAD:
+                            break;
+                        case PREPARING_REBALANCE:
+                            classicGroup.allMembers().forEach(member -> {
+                                classicGroup.completeJoinFuture(member, new 
JoinGroupResponseData()
+                                    .setMemberId(member.memberId())
+                                    .setErrorCode(NOT_COORDINATOR.code()));
+                            });
+
+                            break;
+                        case COMPLETING_REBALANCE:
+                        case STABLE:
+                            classicGroup.allMembers().forEach(member -> {
+                                classicGroup.completeSyncFuture(member, new 
SyncGroupResponseData()
+                                    .setErrorCode(NOT_COORDINATOR.code()));
+                            });
+                    }
+            }
+        });
+    }
+
     public static String consumerGroupSessionTimeoutKey(String groupId, String 
memberId) {
         return "session-timeout-" + groupId + "-" + memberId;
     }
@@ -3088,7 +3130,6 @@ public class GroupMetadataManager {
 
                         responseFuture.complete(
                             new JoinGroupResponseData()
-                                .setMembers(Collections.emptyList())
                                 .setMemberId(UNKNOWN_MEMBER_ID)
                                 .setGenerationId(group.generationId())
                                 
.setProtocolName(group.protocolName().orElse(null))
@@ -3111,7 +3152,6 @@ public class GroupMetadataManager {
                         );
                     } else {
                         group.completeJoinFuture(newMember, new 
JoinGroupResponseData()
-                            .setMembers(Collections.emptyList())
                             .setMemberId(newMemberId)
                             .setGenerationId(group.generationId())
                             .setProtocolName(group.protocolName().orElse(null))
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java
index 49b087ad3df..3ba31d5d855 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java
@@ -71,16 +71,6 @@ import static 
org.apache.kafka.coordinator.group.classic.ClassicGroupState.STABL
  */
 public class ClassicGroup implements Group {
 
-    /**
-     * Empty generation.
-     */
-    public static final int NO_GENERATION = -1;
-
-    /**
-     * Protocol with empty name.
-     */
-    public static final String NO_PROTOCOL_NAME = "";
-
     /**
      * No leader.
      */
@@ -545,7 +535,6 @@ public class ClassicGroup implements Group {
         JoinGroupResponseData joinGroupResponse = new JoinGroupResponseData()
             .setMembers(Collections.emptyList())
             .setMemberId(oldMemberId)
-            .setGenerationId(NO_GENERATION)
             .setProtocolName(null)
             .setProtocolType(null)
             .setLeader(NO_LEADER)
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
index 59868f36f10..19c4b366a92 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
@@ -1056,4 +1056,26 @@ public class GroupCoordinatorShardTest {
         assertEquals(records, result.records());
         assertNull(result.response());
     }
+
+    @Test
+    public void testOnUnloaded() {
+        GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
+        OffsetMetadataManager offsetMetadataManager = 
mock(OffsetMetadataManager.class);
+        Time mockTime = new MockTime();
+        MockCoordinatorTimer<Void, Record> timer = new 
MockCoordinatorTimer<>(mockTime);
+        GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+            new LogContext(),
+            groupMetadataManager,
+            offsetMetadataManager,
+            mockTime,
+            timer,
+            GroupCoordinatorConfigTest.createGroupCoordinatorConfig(4096, 
1000L, 24 * 60),
+            mock(CoordinatorMetrics.class),
+            mock(CoordinatorMetricsShard.class)
+        );
+
+        coordinator.onUnloaded();
+        assertEquals(0, timer.size());
+        verify(groupMetadataManager, times(1)).onUnloaded();
+    }
 }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index dc21a2140d2..81c582ed4d4 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -94,6 +94,7 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
+import static org.apache.kafka.common.protocol.Errors.NOT_COORDINATOR;
 import static 
org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH;
 import static org.apache.kafka.common.utils.Utils.mkSet;
 import static 
org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol;
@@ -113,6 +114,7 @@ import static 
org.apache.kafka.coordinator.group.GroupMetadataManager.EMPTY_RESU
 import static 
org.apache.kafka.coordinator.group.GroupMetadataManager.classicGroupHeartbeatKey;
 import static 
org.apache.kafka.coordinator.group.GroupMetadataManager.classicGroupSyncKey;
 import static 
org.apache.kafka.coordinator.group.RecordHelpersTest.mkMapOfPartitionRacks;
+import static 
org.apache.kafka.coordinator.group.classic.ClassicGroupMember.EMPTY_ASSIGNMENT;
 import static 
org.apache.kafka.coordinator.group.classic.ClassicGroupState.COMPLETING_REBALANCE;
 import static 
org.apache.kafka.coordinator.group.classic.ClassicGroupState.DEAD;
 import static 
org.apache.kafka.coordinator.group.classic.ClassicGroupState.EMPTY;
@@ -9520,6 +9522,107 @@ public class GroupMetadataManagerTest {
         );
     }
 
+    @Test
+    public void testClassicGroupOnUnloadedEmptyAndPreparingRebalance() throws 
Exception {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        ClassicGroup emptyGroup = context.createClassicGroup("empty-group");
+        assertTrue(emptyGroup.isInState(EMPTY));
+
+        ClassicGroup preparingGroup = 
context.createClassicGroup("preparing-group");
+        JoinGroupRequestData request = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+            .withGroupId("preparing-group")
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withDefaultProtocolTypeAndProtocols()
+            .build();
+
+        // preparing-group should have 2 members.
+        GroupMetadataManagerTestContext.JoinResult joinResult1 = 
context.sendClassicGroupJoin(request);
+        GroupMetadataManagerTestContext.JoinResult joinResult2 = 
context.sendClassicGroupJoin(request);
+
+        assertFalse(joinResult1.joinFuture.isDone());
+        assertFalse(joinResult2.joinFuture.isDone());
+        assertTrue(preparingGroup.isInState(PREPARING_REBALANCE));
+        assertEquals(2, preparingGroup.size());
+
+        context.onUnloaded();
+
+        assertTrue(emptyGroup.isInState(DEAD));
+        assertTrue(preparingGroup.isInState(DEAD));
+        assertTrue(joinResult1.joinFuture.isDone());
+        assertTrue(joinResult2.joinFuture.isDone());
+        assertEquals(new JoinGroupResponseData()
+            .setMemberId(joinResult1.joinFuture.get().memberId())
+            .setMembers(Collections.emptyList())
+            .setErrorCode(NOT_COORDINATOR.code()), 
joinResult1.joinFuture.get());
+
+        assertEquals(new JoinGroupResponseData()
+            .setMemberId(joinResult2.joinFuture.get().memberId())
+            .setMembers(Collections.emptyList())
+            .setErrorCode(NOT_COORDINATOR.code()), 
joinResult2.joinFuture.get());
+    }
+
+    @Test
+    public void testClassicGroupOnUnloadedCompletingRebalance() throws 
Exception {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+        ClassicGroup group = context.createClassicGroup("group-id");
+
+        // Set up a group in with a leader, follower, and a pending member.
+        // Have the pending member join the group and both the pending member
+        // and the follower sync. We should have 2 members awaiting sync.
+        GroupMetadataManagerTestContext.PendingMemberGroupResult 
pendingGroupResult = context.setupGroupWithPendingMember(group);
+        String pendingMemberId = 
pendingGroupResult.pendingMemberResponse.memberId();
+
+        // Compete join group for the pending member
+        JoinGroupRequestData request = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(pendingMemberId)
+            .withDefaultProtocolTypeAndProtocols()
+            .build();
+
+        GroupMetadataManagerTestContext.JoinResult joinResult = 
context.sendClassicGroupJoin(request);
+
+        assertTrue(joinResult.records.isEmpty());
+        assertTrue(joinResult.joinFuture.isDone());
+        assertEquals(Errors.NONE.code(), 
joinResult.joinFuture.get().errorCode());
+        assertEquals(3, group.allMembers().size());
+        assertEquals(0, group.numPendingJoinMembers());
+
+        // Follower and pending send SyncGroup request.
+        // Follower and pending member should be awaiting sync while the 
leader is pending sync.
+        GroupMetadataManagerTestContext.SyncResult followerSyncResult = 
context.sendClassicGroupSync(
+            new GroupMetadataManagerTestContext.SyncGroupRequestBuilder()
+                .withGroupId("group-id")
+                .withMemberId(pendingGroupResult.followerId)
+                .withGenerationId(joinResult.joinFuture.get().generationId())
+                .build());
+
+        GroupMetadataManagerTestContext.SyncResult pendingMemberSyncResult = 
context.sendClassicGroupSync(
+            new GroupMetadataManagerTestContext.SyncGroupRequestBuilder()
+                .withGroupId("group-id")
+                .withMemberId(pendingMemberId)
+                .withGenerationId(joinResult.joinFuture.get().generationId())
+                .build());
+
+        assertFalse(followerSyncResult.syncFuture.isDone());
+        assertFalse(pendingMemberSyncResult.syncFuture.isDone());
+        assertTrue(group.isInState(COMPLETING_REBALANCE));
+
+        context.onUnloaded();
+
+        assertTrue(group.isInState(DEAD));
+        assertTrue(followerSyncResult.syncFuture.isDone());
+        assertTrue(pendingMemberSyncResult.syncFuture.isDone());
+        assertEquals(new SyncGroupResponseData()
+            .setAssignment(EMPTY_ASSIGNMENT)
+            .setErrorCode(NOT_COORDINATOR.code()), 
followerSyncResult.syncFuture.get());
+        assertEquals(new SyncGroupResponseData()
+            .setAssignment(EMPTY_ASSIGNMENT)
+            .setErrorCode(NOT_COORDINATOR.code()), 
pendingMemberSyncResult.syncFuture.get());
+    }
+
     private static void checkJoinGroupResponse(
         JoinGroupResponseData expectedResponse,
         JoinGroupResponseData actualResponse,
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
index 4a7cd8cae9e..86b0b12d998 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
@@ -1274,4 +1274,8 @@ public class GroupMetadataManagerTestContext {
         lastWrittenOffset++;
         snapshotRegistry.getOrCreateSnapshot(lastWrittenOffset);
     }
+
+    void onUnloaded() {
+        groupMetadataManager.onUnloaded();
+    }
 }

Reply via email to