dajac commented on code in PR #15662:
URL: https://github.com/apache/kafka/pull/15662#discussion_r1553514811


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -761,6 +777,31 @@ public ClassicGroup classicGroup(
         }
     }
 
+    public boolean validateOnlineUpgrade(ClassicGroup classicGroup) {
+        return 
ConsumerGroupMigrationPolicy.isUpgradeEnabled(consumerGroupMigrationPolicy) &&
+            !classicGroup.isInState(DEAD) &&
+            
ConsumerProtocol.PROTOCOL_TYPE.equals(classicGroup.protocolType().orElse(null)) 
&&

Review Comment:
   I think that we have `usesConsumerGroupProtocol()` in the `ClassicGroup` 
class. Could we use it?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -761,6 +777,31 @@ public ClassicGroup classicGroup(
         }
     }
 
+    public boolean validateOnlineUpgrade(ClassicGroup classicGroup) {

Review Comment:
   Does it have to be public? Should we add some javadoc?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -761,6 +777,31 @@ public ClassicGroup classicGroup(
         }
     }
 
+    public boolean validateOnlineUpgrade(ClassicGroup classicGroup) {
+        return 
ConsumerGroupMigrationPolicy.isUpgradeEnabled(consumerGroupMigrationPolicy) &&
+            !classicGroup.isInState(DEAD) &&
+            
ConsumerProtocol.PROTOCOL_TYPE.equals(classicGroup.protocolType().orElse(null)) 
&&
+            classicGroup.size() <= consumerGroupMaxSize;
+    }
+
+    ConsumerGroup convertToConsumerGroup(ClassicGroup classicGroup, 
List<Record> records) {
+        classicGroup.completeAllJoinFutures(Errors.REBALANCE_IN_PROGRESS);
+        classicGroup.completeAllSyncFutures(Errors.REBALANCE_IN_PROGRESS);
+
+        createGroupTombstoneRecords(classicGroup, records);
+        ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, 
classicGroup.groupId(), metrics);
+        classicGroup.convertToConsumerGroup(consumerGroup, records, 
metadataImage.topics());

Review Comment:
   I was wondering whether it would make more sense the other way around and 
have something like `ConsumerGroup.fromClassicGroup(....)`. I guess that it 
does not really matter in the end.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -761,6 +777,31 @@ public ClassicGroup classicGroup(
         }
     }
 
+    public boolean validateOnlineUpgrade(ClassicGroup classicGroup) {
+        return 
ConsumerGroupMigrationPolicy.isUpgradeEnabled(consumerGroupMigrationPolicy) &&
+            !classicGroup.isInState(DEAD) &&
+            
ConsumerProtocol.PROTOCOL_TYPE.equals(classicGroup.protocolType().orElse(null)) 
&&
+            classicGroup.size() <= consumerGroupMaxSize;
+    }

Review Comment:
   I wonder whether we should log something (with the reason) when the upgrade 
is disallowed. Have you considered it?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java:
##########
@@ -1300,6 +1341,68 @@ public Map<String, byte[]> groupAssignment() {
         ));
     }
 
+    /**
+     * Convert the current classic group to a consumer group.
+     * Add the records for the conversion.
+     *
+     * @param consumerGroup     The converted consumer group.
+     * @param records           The list to which the new records are added.
+     *
+     * @throws GroupIdNotFoundException if any of the group's member doesn't 
support the consumer protocol.
+     */
+    public void convertToConsumerGroup(
+        ConsumerGroup consumerGroup,
+        List<Record> records,
+        TopicsImage topicsImage
+    ) throws GroupIdNotFoundException {
+        consumerGroup.setGroupEpoch(generationId);
+        consumerGroup.setTargetAssignmentEpoch(generationId);
+
+        records.add(RecordHelpers.newGroupEpochRecord(groupId(), 
generationId));
+        // SubscriptionMetadata will be computed in the following 
consumerGroupHeartbeat
+        
records.add(RecordHelpers.newGroupSubscriptionMetadataRecord(groupId(), 
Collections.emptyMap()));
+        records.add(RecordHelpers.newTargetAssignmentEpochRecord(groupId(), 
generationId));
+        
+        members.forEach((memberId, member) -> {
+            ConsumerPartitionAssignor.Assignment assignment = 
ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(member.assignment()));
+            Map<Uuid, Set<Integer>> partitions = 
topicPartitionMapFromList(assignment.partitions(), topicsImage);
+            ConsumerPartitionAssignor.Subscription subscription = 
ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(member.metadata()));

Review Comment:
   * `deserializeAssignment` and `deserializeSubscription` could throw an 
`SchemaException` if not mistaken if the bytes are incorrect. We should handle 
those, I suppose.
   * We also discussed offline the need to keep a reference to those in the 
member's state. I don't see it in the patch. Do you plan to add it later on?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java:
##########
@@ -1300,6 +1341,68 @@ public Map<String, byte[]> groupAssignment() {
         ));
     }
 
