dongnuo123 commented on code in PR #15593:
URL: https://github.com/apache/kafka/pull/15593#discussion_r1550242669


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java:
##########
@@ -1300,6 +1341,68 @@ public Map<String, byte[]> groupAssignment() {
         ));
     }
 
+    /**
+     * Convert the current classic group to a consumer group.
+     * Add the records for the conversion.
+     *
+     * @param consumerGroup     The converted consumer group.
+     * @param records           The list to which the new records are added.
+     *
+     * @throws GroupIdNotFoundException if any of the group's member doesn't 
support the consumer protocol.
+     */
+    public void convertToConsumerGroup(
+        ConsumerGroup consumerGroup,
+        List<Record> records,
+        TopicsImage topicsImage
+    ) throws GroupIdNotFoundException {
+        consumerGroup.setGroupEpoch(generationId);
+        consumerGroup.setTargetAssignmentEpoch(generationId);
+
+        records.add(RecordHelpers.newGroupEpochRecord(groupId(), 
generationId));
+        // SubscriptionMetadata will be computed in the following 
consumerGroupHeartbeat
+        
records.add(RecordHelpers.newGroupSubscriptionMetadataRecord(groupId(), 
Collections.emptyMap()));
+        records.add(RecordHelpers.newTargetAssignmentEpochRecord(groupId(), 
generationId));
+        
+        members.forEach((memberId, member) -> {
+            ConsumerPartitionAssignor.Assignment assignment = 
ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(member.assignment()));
+            Map<Uuid, Set<Integer>> partitions = 
topicPartitionMapFromList(assignment.partitions(), topicsImage);
+            ConsumerPartitionAssignor.Subscription subscription = 
ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(member.metadata()));
+
+            ConsumerGroupMember newMember = new 
ConsumerGroupMember.Builder(memberId)
+                .setMemberEpoch(generationId)
+                .setPreviousMemberEpoch(generationId)
+                .setInstanceId(member.groupInstanceId().orElse(null))
+                .setRackId(subscription.rackId().orElse(null))
+                .setRebalanceTimeoutMs(member.rebalanceTimeoutMs())
+                .setClientId(member.clientId())
+                .setClientHost(member.clientHost())
+                .setSubscribedTopicNames(subscription.topics())
+                .setAssignedPartitions(partitions)
+                .build();
+            consumerGroup.updateMember(newMember);
+
+            records.add(RecordHelpers.newMemberSubscriptionRecord(groupId(), 
newMember));
+            records.add(RecordHelpers.newCurrentAssignmentRecord(groupId(), 
newMember));
+            records.add(RecordHelpers.newTargetAssignmentRecord(groupId(), 
memberId, partitions));
+        });

Review Comment:
   Need to schedule session timeouts



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to