dajac commented on code in PR #15411:
URL: https://github.com/apache/kafka/pull/15411#discussion_r1551485397


##########
core/src/main/scala/kafka/server/KafkaConfig.scala:
##########
@@ -251,6 +252,7 @@ object KafkaConfig {
   val ConsumerGroupMaxHeartbeatIntervalMsProp 
="group.consumer.max.heartbeat.interval.ms"
   val ConsumerGroupMaxSizeProp = "group.consumer.max.size"
   val ConsumerGroupAssignorsProp = "group.consumer.assignors"
+  val GroupConsumerUpgradePolicyProp = "group.consumer.upgrade.policy"

Review Comment:
   I am still not fully satisfied with this property because 
`group.consumer.upgrade.policy = upgrade` reads weird. What could we use to 
replace `upgrade` here? `conversion`? `migration`?



##########
core/src/main/scala/kafka/server/KafkaConfig.scala:
##########
@@ -251,6 +252,7 @@ object KafkaConfig {
   val ConsumerGroupMaxHeartbeatIntervalMsProp 
="group.consumer.max.heartbeat.interval.ms"

Review Comment:
   nit: While we are here, could you add the missing space after `=`?



##########
core/src/main/scala/kafka/server/KafkaConfig.scala:
##########
@@ -677,6 +679,7 @@ object KafkaConfig {
   val ConsumerGroupMaxHeartbeatIntervalMsDoc = "The maximum heartbeat interval 
for registered consumers."
   val ConsumerGroupMaxSizeDoc = "The maximum number of consumers that a single 
consumer group can accommodate."
   val ConsumerGroupAssignorsDoc = "The server side assignors as a list of full 
class names. The first one in the list is considered as the default assignor to 
be used in the case where the consumer does not specify an assignor."
+  val GroupConsumerUpgradePolicyDoc = "The config that enables the group 
protocol upgrade/downgrade. The valid values are " + 
Utils.join(Utils.enumOptions(classOf[GroupConsumerUpgradePolicy]), ",") + "."

Review Comment:
   It would be great if we could extend the documentation here a little bit. I 
think that we need to call out that this is about converting classic group 
using the consumer embedded protocol to the consumer group protocol and vice 
versa. We could also call out the various policies with a small descriptions.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConsumerUpgradePolicy.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group;
+
+import java.util.Arrays;
+import java.util.Locale;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public enum GroupConsumerUpgradePolicy {

Review Comment:
   nit: `GroupConsumer` to `ConsumerGroup`.



##########
core/src/main/scala/kafka/server/KafkaConfig.scala:
##########
@@ -1700,6 +1704,7 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
   val consumerGroupMaxHeartbeatIntervalMs = 
getInt(KafkaConfig.ConsumerGroupMaxHeartbeatIntervalMsProp)
   val consumerGroupMaxSize = getInt(KafkaConfig.ConsumerGroupMaxSizeProp)
   val consumerGroupAssignors = 
getConfiguredInstances(KafkaConfig.ConsumerGroupAssignorsProp, 
classOf[PartitionAssignor])
+  val groupConsumerUpgradePolicy = 
GroupConsumerUpgradePolicy.parse(getString(KafkaConfig.GroupConsumerUpgradePolicyProp))

Review Comment:
   nit: `consumerGroup..`



##########
core/src/main/scala/kafka/server/KafkaConfig.scala:
##########
@@ -251,6 +252,7 @@ object KafkaConfig {
   val ConsumerGroupMaxHeartbeatIntervalMsProp 
="group.consumer.max.heartbeat.interval.ms"
   val ConsumerGroupMaxSizeProp = "group.consumer.max.size"
   val ConsumerGroupAssignorsProp = "group.consumer.assignors"
+  val GroupConsumerUpgradePolicyProp = "group.consumer.upgrade.policy"

Review Comment:
   nit: `ConsumerGroupUpgradePolicyProp`



##########
server/src/main/java/org/apache/kafka/server/config/Defaults.java:
##########
@@ -151,6 +152,7 @@ public class Defaults {
         UniformAssignor.class.getName(),
         RangeAssignor.class.getName()
     );
+    public static final String GROUP_CONSUMER_UPGRADE_POLICY = 
GroupConsumerUpgradePolicy.DISABLED.toString();

Review Comment:
   nit: `CONSUMER_GROUP_`



##########
core/src/main/scala/kafka/server/KafkaConfig.scala:
##########
@@ -677,6 +679,7 @@ object KafkaConfig {
   val ConsumerGroupMaxHeartbeatIntervalMsDoc = "The maximum heartbeat interval 
for registered consumers."
   val ConsumerGroupMaxSizeDoc = "The maximum number of consumers that a single 
consumer group can accommodate."
   val ConsumerGroupAssignorsDoc = "The server side assignors as a list of full 
class names. The first one in the list is considered as the default assignor to 
be used in the case where the consumer does not specify an assignor."
+  val GroupConsumerUpgradePolicyDoc = "The config that enables the group 
protocol upgrade/downgrade. The valid values are " + 
Utils.join(Utils.enumOptions(classOf[GroupConsumerUpgradePolicy]), ",") + "."

Review Comment:
   nit: `ConsumerGroup...`



##########
core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala:
##########
@@ -1831,6 +1832,20 @@ class KafkaConfigTest {
     assertTrue(config.isNewGroupCoordinatorEnabled)
   }
 
+  @Test
+  def testGroupProtocolMigrationPolicy(): Unit = {
+    val props = new Properties()
+    props.putAll(kraftProps())
+
+    // Invalid GroupProtocolMigrationPolicy value.
+    props.put(KafkaConfig.GroupConsumerUpgradePolicyProp, "foo")
+    assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props))
+
+    props.put(KafkaConfig.GroupConsumerUpgradePolicyProp, "upgrade")
+    val config = KafkaConfig.fromProps(props)
+    assertEquals(GroupConsumerUpgradePolicy.UPGRADE, 
config.groupConsumerUpgradePolicy)

Review Comment:
   Is is worth checking all the options and with different cases too?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to