dajac commented on code in PR #15662: URL: https://github.com/apache/kafka/pull/15662#discussion_r1553514811
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -761,6 +777,31 @@ public ClassicGroup classicGroup( } } + public boolean validateOnlineUpgrade(ClassicGroup classicGroup) { + return ConsumerGroupMigrationPolicy.isUpgradeEnabled(consumerGroupMigrationPolicy) && + !classicGroup.isInState(DEAD) && + ConsumerProtocol.PROTOCOL_TYPE.equals(classicGroup.protocolType().orElse(null)) && Review Comment: I think that we have `usesConsumerGroupProtocol()` in the `ClassicGroup` class. Could we use it? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -761,6 +777,31 @@ public ClassicGroup classicGroup( } } + public boolean validateOnlineUpgrade(ClassicGroup classicGroup) { Review Comment: Does it have to be public? Should we add some javadoc? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -761,6 +777,31 @@ public ClassicGroup classicGroup( } } + public boolean validateOnlineUpgrade(ClassicGroup classicGroup) { + return ConsumerGroupMigrationPolicy.isUpgradeEnabled(consumerGroupMigrationPolicy) && + !classicGroup.isInState(DEAD) && + ConsumerProtocol.PROTOCOL_TYPE.equals(classicGroup.protocolType().orElse(null)) && + classicGroup.size() <= consumerGroupMaxSize; + } + + ConsumerGroup convertToConsumerGroup(ClassicGroup classicGroup, List<Record> records) { + classicGroup.completeAllJoinFutures(Errors.REBALANCE_IN_PROGRESS); + classicGroup.completeAllSyncFutures(Errors.REBALANCE_IN_PROGRESS); + + createGroupTombstoneRecords(classicGroup, records); + ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, classicGroup.groupId(), metrics); + classicGroup.convertToConsumerGroup(consumerGroup, records, metadataImage.topics()); Review Comment: I was wondering whether it would make more sense the other way around and have something like `ConsumerGroup.fromClassicGroup(....)`. I guess that it does not really matter in the end. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -761,6 +777,31 @@ public ClassicGroup classicGroup( } } + public boolean validateOnlineUpgrade(ClassicGroup classicGroup) { + return ConsumerGroupMigrationPolicy.isUpgradeEnabled(consumerGroupMigrationPolicy) && + !classicGroup.isInState(DEAD) && + ConsumerProtocol.PROTOCOL_TYPE.equals(classicGroup.protocolType().orElse(null)) && + classicGroup.size() <= consumerGroupMaxSize; + } Review Comment: I wonder whether we should log something (with the reason) when the upgrade is disallowed. Have you considered it? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java: ########## @@ -1300,6 +1341,68 @@ public Map<String, byte[]> groupAssignment() { )); } + /** + * Convert the current classic group to a consumer group. + * Add the records for the conversion. + * + * @param consumerGroup The converted consumer group. + * @param records The list to which the new records are added. + * + * @throws GroupIdNotFoundException if any of the group's member doesn't support the consumer protocol. + */ + public void convertToConsumerGroup( + ConsumerGroup consumerGroup, + List<Record> records, + TopicsImage topicsImage + ) throws GroupIdNotFoundException { + consumerGroup.setGroupEpoch(generationId); + consumerGroup.setTargetAssignmentEpoch(generationId); + + records.add(RecordHelpers.newGroupEpochRecord(groupId(), generationId)); + // SubscriptionMetadata will be computed in the following consumerGroupHeartbeat + records.add(RecordHelpers.newGroupSubscriptionMetadataRecord(groupId(), Collections.emptyMap())); + records.add(RecordHelpers.newTargetAssignmentEpochRecord(groupId(), generationId)); + + members.forEach((memberId, member) -> { + ConsumerPartitionAssignor.Assignment assignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(member.assignment())); + Map<Uuid, Set<Integer>> partitions = topicPartitionMapFromList(assignment.partitions(), topicsImage); + ConsumerPartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(member.metadata())); Review Comment: * `deserializeAssignment` and `deserializeSubscription` could throw an `SchemaException` if not mistaken if the bytes are incorrect. We should handle those, I suppose. * We also discussed offline the need to keep a reference to those in the member's state. I don't see it in the patch. Do you plan to add it later on? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java: ########## @@ -1300,6 +1341,68 @@ public Map<String, byte[]> groupAssignment() { )); } + /** + * Convert the current classic group to a consumer group. + * Add the records for the conversion. + * + * @param consumerGroup The converted consumer group. Review Comment: This seems to indicate that my earlier comment may be correct. The received `consumerGroup` is not yet the converted group but only an empty one. This is a bit confusing. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java: ########## @@ -1300,6 +1341,68 @@ public Map<String, byte[]> groupAssignment() { )); } + /** + * Convert the current classic group to a consumer group. + * Add the records for the conversion. + * + * @param consumerGroup The converted consumer group. + * @param records The list to which the new records are added. + * + * @throws GroupIdNotFoundException if any of the group's member doesn't support the consumer protocol. Review Comment: Where is `GroupIdNotFoundException` thrown? I don't see it. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -761,6 +777,31 @@ public ClassicGroup classicGroup( } } + public boolean validateOnlineUpgrade(ClassicGroup classicGroup) { + return ConsumerGroupMigrationPolicy.isUpgradeEnabled(consumerGroupMigrationPolicy) && + !classicGroup.isInState(DEAD) && + ConsumerProtocol.PROTOCOL_TYPE.equals(classicGroup.protocolType().orElse(null)) && + classicGroup.size() <= consumerGroupMaxSize; + } + + ConsumerGroup convertToConsumerGroup(ClassicGroup classicGroup, List<Record> records) { + classicGroup.completeAllJoinFutures(Errors.REBALANCE_IN_PROGRESS); + classicGroup.completeAllSyncFutures(Errors.REBALANCE_IN_PROGRESS); Review Comment: Should we put a comment explaining the logic here? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/ConsumerGroupMigrationPolicy.java: ########## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.group; + +import java.util.Arrays; +import java.util.Locale; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +public enum ConsumerGroupMigrationPolicy { + /** Both upgrade and downgrade are enabled.*/ + BIDIRECTIONAL("bidirectional"), + + /** Only upgrade is enabled.*/ + UPGRADE("upgrade"), + + /** Only downgrade is enabled.*/ + DOWNGRADE("downgrade"), + + /** Neither upgrade nor downgrade is enabled.*/ + DISABLED("disabled"); + + private final String policy; + + ConsumerGroupMigrationPolicy(String config) { + this.policy = config; + } + + @Override + public String toString() { + return policy; + } + + public static String validValuesDescription = + BIDIRECTIONAL + ": both upgrade from classic group to consumer group and downgrade from consumer group to classic group are enabled" + ", " + + UPGRADE + ": only upgrade is enabled" + ", " + + DOWNGRADE + ": only downgrade is enabled" + ", " + + DISABLED + ": neither upgrade nor downgrade is enabled."; + + private final static Map<String, ConsumerGroupMigrationPolicy> NAME_TO_ENUM = Arrays.stream(values()) + .collect(Collectors.toMap(config -> config.policy.toLowerCase(Locale.ROOT), Function.identity())); + + /** + * Parse a string into the corresponding {@code GroupProtocolMigrationPolicy} enum value, in a case-insensitive manner. + * + * @return The {{@link ConsumerGroupMigrationPolicy}} according to the string passed. None is returned if + * the string doesn't correspond to a valid policy. + */ + public static ConsumerGroupMigrationPolicy parse(String name) { + if (name == null) { + return DISABLED; + } + ConsumerGroupMigrationPolicy config = NAME_TO_ENUM.get(name.toLowerCase(Locale.ROOT)); + + return config == null ? DISABLED : config; + } + + public static boolean isUpgradeEnabled(ConsumerGroupMigrationPolicy policy) { + switch (policy) { + case BIDIRECTIONAL: + case UPGRADE: + return true; + case DOWNGRADE: + case DISABLED: + default: + return false; + } + } Review Comment: I would suggest to encapsulate this logic into the enum itself. One option would be to define `isUpgradeEnabled` as an abstract method in the enum and then to override it when the enum is defined. Take a look at `CoordinatorState` enum. Another option would be to pass two booleans along with the policy and rely on them. In both cases, it will allow you to do `policy.isDowngradeEnabled` or `policy.isUpgradeEnabled`. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -761,6 +777,31 @@ public ClassicGroup classicGroup( } } + public boolean validateOnlineUpgrade(ClassicGroup classicGroup) { + return ConsumerGroupMigrationPolicy.isUpgradeEnabled(consumerGroupMigrationPolicy) && + !classicGroup.isInState(DEAD) && + ConsumerProtocol.PROTOCOL_TYPE.equals(classicGroup.protocolType().orElse(null)) && + classicGroup.size() <= consumerGroupMaxSize; + } + + ConsumerGroup convertToConsumerGroup(ClassicGroup classicGroup, List<Record> records) { + classicGroup.completeAllJoinFutures(Errors.REBALANCE_IN_PROGRESS); + classicGroup.completeAllSyncFutures(Errors.REBALANCE_IN_PROGRESS); + + createGroupTombstoneRecords(classicGroup, records); + ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, classicGroup.groupId(), metrics); + classicGroup.convertToConsumerGroup(consumerGroup, records, metadataImage.topics()); + + consumerGroup.members().forEach((memberId, __) -> + scheduleConsumerGroupSessionTimeout(consumerGroup.groupId(), memberId) Review Comment: If the conversion fails, will those timers be handled appropriately (e.g. ignored)? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java: ########## @@ -1244,6 +1267,24 @@ public boolean completeSyncFuture( return false; } + /** + * Complete all the awaiting sync future with the give error. + * + * @param error the error to complete the future with. + */ + public void completeAllSyncFutures( + Errors error + ) { + members.forEach((__, member) -> completeSyncFuture( + member, + new SyncGroupResponseData() + .setProtocolName(protocolName.orElse(null)) + .setProtocolType(protocolType.orElse(null)) + .setAssignment(member.assignment()) Review Comment: Do we have to set the assignment when returning an error? Same question for protocol name/type. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java: ########## @@ -1300,6 +1341,68 @@ public Map<String, byte[]> groupAssignment() { )); } + /** + * Convert the current classic group to a consumer group. + * Add the records for the conversion. + * + * @param consumerGroup The converted consumer group. + * @param records The list to which the new records are added. + * + * @throws GroupIdNotFoundException if any of the group's member doesn't support the consumer protocol. + */ + public void convertToConsumerGroup( + ConsumerGroup consumerGroup, + List<Record> records, + TopicsImage topicsImage + ) throws GroupIdNotFoundException { Review Comment: The order of the records matter here. We should have the following order: 1. Member subscriptions. 2. Group epoch. 3. Member target assignment. 4. Target assignment epoch. 5. Member current assignments. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroupMember.java: ########## @@ -199,6 +199,17 @@ public byte[] metadata(String protocolName) { protocolName); } + /** + * Get the metadata of any supported protocol. + */ + public byte[] metadata() { Review Comment: nit: preferredSupportedProtocolMetadata? Or something along those lines. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java: ########## @@ -1300,6 +1341,68 @@ public Map<String, byte[]> groupAssignment() { )); } + /** + * Convert the current classic group to a consumer group. + * Add the records for the conversion. + * + * @param consumerGroup The converted consumer group. + * @param records The list to which the new records are added. + * + * @throws GroupIdNotFoundException if any of the group's member doesn't support the consumer protocol. + */ + public void convertToConsumerGroup( + ConsumerGroup consumerGroup, + List<Record> records, + TopicsImage topicsImage + ) throws GroupIdNotFoundException { + consumerGroup.setGroupEpoch(generationId); + consumerGroup.setTargetAssignmentEpoch(generationId); + + records.add(RecordHelpers.newGroupEpochRecord(groupId(), generationId)); + // SubscriptionMetadata will be computed in the following consumerGroupHeartbeat + records.add(RecordHelpers.newGroupSubscriptionMetadataRecord(groupId(), Collections.emptyMap())); + records.add(RecordHelpers.newTargetAssignmentEpochRecord(groupId(), generationId)); + + members.forEach((memberId, member) -> { + ConsumerPartitionAssignor.Assignment assignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(member.assignment())); + Map<Uuid, Set<Integer>> partitions = topicPartitionMapFromList(assignment.partitions(), topicsImage); + ConsumerPartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(member.metadata())); + + ConsumerGroupMember newMember = new ConsumerGroupMember.Builder(memberId) + .setMemberEpoch(generationId) + .setPreviousMemberEpoch(generationId) + .setInstanceId(member.groupInstanceId().orElse(null)) + .setRackId(subscription.rackId().orElse(null)) + .setRebalanceTimeoutMs(member.rebalanceTimeoutMs()) + .setClientId(member.clientId()) + .setClientHost(member.clientHost()) + .setSubscribedTopicNames(subscription.topics()) + .setAssignedPartitions(partitions) + .build(); + consumerGroup.updateMember(newMember); + + records.add(RecordHelpers.newMemberSubscriptionRecord(groupId(), newMember)); + records.add(RecordHelpers.newCurrentAssignmentRecord(groupId(), newMember)); + records.add(RecordHelpers.newTargetAssignmentRecord(groupId(), memberId, partitions)); + }); + } + + private static Map<Uuid, Set<Integer>> topicPartitionMapFromList( + List<TopicPartition> partitions, + TopicsImage topicsImage + ) { + Map<Uuid, Set<Integer>> topicPartitionMap = new HashMap<>(); + partitions.forEach(topicPartition -> { + TopicImage topicImage = topicsImage.getTopic(topicPartition.topic()); + // TODO: add a log if topic image is null? Review Comment: I suppose that we could just ignore them. If the image is null, the topic does not exist so it must be removed anyway. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -761,6 +777,31 @@ public ClassicGroup classicGroup( } } + public boolean validateOnlineUpgrade(ClassicGroup classicGroup) { + return ConsumerGroupMigrationPolicy.isUpgradeEnabled(consumerGroupMigrationPolicy) && + !classicGroup.isInState(DEAD) && + ConsumerProtocol.PROTOCOL_TYPE.equals(classicGroup.protocolType().orElse(null)) && + classicGroup.size() <= consumerGroupMaxSize; + } + + ConsumerGroup convertToConsumerGroup(ClassicGroup classicGroup, List<Record> records) { + classicGroup.completeAllJoinFutures(Errors.REBALANCE_IN_PROGRESS); + classicGroup.completeAllSyncFutures(Errors.REBALANCE_IN_PROGRESS); + + createGroupTombstoneRecords(classicGroup, records); Review Comment: nit: You could directly use `classicGroup.createGroupTombstoneRecords(records);`. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java: ########## @@ -1300,6 +1341,68 @@ public Map<String, byte[]> groupAssignment() { )); } + /** + * Convert the current classic group to a consumer group. + * Add the records for the conversion. + * + * @param consumerGroup The converted consumer group. + * @param records The list to which the new records are added. + * + * @throws GroupIdNotFoundException if any of the group's member doesn't support the consumer protocol. + */ + public void convertToConsumerGroup( + ConsumerGroup consumerGroup, + List<Record> records, + TopicsImage topicsImage + ) throws GroupIdNotFoundException { + consumerGroup.setGroupEpoch(generationId); + consumerGroup.setTargetAssignmentEpoch(generationId); Review Comment: nit: From a structure perspective, it would be great if we could pair the records creation with the group update. It will be easier to read. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroupMember.java: ########## @@ -199,6 +199,17 @@ public byte[] metadata(String protocolName) { protocolName); } + /** + * Get the metadata of any supported protocol. + */ + public byte[] metadata() { + for (JoinGroupRequestProtocol supportedProtocol : supportedProtocols) { + return supportedProtocol.metadata(); + } + + throw new IllegalArgumentException("Member does not support any protocol."); Review Comment: Interesting way to code this. Intuitively, I would have check if supportedProtocols.isEmpty() and throw if it is and then get the first element. I find it a bit more explicit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org