This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch 3.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.6 by this push:
     new c74a9832df4 KAFKA-16222: desanitize entity name when migrate client 
quotas (#15481)
c74a9832df4 is described below

commit c74a9832df474367da1feb3ee85f9f5e98a24968
Author: PoAn Yang <pay...@apache.org>
AuthorDate: Wed Mar 20 14:53:23 2024 +0800

    KAFKA-16222: desanitize entity name when migrate client quotas (#15481)
    
    The entity name is sanitized when it's in Zk mode.
    We didn't desanitize it when we migrate client quotas. Add 
Sanitizer.desanitize to fix it.
    
    Reviewers: Luke Chen <show...@gmail.com>
---
 .../main/scala/kafka/zk/ZkMigrationClient.scala    |  4 +++
 .../kafka/zk/ZkMigrationIntegrationTest.scala      | 31 ++++++++++++++--------
 2 files changed, 24 insertions(+), 11 deletions(-)

diff --git a/core/src/main/scala/kafka/zk/ZkMigrationClient.scala 
b/core/src/main/scala/kafka/zk/ZkMigrationClient.scala
index a11a84c017b..76e0b47aee8 100644
--- a/core/src/main/scala/kafka/zk/ZkMigrationClient.scala
+++ b/core/src/main/scala/kafka/zk/ZkMigrationClient.scala
@@ -27,6 +27,7 @@ import org.apache.kafka.common.errors.ControllerMovedException
 import org.apache.kafka.common.metadata._
 import org.apache.kafka.common.resource.ResourcePattern
 import org.apache.kafka.common.security.scram.ScramCredential
+import org.apache.kafka.common.utils.Sanitizer
 import org.apache.kafka.common.{TopicIdPartition, Uuid}
 import org.apache.kafka.metadata.DelegationTokenData
 import org.apache.kafka.metadata.PartitionRegistration
@@ -225,6 +226,9 @@ class ZkMigrationClient(
         entityDataList: util.List[ClientQuotaRecord.EntityData],
         quotas: util.Map[String, lang.Double]
       ): Unit = {
+        entityDataList.forEach(entityData => {
+          
entityData.setEntityName(Sanitizer.desanitize(entityData.entityName()))
+        })
         val batch = new util.ArrayList[ApiMessageAndVersion]()
         quotas.forEach((key, value) => {
           batch.add(new ApiMessageAndVersion(new ClientQuotaRecord()
diff --git 
a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
index 4c3ddd8b80d..f8fe82ed9d0 100644
--- a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
@@ -17,7 +17,7 @@
 package kafka.zk
 
 import kafka.security.authorizer.AclEntry.{WildcardHost, 
WildcardPrincipalString}
-import kafka.server.{ConfigType, ControllerRequestCompletionHandler, 
KafkaConfig}
+import kafka.server.{ConfigEntityName, ConfigType, 
ControllerRequestCompletionHandler, KafkaConfig}
 import kafka.test.{ClusterConfig, ClusterGenerator, ClusterInstance}
 import kafka.test.annotation.{AutoStart, ClusterConfigProperty, 
ClusterTemplate, ClusterTest, Type}
 import kafka.test.junit.ClusterTestExtensions
@@ -54,7 +54,6 @@ import org.slf4j.LoggerFactory
 import java.util
 import java.util.concurrent.{CompletableFuture, ExecutionException, TimeUnit}
 import java.util.{Collections, Optional, Properties, UUID}
-import scala.collection.Seq
 import scala.jdk.CollectionConverters._
 
 object ZkMigrationIntegrationTest {
@@ -217,15 +216,19 @@ class ZkMigrationIntegrationTest {
     createTopicResult.all().get(60, TimeUnit.SECONDS)
 
     val quotas = new util.ArrayList[ClientQuotaAlteration]()
-    quotas.add(new ClientQuotaAlteration(
-      new ClientQuotaEntity(Map("user" -> "user1").asJava),
-      List(new ClientQuotaAlteration.Op("consumer_byte_rate", 1000.0)).asJava))
-    quotas.add(new ClientQuotaAlteration(
-      new ClientQuotaEntity(Map("user" -> "user1", "client-id" -> 
"clientA").asJava),
+    val defaultUserEntity = new ClientQuotaEntity(Map(ClientQuotaEntity.USER 
-> ConfigEntityName.Default).asJava)
+    quotas.add(new ClientQuotaAlteration(defaultUserEntity, List(new 
ClientQuotaAlteration.Op("consumer_byte_rate", 900.0)).asJava))
+    val defaultClientIdEntity = new 
ClientQuotaEntity(Map(ClientQuotaEntity.CLIENT_ID -> 
ConfigEntityName.Default).asJava)
+    quotas.add(new ClientQuotaAlteration(defaultClientIdEntity, List(new 
ClientQuotaAlteration.Op("consumer_byte_rate", 900.0)).asJava))
+    val defaultIpEntity = new ClientQuotaEntity(Map(ClientQuotaEntity.IP -> 
null.asInstanceOf[String]).asJava)
+    quotas.add(new ClientQuotaAlteration(defaultIpEntity, List(new 
ClientQuotaAlteration.Op("connection_creation_rate", 9.0)).asJava))
+    val userEntity = new ClientQuotaEntity(Map(ClientQuotaEntity.USER -> 
"user/1@prod").asJava)
+    quotas.add(new ClientQuotaAlteration(userEntity, List(new 
ClientQuotaAlteration.Op("consumer_byte_rate", 1000.0)).asJava))
+    val userClientEntity = new ClientQuotaEntity(Map(ClientQuotaEntity.USER -> 
"user/1@prod", ClientQuotaEntity.CLIENT_ID -> "client/1@domain").asJava)
+    quotas.add(new ClientQuotaAlteration(userClientEntity,
       List(new ClientQuotaAlteration.Op("consumer_byte_rate", 800.0), new 
ClientQuotaAlteration.Op("producer_byte_rate", 100.0)).asJava))
-    quotas.add(new ClientQuotaAlteration(
-      new ClientQuotaEntity(Map("ip" -> "8.8.8.8").asJava),
-      List(new ClientQuotaAlteration.Op("connection_creation_rate", 
10.0)).asJava))
+    val ipEntity = new ClientQuotaEntity(Map(ClientQuotaEntity.IP -> 
"8.8.8.8").asJava)
+    quotas.add(new ClientQuotaAlteration(ipEntity, List(new 
ClientQuotaAlteration.Op("connection_creation_rate", 10.0)).asJava))
     admin.alterClientQuotas(quotas).all().get(60, TimeUnit.SECONDS)
 
     val zkClient = 
clusterInstance.asInstanceOf[ZkClusterInstance].getUnderlying().zkClient
@@ -261,7 +264,13 @@ class ZkMigrationIntegrationTest {
       assertEquals(10, 
image.topics().getTopic("test-topic-3").partitions().size())
 
       val clientQuotas = image.clientQuotas().entities()
-      assertEquals(3, clientQuotas.size())
+      assertEquals(6, clientQuotas.size())
+      assertEquals(true, clientQuotas.containsKey(defaultUserEntity))
+      assertEquals(true, clientQuotas.containsKey(defaultClientIdEntity))
+      assertEquals(true, clientQuotas.containsKey(new 
ClientQuotaEntity(Map(ClientQuotaEntity.IP -> "").asJava))) // default ip
+      assertEquals(true, clientQuotas.containsKey(userEntity))
+      assertEquals(true, clientQuotas.containsKey(userClientEntity))
+      assertEquals(true, clientQuotas.containsKey(ipEntity))
     }
 
     migrationState = 
migrationClient.releaseControllerLeadership(migrationState)

Reply via email to