dielhennr commented on a change in pull request #11141:
URL: https://github.com/apache/kafka/pull/11141#discussion_r694353729



##########
File path: core/src/main/scala/kafka/server/ConfigHelper.scala
##########
@@ -22,20 +22,125 @@ import java.util.{Collections, Properties}
 import kafka.log.LogConfig
 import kafka.server.metadata.ConfigRepository
 import kafka.utils.{Log4jController, Logging}
-import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, 
ConfigResource}
-import org.apache.kafka.common.errors.{ApiException, InvalidRequestException}
+import org.apache.kafka.clients.admin.AlterConfigOp
+import org.apache.kafka.clients.admin.AlterConfigOp.OpType
+import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, 
ConfigException, ConfigResource, LogLevelConfig}
+import org.apache.kafka.common.config.ConfigDef.ConfigKey
+import org.apache.kafka.common.errors.{ApiException, 
InvalidConfigurationException, InvalidRequestException}
 import org.apache.kafka.common.internals.Topic
 import 
org.apache.kafka.common.message.DescribeConfigsRequestData.DescribeConfigsResource
 import org.apache.kafka.common.message.DescribeConfigsResponseData
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{ApiError, DescribeConfigsResponse}
 import org.apache.kafka.common.requests.DescribeConfigsResponse.ConfigSource
 
-import scala.collection.{Map, mutable}
+import scala.collection.{Map, mutable, Seq}
 import scala.jdk.CollectionConverters._
 
 class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, 
configRepository: ConfigRepository) extends Logging {
 
+  def getAndValidateBrokerId(resource: ConfigResource) = {
+    if (resource.name == null || resource.name.isEmpty)
+      None
+    else {
+      val id = resourceNameToBrokerId(resource.name)
+      if (id != this.config.brokerId)
+        throw new InvalidRequestException(s"Unexpected broker id, expected 
${this.config.brokerId}, but received ${resource.name}")
+      Some(id)
+    }
+  }
+
+  def validateBrokerConfigs(resource: ConfigResource, 
+                            validateOnly: Boolean, 
+                            configProps: Properties): (ConfigResource, 
ApiError) = {
+    val brokerId = getAndValidateBrokerId(resource)
+    val perBrokerConfig = brokerId.nonEmpty
+    // Check that there are no static configs being altered
+    DynamicConfig.Broker.validate(configProps)
+    // Validate and process the reconfiguration
+    this.config.dynamicConfig.validate(configProps, perBrokerConfig)
+    if (!validateOnly) {
+      if (perBrokerConfig)
+        
this.config.dynamicConfig.reloadUpdatedFilesWithoutConfigChange(configProps)
+    }
+
+    resource -> ApiError.NONE
+  }
+
+  def prepareIncrementalConfigs(alterConfigOps: Seq[AlterConfigOp], 
configProps: Properties, configKeys: Map[String, ConfigKey]): Unit = {

Review comment:
       `prepareIncrementalConfigs`, `validateLogLevelConfigs`, 
`toLoggableProps`, and `getAndValidateBrokerId` (formerly `getBrokerId`) were 
all helper methods in `ZkAdminManager`. I've moved them here so that they may 
be used to validate broker configs for KRaft brokers.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to