dajac commented on code in PR #15721:
URL: https://github.com/apache/kafka/pull/15721#discussion_r1568737120


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -775,6 +777,126 @@ public ClassicGroup classicGroup(
         }
     }
 
+    public boolean validateOnlineDowngrade(ConsumerGroup consumerGroup, String 
memberId) {
+        if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) {
+            log.info("Cannot downgrade consumer group {} to classic group 
because the online downgrade is disabled.",
+                consumerGroup.groupId());
+            return false;
+        } else if 
(!consumerGroup.allMembersUseClassicProtocolExcept(memberId)) {
+            log.debug("Cannot downgrade consumer group {} to classic group 
because not all its members use the classic protocol.",
+                consumerGroup.groupId());
+            return false;
+        } else if (consumerGroup.numMembers() <= 1) {
+            log.info("Skip downgrading the consumer group {} to classic group 
because it's empty.",
+                consumerGroup.groupId());
+            return false;
+        } else if (consumerGroup.numMembers() - 1 > classicGroupMaxSize) {
+            log.info("Cannot downgrade consumer group {} to classic group 
because its group size is greater than classic group max size.",
+                consumerGroup.groupId());
+        }
+        return true;
+    }
+
+    public CompletableFuture<Void> convertToClassicGroup(ConsumerGroup 
consumerGroup, String leavingMemberId, List<Record> records) {
+        consumerGroup.createGroupTombstoneRecords(records);
+        ClassicGroup classicGroup;

Review Comment:
   nit: Let's add an empty line before this one.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -775,6 +777,126 @@ public ClassicGroup classicGroup(
         }
     }
 
