dajac commented on code in PR #15721:
URL: https://github.com/apache/kafka/pull/15721#discussion_r1569077848


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -777,6 +778,59 @@ public ClassicGroup classicGroup(
         }
     }
 
+    public boolean validateOnlineDowngrade(ConsumerGroup consumerGroup, String 
memberId) {
+        if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) {
+            log.info("Cannot downgrade consumer group {} to classic group 
because the online downgrade is disabled.",
+                consumerGroup.groupId());
+            return false;
+        } else if 
(!consumerGroup.allMembersUseClassicProtocolExcept(memberId)) {
+            log.debug("Cannot downgrade consumer group {} to classic group 
because not all its members use the classic protocol.",
+                consumerGroup.groupId());
+            return false;
+        } else if (consumerGroup.numMembers() <= 1) {
+            log.info("Skip downgrading the consumer group {} to classic group 
because it's empty.",
+                consumerGroup.groupId());
+            return false;
+        } else if (consumerGroup.numMembers() - 1 > classicGroupMaxSize) {
+            log.info("Cannot downgrade consumer group {} to classic group 
because its group size is greater than classic group max size.",
+                consumerGroup.groupId());
+        }
+        return true;
+    }
+
+    public CompletableFuture<Void> convertToClassicGroup(ConsumerGroup 
consumerGroup, String leavingMemberId, List<Record> records) {
+        consumerGroup.createGroupTombstoneRecords(records);
+        ClassicGroup classicGroup;
+        try {
+            classicGroup = consumerGroup.toClassicGroup(
+                leavingMemberId,
+                logContext,
+                time,
+                consumerGroupSessionTimeoutMs,
+                metadataImage,
+                records
+            );
+        } catch (SchemaException e) {
+            log.warn("Cannot downgrade the consumer group " + 
consumerGroup.groupId() + ": fail to parse " +
+                "the Consumer Protocol " + ConsumerProtocol.PROTOCOL_TYPE + 
".", e);
+
+            throw new GroupIdNotFoundException(String.format("Cannot downgrade 
the classic group %s: %s.",
+                consumerGroup.groupId(), e.getMessage()));
+        }
+
+        groups.put(consumerGroup.groupId(), classicGroup);

Review Comment:
   I think that we should explicitly remove the previous group before adding 
the new one because we update metrics when the previous group is removed. We 
could likely call `removeGroup` for this purpose.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to