splett2 commented on a change in pull request #9386:
URL: https://github.com/apache/kafka/pull/9386#discussion_r521856417



##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1324,7 +1401,59 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
   private[network] def updateBrokerMaxConnectionRate(maxConnectionRate: Int): 
Unit = {
     // if there is a connection waiting on the rate throttle delay, we will 
let it wait the original delay even if
     // the rate limit increases, because it is just one connection per 
listener and the code is simpler that way
-    updateConnectionRateQuota(maxConnectionRate)
+    updateConnectionRateQuota(maxConnectionRate, BrokerQuotaEntity)
+  }
+
+  /**
+   * Update the connection rate quota for a given IP and updates quota configs 
for updated IPs.
+   * If an IP is given, metric config will be updated only for the given IP, 
otherwise
+   * all metric configs will be checked and updated if required.
+   *
+   * @param ip ip to update or default if None
+   * @param maxConnectionRate new connection rate, or resets entity to default 
if None
+   */
+  def updateIpConnectionRateQuota(ip: Option[String], maxConnectionRate: 
Option[Int]): Unit = {
+    def isIpConnectionRateMetric(metricName: MetricName) = {
+      metricName.name == ConnectionRateMetricName &&
+      metricName.group == MetricsGroup &&
+      metricName.tags.containsKey(IpMetricTag)
+    }
+
+    def shouldUpdateQuota(metric: KafkaMetric, quotaLimit: Int) = {
+      quotaLimit != metric.config.quota.bound
+    }
+    counts.synchronized {

Review comment:
       After thinking on this a bit more, I think that only locking on updates 
to `defaultConnectionRatePerIp` should be sufficient for correctness.
   
   ZK dynamic config changes are processed within one thread, so we will only 
have one thread executing in `updateIpConnectionRateQuota`. If we want to be 
really careful about this, we can have `updateIpConnectionRateQuota` 
synchronized on `ConnectionQuotas`.
   
   The case we want to avoid by synchronizing on `counts` is that thread 1 
reads `defaultConnectionRatePerIp` as connection rate limit `A` while calling 
`inc()`, then thread 2 updates connection rate and quota metric config to `B`, 
then thread 1 resumes execution and creates a sensor/metric with quota limit 
`A` => inconsistency.
   
   If we synchronize on `counts` for only updates to 
`connectionRateForIp/defaultConnectionRate`, we know that thread 1 that has 
read a connection rate quota as `A` will finish creating quota metrics with 
limit `A` before thread 2 acquires the `counts` lock and updates 
`connectionRateForIp/defaultConnectionRate` to `B`. 
   
   After thread 2 releases the `counts` lock, subsequent threads calling 
`inc()` will read the quota as `B` and create a metric as `B`. Thread 2 can 
then be able to update any quota metrics from `A` to `B`, without holding the 
`counts` lock knowing that there are no operations that could have read the 
default connection rate limit as `A` without already having finished created 
the sensor with quota as `A`, and that all subsequent quotas will be read and 
created as `B`.
   
   The only issue remaining is that we can get concurrent reads of 
`connectionRatePerIp` while updating quota metrics, but we can just replace 
`mutable.Map` with `ConcurrentHashMap` which is preferable to coarsely locking 
on `counts`.
   
   Let me know if I'm missing something here with respect to thread safety.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to