+    public boolean validateOnlineDowngrade(ConsumerGroup consumerGroup, String 
memberId) {
+        if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) {
+            log.info("Cannot downgrade consumer group {} to classic group 
because the online downgrade is disabled.",
+                consumerGroup.groupId());
+            return false;
+        } else if 
(!consumerGroup.allMembersUseClassicProtocolExcept(memberId)) {
+            log.debug("Cannot downgrade consumer group {} to classic group 
because not all its members use the classic protocol.",
+                consumerGroup.groupId());
+            return false;
+        } else if (consumerGroup.numMembers() <= 1) {
+            log.info("Skip downgrading the consumer group {} to classic group 
because it's empty.",
+                consumerGroup.groupId());
+            return false;
+        } else if (consumerGroup.numMembers() - 1 > classicGroupMaxSize) {
+            log.info("Cannot downgrade consumer group {} to classic group 
because its group size is greater than classic group max size.",
+                consumerGroup.groupId());
+        }
+        return true;
+    }
+
+    public CompletableFuture<Void> convertToClassicGroup(ConsumerGroup 
consumerGroup, String leavingMemberId, List<Record> records) {

Review Comment:
   Does it need to be public? Let's add some javadoc please.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -9623,6 +10332,186 @@ public void 
testClassicGroupOnUnloadedCompletingRebalance() throws Exception {
             .setErrorCode(NOT_COORDINATOR.code()), 
pendingMemberSyncResult.syncFuture.get());
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testLastClassicProtocolMemberLeavingConsumerGroup(boolean 
appendLogSuccessfully) {

Review Comment:
   Should we also test the session expiration path and the rebalance expiration 
path?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -998,4 +1094,232 @@ public ConsumerGroupDescribeResponseData.DescribedGroup 
asDescribedGroup(
         );
         return describedGroup;
     }
+
+    /**
+     * Create a new consumer group according to the given classic group.
+     *
+     * @param snapshotRegistry  The SnapshotRegistry.
+     * @param metrics           The GroupCoordinatorMetricsShard.
+     * @param classicGroup      The converted classic group.
+     * @param topicsImage       The TopicsImage for topic id and topic name 
conversion.
+     * @return  The created ConsumerGruop.
+     */
+    public static ConsumerGroup fromClassicGroup(
+        SnapshotRegistry snapshotRegistry,
+        GroupCoordinatorMetricsShard metrics,
+        ClassicGroup classicGroup,
+        TopicsImage topicsImage
+    ) {
+        String groupId = classicGroup.groupId();
+        ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, 
groupId, metrics);
+        consumerGroup.setGroupEpoch(classicGroup.generationId());
+        consumerGroup.setTargetAssignmentEpoch(classicGroup.generationId());
+
+        classicGroup.allMembers().forEach(classicGroupMember -> {
+            ConsumerPartitionAssignor.Assignment assignment = 
ConsumerProtocol.deserializeAssignment(
+                ByteBuffer.wrap(classicGroupMember.assignment())
+            );
+            Map<Uuid, Set<Integer>> partitions = 
topicPartitionMapFromList(assignment.partitions(), topicsImage);
+
+            ConsumerPartitionAssignor.Subscription subscription = 
ConsumerProtocol.deserializeSubscription(
+                
ByteBuffer.wrap(classicGroupMember.metadata(classicGroup.protocolName().get()))
+            );
+
+            // The target assignment and the assigned partitions of each 
member are set based on the last
+            // assignment of the classic group. All the members are put in the 
Stable state. If the classic
+            // group was in Preparing Rebalance or Completing Rebalance 
states, the classic members are
+            // asked to rejoin the group to re-trigger a rebalance or collect 
their assignments.
+            ConsumerGroupMember newMember = new 
ConsumerGroupMember.Builder(classicGroupMember.memberId())
+                .setMemberEpoch(classicGroup.generationId())
+                .setState(MemberState.STABLE)
+                .setPreviousMemberEpoch(classicGroup.generationId())
+                
.setInstanceId(classicGroupMember.groupInstanceId().orElse(null))
+                .setRackId(subscription.rackId().orElse(null))
+                .setRebalanceTimeoutMs(classicGroupMember.rebalanceTimeoutMs())
+                .setClientId(classicGroupMember.clientId())
+                .setClientHost(classicGroupMember.clientHost())
+                .setSubscribedTopicNames(subscription.topics())
+                .setAssignedPartitions(partitions)
+                
.setSupportedClassicProtocols(classicGroupMember.supportedProtocols())
+                .build();
+            consumerGroup.updateTargetAssignment(newMember.memberId(), new 
Assignment(partitions));
+            consumerGroup.updateMember(newMember);
+        });
+
+        return consumerGroup;
+    }
+
+    /**
+     * Populate the record list with the records needed to create the given 
consumer group.
+     *
+     * @param records The list to which the new records are added.
+     */
+    public void createConsumerGroupRecords(
+        List<Record> records
+    ) {
+        members().forEach((__, consumerGroupMember) ->
+            records.add(RecordHelpers.newMemberSubscriptionRecord(groupId(), 
consumerGroupMember))
+        );
+
+        records.add(RecordHelpers.newGroupEpochRecord(groupId(), 
groupEpoch()));
+
+        members().forEach((consumerGroupMemberId, consumerGroupMember) ->
+            records.add(RecordHelpers.newTargetAssignmentRecord(
+                groupId(),
+                consumerGroupMemberId,
+                targetAssignment(consumerGroupMemberId).partitions()
+            ))
+        );
+
+        records.add(RecordHelpers.newTargetAssignmentEpochRecord(groupId(), 
groupEpoch()));
+
+        members().forEach((__, consumerGroupMember) ->
+            records.add(RecordHelpers.newCurrentAssignmentRecord(groupId(), 
consumerGroupMember))
+        );
+    }
+
+    /**
+     * @return The map of topic id and partition set converted from the list 
of TopicPartition.
+     */
+    private static Map<Uuid, Set<Integer>> topicPartitionMapFromList(
+        List<TopicPartition> partitions,
+        TopicsImage topicsImage
+    ) {
+        Map<Uuid, Set<Integer>> topicPartitionMap = new HashMap<>();
+        partitions.forEach(topicPartition -> {
+            TopicImage topicImage = 
topicsImage.getTopic(topicPartition.topic());
+            if (topicImage != null) {
+                topicPartitionMap
+                    .computeIfAbsent(topicImage.id(), __ -> new HashSet<>())
+                    .add(topicPartition.partition());
+            }
+        });
+        return topicPartitionMap;
+    }
+
+    /**
+     * Create a corresponding ClassicGroup and append the record for the 
creation for group downgrade.
+     * The member with leavingMemberId will not be converted to the new 
ClassicGroup as it's the last
+     * member using new consumer protocol that left and triggered the 
downgrade.
+     *
+     * @param leavingMemberId               The member that will not be 
converted in the ClassicGroup.
+     * @param logContext                    The logContext to create the 
ClassicGroup.
+     * @param time                          The time to create the 
ClassicGroup.
+     * @param consumerGroupSessionTimeoutMs The consumerGroupSessionTimeoutMs.
+     * @param metadataImage                 The metadataImage.
+     * @param records                       The record list.
+     * @return  The created ClassicGroup.
+     */
+    public ClassicGroup toClassicGroup(
+        String leavingMemberId,
+        LogContext logContext,
+        Time time,
+        int consumerGroupSessionTimeoutMs,
+        MetadataImage metadataImage,
+        List<Record> records
+    ) {
+        ClassicGroup classicGroup = new ClassicGroup(
+            logContext,
+            groupId(),
+            ClassicGroupState.STABLE,
+            time,
+            metrics,
+            groupEpoch(),
+            Optional.ofNullable(ConsumerProtocol.PROTOCOL_TYPE),
+            Optional.empty(),
+            members().keySet().stream().filter(member -> 
!member.equals(leavingMemberId)).findAny(),

Review Comment:
   I think that we could use `Optional.empty()` here because a leader will be 
picked when the members are added.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -775,6 +777,126 @@ public ClassicGroup classicGroup(
         }
     }
 
+    public boolean validateOnlineDowngrade(ConsumerGroup consumerGroup, String 
memberId) {
+        if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) {
+            log.info("Cannot downgrade consumer group {} to classic group 
because the online downgrade is disabled.",
+                consumerGroup.groupId());
+            return false;
+        } else if 
(!consumerGroup.allMembersUseClassicProtocolExcept(memberId)) {
+            log.debug("Cannot downgrade consumer group {} to classic group 
because not all its members use the classic protocol.",
+                consumerGroup.groupId());
+            return false;
+        } else if (consumerGroup.numMembers() <= 1) {
+            log.info("Skip downgrading the consumer group {} to classic group 
because it's empty.",
+                consumerGroup.groupId());
+            return false;
+        } else if (consumerGroup.numMembers() - 1 > classicGroupMaxSize) {
+            log.info("Cannot downgrade consumer group {} to classic group 
because its group size is greater than classic group max size.",
+                consumerGroup.groupId());
+        }
+        return true;
+    }
+
+    public CompletableFuture<Void> convertToClassicGroup(ConsumerGroup 
consumerGroup, String leavingMemberId, List<Record> records) {
+        consumerGroup.createGroupTombstoneRecords(records);
+        ClassicGroup classicGroup;
+        try {
+            classicGroup = consumerGroup.toClassicGroup(
+                leavingMemberId,
+                logContext,
+                time,
+                consumerGroupSessionTimeoutMs,
+                metadataImage,
+                records
+            );
+        } catch (SchemaException e) {
+            log.warn("Cannot downgrade the consumer group " + 
consumerGroup.groupId() + ": fail to parse " +
+                "the Consumer Protocol " + ConsumerProtocol.PROTOCOL_TYPE + 
".", e);
+
+            throw new GroupIdNotFoundException(String.format("Cannot downgrade 
the classic group %s: %s.",
+                consumerGroup.groupId(), e.getMessage()));
+        }
+
+        groups.put(consumerGroup.groupId(), classicGroup);
+        metrics.onClassicGroupStateTransition(null, 
classicGroup.currentState());
+
+        CompletableFuture<Void> appendFuture = new CompletableFuture<>();
+        appendFuture.whenComplete((__, t) -> {
+            if (t == null) {
+                classicGroup.allMembers().forEach(member -> 
rescheduleClassicGroupMemberHeartbeat(classicGroup, member));
+                prepareRebalance(classicGroup, String.format("Downgrade group 
%s.", classicGroup.groupId()));

Review Comment:
   I am not sure to understand why we do this here. In my opinion, it is better 
to create all the state immediately. However, if t != null, I think that we 
need to revert changes made by `onClassicGroupStateTransition` at L821.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -775,6 +777,126 @@ public ClassicGroup classicGroup(
         }
     }
 
+    public boolean validateOnlineDowngrade(ConsumerGroup consumerGroup, String 
memberId) {
+        if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) {
+            log.info("Cannot downgrade consumer group {} to classic group 
because the online downgrade is disabled.",
+                consumerGroup.groupId());
+            return false;
+        } else if 
(!consumerGroup.allMembersUseClassicProtocolExcept(memberId)) {
+            log.debug("Cannot downgrade consumer group {} to classic group 
because not all its members use the classic protocol.",
+                consumerGroup.groupId());
+            return false;
+        } else if (consumerGroup.numMembers() <= 1) {
+            log.info("Skip downgrading the consumer group {} to classic group 
because it's empty.",
+                consumerGroup.groupId());
+            return false;
+        } else if (consumerGroup.numMembers() - 1 > classicGroupMaxSize) {
+            log.info("Cannot downgrade consumer group {} to classic group 
because its group size is greater than classic group max size.",
+                consumerGroup.groupId());
+        }
+        return true;
+    }
+
+    public CompletableFuture<Void> convertToClassicGroup(ConsumerGroup 
consumerGroup, String leavingMemberId, List<Record> records) {
+        consumerGroup.createGroupTombstoneRecords(records);
+        ClassicGroup classicGroup;
+        try {
+            classicGroup = consumerGroup.toClassicGroup(

Review Comment:
   I wonder if we should rather follower the same structure than we used in 
this other PR. I mean that we could have a static method in ClassicGroup and we 
could have a createClassicGroupRecords. What do you think?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1369,11 +1493,16 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
                 log.info("[GroupId {}] Static Member {} with instance id {} 
left the consumer group.",
                     group.groupId(), memberId, instanceId);
                 records = consumerGroupFenceMember(group, member);
+                if (validateOnlineDowngrade(group, memberId)) appendFuture = 
convertToClassicGroup(group, memberId, records);
             }
         }
-        return new CoordinatorResult<>(records, new 
ConsumerGroupHeartbeatResponseData()
-            .setMemberId(memberId)
-            .setMemberEpoch(memberEpoch));
+        return new CoordinatorResult<>(

Review Comment:
   Could we extract the code from both branches and have it here?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -998,4 +1094,232 @@ public ConsumerGroupDescribeResponseData.DescribedGroup 
asDescribedGroup(
         );
         return describedGroup;
     }
+
+    /**
+     * Create a new consumer group according to the given classic group.
+     *
+     * @param snapshotRegistry  The SnapshotRegistry.
+     * @param metrics           The GroupCoordinatorMetricsShard.
+     * @param classicGroup      The converted classic group.
+     * @param topicsImage       The TopicsImage for topic id and topic name 
conversion.
+     * @return  The created ConsumerGruop.
+     */
+    public static ConsumerGroup fromClassicGroup(
+        SnapshotRegistry snapshotRegistry,
+        GroupCoordinatorMetricsShard metrics,
+        ClassicGroup classicGroup,
+        TopicsImage topicsImage
+    ) {
+        String groupId = classicGroup.groupId();
+        ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, 
groupId, metrics);
+        consumerGroup.setGroupEpoch(classicGroup.generationId());
+        consumerGroup.setTargetAssignmentEpoch(classicGroup.generationId());
+
+        classicGroup.allMembers().forEach(classicGroupMember -> {
+            ConsumerPartitionAssignor.Assignment assignment = 
ConsumerProtocol.deserializeAssignment(
+                ByteBuffer.wrap(classicGroupMember.assignment())
+            );
+            Map<Uuid, Set<Integer>> partitions = 
topicPartitionMapFromList(assignment.partitions(), topicsImage);
+
+            ConsumerPartitionAssignor.Subscription subscription = 
ConsumerProtocol.deserializeSubscription(
+                
ByteBuffer.wrap(classicGroupMember.metadata(classicGroup.protocolName().get()))
+            );
+
+            // The target assignment and the assigned partitions of each 
member are set based on the last
+            // assignment of the classic group. All the members are put in the 
Stable state. If the classic
+            // group was in Preparing Rebalance or Completing Rebalance 
states, the classic members are
+            // asked to rejoin the group to re-trigger a rebalance or collect 
their assignments.
+            ConsumerGroupMember newMember = new 
ConsumerGroupMember.Builder(classicGroupMember.memberId())
+                .setMemberEpoch(classicGroup.generationId())
+                .setState(MemberState.STABLE)
+                .setPreviousMemberEpoch(classicGroup.generationId())
+                
.setInstanceId(classicGroupMember.groupInstanceId().orElse(null))
+                .setRackId(subscription.rackId().orElse(null))
+                .setRebalanceTimeoutMs(classicGroupMember.rebalanceTimeoutMs())
+                .setClientId(classicGroupMember.clientId())
+                .setClientHost(classicGroupMember.clientHost())
+                .setSubscribedTopicNames(subscription.topics())
+                .setAssignedPartitions(partitions)
+                
.setSupportedClassicProtocols(classicGroupMember.supportedProtocols())
+                .build();
+            consumerGroup.updateTargetAssignment(newMember.memberId(), new 
Assignment(partitions));
+            consumerGroup.updateMember(newMember);
+        });
+
+        return consumerGroup;
+    }
+
+    /**
+     * Populate the record list with the records needed to create the given 
consumer group.
+     *
+     * @param records The list to which the new records are added.
+     */
+    public void createConsumerGroupRecords(
+        List<Record> records
+    ) {
+        members().forEach((__, consumerGroupMember) ->
+            records.add(RecordHelpers.newMemberSubscriptionRecord(groupId(), 
consumerGroupMember))
+        );
+
+        records.add(RecordHelpers.newGroupEpochRecord(groupId(), 
groupEpoch()));
+
+        members().forEach((consumerGroupMemberId, consumerGroupMember) ->
+            records.add(RecordHelpers.newTargetAssignmentRecord(
+                groupId(),
+                consumerGroupMemberId,
+                targetAssignment(consumerGroupMemberId).partitions()
+            ))
+        );
+
+        records.add(RecordHelpers.newTargetAssignmentEpochRecord(groupId(), 
groupEpoch()));
+
+        members().forEach((__, consumerGroupMember) ->
+            records.add(RecordHelpers.newCurrentAssignmentRecord(groupId(), 
consumerGroupMember))
+        );
+    }
+
+    /**
+     * @return The map of topic id and partition set converted from the list 
of TopicPartition.
+     */
+    private static Map<Uuid, Set<Integer>> topicPartitionMapFromList(
+        List<TopicPartition> partitions,
+        TopicsImage topicsImage
+    ) {
+        Map<Uuid, Set<Integer>> topicPartitionMap = new HashMap<>();
+        partitions.forEach(topicPartition -> {
+            TopicImage topicImage = 
topicsImage.getTopic(topicPartition.topic());
+            if (topicImage != null) {
+                topicPartitionMap
+                    .computeIfAbsent(topicImage.id(), __ -> new HashSet<>())
+                    .add(topicPartition.partition());
+            }
+        });
+        return topicPartitionMap;
+    }
+
+    /**
+     * Create a corresponding ClassicGroup and append the record for the 
creation for group downgrade.
+     * The member with leavingMemberId will not be converted to the new 
ClassicGroup as it's the last
+     * member using new consumer protocol that left and triggered the 
downgrade.
+     *
+     * @param leavingMemberId               The member that will not be 
converted in the ClassicGroup.
+     * @param logContext                    The logContext to create the 
ClassicGroup.
+     * @param time                          The time to create the 
ClassicGroup.
+     * @param consumerGroupSessionTimeoutMs The consumerGroupSessionTimeoutMs.
+     * @param metadataImage                 The metadataImage.
+     * @param records                       The record list.
+     * @return  The created ClassicGroup.
+     */
+    public ClassicGroup toClassicGroup(
+        String leavingMemberId,
+        LogContext logContext,
+        Time time,
+        int consumerGroupSessionTimeoutMs,
+        MetadataImage metadataImage,
+        List<Record> records
+    ) {
+        ClassicGroup classicGroup = new ClassicGroup(
+            logContext,
+            groupId(),
+            ClassicGroupState.STABLE,
+            time,
+            metrics,
+            groupEpoch(),
+            Optional.ofNullable(ConsumerProtocol.PROTOCOL_TYPE),
+            Optional.empty(),
+            members().keySet().stream().filter(member -> 
!member.equals(leavingMemberId)).findAny(),
+            Optional.of(time.milliseconds())
+        );
+
+        members().forEach((memberId, member) -> {
+            if (!memberId.equals(leavingMemberId)) {
+                classicGroup.add(
+                    new ClassicGroupMember(
+                        memberId,
+                        Optional.ofNullable(member.instanceId()),
+                        member.clientId(),
+                        member.clientHost(),
+                        member.rebalanceTimeoutMs(),
+                        consumerGroupSessionTimeoutMs,
+                        ConsumerProtocol.PROTOCOL_TYPE,
+                        member.supportedJoinGroupRequestProtocols(),

Review Comment:
   This does not seem correct. The issue is that the stored protocols were 
stored when the group was converted so they are outdated here. I wonder whether 
we should generate them with the minimal informations (e.g. the subscribed 
topics).



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -998,4 +1094,232 @@ public ConsumerGroupDescribeResponseData.DescribedGroup 
asDescribedGroup(
         );
         return describedGroup;
     }
+
+    /**
+     * Create a new consumer group according to the given classic group.
+     *
+     * @param snapshotRegistry  The SnapshotRegistry.
+     * @param metrics           The GroupCoordinatorMetricsShard.
+     * @param classicGroup      The converted classic group.
+     * @param topicsImage       The TopicsImage for topic id and topic name 
conversion.
+     * @return  The created ConsumerGruop.
+     */
+    public static ConsumerGroup fromClassicGroup(
+        SnapshotRegistry snapshotRegistry,
+        GroupCoordinatorMetricsShard metrics,
+        ClassicGroup classicGroup,
+        TopicsImage topicsImage
+    ) {
+        String groupId = classicGroup.groupId();
+        ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, 
groupId, metrics);
+        consumerGroup.setGroupEpoch(classicGroup.generationId());
+        consumerGroup.setTargetAssignmentEpoch(classicGroup.generationId());
+
+        classicGroup.allMembers().forEach(classicGroupMember -> {
+            ConsumerPartitionAssignor.Assignment assignment = 
ConsumerProtocol.deserializeAssignment(
+                ByteBuffer.wrap(classicGroupMember.assignment())
+            );
+            Map<Uuid, Set<Integer>> partitions = 
topicPartitionMapFromList(assignment.partitions(), topicsImage);
+
+            ConsumerPartitionAssignor.Subscription subscription = 
ConsumerProtocol.deserializeSubscription(
+                
ByteBuffer.wrap(classicGroupMember.metadata(classicGroup.protocolName().get()))
+            );
+
+            // The target assignment and the assigned partitions of each 
member are set based on the last
+            // assignment of the classic group. All the members are put in the 
Stable state. If the classic
+            // group was in Preparing Rebalance or Completing Rebalance 
states, the classic members are
+            // asked to rejoin the group to re-trigger a rebalance or collect 
their assignments.
+            ConsumerGroupMember newMember = new 
ConsumerGroupMember.Builder(classicGroupMember.memberId())
+                .setMemberEpoch(classicGroup.generationId())
+                .setState(MemberState.STABLE)
+                .setPreviousMemberEpoch(classicGroup.generationId())
+                
.setInstanceId(classicGroupMember.groupInstanceId().orElse(null))
+                .setRackId(subscription.rackId().orElse(null))
+                .setRebalanceTimeoutMs(classicGroupMember.rebalanceTimeoutMs())
+                .setClientId(classicGroupMember.clientId())
+                .setClientHost(classicGroupMember.clientHost())
+                .setSubscribedTopicNames(subscription.topics())
+                .setAssignedPartitions(partitions)
+                
.setSupportedClassicProtocols(classicGroupMember.supportedProtocols())
+                .build();
+            consumerGroup.updateTargetAssignment(newMember.memberId(), new 
Assignment(partitions));
+            consumerGroup.updateMember(newMember);
+        });
+
+        return consumerGroup;
+    }
+
+    /**
+     * Populate the record list with the records needed to create the given 
consumer group.
+     *
+     * @param records The list to which the new records are added.
+     */
+    public void createConsumerGroupRecords(
+        List<Record> records
+    ) {
+        members().forEach((__, consumerGroupMember) ->
+            records.add(RecordHelpers.newMemberSubscriptionRecord(groupId(), 
consumerGroupMember))
+        );
+
+        records.add(RecordHelpers.newGroupEpochRecord(groupId(), 
groupEpoch()));
+
+        members().forEach((consumerGroupMemberId, consumerGroupMember) ->
+            records.add(RecordHelpers.newTargetAssignmentRecord(
+                groupId(),
+                consumerGroupMemberId,
+                targetAssignment(consumerGroupMemberId).partitions()
+            ))
+        );
+
+        records.add(RecordHelpers.newTargetAssignmentEpochRecord(groupId(), 
groupEpoch()));
+
+        members().forEach((__, consumerGroupMember) ->
+            records.add(RecordHelpers.newCurrentAssignmentRecord(groupId(), 
consumerGroupMember))
+        );
+    }
+
+    /**
+     * @return The map of topic id and partition set converted from the list 
of TopicPartition.
+     */
+    private static Map<Uuid, Set<Integer>> topicPartitionMapFromList(
+        List<TopicPartition> partitions,
+        TopicsImage topicsImage
+    ) {
+        Map<Uuid, Set<Integer>> topicPartitionMap = new HashMap<>();
+        partitions.forEach(topicPartition -> {
+            TopicImage topicImage = 
topicsImage.getTopic(topicPartition.topic());
+            if (topicImage != null) {
+                topicPartitionMap
+                    .computeIfAbsent(topicImage.id(), __ -> new HashSet<>())
+                    .add(topicPartition.partition());
+            }
+        });
+        return topicPartitionMap;
+    }
+
+    /**
+     * Create a corresponding ClassicGroup and append the record for the 
creation for group downgrade.
+     * The member with leavingMemberId will not be converted to the new 
ClassicGroup as it's the last
+     * member using new consumer protocol that left and triggered the 
downgrade.
+     *
+     * @param leavingMemberId               The member that will not be 
converted in the ClassicGroup.
+     * @param logContext                    The logContext to create the 
ClassicGroup.
+     * @param time                          The time to create the 
ClassicGroup.
+     * @param consumerGroupSessionTimeoutMs The consumerGroupSessionTimeoutMs.
+     * @param metadataImage                 The metadataImage.
+     * @param records                       The record list.
+     * @return  The created ClassicGroup.
+     */
+    public ClassicGroup toClassicGroup(
+        String leavingMemberId,
+        LogContext logContext,
+        Time time,
+        int consumerGroupSessionTimeoutMs,
+        MetadataImage metadataImage,
+        List<Record> records
+    ) {
+        ClassicGroup classicGroup = new ClassicGroup(
+            logContext,
+            groupId(),
+            ClassicGroupState.STABLE,
+            time,
+            metrics,
+            groupEpoch(),
+            Optional.ofNullable(ConsumerProtocol.PROTOCOL_TYPE),
+            Optional.empty(),
+            members().keySet().stream().filter(member -> 
!member.equals(leavingMemberId)).findAny(),
+            Optional.of(time.milliseconds())
+        );
+
+        members().forEach((memberId, member) -> {
+            if (!memberId.equals(leavingMemberId)) {
+                classicGroup.add(
+                    new ClassicGroupMember(
+                        memberId,
+                        Optional.ofNullable(member.instanceId()),
+                        member.clientId(),
+                        member.clientHost(),
+                        member.rebalanceTimeoutMs(),
+                        consumerGroupSessionTimeoutMs,
+                        ConsumerProtocol.PROTOCOL_TYPE,
+                        member.supportedJoinGroupRequestProtocols(),
+                        null
+                    )
+                );
+            }
+        });
+
+        
classicGroup.setProtocolName(Optional.of(classicGroup.selectProtocol()));
+        
classicGroup.setSubscribedTopics(classicGroup.computeSubscribedTopics());
+
+        Map<String, byte[]> assignments = new HashMap<>();
+
+        classicGroup.allMembers().forEach(classicGroupMember -> {
+            byte[] assignment = 
Utils.toArray(ConsumerProtocol.serializeAssignment(
+                new 
ConsumerPartitionAssignor.Assignment(ConsumerGroup.topicPartitionListFromMap(
+                    
targetAssignment().get(classicGroupMember.memberId()).partitions(),
+                    metadataImage.topics()
+                )),
+                ConsumerProtocol.deserializeVersion(
+                    
ByteBuffer.wrap(classicGroupMember.metadata(classicGroup.protocolName().orElse("")))
+                )

Review Comment:
   This one maybe worth a comment explaining why we need to use this version.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -9623,6 +10332,186 @@ public void 
testClassicGroupOnUnloadedCompletingRebalance() throws Exception {
             .setErrorCode(NOT_COORDINATOR.code()), 
pendingMemberSyncResult.syncFuture.get());
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testLastClassicProtocolMemberLeavingConsumerGroup(boolean 
appendLogSuccessfully) {
+        String groupId = "group-id";
+        String memberId1 = Uuid.randomUuid().toString();
+        String memberId2 = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+        Uuid zarTopicId = Uuid.randomUuid();
+        String zarTopicName = "zar";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+
+        List<ConsumerGroupMemberMetadataValue.ClassicProtocol> protocols = 
Collections.singletonList(
+            new ConsumerGroupMemberMetadataValue.ClassicProtocol()
+                .setName("range")
+                
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(new 
ConsumerPartitionAssignor.Subscription(
+                    Arrays.asList(fooTopicName, barTopicName),
+                    null,
+                    Arrays.asList(
+                        new TopicPartition(fooTopicName, 0),
+                        new TopicPartition(fooTopicName, 1),
+                        new TopicPartition(fooTopicName, 2),
+                        new TopicPartition(barTopicName, 0),
+                        new TopicPartition(barTopicName, 1)
+                    )
+                ))))
+        );
+
+        ConsumerGroupMember member1 = new 
ConsumerGroupMember.Builder(memberId1)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(9)
+            .setClientId("client")
+            .setClientHost("localhost/127.0.0.1")
+            .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+            .setServerAssignorName("range")
+            .setClassicMemberMetadata(new 
ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+                .setSupportedProtocols(protocols))
+            .setAssignedPartitions(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2),
+                mkTopicAssignment(barTopicId, 0, 1)))
+            .build();
+        ConsumerGroupMember member2 = new 
ConsumerGroupMember.Builder(memberId2)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(9)
+            .setClientId("client")
+            .setClientHost("localhost/127.0.0.1")
+            // Use zar only here to ensure that metadata needs to be 
recomputed.
+            .setSubscribedTopicNames(Arrays.asList("foo", "bar", "zar"))
+            .setServerAssignorName("range")
+            .setAssignedPartitions(mkAssignment(
+                mkTopicAssignment(fooTopicId, 3, 4, 5),
+                mkTopicAssignment(barTopicId, 2)))
+            .build();
+
+        // Consumer group with two members.
+        // Member 1 uses the classic protocol and member 2 uses the consumer 
protocol.
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            
.withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.DOWNGRADE)
+            .withAssignors(Collections.singletonList(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .addTopic(barTopicId, barTopicName, 3)
+                .addTopic(zarTopicId, zarTopicName, 1)
+                .addRacks()
+                .build())
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(member1)
+                .withMember(member2)
+                .withAssignment(memberId1, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1, 2),
+                    mkTopicAssignment(barTopicId, 0, 1)))
+                .withAssignment(memberId2, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 3, 4, 5),
+                    mkTopicAssignment(barTopicId, 2)))
+                .withAssignmentEpoch(10))
+            .build();
+
+        context.commit();
+        ConsumerGroup consumerGroup = 
context.groupMetadataManager.consumerGroup(groupId);
+
+        // Member 2 leaves the consumer group, triggering the downgrade.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = 
context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId2)
+                .setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH)
+                .setRebalanceTimeoutMs(5000)
+                .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                .setTopicPartitions(Collections.emptyList()));
+
+
+        byte[] assignment = 
Utils.toArray(ConsumerProtocol.serializeAssignment(new 
ConsumerPartitionAssignor.Assignment(Arrays.asList(
+            new TopicPartition(fooTopicName, 0), new 
TopicPartition(fooTopicName, 1), new TopicPartition(fooTopicName, 2),
+            new TopicPartition(barTopicName, 0), new 
TopicPartition(barTopicName, 1)
+        ))));
+        Map<String, byte[]> assignments = new HashMap<String, byte[]>() {
+            {
+                put(memberId1, assignment);
+            }
+        };
+
+        ClassicGroup expectedClassicGroup = new ClassicGroup(
+            new LogContext(),
+            groupId,
+            STABLE,
+            context.time,
+            context.metrics,
+            10,
+            Optional.ofNullable(ConsumerProtocol.PROTOCOL_TYPE),
+            Optional.ofNullable("range"),
+            Optional.ofNullable(memberId1),
+            Optional.of(context.time.milliseconds())
+        );
+        expectedClassicGroup.add(
+            new ClassicGroupMember(
+                memberId1,
+                Optional.ofNullable(member1.instanceId()),
+                member1.clientId(),
+                member1.clientHost(),
+                member1.rebalanceTimeoutMs(),
+                45000,
+                ConsumerProtocol.PROTOCOL_TYPE,
+                member1.supportedJoinGroupRequestProtocols(),
+                assignment
+            )
+        );
+
+        List<Record> expectedRecords = Arrays.asList(
+            RecordHelpers.newCurrentAssignmentTombstoneRecord(groupId, 
memberId2),
+            RecordHelpers.newTargetAssignmentTombstoneRecord(groupId, 
memberId2),
+            RecordHelpers.newMemberSubscriptionTombstoneRecord(groupId, 
memberId2),
+            // Subscription metadata is recomputed because zar is no longer 
there.
+            RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new 
HashMap<String, TopicMetadata>() {
+                {
+                    put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 6, mkMapOfPartitionRacks(6)));
+                    put(barTopicName, new TopicMetadata(barTopicId, 
barTopicName, 3, mkMapOfPartitionRacks(3)));
+                }
+            }),
+            RecordHelpers.newGroupEpochRecord(groupId, 11),
+
+            RecordHelpers.newTargetAssignmentEpochTombstoneRecord(groupId),
+            RecordHelpers.newGroupSubscriptionMetadataTombstoneRecord(groupId),
+            RecordHelpers.newGroupEpochTombstoneRecord(groupId),
+            RecordHelpers.newGroupMetadataRecord(expectedClassicGroup, 
assignments, MetadataVersion.latestTesting())
+        );
+
+        assertRecordsEquals(expectedRecords, result.records());
+
+        if (appendLogSuccessfully) {
+            ClassicGroup classicGroup = 
context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupId, false);
+
+            // Simulate a successful write to the log.
+            result.appendFuture().complete(null);
+
+            ScheduledTimeout<Void, Record> timeout = context.timer.timeout(
+                classicGroupHeartbeatKey(groupId, memberId1));
+            assertNotNull(timeout);
+
+            // A new rebalance is triggered.
+            assertTrue(classicGroup.isInState(PREPARING_REBALANCE));
+        } else {
+            // Simulate a failed write to the log.
+            result.appendFuture().completeExceptionally(new 
NotLeaderOrFollowerException());
+            context.rollback();
+
+            ScheduledTimeout<Void, Record> timeout = context.timer.timeout(
+                classicGroupHeartbeatKey(groupId, memberId1));
+            assertNull(timeout);
+
+            // The group is reverted back to the consumer group.
+            assertEquals(consumerGroup, 
context.groupMetadataManager.consumerGroup(groupId));
+        }

