dajac commented on code in PR #15721: URL: https://github.com/apache/kafka/pull/15721#discussion_r1569077848
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -777,6 +778,59 @@ public ClassicGroup classicGroup( } } + public boolean validateOnlineDowngrade(ConsumerGroup consumerGroup, String memberId) { + if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) { + log.info("Cannot downgrade consumer group {} to classic group because the online downgrade is disabled.", + consumerGroup.groupId()); + return false; + } else if (!consumerGroup.allMembersUseClassicProtocolExcept(memberId)) { + log.debug("Cannot downgrade consumer group {} to classic group because not all its members use the classic protocol.", + consumerGroup.groupId()); + return false; + } else if (consumerGroup.numMembers() <= 1) { + log.info("Skip downgrading the consumer group {} to classic group because it's empty.", + consumerGroup.groupId()); + return false; + } else if (consumerGroup.numMembers() - 1 > classicGroupMaxSize) { + log.info("Cannot downgrade consumer group {} to classic group because its group size is greater than classic group max size.", + consumerGroup.groupId()); + } + return true; + } + + public CompletableFuture<Void> convertToClassicGroup(ConsumerGroup consumerGroup, String leavingMemberId, List<Record> records) { + consumerGroup.createGroupTombstoneRecords(records); + ClassicGroup classicGroup; + try { + classicGroup = consumerGroup.toClassicGroup( + leavingMemberId, + logContext, + time, + consumerGroupSessionTimeoutMs, + metadataImage, + records + ); + } catch (SchemaException e) { + log.warn("Cannot downgrade the consumer group " + consumerGroup.groupId() + ": fail to parse " + + "the Consumer Protocol " + ConsumerProtocol.PROTOCOL_TYPE + ".", e); + + throw new GroupIdNotFoundException(String.format("Cannot downgrade the classic group %s: %s.", + consumerGroup.groupId(), e.getMessage())); + } + + groups.put(consumerGroup.groupId(), classicGroup); Review Comment: I think that we should explicitly remove the previous group before adding the new one because we update metrics when the previous group is removed. We could likely call `removeGroup` for this purpose. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org