AndrewJSchofield commented on code in PR #15067:
URL: https://github.com/apache/kafka/pull/15067#discussion_r1446139219


##########
clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java:
##########
@@ -223,6 +223,7 @@ public enum ConfigSource {
         DYNAMIC_BROKER_CONFIG,          // dynamic broker config that is 
configured for a specific broker
         DYNAMIC_DEFAULT_BROKER_CONFIG,  // dynamic broker config that is 
configured as default for all brokers in the cluster
         DYNAMIC_CLIENT_METRICS_CONFIG,  // dynamic client metrics subscription 
config that is configured for all clients
+        DYNAMIC_CONSUMER_GROUP_CONFIG,  // dynamic consumer group config that 
is configured for a specific consumer group

Review Comment:
   This should be `DYNAMIC_GROUP_CONFIG` because it applies to group not just a 
consumer group.



##########
clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java:
##########
@@ -120,7 +121,8 @@ public enum ConfigSource {
         STATIC_BROKER_CONFIG((byte) 4, 
org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG),
         DEFAULT_CONFIG((byte) 5, 
org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.DEFAULT_CONFIG),
         DYNAMIC_BROKER_LOGGER_CONFIG((byte) 6, 
org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.DYNAMIC_BROKER_LOGGER_CONFIG),
-        CLIENT_METRICS_CONFIG((byte) 7, 
org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.DYNAMIC_CLIENT_METRICS_CONFIG);
+        CLIENT_METRICS_CONFIG((byte) 7, 
org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.DYNAMIC_CLIENT_METRICS_CONFIG),
+        CONSUMER_GROUP_CONFIG((byte) 8, 
org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.DYNAMIC_CONSUMER_GROUP_CONFIG);

Review Comment:
   `GROUP_CONFIG`



##########
core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala:
##########
@@ -114,6 +114,18 @@ class DynamicConfigPublisher(
                     s"${resource.name()} with new configuration: 
${toLoggableProps(resource, props).mkString(",")} " +
                     s"in $deltaName", t)
                 })
+            case GROUP =>
+              // Apply changes to a group's dynamic configuration.
+              
dynamicConfigHandlers.get(ConfigType.GROUP).foreach(consumerGroupConfigHandler 
=>

Review Comment:
   "group" rather than "consumer group" please.



##########
core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala:
##########
@@ -113,6 +121,22 @@ class ControllerConfigurationValidator(kafkaConfig: 
KafkaConfig) extends Configu
         val properties = new Properties()
         config.entrySet().forEach(e => properties.setProperty(e.getKey, 
e.getValue))
         ClientMetricsConfigs.validate(resource.name(), properties)
+      case GROUP =>
+        validateGroupName(resource.name())
+        val properties = new Properties()
+        val nullConsumerGroupConfigs = new mutable.ArrayBuffer[String]()
+        config.entrySet().forEach(e => {
+          if (e.getValue == null) {
+            nullConsumerGroupConfigs += e.getKey
+          } else {
+            properties.setProperty(e.getKey, e.getValue)
+          }
+        })
+        if (nullConsumerGroupConfigs.nonEmpty) {
+          throw new InvalidConfigurationException("Null value not supported 
for consumer group configs: " +

Review Comment:
   Not consumer group configs.



##########
core/src/main/scala/kafka/server/BrokerServer.scala:
##########
@@ -353,7 +358,8 @@ class BrokerServer(
       dynamicConfigHandlers = Map[String, ConfigHandler](
         ConfigType.TOPIC -> new TopicConfigHandler(replicaManager, config, 
quotaManagers, None),
         ConfigType.BROKER -> new BrokerConfigHandler(config, quotaManagers),
-        ConfigType.CLIENT_METRICS -> new 
ClientMetricsConfigHandler(clientMetricsManager))
+        ConfigType.CLIENT_METRICS -> new 
ClientMetricsConfigHandler(clientMetricsManager),
+        ConfigType.GROUP -> new 
ConsumerGroupConfigHandler(consumerGroupConfigManager))

Review Comment:
   `GroupConfigHandler` and `groupConfigManager`



##########
core/src/main/scala/kafka/server/ConfigHandler.scala:
##########
@@ -264,3 +265,12 @@ class ClientMetricsConfigHandler(private val 
clientMetricsManager: ClientMetrics
     clientMetricsManager.updateSubscription(subscriptionGroupId, properties)
   }
 }
+
+/**
+ * The GroupConfigHandler will process individual group config changes in ZK.
+ */
+class ConsumerGroupConfigHandler(private val consumerGroupConfigManager: 
ConsumerGroupConfigManager) extends ConfigHandler with Logging {

Review Comment:
   `GroupConfigHandler` like the comment please :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to