apovzner commented on a change in pull request #9386: URL: https://github.com/apache/kafka/pull/9386#discussion_r502084924
########## File path: core/src/main/scala/kafka/network/SocketServer.scala ########## @@ -1203,14 +1262,27 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend private val listenerCounts = mutable.Map[ListenerName, Int]() private[network] val maxConnectionsPerListener = mutable.Map[ListenerName, ListenerConnectionQuota]() @volatile private var totalCount = 0 - + @volatile private var defaultConnectionRatePerIp = DynamicConfig.Ip.DefaultConnectionCreationRate + private val inactiveSensorExpirationTimeSeconds = TimeUnit.HOURS.toSeconds(1); + private val connectionRatePerIp = new ConcurrentHashMap[InetAddress, Int]() + private val lock = new ReentrantReadWriteLock() + private val sensorAccessor = new SensorAccess(lock, metrics) // sensor that tracks broker-wide connection creation rate and limit (quota) - private val brokerConnectionRateSensor = createConnectionRateQuotaSensor(config.maxConnectionCreationRate) + private val brokerConnectionRateSensor = getOrCreateConnectionRateQuotaSensor(config.maxConnectionCreationRate, BrokerQuotaEntity) private val maxThrottleTimeMs = TimeUnit.SECONDS.toMillis(config.quotaWindowSizeSeconds.toLong) + def inc(listenerName: ListenerName, address: InetAddress, acceptorBlockedPercentMeter: com.yammer.metrics.core.Meter): Unit = { counts.synchronized { - waitForConnectionSlot(listenerName, acceptorBlockedPercentMeter) + val startThrottleTimeMs = time.milliseconds + + val ipThrottleTimeMs = recordIpConnectionMaybeThrottle(address, startThrottleTimeMs) Review comment: It would be more efficient if we throttled IPs **after** we know that we can accept a connection based on broker-wide and per-listener limits, since reaching broker/listener limits block the acceptor thread while throttling IPs needs more processing. Otherwise, if you reach both broker and per IP limit, the broker will continue accepting and delaying connections where it is justified to block an acceptor thread based on reaching a broker rate limit. Basically, call `waitForConnectionSlot` first. Similar how we check per IP limit on number of connections after we know that we can accept a new connection based on broker/listener limits. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org