This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3e3c618bdc9 KAFKA-16313: Offline group protocol migration (#15546)
3e3c618bdc9 is described below

commit 3e3c618bdc90ea225632a6aeecb0e65b5ac8294f
Author: Dongnuo Lyu <139248811+dongnuo...@users.noreply.github.com>
AuthorDate: Wed Mar 20 03:49:11 2024 -0400

    KAFKA-16313: Offline group protocol migration (#15546)
    
    This patch enables an empty classic group to be automatically converted to 
a new consumer group and vice versa.
    
    Reviewers: David Jacot <dja...@confluent.io>
---
 .../coordinator/group/GroupCoordinatorShard.java   |   2 +-
 .../coordinator/group/GroupMetadataManager.java    | 147 ++++++++++++++-----
 .../group/GroupCoordinatorShardTest.java           |   8 +-
 .../group/GroupMetadataManagerTest.java            | 156 +++++++++++++++++++--
 .../group/GroupMetadataManagerTestContext.java     |   4 +-
 5 files changed, 266 insertions(+), 51 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
index b55dd91cc1c..12c194c331b 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
@@ -388,7 +388,7 @@ public class GroupCoordinatorShard implements 
CoordinatorShard<Record> {
             try {
                 groupMetadataManager.validateDeleteGroup(groupId);
                 numDeletedOffsets += 
offsetMetadataManager.deleteAllOffsets(groupId, records);
-                groupMetadataManager.deleteGroup(groupId, records);
+                groupMetadataManager.createGroupTombstoneRecords(groupId, 
records);
                 deletedGroups.add(groupId);
 
                 resultCollection.add(
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 0a789fa9630..9068ad17efc 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -588,7 +588,9 @@ public class GroupMetadataManager {
      *
      * @param groupId           The group id.
      * @param createIfNotExists A boolean indicating whether the group should 
be
-     *                          created if it does not exist.
+     *                          created if it does not exist or is an empty 
classic group.
+     * @param records           The record list to which the group tombstones 
are written
+     *                          if the group is empty and is a classic group.
      *
      * @return A ConsumerGroup.
      * @throws GroupIdNotFoundException if the group does not exist and 
createIfNotExists is false or
@@ -598,7 +600,8 @@ public class GroupMetadataManager {
      */
     ConsumerGroup getOrMaybeCreateConsumerGroup(
         String groupId,
-        boolean createIfNotExists
+        boolean createIfNotExists,
+        List<Record> records
     ) throws GroupIdNotFoundException {
         Group group = groups.get(groupId);
 
@@ -606,7 +609,7 @@ public class GroupMetadataManager {
             throw new GroupIdNotFoundException(String.format("Consumer group 
%s not found.", groupId));
         }
 
-        if (group == null) {
+        if (group == null || (createIfNotExists && 
maybeDeleteEmptyClassicGroup(group, records))) {
             return new ConsumerGroup(snapshotRegistry, groupId, metrics);
         } else {
             if (group.type() == CONSUMER) {
@@ -619,6 +622,40 @@ public class GroupMetadataManager {
         }
     }
 
+    /**
+     * Gets a consumer group by committed offset.
+     *
+     * @param groupId           The group id.
+     * @param committedOffset   A specified committed offset corresponding to 
this shard.
+     *
+     * @return A ConsumerGroup.
+     * @throws GroupIdNotFoundException if the group does not exist or is not 
a consumer group.
+     */
+    public ConsumerGroup consumerGroup(
+        String groupId,
+        long committedOffset
+    ) throws GroupIdNotFoundException {
+        Group group = group(groupId, committedOffset);
+
+        if (group.type() == CONSUMER) {
+            return (ConsumerGroup) group;
+        } else {
+            // We don't support upgrading/downgrading between protocols at the 
moment so
+            // we throw an exception if a group exists with the wrong type.
+            throw new GroupIdNotFoundException(String.format("Group %s is not 
a consumer group.",
+                groupId));
+        }
+    }
+
+    /**
+     * An overloaded method of {@link 
GroupMetadataManager#consumerGroup(String, long)}
+     */
+    ConsumerGroup consumerGroup(
+        String groupId
+    ) throws GroupIdNotFoundException {
+        return consumerGroup(groupId, Long.MAX_VALUE);
+    }
+
     /**
      * The method should be called on the replay path.
      * Gets or maybe creates a consumer group and updates the groups map if a 
new group is created.
@@ -723,31 +760,6 @@ public class GroupMetadataManager {
         }
     }
 
-    /**
-     * Gets a consumer group by committed offset.
-     *
-     * @param groupId           The group id.
-     * @param committedOffset   A specified committed offset corresponding to 
this shard.
-     *
-     * @return A ConsumerGroup.
-     * @throws GroupIdNotFoundException if the group does not exist or is not 
a consumer group.
-     */
-    public ConsumerGroup consumerGroup(
-        String groupId,
-        long committedOffset
-    ) throws GroupIdNotFoundException {
-        Group group = group(groupId, committedOffset);
-
-        if (group.type() == CONSUMER) {
-            return (ConsumerGroup) group;
-        } else {
-            // We don't support upgrading/downgrading between protocols at the 
moment so
-            // we throw an exception if a group exists with the wrong type.
-            throw new GroupIdNotFoundException(String.format("Group %s is not 
a consumer group.",
-                groupId));
-        }
-    }
-
     /**
      * Removes the group.
      *
@@ -1056,7 +1068,7 @@ public class GroupMetadataManager {
 
         // Get or create the consumer group.
         boolean createIfNotExists = memberEpoch == 0;
-        final ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, 
createIfNotExists);
+        final ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, 
createIfNotExists, records);
         throwIfConsumerGroupIsFull(group, memberId);
 
         // Get or create the member.
@@ -1324,7 +1336,7 @@ public class GroupMetadataManager {
         String memberId,
         int memberEpoch
     ) throws ApiException {
-        ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false);
+        ConsumerGroup group = consumerGroup(groupId);
         List<Record> records;
         if (instanceId == null) {
             ConsumerGroupMember member = 
group.getOrMaybeCreateMember(memberId, false);
@@ -1449,7 +1461,7 @@ public class GroupMetadataManager {
         String key = consumerGroupSessionTimeoutKey(groupId, memberId);
         timer.schedule(key, consumerGroupSessionTimeoutMs, 
TimeUnit.MILLISECONDS, true, () -> {
             try {
-                ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, 
false);
+                ConsumerGroup group = consumerGroup(groupId);
                 ConsumerGroupMember member = 
group.getOrMaybeCreateMember(memberId, false);
                 log.info("[GroupId {}] Member {} fenced from the group because 
its session expired.",
                     groupId, memberId);
@@ -1496,7 +1508,7 @@ public class GroupMetadataManager {
         String key = consumerGroupRebalanceTimeoutKey(groupId, memberId);
         timer.schedule(key, rebalanceTimeoutMs, TimeUnit.MILLISECONDS, true, 
() -> {
             try {
-                ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, 
false);
+                ConsumerGroup group = consumerGroup(groupId);
                 ConsumerGroupMember member = 
group.getOrMaybeCreateMember(memberId, false);
 
                 if (member.memberEpoch() == memberEpoch) {
@@ -2000,6 +2012,7 @@ public class GroupMetadataManager {
         CompletableFuture<JoinGroupResponseData> responseFuture
     ) {
         CoordinatorResult<Void, Record> result = EMPTY_RESULT;
+        List<Record> records = new ArrayList<>();
 
         String groupId = request.groupId();
         String memberId = request.memberId();
@@ -2017,6 +2030,7 @@ public class GroupMetadataManager {
             // Group is created if it does not exist and the member id is 
UNKNOWN. if member
             // is specified but group does not exist, request is rejected with 
GROUP_ID_NOT_FOUND
             ClassicGroup group;
+            maybeDeleteEmptyConsumerGroup(groupId, records);
             boolean isNewGroup = !groups.containsKey(groupId);
             try {
                 group = getOrMaybeCreateClassicGroup(groupId, isUnknownMember);
@@ -2065,7 +2079,7 @@ public class GroupMetadataManager {
                     }
                 });
 
-                List<Record> records = Collections.singletonList(
+                records.add(
                     RecordHelpers.newEmptyGroupMetadataRecord(group, 
metadataImage.features().metadataVersion())
                 );
 
@@ -3547,12 +3561,25 @@ public class GroupMetadataManager {
      * @param groupId The id of the group to be deleted. It has been checked 
in {@link GroupMetadataManager#validateDeleteGroup}.
      * @param records The record list to populate.
      */
-    public void deleteGroup(
+    public void createGroupTombstoneRecords(
         String groupId,
         List<Record> records
     ) {
         // At this point, we have already validated the group id, so we know 
that the group exists and that no exception will be thrown.
-        group(groupId).createGroupTombstoneRecords(records);
+        createGroupTombstoneRecords(group(groupId), records);
+    }
+
+    /**
+     * Populates the record list passed in with record to update the state 
machine.
+     *
+     * @param group The group to be deleted.
+     * @param records The record list to populate.
+     */
+    public void createGroupTombstoneRecords(
+        Group group,
+        List<Record> records
+    ) {
+        group.createGroupTombstoneRecords(records);
     }
 
     /**
@@ -3574,7 +3601,55 @@ public class GroupMetadataManager {
     public void maybeDeleteGroup(String groupId, List<Record> records) {
         Group group = groups.get(groupId);
         if (group != null && group.isEmpty()) {
-            deleteGroup(groupId, records);
+            createGroupTombstoneRecords(groupId, records);
+        }
+    }
+
+    /**
+     * @return true if the group is an empty classic group.
+     */
+    private static boolean isEmptyClassicGroup(Group group) {
+        return group != null && group.type() == CLASSIC && group.isEmpty();
+    }
+
+    /**
+     * @return true if the group is an empty consumer group.
+     */
+    private static boolean isEmptyConsumerGroup(Group group) {
+        return group != null && group.type() == CONSUMER && group.isEmpty();
+    }
+
+    /**
+     * Write tombstones for the group if it's empty and is a classic group.
+     *
+     * @param group     The group to be deleted.
+     * @param records   The list of records to delete the group.
+     *
+     * @return true if the group is empty
+     */
+    private boolean maybeDeleteEmptyClassicGroup(Group group, List<Record> 
records) {
+        if (isEmptyClassicGroup(group)) {
+            // Delete the classic group by adding tombstones.
+            // There's no need to remove the group as the replay of tombstones 
removes it.
+            if (group != null) createGroupTombstoneRecords(group, records);
+            return true;
+        }
+        return false;
+    }
+
+    /**
+     * Delete and write tombstones for the group if it's empty and is a 
consumer group.
+     *
+     * @param groupId The group id to be deleted.
+     * @param records The list of records to delete the group.
+     */
+    private void maybeDeleteEmptyConsumerGroup(String groupId, List<Record> 
records) {
+        Group group = groups.get(groupId, Long.MAX_VALUE);
+        if (isEmptyConsumerGroup(group)) {
+            // Add tombstones for the previous consumer group. The tombstones 
won't actually be
+            // replayed because its coordinator result has a non-null 
appendFuture.
+            createGroupTombstoneRecords(group, records);
+            removeGroup(groupId);
         }
     }
 
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
index 1e6930a8029..59868f36f10 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
@@ -225,14 +225,14 @@ public class GroupCoordinatorShardTest {
             List<Record> records = invocation.getArgument(1);
             
records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId));
             return null;
-        }).when(groupMetadataManager).deleteGroup(anyString(), anyList());
+        }).when(groupMetadataManager).createGroupTombstoneRecords(anyString(), 
anyList());
 
         
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection, 
Record> coordinatorResult =
             coordinator.deleteGroups(context, groupIds);
 
         for (String groupId : groupIds) {
             verify(groupMetadataManager, 
times(1)).validateDeleteGroup(ArgumentMatchers.eq(groupId));
-            verify(groupMetadataManager, 
times(1)).deleteGroup(ArgumentMatchers.eq(groupId), anyList());
+            verify(groupMetadataManager, 
times(1)).createGroupTombstoneRecords(ArgumentMatchers.eq(groupId), anyList());
             verify(offsetMetadataManager, 
times(1)).deleteAllOffsets(ArgumentMatchers.eq(groupId), anyList());
         }
         assertEquals(expectedResult, coordinatorResult);
@@ -291,7 +291,7 @@ public class GroupCoordinatorShardTest {
             List<Record> records = invocation.getArgument(1);
             
records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId));
             return null;
-        }).when(groupMetadataManager).deleteGroup(anyString(), anyList());
+        }).when(groupMetadataManager).createGroupTombstoneRecords(anyString(), 
anyList());
 
         
CoordinatorResult<DeleteGroupsResponseData.DeletableGroupResultCollection, 
Record> coordinatorResult =
             coordinator.deleteGroups(context, groupIds);
@@ -299,7 +299,7 @@ public class GroupCoordinatorShardTest {
         for (String groupId : groupIds) {
             verify(groupMetadataManager, 
times(1)).validateDeleteGroup(eq(groupId));
             if (!groupId.equals("group-id-2")) {
-                verify(groupMetadataManager, 
times(1)).deleteGroup(eq(groupId), anyList());
+                verify(groupMetadataManager, 
times(1)).createGroupTombstoneRecords(eq(groupId), anyList());
                 verify(offsetMetadataManager, 
times(1)).deleteAllOffsets(eq(groupId), anyList());
             }
         }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 43703059915..dc21a2140d2 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -434,7 +434,7 @@ public class GroupMetadataManagerTest {
         ));
 
         assertThrows(GroupIdNotFoundException.class, () ->
-            
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false));
+            context.groupMetadataManager.consumerGroup(groupId));
 
         CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = 
context.consumerGroupHeartbeat(
             new ConsumerGroupHeartbeatRequestData()
@@ -2382,7 +2382,7 @@ public class GroupMetadataManagerTest {
 
         // The metadata refresh flag should be true.
         ConsumerGroup consumerGroup = context.groupMetadataManager
-            .getOrMaybeCreateConsumerGroup(groupId, false);
+            .consumerGroup(groupId);
         
assertTrue(consumerGroup.hasMetadataExpired(context.time.milliseconds()));
 
         // Prepare the assignment result.
@@ -2493,7 +2493,7 @@ public class GroupMetadataManagerTest {
 
         // The metadata refresh flag should be true.
         ConsumerGroup consumerGroup = context.groupMetadataManager
-            .getOrMaybeCreateConsumerGroup(groupId, false);
+            .consumerGroup(groupId);
         
assertTrue(consumerGroup.hasMetadataExpired(context.time.milliseconds()));
 
         // Prepare the assignment result.
@@ -2731,7 +2731,7 @@ public class GroupMetadataManagerTest {
 
         // Ensures that all refresh flags are set to the future.
         Arrays.asList("group1", "group2", "group3", "group4", 
"group5").forEach(groupId -> {
-            ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false);
+            ConsumerGroup group = 
context.groupMetadataManager.consumerGroup(groupId);
             group.setMetadataRefreshDeadline(context.time.milliseconds() + 
5000L, 0);
             assertFalse(group.hasMetadataExpired(context.time.milliseconds()));
         });
@@ -2768,12 +2768,12 @@ public class GroupMetadataManagerTest {
 
         // Verify the groups.
         Arrays.asList("group1", "group2", "group3", "group4").forEach(groupId 
-> {
-            ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false);
+            ConsumerGroup group = 
context.groupMetadataManager.consumerGroup(groupId);
             assertTrue(group.hasMetadataExpired(context.time.milliseconds()));
         });
 
         Collections.singletonList("group5").forEach(groupId -> {
-            ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false);
+            ConsumerGroup group = 
context.groupMetadataManager.consumerGroup(groupId);
             assertFalse(group.hasMetadataExpired(context.time.milliseconds()));
         });
 
@@ -9167,7 +9167,7 @@ public class GroupMetadataManagerTest {
 
         List<Record> expectedRecords = 
Collections.singletonList(RecordHelpers.newGroupMetadataTombstoneRecord("group-id"));
         List<Record> records = new ArrayList<>();
-        context.groupMetadataManager.deleteGroup("group-id", records);
+        context.groupMetadataManager.createGroupTombstoneRecords("group-id", 
records);
         assertEquals(expectedRecords, records);
     }
 
@@ -9205,7 +9205,7 @@ public class GroupMetadataManagerTest {
             RecordHelpers.newGroupEpochTombstoneRecord(groupId)
         );
         List<Record> records = new ArrayList<>();
-        context.groupMetadataManager.deleteGroup(groupId, records);
+        context.groupMetadataManager.createGroupTombstoneRecords("group-id", 
records);
         assertEquals(expectedRecords, records);
     }
 
@@ -9380,6 +9380,146 @@ public class GroupMetadataManagerTest {
         verify(context.metrics, 
times(1)).onConsumerGroupStateTransition(ConsumerGroup.ConsumerGroupState.EMPTY,
 null);
     }
 
+    @Test
+    public void testConsumerGroupHeartbeatWithNonEmptyClassicGroup() {
+        String classicGroupId = "classic-group-id";
+        String memberId = Uuid.randomUuid().toString();
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        assignor.prepareGroupAssignment(new 
GroupAssignment(Collections.emptyMap()));
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .build();
+        ClassicGroup classicGroup = new ClassicGroup(
+            new LogContext(),
+            classicGroupId,
+            EMPTY,
+            context.time,
+            context.metrics
+        );
+        context.replay(RecordHelpers.newGroupMetadataRecord(classicGroup, 
classicGroup.groupAssignment(), MetadataVersion.latestTesting()));
+
+        
context.groupMetadataManager.getOrMaybeCreateClassicGroup(classicGroupId, 
false).transitionTo(PREPARING_REBALANCE);
+        assertThrows(GroupIdNotFoundException.class, () ->
+            context.consumerGroupHeartbeat(
+                new ConsumerGroupHeartbeatRequestData()
+                    .setGroupId(classicGroupId)
+                    .setMemberId(memberId)
+                    .setMemberEpoch(0)
+                    .setServerAssignor("range")
+                    .setRebalanceTimeoutMs(5000)
+                    .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                    .setTopicPartitions(Collections.emptyList())));
+    }
+
+    @Test
+    public void testConsumerGroupHeartbeatWithEmptyClassicGroup() {
+        String classicGroupId = "classic-group-id";
+        String memberId = Uuid.randomUuid().toString();
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        assignor.prepareGroupAssignment(new 
GroupAssignment(Collections.emptyMap()));
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .build();
+        ClassicGroup classicGroup = new ClassicGroup(
+            new LogContext(),
+            classicGroupId,
+            EMPTY,
+            context.time,
+            context.metrics
+        );
+        context.replay(RecordHelpers.newGroupMetadataRecord(classicGroup, 
classicGroup.groupAssignment(), MetadataVersion.latestTesting()));
+
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = 
context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(classicGroupId)
+                .setMemberId(memberId)
+                .setMemberEpoch(0)
+                .setServerAssignor("range")
+                .setRebalanceTimeoutMs(5000)
+                .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                .setTopicPartitions(Collections.emptyList()));
+
+        ConsumerGroupMember expectedMember = new 
ConsumerGroupMember.Builder(memberId)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(1)
+            .setPreviousMemberEpoch(0)
+            .setRebalanceTimeoutMs(5000)
+            .setClientId("client")
+            .setClientHost("localhost/127.0.0.1")
+            .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+            .setServerAssignorName("range")
+            .setAssignedPartitions(Collections.emptyMap())
+            .build();
+
+        assertEquals(Errors.NONE.code(), result.response().errorCode());
+        assertEquals(
+            Arrays.asList(
+                RecordHelpers.newGroupMetadataTombstoneRecord(classicGroupId),
+                RecordHelpers.newMemberSubscriptionRecord(classicGroupId, 
expectedMember),
+                RecordHelpers.newGroupEpochRecord(classicGroupId, 1),
+                RecordHelpers.newTargetAssignmentRecord(classicGroupId, 
memberId, Collections.emptyMap()),
+                RecordHelpers.newTargetAssignmentEpochRecord(classicGroupId, 
1),
+                RecordHelpers.newCurrentAssignmentRecord(classicGroupId, 
expectedMember)
+            ),
+            result.records()
+        );
+        assertEquals(
+            Group.GroupType.CONSUMER,
+            context.groupMetadataManager.consumerGroup(classicGroupId).type()
+        );
+    }
+
+    @Test
+    public void testClassicGroupJoinWithNonEmptyConsumerGroup() throws 
Exception {
+        String consumerGroupId = "consumer-group-id";
+        String memberId = Uuid.randomUuid().toString();
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withConsumerGroup(new ConsumerGroupBuilder(consumerGroupId, 10)
+            .withMember(new ConsumerGroupMember.Builder(memberId)
+                .setState(MemberState.STABLE)
+                .setMemberEpoch(10)
+                .setPreviousMemberEpoch(10)
+                .build()))
+            .build();
+
+        JoinGroupRequestData request = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+            .withGroupId(consumerGroupId)
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withDefaultProtocolTypeAndProtocols()
+            .build();
+
+        GroupMetadataManagerTestContext.JoinResult joinResult = 
context.sendClassicGroupJoin(request);
+        assertEquals(Errors.GROUP_ID_NOT_FOUND.code(), 
joinResult.joinFuture.get().errorCode());
+    }
+
+    @Test
+    public void testClassicGroupJoinWithEmptyConsumerGroup() throws Exception {
+        String consumerGroupId = "consumer-group-id";
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withConsumerGroup(new ConsumerGroupBuilder(consumerGroupId, 10))
+            .build();
+
+        JoinGroupRequestData request = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+            .withGroupId(consumerGroupId)
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withDefaultProtocolTypeAndProtocols()
+            .build();
+        GroupMetadataManagerTestContext.JoinResult joinResult = 
context.sendClassicGroupJoin(request, true);
+
+        List<Record> expectedRecords = Arrays.asList(
+            
RecordHelpers.newTargetAssignmentEpochTombstoneRecord(consumerGroupId),
+            
RecordHelpers.newGroupSubscriptionMetadataTombstoneRecord(consumerGroupId),
+            RecordHelpers.newGroupEpochTombstoneRecord(consumerGroupId)
+        );
+
+        assertEquals(Errors.MEMBER_ID_REQUIRED.code(), 
joinResult.joinFuture.get().errorCode());
+        assertEquals(expectedRecords, joinResult.records.subList(0, 
expectedRecords.size()));
+        assertEquals(
+            Group.GroupType.CLASSIC,
+            
context.groupMetadataManager.getOrMaybeCreateClassicGroup(consumerGroupId, 
false).type()
+        );
+    }
+
     private static void checkJoinGroupResponse(
         JoinGroupResponseData expectedResponse,
         JoinGroupResponseData actualResponse,
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
index a02359ffed1..4a7cd8cae9e 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
@@ -483,7 +483,7 @@ public class GroupMetadataManagerTestContext {
         String groupId
     ) {
         return groupMetadataManager
-            .getOrMaybeCreateConsumerGroup(groupId, false)
+            .consumerGroup(groupId)
             .state();
     }
 
@@ -492,7 +492,7 @@ public class GroupMetadataManagerTestContext {
         String memberId
     ) {
         return groupMetadataManager
-            .getOrMaybeCreateConsumerGroup(groupId, false)
+            .consumerGroup(groupId)
             .getOrMaybeCreateMember(memberId, false)
             .state();
     }

Reply via email to