dajac commented on code in PR #15442: URL: https://github.com/apache/kafka/pull/15442#discussion_r1507302228
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ########## @@ -474,6 +474,8 @@ public CoordinatorResult<OffsetCommitResponseData, Record> commitOffset( final List<Record> records = new ArrayList<>(); final long currentTimeMs = time.milliseconds(); final OptionalLong expireTimestampMs = expireTimestampMs(request.retentionTimeMs(), currentTimeMs); + groupMetadataManager.maybeUpgradeEmptyGroup(group.groupId(), records, true); + final int initialRecordsSize = records.size(); Review Comment: Why are we doing this here? I am not sure to follow. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -3500,6 +3503,59 @@ public void maybeDeleteGroup(String groupId, List<Record> records) { } } + /** + * A group can be upgraded offline if it's a classic group and empty. + * + * @param groupId The group to be validated. + * @return true if the offline upgrade is valid. + */ + private boolean validateOfflineUpgrade(String groupId) { + Group group = groups.get(groupId); + + if (group == null || group.type() == CONSUMER) { + return false; + } + + ClassicGroup classicGroup = (ClassicGroup) group; + if (!classicGroup.isEmpty()) { + return false; + } else { + return true; + } + } + + /** + * Upgrade the empty classic group to a consumer group if it's valid. + * + * @param groupId The group id to be updated. + * @param records The list of records to delete the classic group and create the consumer group. + * @param isSimpleGroup The boolean indicating whether the group to be updated is a simple group. + */ + public void maybeUpgradeEmptyGroup(String groupId, List<Record> records, boolean isSimpleGroup) { + if (validateOfflineUpgrade(groupId)) { + final long currentTimeMs = time.milliseconds(); + ClassicGroup classicGroup = getOrMaybeCreateClassicGroup(groupId, false); + int groupEpoch = classicGroup.generationId(); + + // Replace the classic group with a new consumer group. + ConsumerGroup consumerGroup = getOrMaybeCreateConsumerGroup(groupId, true); + // We don't create the tombstone because the replay will remove the newly created consumer group. + removeGroup(groupId); + groups.put(groupId, consumerGroup); + metrics.onConsumerGroupStateTransition(null, consumerGroup.state()); + + if (!isSimpleGroup) { + records.add(newGroupSubscriptionMetadataRecord( + groupId, + consumerGroup.computeSubscriptionMetadata(classicGroup.subscribedTopics(), metadataImage.topics(), metadataImage.cluster()) + )); Review Comment: This does not seem necessary. If the group is empty, the subscribedTopics will also be empty. As I said earlier, we can let the handling of the new member create the new subscription. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org