Review Comment:
   We should also verify the metrics as we had code related the group 
conversion.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -998,4 +1094,232 @@ public ConsumerGroupDescribeResponseData.DescribedGroup 
asDescribedGroup(
         );
         return describedGroup;
     }
+
+    /**
+     * Create a new consumer group according to the given classic group.
+     *
+     * @param snapshotRegistry  The SnapshotRegistry.
+     * @param metrics           The GroupCoordinatorMetricsShard.
+     * @param classicGroup      The converted classic group.
+     * @param topicsImage       The TopicsImage for topic id and topic name 
conversion.
+     * @return  The created ConsumerGruop.
+     */
+    public static ConsumerGroup fromClassicGroup(
+        SnapshotRegistry snapshotRegistry,
+        GroupCoordinatorMetricsShard metrics,
+        ClassicGroup classicGroup,
+        TopicsImage topicsImage
+    ) {
+        String groupId = classicGroup.groupId();
+        ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, 
groupId, metrics);
+        consumerGroup.setGroupEpoch(classicGroup.generationId());
+        consumerGroup.setTargetAssignmentEpoch(classicGroup.generationId());
+
+        classicGroup.allMembers().forEach(classicGroupMember -> {
+            ConsumerPartitionAssignor.Assignment assignment = 
ConsumerProtocol.deserializeAssignment(
+                ByteBuffer.wrap(classicGroupMember.assignment())
+            );
+            Map<Uuid, Set<Integer>> partitions = 
topicPartitionMapFromList(assignment.partitions(), topicsImage);
+
+            ConsumerPartitionAssignor.Subscription subscription = 
ConsumerProtocol.deserializeSubscription(
+                
ByteBuffer.wrap(classicGroupMember.metadata(classicGroup.protocolName().get()))
+            );
+
+            // The target assignment and the assigned partitions of each 
member are set based on the last
+            // assignment of the classic group. All the members are put in the 
Stable state. If the classic
+            // group was in Preparing Rebalance or Completing Rebalance 
states, the classic members are
+            // asked to rejoin the group to re-trigger a rebalance or collect 
their assignments.
+            ConsumerGroupMember newMember = new 
ConsumerGroupMember.Builder(classicGroupMember.memberId())
+                .setMemberEpoch(classicGroup.generationId())
+                .setState(MemberState.STABLE)
+                .setPreviousMemberEpoch(classicGroup.generationId())
+                
.setInstanceId(classicGroupMember.groupInstanceId().orElse(null))
+                .setRackId(subscription.rackId().orElse(null))
+                .setRebalanceTimeoutMs(classicGroupMember.rebalanceTimeoutMs())
+                .setClientId(classicGroupMember.clientId())
+                .setClientHost(classicGroupMember.clientHost())
+                .setSubscribedTopicNames(subscription.topics())
+                .setAssignedPartitions(partitions)
+                
.setSupportedClassicProtocols(classicGroupMember.supportedProtocols())
+                .build();
+            consumerGroup.updateTargetAssignment(newMember.memberId(), new 
Assignment(partitions));
+            consumerGroup.updateMember(newMember);
+        });
+
+        return consumerGroup;
+    }
+
+    /**
+     * Populate the record list with the records needed to create the given 
consumer group.
+     *
+     * @param records The list to which the new records are added.
+     */
+    public void createConsumerGroupRecords(
+        List<Record> records
+    ) {
+        members().forEach((__, consumerGroupMember) ->
+            records.add(RecordHelpers.newMemberSubscriptionRecord(groupId(), 
consumerGroupMember))
+        );
+
+        records.add(RecordHelpers.newGroupEpochRecord(groupId(), 
groupEpoch()));
+
+        members().forEach((consumerGroupMemberId, consumerGroupMember) ->
+            records.add(RecordHelpers.newTargetAssignmentRecord(
+                groupId(),
+                consumerGroupMemberId,
+                targetAssignment(consumerGroupMemberId).partitions()
+            ))
+        );
+
+        records.add(RecordHelpers.newTargetAssignmentEpochRecord(groupId(), 
groupEpoch()));
+
+        members().forEach((__, consumerGroupMember) ->
+            records.add(RecordHelpers.newCurrentAssignmentRecord(groupId(), 
consumerGroupMember))
+        );
+    }
+
+    /**
+     * @return The map of topic id and partition set converted from the list 
of TopicPartition.
+     */
+    private static Map<Uuid, Set<Integer>> topicPartitionMapFromList(
+        List<TopicPartition> partitions,
+        TopicsImage topicsImage
+    ) {
+        Map<Uuid, Set<Integer>> topicPartitionMap = new HashMap<>();
+        partitions.forEach(topicPartition -> {
+            TopicImage topicImage = 
topicsImage.getTopic(topicPartition.topic());
+            if (topicImage != null) {
+                topicPartitionMap
+                    .computeIfAbsent(topicImage.id(), __ -> new HashSet<>())
+                    .add(topicPartition.partition());
+            }
+        });
+        return topicPartitionMap;
+    }
+
+    /**
+     * Create a corresponding ClassicGroup and append the record for the 
creation for group downgrade.
+     * The member with leavingMemberId will not be converted to the new 
ClassicGroup as it's the last
+     * member using new consumer protocol that left and triggered the 
downgrade.
+     *
+     * @param leavingMemberId               The member that will not be 
converted in the ClassicGroup.
+     * @param logContext                    The logContext to create the 
ClassicGroup.
+     * @param time                          The time to create the 
ClassicGroup.
+     * @param consumerGroupSessionTimeoutMs The consumerGroupSessionTimeoutMs.
+     * @param metadataImage                 The metadataImage.
+     * @param records                       The record list.
+     * @return  The created ClassicGroup.
+     */
+    public ClassicGroup toClassicGroup(
+        String leavingMemberId,
+        LogContext logContext,
+        Time time,
+        int consumerGroupSessionTimeoutMs,
+        MetadataImage metadataImage,
+        List<Record> records
+    ) {
+        ClassicGroup classicGroup = new ClassicGroup(
+            logContext,
+            groupId(),
+            ClassicGroupState.STABLE,
+            time,
+            metrics,
+            groupEpoch(),
+            Optional.ofNullable(ConsumerProtocol.PROTOCOL_TYPE),
+            Optional.empty(),
+            members().keySet().stream().filter(member -> 
!member.equals(leavingMemberId)).findAny(),
+            Optional.of(time.milliseconds())
+        );
+
+        members().forEach((memberId, member) -> {
+            if (!memberId.equals(leavingMemberId)) {
+                classicGroup.add(
+                    new ClassicGroupMember(
+                        memberId,
+                        Optional.ofNullable(member.instanceId()),
+                        member.clientId(),
+                        member.clientHost(),
+                        member.rebalanceTimeoutMs(),
+                        consumerGroupSessionTimeoutMs,
+                        ConsumerProtocol.PROTOCOL_TYPE,
+                        member.supportedJoinGroupRequestProtocols(),
+                        null
+                    )
+                );
+            }
+        });
+
+        
classicGroup.setProtocolName(Optional.of(classicGroup.selectProtocol()));
+        
classicGroup.setSubscribedTopics(classicGroup.computeSubscribedTopics());
+
+        Map<String, byte[]> assignments = new HashMap<>();
+
+        classicGroup.allMembers().forEach(classicGroupMember -> {
+            byte[] assignment = 
Utils.toArray(ConsumerProtocol.serializeAssignment(
+                new 
ConsumerPartitionAssignor.Assignment(ConsumerGroup.topicPartitionListFromMap(
+                    
targetAssignment().get(classicGroupMember.memberId()).partitions(),
+                    metadataImage.topics()
+                )),
+                ConsumerProtocol.deserializeVersion(
+                    
ByteBuffer.wrap(classicGroupMember.metadata(classicGroup.protocolName().orElse("")))
+                )
+            ));
+
+            classicGroupMember.setAssignment(assignment);
+            assignments.put(classicGroupMember.memberId(), assignment);
+        });
+
+        records.add(RecordHelpers.newGroupMetadataRecord(
+            classicGroup, assignments, 
metadataImage.features().metadataVersion()));

Review Comment:
   nit: Style.
   
   ```
    records.add(RecordHelpers.newGroupMetadataRecord(
        classicGroup,
        assignments,
        metadataImage.features().metadataVersion()
   ));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to