dajac commented on code in PR #15721: URL: https://github.com/apache/kafka/pull/15721#discussion_r1569051446
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -775,6 +777,126 @@ public ClassicGroup classicGroup( } } + public boolean validateOnlineDowngrade(ConsumerGroup consumerGroup, String memberId) { + if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) { + log.info("Cannot downgrade consumer group {} to classic group because the online downgrade is disabled.", + consumerGroup.groupId()); + return false; + } else if (!consumerGroup.allMembersUseClassicProtocolExcept(memberId)) { + log.debug("Cannot downgrade consumer group {} to classic group because not all its members use the classic protocol.", + consumerGroup.groupId()); + return false; + } else if (consumerGroup.numMembers() <= 1) { + log.info("Skip downgrading the consumer group {} to classic group because it's empty.", + consumerGroup.groupId()); + return false; + } else if (consumerGroup.numMembers() - 1 > classicGroupMaxSize) { + log.info("Cannot downgrade consumer group {} to classic group because its group size is greater than classic group max size.", + consumerGroup.groupId()); + } + return true; + } + + public CompletableFuture<Void> convertToClassicGroup(ConsumerGroup consumerGroup, String leavingMemberId, List<Record> records) { + consumerGroup.createGroupTombstoneRecords(records); + ClassicGroup classicGroup; + try { + classicGroup = consumerGroup.toClassicGroup( + leavingMemberId, + logContext, + time, + consumerGroupSessionTimeoutMs, + metadataImage, + records + ); + } catch (SchemaException e) { + log.warn("Cannot downgrade the consumer group " + consumerGroup.groupId() + ": fail to parse " + + "the Consumer Protocol " + ConsumerProtocol.PROTOCOL_TYPE + ".", e); + + throw new GroupIdNotFoundException(String.format("Cannot downgrade the classic group %s: %s.", + consumerGroup.groupId(), e.getMessage())); + } + + groups.put(consumerGroup.groupId(), classicGroup); + metrics.onClassicGroupStateTransition(null, classicGroup.currentState()); + + CompletableFuture<Void> appendFuture = new CompletableFuture<>(); + appendFuture.whenComplete((__, t) -> { + if (t == null) { + classicGroup.allMembers().forEach(member -> rescheduleClassicGroupMemberHeartbeat(classicGroup, member)); + prepareRebalance(classicGroup, String.format("Downgrade group %s.", classicGroup.groupId())); Review Comment: I do agree with the `appendFuture` part of your explanation. However, I still believe that we should schedule the session timeouts and the start the rebalance before it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org