dajac commented on a change in pull request #8768:
URL: https://github.com/apache/kafka/pull/8768#discussion_r440142597
##########
File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
##########
@@ -302,21 +312,295 @@ class ConnectionQuotasTest {
}
// all connections should get added
overLimitFutures.foreach(_.get(5, TimeUnit.SECONDS))
- listeners.values.foreach { listener =>
- assertEquals(s"Number of connections on $listener:",
- listenerMaxConnections, connectionQuotas.get(listener.defaultIp))
+ verifyConnectionCountOnEveryListener(connectionQuotas,
listenerMaxConnections)
+ } finally {
+ executor.shutdownNow()
+ }
+ }
+
+ @Test
+ def testBrokerConnectionRateLimitWhenActualRateBelowLimit(): Unit = {
+ val brokerRateLimit = 125
+ val props = brokerPropsWithDefaultConnectionLimits
+ props.put(KafkaConfig.MaxConnectionCreationRateProp,
brokerRateLimit.toString)
+ val config = KafkaConfig.fromProps(props)
+ val connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics)
+
+ addListenersAndVerify(config, connectionQuotas)
+
+ val executor = Executors.newFixedThreadPool(listeners.size)
+ try {
+ // create connections with the total rate < broker-wide quota, and
verify there is no throttling
+ val connCreateIntervalMs = 25 // connection creation rate = 40/sec per
listener (3 * 40 = 120/sec total)
+ val connectionsPerListener = 200 // should take 5 seconds to create 200
connections with rate = 40/sec
+ val futures = listeners.values.map { listener =>
+ executor.submit((() => acceptConnections(connectionQuotas, listener,
connectionsPerListener, connCreateIntervalMs)): Runnable)
}
+ futures.foreach(_.get(10, TimeUnit.SECONDS))
+
+ // the blocked percent should still be 0, because no limits were reached
+ verifyNoBlockedPercentRecordedOnAllListeners()
+ verifyConnectionCountOnEveryListener(connectionQuotas,
connectionsPerListener)
+ } finally {
+ executor.shutdownNow()
+ }
+ }
+
+ @Test
+ def testBrokerConnectionRateLimitWhenActualRateAboveLimit(): Unit = {
+ val brokerRateLimit = 90
+ val props = brokerPropsWithDefaultConnectionLimits
+ props.put(KafkaConfig.MaxConnectionCreationRateProp,
brokerRateLimit.toString)
+ val config = KafkaConfig.fromProps(props)
+ val connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics)
+
+ addListenersAndVerify(config, connectionQuotas)
+
+ val executor = Executors.newFixedThreadPool(listeners.size)
+ try {
+ // each listener creates connections such that the total connection rate
> broker-wide quota
+ val connCreateIntervalMs = 10 // connection creation rate = 100
+ val connectionsPerListener = 400
+ val futures = listeners.values.map { listener =>
+ executor.submit((() => acceptConnections(connectionQuotas, listener,
connectionsPerListener, connCreateIntervalMs)): Runnable)
+ }
+ futures.foreach(_.get(20, TimeUnit.SECONDS))
+
+ // verify that connections on non-inter-broker listener are throttled
+ verifyOnlyNonInterBrokerListenersBlockedPercentRecorded()
+
+ // expect all connections to be created (no limit on the number of
connections)
+ verifyConnectionCountOnEveryListener(connectionQuotas,
connectionsPerListener)
+ } finally {
+ executor.shutdownNow()
+ }
+ }
+
+ @Test
+ def testListenerConnectionRateLimitWhenActualRateBelowLimit(): Unit = {
+ val brokerRateLimit = 125
+ val listenerRateLimit = 50
+ val connCreateIntervalMs = 25 // connection creation rate = 40/sec per
listener (3 * 40 = 120/sec total)
+ val props = brokerPropsWithDefaultConnectionLimits
+ props.put(KafkaConfig.MaxConnectionCreationRateProp,
brokerRateLimit.toString)
+ val config = KafkaConfig.fromProps(props)
+ val connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics)
+
+ val listenerConfig = Map(KafkaConfig.MaxConnectionCreationRateProp ->
listenerRateLimit.toString).asJava
+ addListenersAndVerify(config, listenerConfig, connectionQuotas)
+
+ val executor = Executors.newFixedThreadPool(listeners.size)
+ try {
+ // create connections with the rate < listener quota on every listener,
and verify there is no throttling
+ val connectionsPerListener = 200 // should take 5 seconds to create 200
connections with rate = 40/sec
+ val futures = listeners.values.map { listener =>
+ executor.submit((() => acceptConnections(connectionQuotas, listener,
connectionsPerListener, connCreateIntervalMs)): Runnable)
+ }
+ futures.foreach(_.get(10, TimeUnit.SECONDS))
+
+ // the blocked percent should still be 0, because no limits were reached
+ verifyNoBlockedPercentRecordedOnAllListeners()
+
+ verifyConnectionCountOnEveryListener(connectionQuotas,
connectionsPerListener)
+ } finally {
+ executor.shutdownNow()
+ }
+ }
+
+ @Test
+ def testListenerConnectionRateLimitWhenActualRateAboveLimit(): Unit = {
+ val brokerRateLimit = 125
+ val listenerRateLimit = 30
+ val connCreateIntervalMs = 25 // connection creation rate = 40/sec per
listener (3 * 40 = 120/sec total)
+ val props = brokerPropsWithDefaultConnectionLimits
+ props.put(KafkaConfig.MaxConnectionCreationRateProp,
brokerRateLimit.toString)
+ val config = KafkaConfig.fromProps(props)
+ val connectionQuotas = new ConnectionQuotas(config, Time.SYSTEM, metrics)
+
+ val listenerConfig = Map(KafkaConfig.MaxConnectionCreationRateProp ->
listenerRateLimit.toString).asJava
+ addListenersAndVerify(config, listenerConfig, connectionQuotas)
+
+ val executor = Executors.newFixedThreadPool(listeners.size)
+ try {
+ // create connections with the rate > listener quota on every listener
+ // run a bit longer (20 seconds) to also verify the throttle rate
+ val connectionsPerListener = 600 // should take 20 seconds to create 600
connections with rate = 30/sec
+ val futures = listeners.values.map { listener =>
+ executor.submit((() =>
+ // epsilon is set to account for the worst-case where the
measurement is taken just before or after the quota window
+ acceptConnectionsAndVerifyRate(connectionQuotas, listener,
connectionsPerListener, connCreateIntervalMs, listenerRateLimit, 5)): Runnable)
+ }
+ futures.foreach(_.get(30, TimeUnit.SECONDS))
+
+ // verify that every listener was throttled
+ blockedPercentMeters.foreach { case (name, meter) =>
+ assertTrue(s"Expected BlockedPercentMeter metric for $name listener to
be recorded", meter.count() > 0)
+ }
Review comment:
nit: Would it make sense to add the pendant of
`verifyNoBlockedPercentRecordedOnAllListeners` for this block? Something like
`verifyNonZeroBlockedPercentRecordedOnAllListeners`?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]