apovzner commented on a change in pull request #9386:
URL: https://github.com/apache/kafka/pull/9386#discussion_r502084924



##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1203,14 +1262,27 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
   private val listenerCounts = mutable.Map[ListenerName, Int]()
   private[network] val maxConnectionsPerListener = mutable.Map[ListenerName, 
ListenerConnectionQuota]()
   @volatile private var totalCount = 0
-
+  @volatile private var defaultConnectionRatePerIp = 
DynamicConfig.Ip.DefaultConnectionCreationRate
+  private val inactiveSensorExpirationTimeSeconds = 
TimeUnit.HOURS.toSeconds(1);
+  private val connectionRatePerIp = new ConcurrentHashMap[InetAddress, Int]()
+  private val lock = new ReentrantReadWriteLock()
+  private val sensorAccessor = new SensorAccess(lock, metrics)
   // sensor that tracks broker-wide connection creation rate and limit (quota)
-  private val brokerConnectionRateSensor = 
createConnectionRateQuotaSensor(config.maxConnectionCreationRate)
+  private val brokerConnectionRateSensor = 
getOrCreateConnectionRateQuotaSensor(config.maxConnectionCreationRate, 
BrokerQuotaEntity)
   private val maxThrottleTimeMs = 
TimeUnit.SECONDS.toMillis(config.quotaWindowSizeSeconds.toLong)
 
+
   def inc(listenerName: ListenerName, address: InetAddress, 
acceptorBlockedPercentMeter: com.yammer.metrics.core.Meter): Unit = {
     counts.synchronized {
-      waitForConnectionSlot(listenerName, acceptorBlockedPercentMeter)
+      val startThrottleTimeMs = time.milliseconds
+
+      val ipThrottleTimeMs = recordIpConnectionMaybeThrottle(address, 
startThrottleTimeMs)

Review comment:
       It would be more efficient if we throttled IPs **after** we know that we 
can accept a connection based on broker-wide and per-listener limits, since 
reaching broker/listener limits block the acceptor thread while throttling IPs 
needs more processing. Otherwise, if you reach both broker and per IP limit, 
the broker will continue accepting and delaying connections where it is 
justified to block an acceptor thread based on reaching a broker rate limit. 
Basically, call `waitForConnectionSlot` first. Similar how we check per IP 
limit on number of connections after we know that we can accept a new 
connection based on broker/listener limits. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to