dajac commented on a change in pull request #8768:
URL: https://github.com/apache/kafka/pull/8768#discussion_r440142597



##########
File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
##########
@@ -302,21 +312,295 @@ class ConnectionQuotasTest {
       }
       // all connections should get added
       overLimitFutures.foreach(_.get(5, TimeUnit.SECONDS))
-      listeners.values.foreach { listener =>
-        assertEquals(s"Number of connections on $listener:",
-          listenerMaxConnections, connectionQuotas.get(listener.defaultIp))
+      verifyConnectionCountOnEveryListener(connectionQuotas, 
listenerMaxConnections)
+    } finally {
+      executor.shutdownNow()
+    }
+  }
+
+  @Test
+  def testBrokerConnectionRateLimitWhenActualRateBelowLimit(): Unit = {
+    val brokerRateLimit = 125
+    val props = brokerPropsWithDefaultConnectionLimits
+    props.put(KafkaConfig.MaxConnectionCreationRateProp, 
brokerRateLimit.toString)
+    val config = KafkaConfig.fromProps(props)
+    val connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics)
+
+    addListenersAndVerify(config, connectionQuotas)
+
+    val executor = Executors.newFixedThreadPool(listeners.size)
+    try {
+      // create connections with the total rate < broker-wide quota, and 
verify there is no throttling
+      val connCreateIntervalMs = 25 // connection creation rate = 40/sec per 
listener (3 * 40 = 120/sec total)
+      val connectionsPerListener = 200 // should take 5 seconds to create 200 
connections with rate = 40/sec
+      val futures = listeners.values.map { listener =>
+        executor.submit((() => acceptConnections(connectionQuotas, listener, 
connectionsPerListener, connCreateIntervalMs)): Runnable)
       }
+      futures.foreach(_.get(10, TimeUnit.SECONDS))
+
+      // the blocked percent should still be 0, because no limits were reached
+      verifyNoBlockedPercentRecordedOnAllListeners()
+      verifyConnectionCountOnEveryListener(connectionQuotas, 
connectionsPerListener)
+    } finally {
+      executor.shutdownNow()
+    }
+  }
+
+  @Test
+  def testBrokerConnectionRateLimitWhenActualRateAboveLimit(): Unit = {
+    val brokerRateLimit = 90
+    val props = brokerPropsWithDefaultConnectionLimits
+    props.put(KafkaConfig.MaxConnectionCreationRateProp, 
brokerRateLimit.toString)
+    val config = KafkaConfig.fromProps(props)
+    val connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics)
+
+    addListenersAndVerify(config, connectionQuotas)
+
+    val executor = Executors.newFixedThreadPool(listeners.size)
+    try {
+      // each listener creates connections such that the total connection rate 
> broker-wide quota
+      val connCreateIntervalMs = 10      // connection creation rate = 100
+      val connectionsPerListener = 400
+      val futures = listeners.values.map { listener =>
+        executor.submit((() => acceptConnections(connectionQuotas, listener, 
connectionsPerListener, connCreateIntervalMs)): Runnable)
+      }
+      futures.foreach(_.get(20, TimeUnit.SECONDS))
+
+      // verify that connections on non-inter-broker listener are throttled
+      verifyOnlyNonInterBrokerListenersBlockedPercentRecorded()
+
+      // expect all connections to be created (no limit on the number of 
connections)
+      verifyConnectionCountOnEveryListener(connectionQuotas, 
connectionsPerListener)
+    } finally {
+      executor.shutdownNow()
+    }
+  }
+
+  @Test
+  def testListenerConnectionRateLimitWhenActualRateBelowLimit(): Unit = {
+    val brokerRateLimit = 125
+    val listenerRateLimit = 50
+    val connCreateIntervalMs = 25 // connection creation rate = 40/sec per 
listener (3 * 40 = 120/sec total)
+    val props = brokerPropsWithDefaultConnectionLimits
+    props.put(KafkaConfig.MaxConnectionCreationRateProp, 
brokerRateLimit.toString)
+    val config = KafkaConfig.fromProps(props)
+    val connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics)
+
+    val listenerConfig = Map(KafkaConfig.MaxConnectionCreationRateProp -> 
listenerRateLimit.toString).asJava
+    addListenersAndVerify(config, listenerConfig, connectionQuotas)
+
+    val executor = Executors.newFixedThreadPool(listeners.size)
+    try {
+      // create connections with the rate < listener quota on every listener, 
and verify there is no throttling
+      val connectionsPerListener = 200 // should take 5 seconds to create 200 
connections with rate = 40/sec
+      val futures = listeners.values.map { listener =>
+        executor.submit((() => acceptConnections(connectionQuotas, listener, 
connectionsPerListener, connCreateIntervalMs)): Runnable)
+      }
+      futures.foreach(_.get(10, TimeUnit.SECONDS))
+
+      // the blocked percent should still be 0, because no limits were reached
+      verifyNoBlockedPercentRecordedOnAllListeners()
+
+      verifyConnectionCountOnEveryListener(connectionQuotas, 
connectionsPerListener)
+    } finally {
+      executor.shutdownNow()
+    }
+  }
+
+  @Test
+  def testListenerConnectionRateLimitWhenActualRateAboveLimit(): Unit = {
+    val brokerRateLimit = 125
+    val listenerRateLimit = 30
+    val connCreateIntervalMs = 25 // connection creation rate = 40/sec per 
listener (3 * 40 = 120/sec total)
+    val props = brokerPropsWithDefaultConnectionLimits
+    props.put(KafkaConfig.MaxConnectionCreationRateProp, 
brokerRateLimit.toString)
+    val config = KafkaConfig.fromProps(props)
+    val connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics)
+
+    val listenerConfig = Map(KafkaConfig.MaxConnectionCreationRateProp -> 
listenerRateLimit.toString).asJava
+    addListenersAndVerify(config, listenerConfig, connectionQuotas)
+
+    val executor = Executors.newFixedThreadPool(listeners.size)
+    try {
+      // create connections with the rate > listener quota on every listener
+      // run a bit longer (20 seconds) to also verify the throttle rate
+      val connectionsPerListener = 600 // should take 20 seconds to create 600 
connections with rate = 30/sec
+      val futures = listeners.values.map { listener =>
+        executor.submit((() =>
+          // epsilon is set to account for the worst-case where the 
measurement is taken just before or after the quota window
+          acceptConnectionsAndVerifyRate(connectionQuotas, listener, 
connectionsPerListener, connCreateIntervalMs, listenerRateLimit, 5)): Runnable)
+      }
+      futures.foreach(_.get(30, TimeUnit.SECONDS))
+
+      // verify that every listener was throttled
+      blockedPercentMeters.foreach { case (name, meter) =>
+        assertTrue(s"Expected BlockedPercentMeter metric for $name listener to 
be recorded", meter.count() > 0)
+      }

Review comment:
       nit: Would it make sense to add the pendant of 
`verifyNoBlockedPercentRecordedOnAllListeners` for this block? Something like 
`verifyNonZeroBlockedPercentRecordedOnAllListeners`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to