+    /**
+     * Convert the current classic group to a consumer group.
+     * Add the records for the conversion.
+     *
+     * @param consumerGroup     The converted consumer group.

Review Comment:
   This seems to indicate that my earlier comment may be correct. The received 
`consumerGroup` is not yet the converted group but only an empty one. This is a 
bit confusing.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java:
##########
@@ -1300,6 +1341,68 @@ public Map<String, byte[]> groupAssignment() {
         ));
     }
 
+    /**
+     * Convert the current classic group to a consumer group.
+     * Add the records for the conversion.
+     *
+     * @param consumerGroup     The converted consumer group.
+     * @param records           The list to which the new records are added.
+     *
+     * @throws GroupIdNotFoundException if any of the group's member doesn't 
support the consumer protocol.

Review Comment:
   Where is `GroupIdNotFoundException` thrown? I don't see it.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -761,6 +777,31 @@ public ClassicGroup classicGroup(
         }
     }
 
+    public boolean validateOnlineUpgrade(ClassicGroup classicGroup) {
+        return 
ConsumerGroupMigrationPolicy.isUpgradeEnabled(consumerGroupMigrationPolicy) &&
+            !classicGroup.isInState(DEAD) &&
+            
ConsumerProtocol.PROTOCOL_TYPE.equals(classicGroup.protocolType().orElse(null)) 
&&
+            classicGroup.size() <= consumerGroupMaxSize;
+    }
+
+    ConsumerGroup convertToConsumerGroup(ClassicGroup classicGroup, 
List<Record> records) {
+        classicGroup.completeAllJoinFutures(Errors.REBALANCE_IN_PROGRESS);
+        classicGroup.completeAllSyncFutures(Errors.REBALANCE_IN_PROGRESS);

Review Comment:
   Should we put a comment explaining the logic here?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/ConsumerGroupMigrationPolicy.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group;
+
+import java.util.Arrays;
+import java.util.Locale;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public enum ConsumerGroupMigrationPolicy {
+    /** Both upgrade and downgrade are enabled.*/
+    BIDIRECTIONAL("bidirectional"),
+
+    /** Only upgrade is enabled.*/
+    UPGRADE("upgrade"),
+
+    /** Only downgrade is enabled.*/
+    DOWNGRADE("downgrade"),
+
+    /** Neither upgrade nor downgrade is enabled.*/
+    DISABLED("disabled");
+
+    private final String policy;
+
+    ConsumerGroupMigrationPolicy(String config) {
+        this.policy = config;
+    }
+
+    @Override
+    public String toString() {
+        return policy;
+    }
+
+    public static String validValuesDescription =
+        BIDIRECTIONAL   + ": both upgrade from classic group to consumer group 
and downgrade from consumer group to classic group are enabled" + ", " +
+        UPGRADE         + ": only upgrade is enabled" + ", " +
+        DOWNGRADE       + ": only downgrade is enabled" + ", " +
+        DISABLED        + ": neither upgrade nor downgrade is enabled.";
+
+    private final static Map<String, ConsumerGroupMigrationPolicy> 
NAME_TO_ENUM = Arrays.stream(values())
+        .collect(Collectors.toMap(config -> 
config.policy.toLowerCase(Locale.ROOT), Function.identity()));
+
+    /**
+     * Parse a string into the corresponding {@code 
GroupProtocolMigrationPolicy} enum value, in a case-insensitive manner.
+     *
+     * @return The {{@link ConsumerGroupMigrationPolicy}} according to the 
string passed. None is returned if
+     * the string doesn't correspond to a valid policy.
+     */
+    public static ConsumerGroupMigrationPolicy parse(String name) {
+        if (name == null) {
+            return DISABLED;
+        }
+        ConsumerGroupMigrationPolicy config = 
NAME_TO_ENUM.get(name.toLowerCase(Locale.ROOT));
+
+        return config == null ? DISABLED : config;
+    }
+
+    public static boolean isUpgradeEnabled(ConsumerGroupMigrationPolicy 
policy) {
+        switch (policy) {
+            case BIDIRECTIONAL:
+            case UPGRADE:
+                return true;
+            case DOWNGRADE:
+            case DISABLED:
+            default:
+                return false;
+        }
+    }

Review Comment:
   I would suggest to encapsulate this logic into the enum itself. One option 
