apovzner commented on a change in pull request #9386:
URL: https://github.com/apache/kafka/pull/9386#discussion_r512928815



##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -1207,14 +1286,26 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
   private val listenerCounts = mutable.Map[ListenerName, Int]()
   private[network] val maxConnectionsPerListener = mutable.Map[ListenerName, 
ListenerConnectionQuota]()
   @volatile private var totalCount = 0
-
+  @volatile private var defaultConnectionRatePerIp = 
DynamicConfig.Ip.DefaultConnectionCreationRate
+  private val connectionRatePerIp = new ConcurrentHashMap[InetAddress, Int]()
+  private val lock = new ReentrantReadWriteLock()
+  private val sensorAccessor = new SensorAccess(lock, metrics)
   // sensor that tracks broker-wide connection creation rate and limit (quota)
-  private val brokerConnectionRateSensor = 
createConnectionRateQuotaSensor(config.maxConnectionCreationRate)
+  private val brokerConnectionRateSensor = 
getOrCreateConnectionRateQuotaSensor(config.maxConnectionCreationRate, 
BrokerQuotaEntity)
   private val maxThrottleTimeMs = 
TimeUnit.SECONDS.toMillis(config.quotaWindowSizeSeconds.toLong)
 
   def inc(listenerName: ListenerName, address: InetAddress, 
acceptorBlockedPercentMeter: com.yammer.metrics.core.Meter): Unit = {
     counts.synchronized {
-      waitForConnectionSlot(listenerName, acceptorBlockedPercentMeter)
+      val startThrottleTimeMs = time.milliseconds
+
+      waitForConnectionSlot(listenerName, startThrottleTimeMs, 
acceptorBlockedPercentMeter)
+
+      val ipThrottleTimeMs = recordIpConnectionMaybeThrottle(address, 
startThrottleTimeMs)

Review comment:
       There are some corner cases here where `startThrottleTimeMs` could be in 
the past if `waitForConnectionSlot()` waited for an active connection slot to 
become available (if we exceeded the limit for the number of active 
connections), or waited for broker-wide or listener-wide rate to get back to 
quota. In some cases, ipThrottleTimeMs would be zero if we checked against the 
current time here.
   
   Since getting System.currentTimeMillis() is not that expensive (as it used 
to be), I think it would be better to revert to `waitForConnectionSlot` getting 
its own time (as before this PR), and then `recordIpConnectionMaybeThrottle` 
getting current time and also calling the code block below and throwing 
ConnectionThrottledException. And then adding a comment here that 
`recordIpConnectionMaybeThrottle` would throw an exception if per-IP quota is 
exceeded. What do you think?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to