This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9a2f202a1e9 MINOR: Move ClientQuotasRequestTest to server module
(#20053)
9a2f202a1e9 is described below
commit 9a2f202a1e99a807b6ba91bb3c4b9148089f0b02
Author: Lan Ding <[email protected]>
AuthorDate: Sun Jul 20 23:14:55 2025 +0800
MINOR: Move ClientQuotasRequestTest to server module (#20053)
1. Move ClientQuotasRequestTest to server module.
2. Rewrite ClientQuotasRequestTest in Java.
Reviewers: Jhen-Yung Hsu <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../kafka/server/ClientQuotasRequestTest.scala | 592 -----------------
.../server/quota/ClientQuotasRequestTest.java | 726 +++++++++++++++++++++
2 files changed, 726 insertions(+), 592 deletions(-)
diff --git
a/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala
b/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala
deleted file mode 100644
index dafd11d033c..00000000000
--- a/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala
+++ /dev/null
@@ -1,592 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import java.net.InetAddress
-import java.util
-import java.util.concurrent.{ExecutionException, TimeUnit}
-import org.apache.kafka.common.test.api.ClusterTest
-import kafka.utils.TestUtils
-import org.apache.kafka.clients.admin.{ScramCredentialInfo, ScramMechanism,
UserScramCredentialUpsertion}
-import org.apache.kafka.common.errors.{InvalidRequestException,
UnsupportedVersionException}
-import org.apache.kafka.common.internals.KafkaFutureImpl
-import org.apache.kafka.common.quota.{ClientQuotaAlteration,
ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent}
-import org.apache.kafka.common.requests.{AlterClientQuotasRequest,
AlterClientQuotasResponse, DescribeClientQuotasRequest,
DescribeClientQuotasResponse}
-import org.apache.kafka.common.test.ClusterInstance
-import org.apache.kafka.server.IntegrationTestUtils
-import org.apache.kafka.server.config.QuotaConfig
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Disabled
-
-import scala.jdk.CollectionConverters._
-
-class ClientQuotasRequestTest(cluster: ClusterInstance) {
- @ClusterTest
- def testAlterClientQuotasRequest(): Unit = {
-
- val entity = new ClientQuotaEntity(Map(ClientQuotaEntity.USER -> "user",
ClientQuotaEntity.CLIENT_ID -> "client-id").asJava)
-
- // Expect an empty configuration.
- verifyDescribeEntityQuotas(entity, Map.empty)
-
- // Add two configuration entries.
- alterEntityQuotas(entity, Map(
- QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> Some(10000.0),
- QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> Some(20000.0)
- ), validateOnly = false)
-
- verifyDescribeEntityQuotas(entity, Map(
- QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 10000.0,
- QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> 20000.0
- ))
-
- // Update an existing entry.
- alterEntityQuotas(entity, Map(
- QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> Some(15000.0)
- ), validateOnly = false)
-
- verifyDescribeEntityQuotas(entity, Map(
- QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 15000.0,
- QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> 20000.0
- ))
-
- // Remove an existing configuration entry.
- alterEntityQuotas(entity, Map(
- QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> None
- ), validateOnly = false)
-
- verifyDescribeEntityQuotas(entity, Map(
- QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> 20000.0
- ))
-
- // Remove a non-existent configuration entry. This should make no changes.
- alterEntityQuotas(entity, Map(
- QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG -> None
- ), validateOnly = false)
-
- verifyDescribeEntityQuotas(entity, Map(
- QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> 20000.0
- ))
-
- // Add back a deleted configuration entry.
- alterEntityQuotas(entity, Map(
- QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> Some(5000.0)
- ), validateOnly = false)
-
- verifyDescribeEntityQuotas(entity, Map(
- QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 5000.0,
- QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> 20000.0
- ))
-
- // Perform a mixed update.
- alterEntityQuotas(entity, Map(
- QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> Some(20000.0),
- QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> None,
- QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG -> Some(12.3)
- ), validateOnly = false)
-
- verifyDescribeEntityQuotas(entity, Map(
- QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 20000.0,
- QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG -> 12.3
- ))
- }
-
- @ClusterTest
- def testAlterClientQuotasRequestValidateOnly(): Unit = {
- val entity = new ClientQuotaEntity(Map(ClientQuotaEntity.USER ->
"user").asJava)
-
- // Set up a configuration.
- alterEntityQuotas(entity, Map(
- QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> Some(20000.0),
- QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG -> Some(23.45)
- ), validateOnly = false)
-
- verifyDescribeEntityQuotas(entity, Map(
- QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 20000.0,
- QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG -> 23.45
- ))
-
- // Validate-only addition.
- alterEntityQuotas(entity, Map(
- QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> Some(50000.0)
- ), validateOnly = true)
-
- verifyDescribeEntityQuotas(entity, Map(
- QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 20000.0,
- QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG -> 23.45
- ))
-
- // Validate-only modification.
- alterEntityQuotas(entity, Map(
- QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> Some(10000.0)
- ), validateOnly = true)
-
- verifyDescribeEntityQuotas(entity, Map(
- QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 20000.0,
- QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG -> 23.45
- ))
-
- // Validate-only removal.
- alterEntityQuotas(entity, Map(
- QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG -> None
- ), validateOnly = true)
-
- verifyDescribeEntityQuotas(entity, Map(
- QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 20000.0,
- QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG -> 23.45
- ))
-
- // Validate-only mixed update.
- alterEntityQuotas(entity, Map(
- QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> Some(10000.0),
- QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> Some(50000.0),
- QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG -> None
- ), validateOnly = true)
-
- verifyDescribeEntityQuotas(entity, Map(
- QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 20000.0,
- QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG -> 23.45
- ))
- }
-
- @Disabled("TODO: KAFKA-17630 - Convert
ClientQuotasRequestTest#testClientQuotasForScramUsers to kraft")
- @ClusterTest
- def testClientQuotasForScramUsers(): Unit = {
- val userName = "user"
-
- val admin = cluster.admin()
- try {
- val results = admin.alterUserScramCredentials(util.Arrays.asList(
- new UserScramCredentialUpsertion(userName, new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "password")))
- results.all.get
-
- val entity = new ClientQuotaEntity(Map(ClientQuotaEntity.USER ->
userName).asJava)
-
- verifyDescribeEntityQuotas(entity, Map.empty)
-
- alterEntityQuotas(entity, Map(
- QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> Some(10000.0),
- QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> Some(20000.0)
- ), validateOnly = false)
-
- verifyDescribeEntityQuotas(entity, Map(
- QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 10000.0,
- QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> 20000.0
- ))
- } finally {
- admin.close()
- }
- }
-
- @ClusterTest
- def testAlterIpQuotasRequest(): Unit = {
- val knownHost = "1.2.3.4"
- val unknownHost = "2.3.4.5"
- val entity = toIpEntity(Some(knownHost))
- val defaultEntity = toIpEntity(Some(null))
- val entityFilter =
ClientQuotaFilterComponent.ofEntity(ClientQuotaEntity.IP, knownHost)
- val defaultEntityFilter =
ClientQuotaFilterComponent.ofDefaultEntity(ClientQuotaEntity.IP)
- val allIpEntityFilter =
ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.IP)
-
- def verifyIpQuotas(entityFilter: ClientQuotaFilterComponent,
expectedMatches: Map[ClientQuotaEntity, Double]): Unit = {
- TestUtils.tryUntilNoAssertionError() {
- val result =
describeClientQuotas(ClientQuotaFilter.containsOnly(List(entityFilter).asJava))
- assertEquals(expectedMatches.keySet, result.asScala.keySet)
- result.asScala.foreach { case (entity, props) =>
- assertEquals(Set(QuotaConfig.IP_CONNECTION_RATE_OVERRIDE_CONFIG),
props.asScala.keySet)
- assertEquals(expectedMatches(entity),
props.get(QuotaConfig.IP_CONNECTION_RATE_OVERRIDE_CONFIG))
- val entityName = entity.entries.get(ClientQuotaEntity.IP)
- // ClientQuotaEntity with null name maps to default entity
- val entityIp = if (entityName == null)
- InetAddress.getByName(unknownHost)
- else
- InetAddress.getByName(entityName)
- var currentServerQuota = 0
- currentServerQuota =
cluster.brokers().values().asScala.head.socketServer.connectionQuotas.connectionRateForIp(entityIp)
- assertTrue(Math.abs(expectedMatches(entity) - currentServerQuota) <
0.01,
- s"Connection quota of $entity is not ${expectedMatches(entity)}
but $currentServerQuota")
- }
- }
- }
-
- // Expect an empty configuration.
- verifyIpQuotas(allIpEntityFilter, Map.empty)
-
- // Add a configuration entry.
- alterEntityQuotas(entity,
Map(QuotaConfig.IP_CONNECTION_RATE_OVERRIDE_CONFIG -> Some(100.0)),
validateOnly = false)
- verifyIpQuotas(entityFilter, Map(entity -> 100.0))
-
- // update existing entry
- alterEntityQuotas(entity,
Map(QuotaConfig.IP_CONNECTION_RATE_OVERRIDE_CONFIG -> Some(150.0)),
validateOnly = false)
- verifyIpQuotas(entityFilter, Map(entity -> 150.0))
-
- // update default value
- alterEntityQuotas(defaultEntity,
Map(QuotaConfig.IP_CONNECTION_RATE_OVERRIDE_CONFIG -> Some(200.0)),
validateOnly = false)
- verifyIpQuotas(defaultEntityFilter, Map(defaultEntity -> 200.0))
-
- // describe all IP quotas
- verifyIpQuotas(allIpEntityFilter, Map(entity -> 150.0, defaultEntity ->
200.0))
-
- // remove entry
- alterEntityQuotas(entity,
Map(QuotaConfig.IP_CONNECTION_RATE_OVERRIDE_CONFIG -> None), validateOnly =
false)
- verifyIpQuotas(entityFilter, Map.empty)
-
- // remove default value
- alterEntityQuotas(defaultEntity,
Map(QuotaConfig.IP_CONNECTION_RATE_OVERRIDE_CONFIG -> None), validateOnly =
false)
- verifyIpQuotas(allIpEntityFilter, Map.empty)
- }
-
- @ClusterTest
- def testAlterClientQuotasInvalidRequests(): Unit = {
- var entity = new ClientQuotaEntity(Map(ClientQuotaEntity.USER ->
"").asJava)
- assertThrows(classOf[InvalidRequestException], () =>
alterEntityQuotas(entity, Map(QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG ->
Some(12.34)), validateOnly = true))
-
- entity = new ClientQuotaEntity(Map(ClientQuotaEntity.CLIENT_ID ->
"").asJava)
- assertThrows(classOf[InvalidRequestException], () =>
alterEntityQuotas(entity, Map(QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG ->
Some(12.34)), validateOnly = true))
-
- entity = new ClientQuotaEntity(Map("" -> "name").asJava)
- assertThrows(classOf[InvalidRequestException], () =>
alterEntityQuotas(entity, Map(QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG ->
Some(12.34)), validateOnly = true))
-
- entity = new ClientQuotaEntity(Map.empty.asJava)
- assertThrows(classOf[InvalidRequestException], () =>
alterEntityQuotas(entity, Map(QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG ->
Some(10000.5)), validateOnly = true))
-
- entity = new ClientQuotaEntity(Map(ClientQuotaEntity.USER ->
"user").asJava)
- assertThrows(classOf[InvalidRequestException], () =>
alterEntityQuotas(entity, Map("bad" -> Some(1.0)), validateOnly = true))
-
- entity = new ClientQuotaEntity(Map(ClientQuotaEntity.USER ->
"user").asJava)
- assertThrows(classOf[InvalidRequestException], () =>
alterEntityQuotas(entity, Map(QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG ->
Some(10000.5)), validateOnly = true))
- }
-
- private def expectInvalidRequestWithMessage(runnable: => Unit,
expectedMessage: String): Unit = {
- val exception = assertThrows(classOf[InvalidRequestException], () =>
runnable)
- assertTrue(exception.getMessage.contains(expectedMessage), s"Expected
message $exception to contain $expectedMessage")
- }
-
- @ClusterTest
- def testAlterClientQuotasInvalidEntityCombination(): Unit = {
- val userAndIpEntity = new ClientQuotaEntity(Map(ClientQuotaEntity.USER ->
"user", ClientQuotaEntity.IP -> "1.2.3.4").asJava)
- val clientAndIpEntity = new
ClientQuotaEntity(Map(ClientQuotaEntity.CLIENT_ID -> "client",
ClientQuotaEntity.IP -> "1.2.3.4").asJava)
- val expectedExceptionMessage = "Invalid quota entity combination"
- expectInvalidRequestWithMessage(alterEntityQuotas(userAndIpEntity,
Map(QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG -> Some(12.34)),
- validateOnly = true), expectedExceptionMessage)
- expectInvalidRequestWithMessage(alterEntityQuotas(clientAndIpEntity,
Map(QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG -> Some(12.34)),
- validateOnly = true), expectedExceptionMessage)
- }
-
- @ClusterTest
- def testAlterClientQuotasBadIp(): Unit = {
- val invalidHostPatternEntity = new
ClientQuotaEntity(Map(ClientQuotaEntity.IP -> "not a valid host because it has
spaces").asJava)
- val unresolvableHostEntity = new
ClientQuotaEntity(Map(ClientQuotaEntity.IP -> "RFC2606.invalid").asJava)
- val expectedExceptionMessage = "not a valid IP"
-
expectInvalidRequestWithMessage(alterEntityQuotas(invalidHostPatternEntity,
Map(QuotaConfig.IP_CONNECTION_RATE_OVERRIDE_CONFIG -> Some(50.0)),
- validateOnly = true), expectedExceptionMessage)
- expectInvalidRequestWithMessage(alterEntityQuotas(unresolvableHostEntity,
Map(QuotaConfig.IP_CONNECTION_RATE_OVERRIDE_CONFIG -> Some(50.0)),
- validateOnly = true), expectedExceptionMessage)
- }
-
- @ClusterTest
- def testDescribeClientQuotasInvalidFilterCombination(): Unit = {
- val ipFilterComponent =
ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.IP)
- val userFilterComponent =
ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.USER)
- val clientIdFilterComponent =
ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.CLIENT_ID)
- val expectedExceptionMessage = "Invalid entity filter component
combination"
-
expectInvalidRequestWithMessage(describeClientQuotas(ClientQuotaFilter.contains(List(ipFilterComponent,
userFilterComponent).asJava)),
- expectedExceptionMessage)
-
expectInvalidRequestWithMessage(describeClientQuotas(ClientQuotaFilter.contains(List(ipFilterComponent,
clientIdFilterComponent).asJava)),
- expectedExceptionMessage)
- }
-
- // Entities to be matched against.
- private val matchUserClientEntities = List(
- (Some("user-1"), Some("client-id-1"), 50.50),
- (Some("user-2"), Some("client-id-1"), 51.51),
- (Some("user-3"), Some("client-id-2"), 52.52),
- (Some(null), Some("client-id-1"), 53.53),
- (Some("user-1"), Some(null), 54.54),
- (Some("user-3"), Some(null), 55.55),
- (Some("user-1"), None, 56.56),
- (Some("user-2"), None, 57.57),
- (Some("user-3"), None, 58.58),
- (Some(null), None, 59.59),
- (None, Some("client-id-2"), 60.60)
- ).map { case (u, c, v) => (toClientEntity(u, c), v) }
-
- private val matchIpEntities = List(
- (Some("1.2.3.4"), 10.0),
- (Some("2.3.4.5"), 20.0)
- ).map { case (ip, quota) => (toIpEntity(ip), quota)}
-
- private def setupDescribeClientQuotasMatchTest(): Unit = {
- val userClientQuotas = matchUserClientEntities.map { case (e, v) =>
- e -> Map((QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, Some(v)))
- }.toMap
- val ipQuotas = matchIpEntities.map { case (e, v) =>
- e -> Map((QuotaConfig.IP_CONNECTION_RATE_OVERRIDE_CONFIG, Some(v)))
- }.toMap
- val result = alterClientQuotas(userClientQuotas ++ ipQuotas, validateOnly
= false)
- (matchUserClientEntities ++ matchIpEntities).foreach(e =>
result(e._1).get(10, TimeUnit.SECONDS))
- }
-
- @ClusterTest
- def testDescribeClientQuotasMatchExact(): Unit = {
- setupDescribeClientQuotasMatchTest()
-
- def matchEntity(entity: ClientQuotaEntity) = {
- val components = entity.entries.asScala.map { case (entityType,
entityName) =>
- entityName match {
- case null => ClientQuotaFilterComponent.ofDefaultEntity(entityType)
- case name => ClientQuotaFilterComponent.ofEntity(entityType, name)
- }
- }
-
describeClientQuotas(ClientQuotaFilter.containsOnly(components.toList.asJava))
- }
-
- // Test exact matches.
- matchUserClientEntities.foreach { case (e, v) =>
- TestUtils.tryUntilNoAssertionError() {
- val result = matchEntity(e)
- assertEquals(1, result.size)
- assertTrue(result.get(e) != null)
- val value =
result.get(e).get(QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG)
- assertNotNull(value)
- assertEquals(value, v, 1e-6)
- }
- }
-
- // Entities not contained in `matchEntityList`.
- val notMatchEntities = List(
- (Some("user-1"), Some("client-id-2")),
- (Some("user-3"), Some("client-id-1")),
- (Some("user-2"), Some(null)),
- (Some("user-4"), None),
- (Some(null), Some("client-id-2")),
- (None, Some("client-id-1")),
- (None, Some("client-id-3")),
- ).map { case (u, c) =>
- new ClientQuotaEntity((u.map((ClientQuotaEntity.USER, _)) ++
- c.map((ClientQuotaEntity.CLIENT_ID, _))).toMap.asJava)
- }
-
- // Verify exact matches of the non-matches returns empty.
- notMatchEntities.foreach { e =>
- val result = matchEntity(e)
- assertEquals(0, result.size)
- }
- }
-
- @ClusterTest
- def testDescribeClientQuotasMatchPartial(): Unit = {
- setupDescribeClientQuotasMatchTest()
-
- def testMatchEntities(filter: ClientQuotaFilter, expectedMatchSize: Int,
partition: ClientQuotaEntity => Boolean): Unit = {
- TestUtils.tryUntilNoAssertionError() {
- val result = describeClientQuotas(filter)
- val (expectedMatches, _) = (matchUserClientEntities ++
matchIpEntities).partition(e => partition(e._1))
- assertEquals(expectedMatchSize, expectedMatches.size) // for test
verification
- assertEquals(expectedMatchSize, result.size, s"Failed to match
$expectedMatchSize entities for $filter")
- val expectedMatchesMap = expectedMatches.toMap
- matchUserClientEntities.foreach { case (entity, expectedValue) =>
- if (expectedMatchesMap.contains(entity)) {
- val config = result.get(entity)
- assertNotNull(config)
- val value =
config.get(QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG)
- assertNotNull(value)
- assertEquals(expectedValue, value, 1e-6)
- } else {
- assertNull(result.get(entity))
- }
- }
- matchIpEntities.foreach { case (entity, expectedValue) =>
- if (expectedMatchesMap.contains(entity)) {
- val config = result.get(entity)
- assertNotNull(config)
- val value =
config.get(QuotaConfig.IP_CONNECTION_RATE_OVERRIDE_CONFIG)
- assertNotNull(value)
- assertEquals(expectedValue, value, 1e-6)
- } else {
- assertNull(result.get(entity))
- }
- }
- }
- }
-
- // Match open-ended existing user.
- testMatchEntities(
-
ClientQuotaFilter.contains(List(ClientQuotaFilterComponent.ofEntity(ClientQuotaEntity.USER,
"user-1")).asJava), 3,
- entity => entity.entries.get(ClientQuotaEntity.USER) == "user-1"
- )
-
- // Match open-ended non-existent user.
- testMatchEntities(
-
ClientQuotaFilter.contains(List(ClientQuotaFilterComponent.ofEntity(ClientQuotaEntity.USER,
"unknown")).asJava), 0,
- entity => false
- )
-
- // Match open-ended existing client ID.
- testMatchEntities(
-
ClientQuotaFilter.contains(List(ClientQuotaFilterComponent.ofEntity(ClientQuotaEntity.CLIENT_ID,
"client-id-2")).asJava), 2,
- entity => entity.entries.get(ClientQuotaEntity.CLIENT_ID) ==
"client-id-2"
- )
-
- // Match open-ended default user.
- testMatchEntities(
-
ClientQuotaFilter.contains(List(ClientQuotaFilterComponent.ofDefaultEntity(ClientQuotaEntity.USER)).asJava),
2,
- entity => entity.entries.containsKey(ClientQuotaEntity.USER) &&
entity.entries.get(ClientQuotaEntity.USER) == null
- )
-
- // Match close-ended existing user.
- testMatchEntities(
-
ClientQuotaFilter.containsOnly(List(ClientQuotaFilterComponent.ofEntity(ClientQuotaEntity.USER,
"user-2")).asJava), 1,
- entity => entity.entries.get(ClientQuotaEntity.USER) == "user-2" &&
!entity.entries.containsKey(ClientQuotaEntity.CLIENT_ID)
- )
-
- // Match close-ended existing client ID that has no matching entity.
- testMatchEntities(
-
ClientQuotaFilter.containsOnly(List(ClientQuotaFilterComponent.ofEntity(ClientQuotaEntity.CLIENT_ID,
"client-id-1")).asJava), 0,
- entity => false
- )
-
- // Match against all entities with the user type in a close-ended match.
- testMatchEntities(
-
ClientQuotaFilter.containsOnly(List(ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.USER)).asJava),
4,
- entity => entity.entries.containsKey(ClientQuotaEntity.USER) &&
!entity.entries.containsKey(ClientQuotaEntity.CLIENT_ID)
- )
-
- // Match against all entities with the user type in an open-ended match.
- testMatchEntities(
-
ClientQuotaFilter.contains(List(ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.USER)).asJava),
10,
- entity => entity.entries.containsKey(ClientQuotaEntity.USER)
- )
-
- // Match against all entities with the client ID type in a close-ended
match.
- testMatchEntities(
-
ClientQuotaFilter.containsOnly(List(ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.CLIENT_ID)).asJava),
1,
- entity => entity.entries.containsKey(ClientQuotaEntity.CLIENT_ID) &&
!entity.entries.containsKey(ClientQuotaEntity.USER)
- )
-
- // Match against all entities with the client ID type in an open-ended
match.
- testMatchEntities(
-
ClientQuotaFilter.contains(List(ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.CLIENT_ID)).asJava),
7,
- entity => entity.entries.containsKey(ClientQuotaEntity.CLIENT_ID)
- )
-
- // Match against all entities with IP type in an open-ended match.
- testMatchEntities(
-
ClientQuotaFilter.contains(List(ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.IP)).asJava),
2,
- entity => entity.entries.containsKey(ClientQuotaEntity.IP)
- )
-
- // Match open-ended empty filter list. This should match all entities.
- testMatchEntities(ClientQuotaFilter.contains(List.empty.asJava), 13,
entity => true)
-
- // Match close-ended empty filter list. This should match no entities.
- testMatchEntities(ClientQuotaFilter.containsOnly(List.empty.asJava), 0, _
=> false)
- }
-
- @ClusterTest
- def testClientQuotasUnsupportedEntityTypes(): Unit = {
- val entity = new ClientQuotaEntity(Map("other" -> "name").asJava)
- assertThrows(classOf[UnsupportedVersionException], () =>
verifyDescribeEntityQuotas(entity, Map.empty))
- }
-
- @ClusterTest
- def testClientQuotasSanitized(): Unit = {
- // An entity with name that must be sanitized when writing to Zookeeper.
- val entity = new ClientQuotaEntity(Map(ClientQuotaEntity.USER -> "user
with spaces").asJava)
-
- alterEntityQuotas(entity, Map(
- QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> Some(20000.0),
- ), validateOnly = false)
-
- verifyDescribeEntityQuotas(entity, Map(
- QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG -> 20000.0,
- ))
- }
-
- private def verifyDescribeEntityQuotas(entity: ClientQuotaEntity, quotas:
Map[String, Double]): Unit = {
- TestUtils.tryUntilNoAssertionError(waitTime = 5000L) {
- val components = entity.entries.asScala.map { case (entityType,
entityName) =>
- Option(entityName).map{ name =>
ClientQuotaFilterComponent.ofEntity(entityType, name)}
- .getOrElse(ClientQuotaFilterComponent.ofDefaultEntity(entityType)
- )
- }
- val describe =
describeClientQuotas(ClientQuotaFilter.containsOnly(components.toList.asJava))
- if (quotas.isEmpty) {
- assertEquals(0, describe.size)
- } else {
- assertEquals(1, describe.size)
- val configs = describe.get(entity)
- assertNotNull(configs)
- assertEquals(quotas.size, configs.size)
- quotas.foreach { case (k, v) =>
- val value = configs.get(k)
- assertNotNull(value)
- assertEquals(v, value, 1e-6)
- }
- }
- }
- }
-
- private def toClientEntity(user: Option[String], clientId: Option[String]) =
- new ClientQuotaEntity((user.map(ClientQuotaEntity.USER -> _) ++
clientId.map(ClientQuotaEntity.CLIENT_ID -> _)).toMap.asJava)
-
- private def toIpEntity(ip: Option[String]) = new
ClientQuotaEntity(ip.map(ClientQuotaEntity.IP -> _).toMap.asJava)
-
- private def describeClientQuotas(filter: ClientQuotaFilter) = {
- val result = new KafkaFutureImpl[java.util.Map[ClientQuotaEntity,
java.util.Map[String, java.lang.Double]]]
- sendDescribeClientQuotasRequest(filter).complete(result)
- try result.get catch {
- case e: ExecutionException => throw e.getCause
- }
- }
-
- private def sendDescribeClientQuotasRequest(filter: ClientQuotaFilter):
DescribeClientQuotasResponse = {
- val request = new DescribeClientQuotasRequest.Builder(filter).build()
- IntegrationTestUtils.connectAndReceive[DescribeClientQuotasResponse](
- request,
- cluster.boundPorts().get(0))
- }
-
- private def alterEntityQuotas(entity: ClientQuotaEntity, alter: Map[String,
Option[Double]], validateOnly: Boolean) =
- try alterClientQuotas(Map(entity -> alter), validateOnly)(entity).get(10,
TimeUnit.SECONDS) catch {
- case e: ExecutionException => throw e.getCause
- }
-
- private def alterClientQuotas(request: Map[ClientQuotaEntity, Map[String,
Option[Double]]], validateOnly: Boolean) = {
- val entries = request.map { case (entity, alter) =>
- val ops = alter.map { case (key, value) =>
- new ClientQuotaAlteration.Op(key, value.map(Double.box).orNull)
- }.asJavaCollection
- new ClientQuotaAlteration(entity, ops)
- }
-
- val response = request.map(e => e._1 -> new KafkaFutureImpl[Void]).asJava
- sendAlterClientQuotasRequest(entries, validateOnly).complete(response)
- val result = response.asScala
- assertEquals(request.size, result.size)
- request.foreach(e => assertTrue(result.contains(e._1)))
- result
- }
-
- private def sendAlterClientQuotasRequest(entries:
Iterable[ClientQuotaAlteration], validateOnly: Boolean):
AlterClientQuotasResponse = {
- val request = new
AlterClientQuotasRequest.Builder(entries.asJavaCollection, validateOnly).build()
- IntegrationTestUtils.connectAndReceive[AlterClientQuotasResponse](
- request,
- cluster.boundPorts().get(0))
- }
-}
diff --git
a/server/src/test/java/org/apache/kafka/server/quota/ClientQuotasRequestTest.java
b/server/src/test/java/org/apache/kafka/server/quota/ClientQuotasRequestTest.java
new file mode 100644
index 00000000000..a4d562a5543
--- /dev/null
+++
b/server/src/test/java/org/apache/kafka/server/quota/ClientQuotasRequestTest.java
@@ -0,0 +1,726 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.quota;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AlterClientQuotasOptions;
+import org.apache.kafka.clients.admin.AlterUserScramCredentialsResult;
+import org.apache.kafka.clients.admin.ScramCredentialInfo;
+import org.apache.kafka.clients.admin.ScramMechanism;
+import org.apache.kafka.clients.admin.UserScramCredentialUpsertion;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.quota.ClientQuotaAlteration;
+import org.apache.kafka.common.quota.ClientQuotaEntity;
+import org.apache.kafka.common.quota.ClientQuotaFilter;
+import org.apache.kafka.common.quota.ClientQuotaFilterComponent;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.server.config.QuotaConfig;
+import org.apache.kafka.test.TestUtils;
+
+import java.net.InetAddress;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ClientQuotasRequestTest {
+ private final ClusterInstance cluster;
+
+ public ClientQuotasRequestTest(ClusterInstance cluster) {
+ this.cluster = cluster;
+ }
+
+ @ClusterTest
+ public void testAlterClientQuotasRequest() throws InterruptedException {
+ ClientQuotaEntity entity = new ClientQuotaEntity(
+ Map.of(ClientQuotaEntity.USER, "user",
ClientQuotaEntity.CLIENT_ID, "client-id"));
+
+ // Expect an empty configuration.
+ verifyDescribeEntityQuotas(entity, Map.of());
+
+ // Add two configuration entries.
+ alterEntityQuotas(entity, Map.of(
+ QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG,
Optional.of(10000.0),
+ QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG,
Optional.of(20000.0)
+ ), false);
+
+ verifyDescribeEntityQuotas(entity, Map.of(
+ QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, 10000.0,
+ QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG, 20000.0
+ ));
+
+ // Update an existing entry.
+ alterEntityQuotas(entity, Map.of(
+ QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG,
Optional.of(15000.0)
+ ), false);
+
+ verifyDescribeEntityQuotas(entity, Map.of(
+ QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, 15000.0,
+ QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG, 20000.0
+ ));
+
+ // Remove an existing configuration entry.
+ alterEntityQuotas(entity, Map.of(
+ QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, Optional.empty()
+ ), false);
+
+ verifyDescribeEntityQuotas(entity, Map.of(
+ QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG, 20000.0
+ ));
+
+ // Remove a non-existent configuration entry. This should make no
changes.
+ alterEntityQuotas(entity, Map.of(
+ QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, Optional.empty()
+ ), false);
+
+ verifyDescribeEntityQuotas(entity, Map.of(
+ QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG, 20000.0
+ ));
+
+ // Add back a deleted configuration entry.
+ alterEntityQuotas(entity, Map.of(
+ QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, Optional.of(5000.0)
+ ), false);
+
+ verifyDescribeEntityQuotas(entity, Map.of(
+ QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, 5000.0,
+ QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG, 20000.0
+ ));
+
+ // Perform a mixed update.
+ alterEntityQuotas(entity, Map.of(
+ QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG,
Optional.of(20000.0),
+ QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG, Optional.empty(),
+ QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, Optional.of(12.3)
+ ), false);
+
+ verifyDescribeEntityQuotas(entity, Map.of(
+ QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, 20000.0,
+ QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, 12.3
+ ));
+ }
+
+ @ClusterTest
+ public void testAlterClientQuotasRequestValidateOnly() throws
InterruptedException {
+ ClientQuotaEntity entity = new
ClientQuotaEntity(Map.of(ClientQuotaEntity.USER, "user"));
+
+ // Set up a configuration.
+ alterEntityQuotas(entity, Map.of(
+ QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG,
Optional.of(20000.0),
+ QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, Optional.of(23.45)
+ ), false);
+
+ verifyDescribeEntityQuotas(entity, Map.of(
+ QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, 20000.0,
+ QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, 23.45
+ ));
+
+ // Validate-only addition.
+ alterEntityQuotas(entity, Map.of(
+ QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG,
Optional.of(50000.0)
+ ), true);
+
+ verifyDescribeEntityQuotas(entity, Map.of(
+ QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, 20000.0,
+ QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, 23.45
+ ));
+
+ // Validate-only modification.
+ alterEntityQuotas(entity, Map.of(
+ QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG,
Optional.of(10000.0)
+ ), true);
+
+ verifyDescribeEntityQuotas(entity, Map.of(
+ QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, 20000.0,
+ QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, 23.45
+ ));
+
+ // Validate-only removal.
+ alterEntityQuotas(entity, Map.of(
+ QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, Optional.empty()
+ ), true);
+
+ verifyDescribeEntityQuotas(entity, Map.of(
+ QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, 20000.0,
+ QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, 23.45
+ ));
+
+ // Validate-only mixed update.
+ alterEntityQuotas(entity, Map.of(
+ QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG,
Optional.of(10000.0),
+ QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG,
Optional.of(50000.0),
+ QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, Optional.empty()
+ ), true);
+
+ verifyDescribeEntityQuotas(entity, Map.of(
+ QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, 20000.0,
+ QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, 23.45
+ ));
+ }
+
+ @ClusterTest
+ public void testClientQuotasForScramUsers() throws InterruptedException,
ExecutionException {
+ final String userName = "user";
+
+ try (Admin admin = cluster.admin()) {
+ AlterUserScramCredentialsResult results =
admin.alterUserScramCredentials(List.of(
+ new UserScramCredentialUpsertion(userName, new
ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 4096), "password")));
+ results.all().get();
+
+ ClientQuotaEntity entity = new
ClientQuotaEntity(Map.of(ClientQuotaEntity.USER, userName));
+
+ verifyDescribeEntityQuotas(entity, Map.of());
+
+ alterEntityQuotas(entity, Map.of(
+ QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG,
Optional.of(10000.0),
+ QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG,
Optional.of(20000.0)
+ ), false);
+
+ verifyDescribeEntityQuotas(entity, Map.of(
+ QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, 10000.0,
+ QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG, 20000.0
+ ));
+ }
+ }
+
+ @ClusterTest
+ public void testAlterIpQuotasRequest() throws InterruptedException {
+ final String knownHost = "1.2.3.4";
+ final String unknownHost = "2.3.4.5";
+ ClientQuotaEntity entity = toIpEntity(Optional.of(knownHost));
+ ClientQuotaEntity defaultEntity = toIpEntity(Optional.empty());
+ ClientQuotaFilterComponent entityFilter =
ClientQuotaFilterComponent.ofEntity(ClientQuotaEntity.IP, knownHost);
+ ClientQuotaFilterComponent defaultEntityFilter =
ClientQuotaFilterComponent.ofDefaultEntity(ClientQuotaEntity.IP);
+ ClientQuotaFilterComponent allIpEntityFilter =
ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.IP);
+
+ // Expect an empty configuration.
+ verifyIpQuotas(allIpEntityFilter, Map.of(), unknownHost);
+
+ // Add a configuration entry.
+ alterEntityQuotas(entity,
Map.of(QuotaConfig.IP_CONNECTION_RATE_OVERRIDE_CONFIG, Optional.of(100.0)),
false);
+ verifyIpQuotas(entityFilter, Map.of(entity, 100.0), unknownHost);
+
+ // update existing entry
+ alterEntityQuotas(entity,
Map.of(QuotaConfig.IP_CONNECTION_RATE_OVERRIDE_CONFIG, Optional.of(150.0)),
false);
+ verifyIpQuotas(entityFilter, Map.of(entity, 150.0), unknownHost);
+
+ // update default value
+ alterEntityQuotas(defaultEntity,
Map.of(QuotaConfig.IP_CONNECTION_RATE_OVERRIDE_CONFIG, Optional.of(200.0)),
false);
+ verifyIpQuotas(defaultEntityFilter, Map.of(defaultEntity, 200.0),
unknownHost);
+
+ // describe all IP quotas
+ verifyIpQuotas(allIpEntityFilter, Map.of(entity, 150.0, defaultEntity,
200.0), unknownHost);
+
+ // remove entry
+ alterEntityQuotas(entity,
Map.of(QuotaConfig.IP_CONNECTION_RATE_OVERRIDE_CONFIG, Optional.empty()),
false);
+ verifyIpQuotas(entityFilter, Map.of(), unknownHost);
+
+ // remove default value
+ alterEntityQuotas(defaultEntity,
Map.of(QuotaConfig.IP_CONNECTION_RATE_OVERRIDE_CONFIG, Optional.empty()),
false);
+ verifyIpQuotas(allIpEntityFilter, Map.of(), unknownHost);
+ }
+
+ private void verifyIpQuotas(ClientQuotaFilterComponent entityFilter,
Map<ClientQuotaEntity, Double> expectedMatches,
+ String unknownHost) throws InterruptedException {
+
+ TestUtils.retryOnExceptionWithTimeout(5000L, () -> {
+ Map<ClientQuotaEntity, Map<String, Double>> result =
describeClientQuotas(
+ ClientQuotaFilter.containsOnly(List.of(entityFilter))).get();
+ assertEquals(expectedMatches.keySet(), result.keySet());
+
+ for (Map.Entry<ClientQuotaEntity, Map<String, Double>> entry :
result.entrySet()) {
+ ClientQuotaEntity entity = entry.getKey();
+ Map<String, Double> props = entry.getValue();
+
assertEquals(Set.of(QuotaConfig.IP_CONNECTION_RATE_OVERRIDE_CONFIG),
props.keySet());
+ assertEquals(expectedMatches.get(entity),
props.get(QuotaConfig.IP_CONNECTION_RATE_OVERRIDE_CONFIG));
+ String entityName = entity.entries().get(ClientQuotaEntity.IP);
+ // ClientQuotaEntity with null name maps to default entity
+ InetAddress entityIp = entityName == null
+ ? InetAddress.getByName(unknownHost)
+ : InetAddress.getByName(entityName);
+ int currentServerQuota = cluster.brokers()
+ .values()
+ .iterator()
+ .next()
+ .socketServer()
+ .connectionQuotas()
+ .connectionRateForIp(entityIp);
+ assertTrue(Math.abs(expectedMatches.get(entity) -
currentServerQuota) < 0.01,
+ String.format("Connection quota of %s is not %s but %s",
entity, expectedMatches.get(entity), currentServerQuota));
+ }
+ });
+ }
+
+ @ClusterTest
+ public void testAlterClientQuotasInvalidRequests() {
+ final ClientQuotaEntity entity1 = new
ClientQuotaEntity(Map.of(ClientQuotaEntity.USER, ""));
+ TestUtils.assertFutureThrows(InvalidRequestException.class,
+ alterEntityQuotas(entity1,
Map.of(QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, Optional.of(12.34)),
true));
+
+ final ClientQuotaEntity entity2 = new
ClientQuotaEntity(Map.of(ClientQuotaEntity.CLIENT_ID, ""));
+ TestUtils.assertFutureThrows(InvalidRequestException.class,
+ alterEntityQuotas(entity2,
Map.of(QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, Optional.of(12.34)),
true));
+
+ final ClientQuotaEntity entity3 = new ClientQuotaEntity(Map.of("",
"name"));
+ TestUtils.assertFutureThrows(InvalidRequestException.class,
+ alterEntityQuotas(entity3,
Map.of(QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, Optional.of(12.34)),
true));
+
+ final ClientQuotaEntity entity4 = new ClientQuotaEntity(Map.of());
+ TestUtils.assertFutureThrows(InvalidRequestException.class,
+ alterEntityQuotas(entity4,
Map.of(QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, Optional.of(10000.5)),
true));
+
+ final ClientQuotaEntity entity5 = new
ClientQuotaEntity(Map.of(ClientQuotaEntity.USER, "user"));
+ TestUtils.assertFutureThrows(InvalidRequestException.class,
+ alterEntityQuotas(entity5, Map.of("bad", Optional.of(1.0)), true));
+
+ final ClientQuotaEntity entity6 = new
ClientQuotaEntity(Map.of(ClientQuotaEntity.USER, "user"));
+ TestUtils.assertFutureThrows(InvalidRequestException.class,
+ alterEntityQuotas(entity6,
Map.of(QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, Optional.of(10000.5)),
true));
+ }
+
+ private void expectInvalidRequestWithMessage(Future<?> future, String
expectedMessage) {
+ InvalidRequestException exception =
TestUtils.assertFutureThrows(InvalidRequestException.class, future);
+ assertNotNull(exception);
+ assertTrue(
+ exception.getMessage().contains(expectedMessage),
+ String.format("Expected message %s to contain %s", exception,
expectedMessage)
+ );
+ }
+
+ @ClusterTest
+ public void testAlterClientQuotasInvalidEntityCombination() {
+ ClientQuotaEntity userAndIpEntity = new ClientQuotaEntity(
+ Map.of(ClientQuotaEntity.USER, "user", ClientQuotaEntity.IP,
"1.2.3.4")
+ );
+ ClientQuotaEntity clientAndIpEntity = new ClientQuotaEntity(
+ Map.of(ClientQuotaEntity.CLIENT_ID, "client",
ClientQuotaEntity.IP, "1.2.3.4")
+ );
+ final String expectedExceptionMessage = "Invalid quota entity
combination";
+
+ expectInvalidRequestWithMessage(
+ alterEntityQuotas(userAndIpEntity,
Map.of(QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, Optional.of(12.34)),
true),
+ expectedExceptionMessage
+ );
+
+ expectInvalidRequestWithMessage(
+ alterEntityQuotas(clientAndIpEntity,
Map.of(QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG, Optional.of(12.34)),
true),
+ expectedExceptionMessage
+ );
+ }
+
+ @ClusterTest
+ public void testAlterClientQuotasBadIp() {
+ ClientQuotaEntity invalidHostPatternEntity = new ClientQuotaEntity(
+ Map.of(ClientQuotaEntity.IP, "not a valid host because it has
spaces")
+ );
+ ClientQuotaEntity unresolvableHostEntity = new ClientQuotaEntity(
+ Map.of(ClientQuotaEntity.IP, "RFC2606.invalid")
+ );
+ final String expectedExceptionMessage = "not a valid IP";
+
+ expectInvalidRequestWithMessage(
+ alterEntityQuotas(invalidHostPatternEntity,
Map.of(QuotaConfig.IP_CONNECTION_RATE_OVERRIDE_CONFIG, Optional.of(50.0)),
true),
+ expectedExceptionMessage
+ );
+
+ expectInvalidRequestWithMessage(
+ alterEntityQuotas(unresolvableHostEntity,
Map.of(QuotaConfig.IP_CONNECTION_RATE_OVERRIDE_CONFIG, Optional.of(50.0)),
true),
+ expectedExceptionMessage
+ );
+ }
+
+ @ClusterTest
+ public void testDescribeClientQuotasInvalidFilterCombination() {
+ ClientQuotaFilterComponent ipFilterComponent =
ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.IP);
+ ClientQuotaFilterComponent userFilterComponent =
ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.USER);
+ ClientQuotaFilterComponent clientIdFilterComponent =
ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.CLIENT_ID);
+ final String expectedExceptionMessage = "Invalid entity filter
component combination";
+
+ expectInvalidRequestWithMessage(
+
describeClientQuotas(ClientQuotaFilter.contains(List.of(ipFilterComponent,
userFilterComponent))),
+ expectedExceptionMessage
+ );
+ expectInvalidRequestWithMessage(
+
describeClientQuotas(ClientQuotaFilter.contains(List.of(ipFilterComponent,
clientIdFilterComponent))),
+ expectedExceptionMessage
+ );
+ }
+
+ // Entities to be matched against.
+ private final Map<ClientQuotaEntity, Double> matchUserClientEntities = new
HashMap<>(Map.ofEntries(
+ Map.entry(toClientEntity(toUserMap("user-1"),
toClientIdMap("client-id-1")), 50.50),
+ Map.entry(toClientEntity(toUserMap("user-2"),
toClientIdMap("client-id-1")), 51.51),
+ Map.entry(toClientEntity(toUserMap("user-3"),
toClientIdMap("client-id-2")), 52.52),
+ Map.entry(toClientEntity(toUserMap(null),
toClientIdMap("client-id-1")), 53.53),
+ Map.entry(toClientEntity(toUserMap("user-1"), toClientIdMap(null)),
54.54),
+ Map.entry(toClientEntity(toUserMap("user-3"), toClientIdMap(null)),
55.55),
+ Map.entry(toClientEntity(toUserMap("user-1")), 56.56),
+ Map.entry(toClientEntity(toUserMap("user-2")), 57.57),
+ Map.entry(toClientEntity(toUserMap("user-3")), 58.58),
+ Map.entry(toClientEntity(toUserMap(null)), 59.59),
+ Map.entry(toClientEntity(toClientIdMap("client-id-2")), 60.60)
+ ));
+
+ private final Map<ClientQuotaEntity, Double> matchIpEntities = Map.of(
+ toIpEntity(Optional.of("1.2.3.4")), 10.0,
+ toIpEntity(Optional.of("2.3.4.5")), 20.0
+ );
+
+ private void setupDescribeClientQuotasMatchTest() {
+ Map<ClientQuotaEntity, Map<String, Optional<Double>>> userClientQuotas
= matchUserClientEntities.entrySet()
+ .stream()
+ .collect(Collectors.toMap(Map.Entry::getKey,
+ e -> Map.of(QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG,
Optional.of(e.getValue()))));
+
+ Map<ClientQuotaEntity, Map<String, Optional<Double>>> ipQuotas =
matchIpEntities.entrySet()
+ .stream()
+ .collect(Collectors.toMap(Map.Entry::getKey,
+ e -> Map.of(QuotaConfig.IP_CONNECTION_RATE_OVERRIDE_CONFIG,
Optional.of(e.getValue()))));
+
+ Map<ClientQuotaEntity, Map<String, Optional<Double>>> allQuotas = new
HashMap<>();
+ allQuotas.putAll(userClientQuotas);
+ allQuotas.putAll(ipQuotas);
+ Map<ClientQuotaEntity, KafkaFuture<Void>> result =
alterClientQuotas(allQuotas, false);
+
+ matchUserClientEntities.forEach((entity, value) -> {
+ try {
+ result.get(entity).get(10, TimeUnit.SECONDS);
+ } catch (InterruptedException | ExecutionException |
TimeoutException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ matchIpEntities.forEach((entity, value) -> {
+ try {
+ result.get(entity).get(10, TimeUnit.SECONDS);
+ } catch (InterruptedException | ExecutionException |
TimeoutException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ private Map<ClientQuotaEntity, Map<String, Double>>
matchEntity(ClientQuotaEntity entity)
+ throws ExecutionException, InterruptedException {
+ List<ClientQuotaFilterComponent> components =
entity.entries().entrySet().stream().map(entry -> {
+ if (entry.getValue() == null) {
+ return
ClientQuotaFilterComponent.ofDefaultEntity(entry.getKey());
+ } else {
+ return ClientQuotaFilterComponent.ofEntity(entry.getKey(),
entry.getValue());
+ }
+ }).toList();
+
+ return
describeClientQuotas(ClientQuotaFilter.containsOnly(components)).get();
+ }
+
+ @ClusterTest
+ public void testDescribeClientQuotasMatchExact() throws
ExecutionException, InterruptedException {
+ setupDescribeClientQuotasMatchTest();
+
+ // Test exact matches.
+ matchUserClientEntities.forEach((e, v) -> {
+ try {
+ TestUtils.retryOnExceptionWithTimeout(5000L, () -> {
+ Map<ClientQuotaEntity, Map<String, Double>> result =
matchEntity(e);
+ assertEquals(1, result.size());
+ assertNotNull(result.get(e));
+ double value =
result.get(e).get(QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG);
+ assertEquals(value, v, 1e-6);
+ });
+ } catch (InterruptedException ex) {
+ throw new RuntimeException(ex);
+ }
+ });
+
+ // Entities not contained in `matchEntityList`.
+ List<ClientQuotaEntity> notMatchEntities = List.of(
+ toClientEntity(toUserMap("user-1"), toClientIdMap("client-id-2")),
+ toClientEntity(toUserMap("user-3"), toClientIdMap("client-id-1")),
+ toClientEntity(toUserMap("user-2"), toClientIdMap(null)),
+ toClientEntity(toUserMap("user-4")),
+ toClientEntity(toUserMap(null), toClientIdMap("client-id-2")),
+ toClientEntity(toClientIdMap("client-id-1")),
+ toClientEntity(toClientIdMap("client-id-3"))
+ );
+
+ // Verify exact matches of the non-matches returns empty.
+ for (ClientQuotaEntity e : notMatchEntities) {
+ Map<ClientQuotaEntity, Map<String, Double>> result =
matchEntity(e);
+ assertEquals(0, result.size());
+ }
+ }
+
+ private void testMatchEntities(ClientQuotaFilter filter, int
expectedMatchSize, Predicate<ClientQuotaEntity> partition)
+ throws InterruptedException {
+ TestUtils.retryOnExceptionWithTimeout(5000L, () -> {
+ Map<ClientQuotaEntity, Map<String, Double>> result =
describeClientQuotas(filter).get();
+ List<Map.Entry<ClientQuotaEntity, Double>> expectedMatches =
matchUserClientEntities.entrySet()
+ .stream()
+ .collect(Collectors.partitioningBy(entry ->
partition.test(entry.getKey())))
+ .get(true);
+ expectedMatches.addAll(matchIpEntities.entrySet()
+ .stream()
+ .collect(Collectors.partitioningBy(entry ->
partition.test(entry.getKey())))
+ .get(true));
+
+ // for test verification
+ assertEquals(expectedMatchSize, expectedMatches.size());
+ assertEquals(expectedMatchSize, result.size(),
+ "Failed to match " + expectedMatchSize + "entities for " +
filter);
+ Map<Object, Object> expectedMatchesMap =
Map.ofEntries(expectedMatches.toArray(new Map.Entry[0]));
+ matchUserClientEntities.forEach((entity, expectedValue) -> {
+ if (expectedMatchesMap.containsKey(entity)) {
+ Map<String, Double> config = result.get(entity);
+ assertNotNull(config);
+ Double value =
config.get(QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG);
+ assertNotNull(value);
+ assertEquals(expectedValue, value, 1e-6);
+ } else {
+ assertNull(result.get(entity));
+ }
+ });
+ matchIpEntities.forEach((entity, expectedValue) -> {
+ if (expectedMatchesMap.containsKey(entity)) {
+ Map<String, Double> config = result.get(entity);
+ assertNotNull(config);
+ Double value =
config.get(QuotaConfig.IP_CONNECTION_RATE_OVERRIDE_CONFIG);
+ assertNotNull(value);
+ assertEquals(expectedValue, value, 1e-6);
+ } else {
+ assertNull(result.get(entity));
+ }
+ });
+ });
+ }
+
+ @ClusterTest
+ public void testDescribeClientQuotasMatchPartial() throws
InterruptedException {
+ setupDescribeClientQuotasMatchTest();
+
+ // Match open-ended existing user.
+ testMatchEntities(
+
ClientQuotaFilter.contains(List.of(ClientQuotaFilterComponent.ofEntity(ClientQuotaEntity.USER,
"user-1"))),
+ 3,
+ entity ->
Objects.equals(entity.entries().get(ClientQuotaEntity.USER), "user-1")
+ );
+
+ // Match open-ended non-existent user.
+ testMatchEntities(
+
ClientQuotaFilter.contains(List.of(ClientQuotaFilterComponent.ofEntity(ClientQuotaEntity.USER,
"unknown"))),
+ 0,
+ entity -> false
+ );
+
+ // Match open-ended existing client ID.
+ testMatchEntities(
+
ClientQuotaFilter.contains(List.of(ClientQuotaFilterComponent.ofEntity(ClientQuotaEntity.CLIENT_ID,
"client-id-2"))),
+ 2,
+ entity ->
Objects.equals(entity.entries().get(ClientQuotaEntity.CLIENT_ID), "client-id-2")
+ );
+
+ // Match open-ended default user.
+ testMatchEntities(
+
ClientQuotaFilter.contains(List.of(ClientQuotaFilterComponent.ofDefaultEntity(ClientQuotaEntity.USER))),
+ 2,
+ entity -> entity.entries().containsKey(ClientQuotaEntity.USER) &&
entity.entries().get(ClientQuotaEntity.USER) == null
+ );
+
+ // Match close-ended existing user.
+ testMatchEntities(
+
ClientQuotaFilter.containsOnly(List.of(ClientQuotaFilterComponent.ofEntity(ClientQuotaEntity.USER,
"user-2"))),
+ 1,
+ entity ->
Objects.equals(entity.entries().get(ClientQuotaEntity.USER), "user-2") &&
!entity.entries().containsKey(ClientQuotaEntity.CLIENT_ID)
+ );
+
+ // Match close-ended existing client ID that has no matching entity.
+ testMatchEntities(
+
ClientQuotaFilter.containsOnly(List.of(ClientQuotaFilterComponent.ofEntity(ClientQuotaEntity.CLIENT_ID,
"client-id-1"))),
+ 0,
+ entity -> false
+ );
+
+ // Match against all entities with the user type in a close-ended
match.
+ testMatchEntities(
+
ClientQuotaFilter.containsOnly(List.of(ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.USER))),
+ 4,
+ entity -> entity.entries().containsKey(ClientQuotaEntity.USER) &&
!entity.entries().containsKey(ClientQuotaEntity.CLIENT_ID)
+ );
+
+ // Match against all entities with the user type in an open-ended
match.
+ testMatchEntities(
+
ClientQuotaFilter.contains(List.of(ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.USER))),
+ 10,
+ entity -> entity.entries().containsKey(ClientQuotaEntity.USER)
+ );
+
+ // Match against all entities with the client ID type in a close-ended
match.
+ testMatchEntities(
+
ClientQuotaFilter.containsOnly(List.of(ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.CLIENT_ID))),
+ 1,
+ entity ->
entity.entries().containsKey(ClientQuotaEntity.CLIENT_ID) &&
!entity.entries().containsKey(ClientQuotaEntity.USER)
+ );
+
+ // Match against all entities with the client ID type in an open-ended
match.
+ testMatchEntities(
+
ClientQuotaFilter.contains(List.of(ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.CLIENT_ID))),
+ 7,
+ entity -> entity.entries().containsKey(ClientQuotaEntity.CLIENT_ID)
+ );
+
+ // Match against all entities with IP type in an open-ended match.
+ testMatchEntities(
+
ClientQuotaFilter.contains(List.of(ClientQuotaFilterComponent.ofEntityType(ClientQuotaEntity.IP))),
+ 2,
+ entity -> entity.entries().containsKey(ClientQuotaEntity.IP)
+ );
+
+ // Match open-ended empty filter List. This should match all entities.
+ testMatchEntities(ClientQuotaFilter.contains(List.of()), 13, entity ->
true);
+
+ // Match close-ended empty filter List. This should match no entities.
+ testMatchEntities(ClientQuotaFilter.containsOnly(List.of()), 0, entity
-> false);
+ }
+
+ @ClusterTest
+ public void testClientQuotasUnsupportedEntityTypes() {
+ ClientQuotaEntity entity = new ClientQuotaEntity(Map.of("other",
"name"));
+ KafkaFuture<Map<ClientQuotaEntity, Map<String, Double>>> future =
describeClientQuotas(
+ ClientQuotaFilter.containsOnly(getComponents(entity)));
+
+ TestUtils.assertFutureThrows(UnsupportedVersionException.class,
future);
+ }
+
+ @ClusterTest
+ public void testClientQuotasSanitized() throws InterruptedException {
+ // An entity with name that must be sanitized when writing to
Zookeeper.
+ ClientQuotaEntity entity = new
ClientQuotaEntity(Map.of(ClientQuotaEntity.USER, "user with spaces"));
+
+ alterEntityQuotas(entity, Map.of(
+ QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG,
Optional.of(20000.0)
+ ), false);
+
+ verifyDescribeEntityQuotas(entity, Map.of(
+ QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG, 20000.0
+ ));
+ }
+
+ private Map<String, String> toUserMap(String user) {
+ // Uses Collections.singletonMap instead of Map.of to support null
user parameter.
+ return Collections.singletonMap(ClientQuotaEntity.USER, user);
+ }
+
+ private Map<String, String> toClientIdMap(String clientId) {
+ // Uses Collections.singletonMap instead of Map.of to support null
client-id parameter.
+ return Collections.singletonMap(ClientQuotaEntity.CLIENT_ID, clientId);
+ }
+
+ @SafeVarargs
+ private ClientQuotaEntity toClientEntity(Map<String, String>... entries) {
+ Map<String, String> entityMap = new HashMap<>();
+ for (Map<String, String> entry : entries) {
+ entityMap.putAll(entry);
+ }
+ return new ClientQuotaEntity(entityMap);
+ }
+
+ private ClientQuotaEntity toIpEntity(Optional<String> ip) {
+ return new
ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.IP,
ip.orElse(null)));
+ }
+
+ private void verifyDescribeEntityQuotas(ClientQuotaEntity entity,
Map<String, Double> quotas)
+ throws InterruptedException {
+ TestUtils.retryOnExceptionWithTimeout(5000L, () -> {
+
+ Map<ClientQuotaEntity, Map<String, Double>> describe =
describeClientQuotas(
+ ClientQuotaFilter.containsOnly(getComponents(entity))).get();
+ if (quotas.isEmpty()) {
+ assertEquals(0, describe.size());
+ } else {
+ assertEquals(1, describe.size());
+ Map<String, Double> configs = describe.get(entity);
+ assertNotNull(configs);
+ assertEquals(quotas.size(), configs.size());
+
+ quotas.forEach((k, v) -> {
+ Double value = configs.get(k);
+ assertNotNull(value);
+ assertEquals(v, value, 1e-6);
+ });
+ }
+ });
+ }
+
+ private List<ClientQuotaFilterComponent> getComponents(ClientQuotaEntity
entity) {
+ return entity.entries().entrySet().stream().map(entry -> {
+ String entityType = entry.getKey();
+ String entityName = entry.getValue();
+ return Optional.ofNullable(entityName)
+ .map(name -> ClientQuotaFilterComponent.ofEntity(entityType,
name))
+ .orElseGet(() ->
ClientQuotaFilterComponent.ofDefaultEntity(entityType));
+ }).toList();
+ }
+
+ private KafkaFuture<Map<ClientQuotaEntity, Map<String, Double>>>
describeClientQuotas(ClientQuotaFilter filter) {
+ try (Admin admin = cluster.admin()) {
+ return admin.describeClientQuotas(filter).entities();
+ }
+ }
+
+ private KafkaFuture<Void> alterEntityQuotas(ClientQuotaEntity entity,
Map<String, Optional<Double>> alter, boolean validateOnly) {
+
+ return alterClientQuotas(Map.of(entity, alter),
validateOnly).get(entity);
+ }
+
+ private Map<ClientQuotaEntity, KafkaFuture<Void>>
alterClientQuotas(Map<ClientQuotaEntity, Map<String,
+ Optional<Double>>> request, boolean validateOnly) {
+
+ List<ClientQuotaAlteration> entries =
request.entrySet().stream().map(entry -> {
+ ClientQuotaEntity entity = entry.getKey();
+ Map<String, Optional<Double>> alter = entry.getValue();
+
+ List<ClientQuotaAlteration.Op> ops = alter.entrySet()
+ .stream()
+ .map(configEntry -> new
ClientQuotaAlteration.Op(configEntry.getKey(),
+ configEntry.getValue().orElse(null)))
+ .collect(Collectors.toList());
+ return new ClientQuotaAlteration(entity, ops);
+ }).collect(Collectors.toList());
+
+ try (Admin admin = cluster.admin()) {
+ Map<ClientQuotaEntity, KafkaFuture<Void>> result =
admin.alterClientQuotas(entries,
+ new
AlterClientQuotasOptions().validateOnly(validateOnly)).values();
+ assertEquals(request.size(), result.size());
+ request.forEach((e, r) -> assertTrue(result.containsKey(e)));
+ return result;
+ }
+ }
+}