would be to define `isUpgradeEnabled` as an abstract method in the enum and 
then to override it when the enum is defined. Take a look at `CoordinatorState` 
enum. Another option would be to pass two booleans along with the policy and 
rely on them. In both cases, it will allow you to do 
`policy.isDowngradeEnabled` or `policy.isUpgradeEnabled`.
   
   



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -761,6 +777,31 @@ public ClassicGroup classicGroup(
         }
     }
 
+    public boolean validateOnlineUpgrade(ClassicGroup classicGroup) {
+        return 
ConsumerGroupMigrationPolicy.isUpgradeEnabled(consumerGroupMigrationPolicy) &&
+            !classicGroup.isInState(DEAD) &&
+            
ConsumerProtocol.PROTOCOL_TYPE.equals(classicGroup.protocolType().orElse(null)) 
&&
+            classicGroup.size() <= consumerGroupMaxSize;
+    }
+
+    ConsumerGroup convertToConsumerGroup(ClassicGroup classicGroup, 
List<Record> records) {
+        classicGroup.completeAllJoinFutures(Errors.REBALANCE_IN_PROGRESS);
+        classicGroup.completeAllSyncFutures(Errors.REBALANCE_IN_PROGRESS);
+
+        createGroupTombstoneRecords(classicGroup, records);
+        ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, 
classicGroup.groupId(), metrics);
+        classicGroup.convertToConsumerGroup(consumerGroup, records, 
metadataImage.topics());
+
+        consumerGroup.members().forEach((memberId, __) ->
+            scheduleConsumerGroupSessionTimeout(consumerGroup.groupId(), 
memberId)

Review Comment:
   If the conversion fails, will those timers be handled appropriately (e.g. 
ignored)? 



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java:
##########
@@ -1244,6 +1267,24 @@ public boolean completeSyncFuture(
         return false;
     }
 
+    /**
+     * Complete all the awaiting sync future with the give error.
+     *
+     * @param error  the error to complete the future with.
+     */
+    public void completeAllSyncFutures(
+        Errors error
+    ) {
+        members.forEach((__, member) -> completeSyncFuture(
+            member,
+            new SyncGroupResponseData()
+                .setProtocolName(protocolName.orElse(null))
+                .setProtocolType(protocolType.orElse(null))
+                .setAssignment(member.assignment())

Review Comment:
   Do we have to set the assignment when returning an error? Same question for 
protocol name/type.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java:
##########
@@ -1300,6 +1341,68 @@ public Map<String, byte[]> groupAssignment() {
         ));
     }
 
+    /**
+     * Convert the current classic group to a consumer group.
+     * Add the records for the conversion.
+     *
+     * @param consumerGroup     The converted consumer group.
+     * @param records           The list to which the new records are added.
+     *
+     * @throws GroupIdNotFoundException if any of the group's member doesn't 
support the consumer protocol.
+     */
+    public void convertToConsumerGroup(
+        ConsumerGroup consumerGroup,
+        List<Record> records,
+        TopicsImage topicsImage
+    ) throws GroupIdNotFoundException {

Review Comment:
   The order of the records matter here. We should have the following order:
   1. Member subscriptions.
   2. Group epoch.
   3. Member target assignment.
   4. Target assignment epoch.
   5. Member current assignments.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroupMember.java:
##########
@@ -199,6 +199,17 @@ public byte[] metadata(String protocolName) {
             protocolName);
     }
 
+    /**
+     * Get the metadata of any supported protocol.
+     */
+    public byte[] metadata() {

Review Comment:
   nit: preferredSupportedProtocolMetadata? Or something along those lines.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java:
##########
@@ -1300,6 +1341,68 @@ public Map<String, byte[]> groupAssignment() {
         ));
     }
 
+    /**
+     * Convert the current classic group to a consumer group.
+     * Add the records for the conversion.
+     *
+     * @param consumerGroup     The converted consumer group.
+     * @param records           The list to which the new records are added.
+     *
+     * @throws GroupIdNotFoundException if any of the group's member doesn't 
support the consumer protocol.
+     */
+    public void convertToConsumerGroup(
+        ConsumerGroup consumerGroup,
+        List<Record> records,
+        TopicsImage topicsImage
+    ) throws GroupIdNotFoundException {
+        consumerGroup.setGroupEpoch(generationId);
+        consumerGroup.setTargetAssignmentEpoch(generationId);
+
+        records.add(RecordHelpers.newGroupEpochRecord(groupId(), 
generationId));
+        // SubscriptionMetadata will be computed in the following 
consumerGroupHeartbeat
+        
records.add(RecordHelpers.newGroupSubscriptionMetadataRecord(groupId(), 
Collections.emptyMap()));
+        records.add(RecordHelpers.newTargetAssignmentEpochRecord(groupId(), 
generationId));
+        
+        members.forEach((memberId, member) -> {
+            ConsumerPartitionAssignor.Assignment assignment = 
ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(member.assignment()));
+            Map<Uuid, Set<Integer>> partitions = 
topicPartitionMapFromList(assignment.partitions(), topicsImage);
+            ConsumerPartitionAssignor.Subscription subscription = 
ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(member.metadata()));
+
+            ConsumerGroupMember newMember = new 
ConsumerGroupMember.Builder(memberId)
+                .setMemberEpoch(generationId)
+                .setPreviousMemberEpoch(generationId)
+                .setInstanceId(member.groupInstanceId().orElse(null))
+                .setRackId(subscription.rackId().orElse(null))
+                .setRebalanceTimeoutMs(member.rebalanceTimeoutMs())
+                .setClientId(member.clientId())
+                .setClientHost(member.clientHost())
+                .setSubscribedTopicNames(subscription.topics())
+                .setAssignedPartitions(partitions)
+                .build();
+            consumerGroup.updateMember(newMember);
+
+            records.add(RecordHelpers.newMemberSubscriptionRecord(groupId(), 
newMember));
+            records.add(RecordHelpers.newCurrentAssignmentRecord(groupId(), 
newMember));
+            records.add(RecordHelpers.newTargetAssignmentRecord(groupId(), 
memberId, partitions));
+        });
+    }
+
+    private static Map<Uuid, Set<Integer>> topicPartitionMapFromList(
+        List<TopicPartition> partitions,
+        TopicsImage topicsImage
+    ) {
+        Map<Uuid, Set<Integer>> topicPartitionMap = new HashMap<>();
+        partitions.forEach(topicPartition -> {
+            TopicImage topicImage = 
topicsImage.getTopic(topicPartition.topic());
+            // TODO: add a log if topic image is null?

Review Comment:
   I suppose that we could just ignore them. If the image is null, the topic 
does not exist so it must be removed anyway.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -761,6 +777,31 @@ public ClassicGroup classicGroup(
         }
     }
 
+    public boolean validateOnlineUpgrade(ClassicGroup classicGroup) {
+        return 
ConsumerGroupMigrationPolicy.isUpgradeEnabled(consumerGroupMigrationPolicy) &&
+            !classicGroup.isInState(DEAD) &&
+            
ConsumerProtocol.PROTOCOL_TYPE.equals(classicGroup.protocolType().orElse(null)) 
&&
+            classicGroup.size() <= consumerGroupMaxSize;
+    }
+
+    ConsumerGroup convertToConsumerGroup(ClassicGroup classicGroup, 
List<Record> records) {
+        classicGroup.completeAllJoinFutures(Errors.REBALANCE_IN_PROGRESS);
+        classicGroup.completeAllSyncFutures(Errors.REBALANCE_IN_PROGRESS);
+
+        createGroupTombstoneRecords(classicGroup, records);

Review Comment:
   nit: You could directly use 
`classicGroup.createGroupTombstoneRecords(records);`.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java:
##########
@@ -1300,6 +1341,68 @@ public Map<String, byte[]> groupAssignment() {
         ));
     }
 
+    /**
+     * Convert the current classic group to a consumer group.
+     * Add the records for the conversion.
+     *
+     * @param consumerGroup     The converted consumer group.
+     * @param records           The list to which the new records are added.
+     *
+     * @throws GroupIdNotFoundException if any of the group's member doesn't 
support the consumer protocol.
+     */
+    public void convertToConsumerGroup(
+        ConsumerGroup consumerGroup,
+        List<Record> records,
+        TopicsImage topicsImage
+    ) throws GroupIdNotFoundException {
+        consumerGroup.setGroupEpoch(generationId);
+        consumerGroup.setTargetAssignmentEpoch(generationId);

Review Comment:
   nit: From a structure perspective, it would be great if we could pair the 
records creation with the group update. It will be easier to read.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroupMember.java:
##########
@@ -199,6 +199,17 @@ public byte[] metadata(String protocolName) {
             protocolName);
     }
 
+    /**
+     * Get the metadata of any supported protocol.
+     */
+    public byte[] metadata() {
+        for (JoinGroupRequestProtocol supportedProtocol : supportedProtocols) {
+            return supportedProtocol.metadata();
+        }
+
+        throw new IllegalArgumentException("Member does not support any 
protocol.");

Review Comment:
   Interesting way to code this. Intuitively, I would have check if 
supportedProtocols.isEmpty() and throw if it is and then get the first element. 
I find it a bit more explicit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to