dielhennr commented on a change in pull request #11141: URL: https://github.com/apache/kafka/pull/11141#discussion_r679380487
########## File path: core/src/main/scala/kafka/server/ConfigHelper.scala ########## @@ -22,20 +22,142 @@ import java.util.{Collections, Properties} import kafka.log.LogConfig import kafka.server.metadata.ConfigRepository import kafka.utils.{Log4jController, Logging} -import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigResource} -import org.apache.kafka.common.errors.{ApiException, InvalidRequestException} +import org.apache.kafka.clients.admin.AlterConfigOp +import org.apache.kafka.clients.admin.AlterConfigOp.OpType +import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, ConfigResource, LogLevelConfig} +import org.apache.kafka.common.config.ConfigDef.ConfigKey +import org.apache.kafka.common.errors.{ApiException, InvalidConfigurationException, InvalidRequestException} import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.message.DescribeConfigsRequestData.DescribeConfigsResource import org.apache.kafka.common.message.DescribeConfigsResponseData +import org.apache.kafka.server.policy.AlterConfigPolicy import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.{ApiError, DescribeConfigsResponse} import org.apache.kafka.common.requests.DescribeConfigsResponse.ConfigSource -import scala.collection.{Map, mutable} +import scala.collection.{Map, mutable, Seq} import scala.jdk.CollectionConverters._ class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, configRepository: ConfigRepository) extends Logging { + val alterConfigPolicy = + Option(config.getConfiguredInstance(KafkaConfig.AlterConfigPolicyClassNameProp, classOf[AlterConfigPolicy])) + + def validateConfigPolicy(resource: ConfigResource, configEntriesMap: Map[String, String]): Unit = { + this.alterConfigPolicy match { + case Some(policy) => + policy.validate(new AlterConfigPolicy.RequestMetadata( + new ConfigResource(resource.`type`(), resource.name), configEntriesMap.asJava)) + case None => + } + } + + def getAndValidateBrokerId(resource: ConfigResource) = { + if (resource.name == null || resource.name.isEmpty) + None + else { + val id = resourceNameToBrokerId(resource.name) + if (id != this.config.brokerId) + throw new InvalidRequestException(s"Unexpected broker id, expected ${this.config.brokerId}, but received ${resource.name}") + Some(id) + } + } + + def validateBrokerConfigs(resource: ConfigResource, Review comment: Separating validation step from the persisting step. Previously `alterBrokerConfig` in `ZkAdminManager` did both steps. This will be helpful when enabling request forwarding by default for ZK brokers as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org