Re: [PR] KAFKA-16411: Correctly migrate default client quota entities [kafka]

2024-03-26 Thread via GitHub


cmccabe merged PR #15584:
URL: https://github.com/apache/kafka/pull/15584


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16411: Correctly migrate default client quota entities [kafka]

2024-03-25 Thread via GitHub


cmccabe commented on PR #15584:
URL: https://github.com/apache/kafka/pull/15584#issuecomment-2018746846

   > I don't think it's a good idea to introduce the new terms mangling & 
unmangling when there are already equivalent terms in the codebase – 
sanitizing/desanitizing – it makes it unnecessarily confusing.
   
   That's a fair point. I will remove the references to "mangling" and replace 
them with "sanitization"
   
   (although I don't really agree, I think "sanitization" implies discarding 
bad data, not mangling it)
   
   But let's not change ZK terminology at this point :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16411: Correctly migrate default client quota entities [kafka]

2024-03-25 Thread via GitHub


cmccabe commented on code in PR #15584:
URL: https://github.com/apache/kafka/pull/15584#discussion_r1538115580


##
core/src/main/scala/kafka/zk/migration/ZkConfigMigrationClient.scala:
##
@@ -50,44 +51,54 @@ class ZkConfigMigrationClient(
 
   val adminZkClient = new AdminZkClient(zkClient)
 
-
   /**
-   * In ZK, we use the special string "default" to represent the 
default entity.
-   * In KRaft, we use an empty string. This method builds an EntityData that 
converts the special ZK string
-   * to the special KRaft string.
+   * In ZK, we use the special string "default" to represent the 
default config entity.
+   * In KRaft, we use an empty string. This method converts the between the 
two conventions.
*/
-  private def fromZkEntityName(entityName: String): String = {
-if (entityName.equals(ConfigEntityName.DEFAULT)) {
+  private def fromZkConfigfEntityName(entityName: String): String = {
+if (entityName.equals(ZooKeeperInternals.DEFAULT_STRING)) {
   ""
 } else {
   entityName
 }
   }
 
-  private def toZkEntityName(entityName: String): String = {
+  private def toZkConfigEntityName(entityName: String): String = {
 if (entityName.isEmpty) {
-  ConfigEntityName.DEFAULT
+  ZooKeeperInternals.DEFAULT_STRING
 } else {
   entityName
 }
   }
 
-  private def buildEntityData(entityType: String, entityName: String): 
EntityData = {
-new 
EntityData().setEntityType(entityType).setEntityName(fromZkEntityName(entityName))
+  private def buildClientQuotaEntityData(
+entityType: String,
+znodeName: String
+  ): EntityData = {
+val result = new EntityData().setEntityType(entityType)
+if (znodeName.equals(ZooKeeperInternals.DEFAULT_STRING)) {
+  // Default __client quota__ entity names are null. This is different 
than default __configs__,
+  // which have their names set to the empty string instead.
+  result.setEntityName(null)
+} else {
+  // ZNode names are mangled before being stored in ZooKeeper.
+  // For example, @ is turned into %40. Undo the mangling here.
+  result.setEntityName(Sanitizer.desanitize(znodeName))
+}
+result
   }
 
-
   override def iterateClientQuotas(visitor: ClientQuotaVisitor): Unit = {
 def migrateEntityType(zkEntityType: String, entityType: String): Unit = {
-  adminZkClient.fetchAllEntityConfigs(zkEntityType).foreach { case (name, 
props) =>
-val entity = List(buildEntityData(entityType, name)).asJava
+  adminZkClient.fetchAllEntityConfigs(zkEntityType).foreach { case 
(znodeName, props) =>
+val entity = List(buildClientQuotaEntityData(entityType, 
znodeName)).asJava
 
 ScramMechanism.values().filter(_ != ScramMechanism.UNKNOWN).foreach { 
mechanism =>
   val propertyValue = props.getProperty(mechanism.mechanismName)
   if (propertyValue != null) {
 val scramCredentials = 
ScramCredentialUtils.credentialFromString(propertyValue)
 logAndRethrow(this, s"Error in client quota visitor for SCRAM 
credential. User was $entity.") {
-  visitor.visitScramCredential(name, mechanism, scramCredentials)
+  visitor.visitScramCredential(Sanitizer.desanitize(znodeName), 
mechanism, scramCredentials)

Review Comment:
   That is correct. The previous change to `ZkMigrationClient` from KAFKA-16222 
has been removed in this PR, as you can see.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16411: Correctly migrate default client quota entities [kafka]

2024-03-25 Thread via GitHub


cmccabe commented on code in PR #15584:
URL: https://github.com/apache/kafka/pull/15584#discussion_r1538115580


##
core/src/main/scala/kafka/zk/migration/ZkConfigMigrationClient.scala:
##
@@ -50,44 +51,54 @@ class ZkConfigMigrationClient(
 
   val adminZkClient = new AdminZkClient(zkClient)
 
-
   /**
-   * In ZK, we use the special string "default" to represent the 
default entity.
-   * In KRaft, we use an empty string. This method builds an EntityData that 
converts the special ZK string
-   * to the special KRaft string.
+   * In ZK, we use the special string "default" to represent the 
default config entity.
+   * In KRaft, we use an empty string. This method converts the between the 
two conventions.
*/
-  private def fromZkEntityName(entityName: String): String = {
-if (entityName.equals(ConfigEntityName.DEFAULT)) {
+  private def fromZkConfigfEntityName(entityName: String): String = {
+if (entityName.equals(ZooKeeperInternals.DEFAULT_STRING)) {
   ""
 } else {
   entityName
 }
   }
 
-  private def toZkEntityName(entityName: String): String = {
+  private def toZkConfigEntityName(entityName: String): String = {
 if (entityName.isEmpty) {
-  ConfigEntityName.DEFAULT
+  ZooKeeperInternals.DEFAULT_STRING
 } else {
   entityName
 }
   }
 
-  private def buildEntityData(entityType: String, entityName: String): 
EntityData = {
-new 
EntityData().setEntityType(entityType).setEntityName(fromZkEntityName(entityName))
+  private def buildClientQuotaEntityData(
+entityType: String,
+znodeName: String
+  ): EntityData = {
+val result = new EntityData().setEntityType(entityType)
+if (znodeName.equals(ZooKeeperInternals.DEFAULT_STRING)) {
+  // Default __client quota__ entity names are null. This is different 
than default __configs__,
+  // which have their names set to the empty string instead.
+  result.setEntityName(null)
+} else {
+  // ZNode names are mangled before being stored in ZooKeeper.
+  // For example, @ is turned into %40. Undo the mangling here.
+  result.setEntityName(Sanitizer.desanitize(znodeName))
+}
+result
   }
 
-
   override def iterateClientQuotas(visitor: ClientQuotaVisitor): Unit = {
 def migrateEntityType(zkEntityType: String, entityType: String): Unit = {
-  adminZkClient.fetchAllEntityConfigs(zkEntityType).foreach { case (name, 
props) =>
-val entity = List(buildEntityData(entityType, name)).asJava
+  adminZkClient.fetchAllEntityConfigs(zkEntityType).foreach { case 
(znodeName, props) =>
+val entity = List(buildClientQuotaEntityData(entityType, 
znodeName)).asJava
 
 ScramMechanism.values().filter(_ != ScramMechanism.UNKNOWN).foreach { 
mechanism =>
   val propertyValue = props.getProperty(mechanism.mechanismName)
   if (propertyValue != null) {
 val scramCredentials = 
ScramCredentialUtils.credentialFromString(propertyValue)
 logAndRethrow(this, s"Error in client quota visitor for SCRAM 
credential. User was $entity.") {
-  visitor.visitScramCredential(name, mechanism, scramCredentials)
+  visitor.visitScramCredential(Sanitizer.desanitize(znodeName), 
mechanism, scramCredentials)

Review Comment:
   That is correct. The previous change from KAFKA-16222 has been removed in 
this PR, as you can see.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16411: Correctly migrate default client quota entities [kafka]

2024-03-25 Thread via GitHub


soarez commented on code in PR #15584:
URL: https://github.com/apache/kafka/pull/15584#discussion_r1537526255


##
core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala:
##
@@ -324,6 +325,68 @@ class KRaftClusterTest {
 }
   }
 
+  def setConsumerByteRate(
+admin: Admin,
+entity: ClientQuotaEntity,
+value: Long
+  ): Unit = {
+admin.alterClientQuotas(Collections.singletonList(
+  new ClientQuotaAlteration(entity, Collections.singletonList(
+new Op("consumer_byte_rate", value.doubleValue()).
+all().get()
+  }
+
+  def getConsumerByteRates(admin: Admin): Map[ClientQuotaEntity, Long] = {
+val allFilter = ClientQuotaFilter.contains(Collections.emptyList())
+val results = new java.util.HashMap[ClientQuotaEntity, Long]
+admin.describeClientQuotas(allFilter).entities().get().forEach {
+  case (entity, entityMap) =>
+Option(entityMap.get("consumer_byte_rate")).foreach {
+  case value => results.put(entity, value.longValue())
+}
+}
+results.asScala.toMap
+  }
+
+  @Test
+  def testDefaultClientQuotas(): Unit = {
+val cluster = new KafkaClusterTestKit.Builder(
+  new TestKitNodes.Builder().
+setNumBrokerNodes(1).
+setNumControllerNodes(1).build()).build()
+try {
+  cluster.format()
+  cluster.startup()
+  TestUtils.waitUntilTrue(() => cluster.brokers().get(0).brokerState == 
BrokerState.RUNNING,
+"Broker never made it to RUNNING state.")
+  val admin = Admin.create(cluster.clientProperties())
+  try {
+val defaultUser = new 
ClientQuotaEntity(Collections.singletonMap[String, String]("user", null))
+val bobUser = new ClientQuotaEntity(Collections.singletonMap[String, 
String]("user", "bob"))

Review Comment:
   Should there also be a test that exercises the sanitizing/desanitizing or 
mangling/unmangling?



##
core/src/main/scala/kafka/zk/migration/ZkConfigMigrationClient.scala:
##
@@ -50,44 +51,54 @@ class ZkConfigMigrationClient(
 
   val adminZkClient = new AdminZkClient(zkClient)
 
-
   /**
-   * In ZK, we use the special string "default" to represent the 
default entity.
-   * In KRaft, we use an empty string. This method builds an EntityData that 
converts the special ZK string
-   * to the special KRaft string.
+   * In ZK, we use the special string "default" to represent the 
default config entity.
+   * In KRaft, we use an empty string. This method converts the between the 
two conventions.
*/
-  private def fromZkEntityName(entityName: String): String = {
-if (entityName.equals(ConfigEntityName.DEFAULT)) {
+  private def fromZkConfigfEntityName(entityName: String): String = {
+if (entityName.equals(ZooKeeperInternals.DEFAULT_STRING)) {
   ""
 } else {
   entityName
 }
   }
 
-  private def toZkEntityName(entityName: String): String = {
+  private def toZkConfigEntityName(entityName: String): String = {
 if (entityName.isEmpty) {
-  ConfigEntityName.DEFAULT
+  ZooKeeperInternals.DEFAULT_STRING
 } else {
   entityName
 }
   }
 
-  private def buildEntityData(entityType: String, entityName: String): 
EntityData = {
-new 
EntityData().setEntityType(entityType).setEntityName(fromZkEntityName(entityName))
+  private def buildClientQuotaEntityData(
+entityType: String,
+znodeName: String
+  ): EntityData = {
+val result = new EntityData().setEntityType(entityType)
+if (znodeName.equals(ZooKeeperInternals.DEFAULT_STRING)) {
+  // Default __client quota__ entity names are null. This is different 
than default __configs__,
+  // which have their names set to the empty string instead.
+  result.setEntityName(null)
+} else {
+  // ZNode names are mangled before being stored in ZooKeeper.
+  // For example, @ is turned into %40. Undo the mangling here.
+  result.setEntityName(Sanitizer.desanitize(znodeName))
+}
+result
   }
 
-
   override def iterateClientQuotas(visitor: ClientQuotaVisitor): Unit = {
 def migrateEntityType(zkEntityType: String, entityType: String): Unit = {
-  adminZkClient.fetchAllEntityConfigs(zkEntityType).foreach { case (name, 
props) =>
-val entity = List(buildEntityData(entityType, name)).asJava
+  adminZkClient.fetchAllEntityConfigs(zkEntityType).foreach { case 
(znodeName, props) =>
+val entity = List(buildClientQuotaEntityData(entityType, 
znodeName)).asJava
 
 ScramMechanism.values().filter(_ != ScramMechanism.UNKNOWN).foreach { 
mechanism =>
   val propertyValue = props.getProperty(mechanism.mechanismName)
   if (propertyValue != null) {
 val scramCredentials = 
ScramCredentialUtils.credentialFromString(propertyValue)
 logAndRethrow(this, s"Error in client quota visitor for SCRAM 
credential. User was $entity.") {
-  visitor.visitScramCredential(name, mechanism, scramCredentials)
+ 

[PR] KAFKA-16411: Correctly migrate default client quota entities [kafka]

2024-03-22 Thread via GitHub


cmccabe opened a new pull request, #15584:
URL: https://github.com/apache/kafka/pull/15584

   KAFKA-16222 fixed a bug whereby we didn't undo the name mangling used on 
client quota entity names stored in ZooKeeper. However, it incorrectly claimed 
to fix the handling of default client quota entities. It also failed to 
correctly re-mangle when syncronizing the data back to ZooKeeper.
   
   This PR fixes ZkConfigMigrationClient to do the mangling correctly on both 
the read and write paths. We do unmangling before invoking the visitors, since 
after all it does not make sense to do the same unmangling step in each and 
every visitor.
   
   Additionally, this PR fixes a bug causing default entities to be converted 
incorrectly. For example, ClientQuotaEntity(user -> null) is stored under the 
/config/users/ znode in ZooKeeper. In KRaft it appears as a 
ClientQuotaRecord with EntityData(entityType=users, entityName=null). Prior to 
this PR, this was being converted to a ClientQuotaRecord with 
EntityData(entityType=users, entityName=""). That represents a quota on the 
user whose name is the empty string (yes, we allow users to name themselves 
with the empty string, sadly.)
   
   The confusion appears to have arisen because for TOPIC and BROKER 
configurations, the default ConfigResource is indeed the one named with the 
empty (not null) string. For example, the default topic configuration resource 
is ConfigResource(name="", type=TOPIC).  However, things are different for 
client quotas. Default client quota entities in KRaft (and also in AdminClient) 
are represented by maps with null values. For example, the default User entity 
is represented by Map("user" -> null).  In retrospect, using a map with null 
values was a poor choice; a Map> would have made more 
sense. However, this is the way the API currently is and we have to convert 
correctly.
   
   There was an additional level of confusion present in KAFKA-16222 where 
someone thought that using the ZooKeeper placeholder string "" in the 
AdminClient API would yield a default client quota entity. Thise seems to have 
been perpetuated by the ConfigEntityName class that was created recently. In 
fact,  is not part of any public API in Kafka. Accordingly, this PR 
also renames ConfigEntityName.DEFAULT to ZooKeeperInternals.DEFAULT_STRING, to 
make it clear that the string  is just a detail of the ZooKeeper 
implementation.  It is not used in the Kafka API to indicate defaults. 
Hopefully this will avoid confusion in the future.
   
   Finally, the PR also creates KRaftClusterTest.testDefaultClientQuotas to get 
extra test coverage of setting default client quotas.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org