apovzner commented on a change in pull request #9386:
URL: https://github.com/apache/kafka/pull/9386#discussion_r502084924



##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1203,14 +1262,27 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
   private val listenerCounts = mutable.Map[ListenerName, Int]()
   private[network] val maxConnectionsPerListener = mutable.Map[ListenerName, 
ListenerConnectionQuota]()
   @volatile private var totalCount = 0
-
+  @volatile private var defaultConnectionRatePerIp = 
DynamicConfig.Ip.DefaultConnectionCreationRate
+  private val inactiveSensorExpirationTimeSeconds = 
TimeUnit.HOURS.toSeconds(1);
+  private val connectionRatePerIp = new ConcurrentHashMap[InetAddress, Int]()
+  private val lock = new ReentrantReadWriteLock()
+  private val sensorAccessor = new SensorAccess(lock, metrics)
   // sensor that tracks broker-wide connection creation rate and limit (quota)
-  private val brokerConnectionRateSensor = 
createConnectionRateQuotaSensor(config.maxConnectionCreationRate)
+  private val brokerConnectionRateSensor = 
getOrCreateConnectionRateQuotaSensor(config.maxConnectionCreationRate, 
BrokerQuotaEntity)
   private val maxThrottleTimeMs = 
TimeUnit.SECONDS.toMillis(config.quotaWindowSizeSeconds.toLong)
 
+
   def inc(listenerName: ListenerName, address: InetAddress, 
acceptorBlockedPercentMeter: com.yammer.metrics.core.Meter): Unit = {
     counts.synchronized {
-      waitForConnectionSlot(listenerName, acceptorBlockedPercentMeter)
+      val startThrottleTimeMs = time.milliseconds
+
+      val ipThrottleTimeMs = recordIpConnectionMaybeThrottle(address, 
startThrottleTimeMs)

Review comment:
       It would be more efficient if we throttled IPs **after** we know that we 
can accept a connection based on broker-wide and per-listener limits, since 
reaching broker/listener limits block the acceptor thread while throttling IPs 
needs more processing. Otherwise, if you reach both broker and per IP limit, 
the broker will continue accepting and delaying connections where it is 
justified to block an acceptor thread based on reaching a broker rate limit. 
Basically, call `waitForConnectionSlot` first. Similar how we check per IP 
limit on number of connections after we know that we can accept a new 
connection based on broker/listener limits. 

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1242,7 +1314,56 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
   private[network] def updateBrokerMaxConnectionRate(maxConnectionRate: Int): 
Unit = {
     // if there is a connection waiting on the rate throttle delay, we will 
let it wait the original delay even if
     // the rate limit increases, because it is just one connection per 
listener and the code is simpler that way
-    updateConnectionRateQuota(maxConnectionRate)
+    updateConnectionRateQuota(maxConnectionRate, BrokerQuotaEntity)
+  }
+
+  /**
+   * Update the connection rate quota for a given IP and updates quota configs 
for updated IPs.
+   * If an IP is given, metric config will be updated only for the given IP, 
otherwise
+   * all metric configs will be checked and updated if required
+   *
+   * @param ip ip to update or default if None
+   * @param maxConnectionRate new connection rate, or resets entity to default 
if None
+   */
+  def updateIpConnectionRate(ip: Option[String], maxConnectionRate: 
Option[Int]): Unit = {
+    def isIpConnectionRateMetric(metricName: MetricName) = {
+      metricName.name == "connection-accept-rate" &&
+      metricName.group == MetricsGroup &&
+      metricName.tags.containsKey("ip")
+    }
+
+    def shouldUpdateQuota(metric: KafkaMetric, quotaLimit: Int) = {
+      quotaLimit != metric.config.quota.bound
+    }
+
+    ip match {
+      case Some(addr) =>
+        val address = InetAddress.getByName(addr)
+        if (maxConnectionRate.isDefined) {
+          info(s"Updating max connection rate override for $address to 
${maxConnectionRate.get}")
+          connectionRatePerIp.put(address, maxConnectionRate.get)
+        } else {
+          info(s"Removing max connection rate override for $address")
+          connectionRatePerIp.remove(address)
+        }
+        updateConnectionRateQuota(connectionRateForIp(address), 
IpQuotaEntity(address))
+      case None =>
+        val newQuota = maxConnectionRate.getOrElse(Int.MaxValue)

Review comment:
       You can use you new constant 
`DynamicConfig.Ip.UnlimitedConnectionCreationRate` instead of `Int.MaxValue` 
here.

##########
File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
##########
@@ -409,6 +409,67 @@ class ConnectionQuotasTest {
     verifyConnectionCountOnEveryListener(connectionQuotas, 
connectionsPerListener)
   }
 
+  @Test

Review comment:
       It would be useful to add a test where we have both per-listener and per 
IP limit, and verify that it throttles based on which limit is reached first. 
Something like: 2 IPs, each per IP limit < per-listener limit, but sum of per 
IP limits > listener limit. So, if you reach limit on one IP, the broker would 
not throttle the second IP until it reaches per listener limit. Does not have 
to be exactly this, just need to verify how per IP throttling interacts with 
per listener throttling. 

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1203,14 +1262,27 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
   private val listenerCounts = mutable.Map[ListenerName, Int]()
   private[network] val maxConnectionsPerListener = mutable.Map[ListenerName, 
ListenerConnectionQuota]()
   @volatile private var totalCount = 0
-
+  @volatile private var defaultConnectionRatePerIp = 
DynamicConfig.Ip.DefaultConnectionCreationRate
+  private val inactiveSensorExpirationTimeSeconds = 
TimeUnit.HOURS.toSeconds(1);
+  private val connectionRatePerIp = new ConcurrentHashMap[InetAddress, Int]()
+  private val lock = new ReentrantReadWriteLock()
+  private val sensorAccessor = new SensorAccess(lock, metrics)
   // sensor that tracks broker-wide connection creation rate and limit (quota)
-  private val brokerConnectionRateSensor = 
createConnectionRateQuotaSensor(config.maxConnectionCreationRate)
+  private val brokerConnectionRateSensor = 
getOrCreateConnectionRateQuotaSensor(config.maxConnectionCreationRate, 
BrokerQuotaEntity)
   private val maxThrottleTimeMs = 
TimeUnit.SECONDS.toMillis(config.quotaWindowSizeSeconds.toLong)
 
+
   def inc(listenerName: ListenerName, address: InetAddress, 
acceptorBlockedPercentMeter: com.yammer.metrics.core.Meter): Unit = {
     counts.synchronized {
-      waitForConnectionSlot(listenerName, acceptorBlockedPercentMeter)
+      val startThrottleTimeMs = time.milliseconds
+
+      val ipThrottleTimeMs = recordIpConnectionMaybeThrottle(address, 
startThrottleTimeMs)

Review comment:
       I see. Yes, I think unrecording is more efficient than keeping more 
delayed connections than needed. Basically, when you unrecord from per-IP 
metric, you can also unrecord from broker and listener metric as well.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to