apovzner commented on a change in pull request #9386: URL: https://github.com/apache/kafka/pull/9386#discussion_r512951535
########## File path: core/src/main/scala/kafka/network/SocketServer.scala ########## @@ -1246,7 +1337,57 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend private[network] def updateBrokerMaxConnectionRate(maxConnectionRate: Int): Unit = { // if there is a connection waiting on the rate throttle delay, we will let it wait the original delay even if // the rate limit increases, because it is just one connection per listener and the code is simpler that way - updateConnectionRateQuota(maxConnectionRate) + updateConnectionRateQuota(maxConnectionRate, BrokerQuotaEntity) + } + + /** + * Update the connection rate quota for a given IP and updates quota configs for updated IPs. + * If an IP is given, metric config will be updated only for the given IP, otherwise + * all metric configs will be checked and updated if required + * + * @param ip ip to update or default if None + * @param maxConnectionRate new connection rate, or resets entity to default if None + */ + def updateIpConnectionRate(ip: Option[String], maxConnectionRate: Option[Int]): Unit = { + def isIpConnectionRateMetric(metricName: MetricName) = { + metricName.name == ConnectionRateMetricName && + metricName.group == MetricsGroup && + metricName.tags.containsKey(IpMetricTag) + } + + def shouldUpdateQuota(metric: KafkaMetric, quotaLimit: Int) = { + quotaLimit != metric.config.quota.bound + } + + ip match { + case Some(addr) => + val address = InetAddress.getByName(addr) + maxConnectionRate match { + case Some(rate) => + info(s"Updating max connection rate override for $address to $rate") + connectionRatePerIp.put(address, rate) + case None => + info(s"Removing max connection rate override for $address") + connectionRatePerIp.remove(address) + } + updateConnectionRateQuota(connectionRateForIp(address), IpQuotaEntity(address)) + case None => + val newQuota = maxConnectionRate.getOrElse(DynamicConfig.Ip.DefaultConnectionCreationRate) + info(s"Updating default max IP connection rate to $newQuota") + defaultConnectionRatePerIp = newQuota + val allMetrics = metrics.metrics + allMetrics.forEach { (metricName, metric) => + if (isIpConnectionRateMetric(metricName) && shouldUpdateQuota(metric, newQuota)) { + info(s"Updating existing connection rate sensor for ${metricName.tags} to $newQuota") + metric.config(rateQuotaMetricConfig(newQuota)) + } Review comment: @splett2 Maybe I was not looking at the right place in this PR, but does this PR handles the case if someone sets non-unlimited per IP default quota? Basically, /config/ips/<default> znode. Because if default is set and it is not unlimited, removing quota for an IP should fall back to configured IP default, and if IP default is not set, then fall back to `DynamicConfig.Ip.DefaultConnectionCreationRate`. Which I think means that we need to have "<default>" ip in the `connectionRatePerIp` if default is not unlimited, and use it when creating per-IP sensor with the right quota. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org