Re: [PR] KAFKA-16411: Correctly migrate default client quota entities [kafka]
cmccabe merged PR #15584: URL: https://github.com/apache/kafka/pull/15584 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16411: Correctly migrate default client quota entities [kafka]
cmccabe commented on PR #15584: URL: https://github.com/apache/kafka/pull/15584#issuecomment-2018746846 > I don't think it's a good idea to introduce the new terms mangling & unmangling when there are already equivalent terms in the codebase – sanitizing/desanitizing – it makes it unnecessarily confusing. That's a fair point. I will remove the references to "mangling" and replace them with "sanitization" (although I don't really agree, I think "sanitization" implies discarding bad data, not mangling it) But let's not change ZK terminology at this point :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16411: Correctly migrate default client quota entities [kafka]
cmccabe commented on code in PR #15584: URL: https://github.com/apache/kafka/pull/15584#discussion_r1538115580 ## core/src/main/scala/kafka/zk/migration/ZkConfigMigrationClient.scala: ## @@ -50,44 +51,54 @@ class ZkConfigMigrationClient( val adminZkClient = new AdminZkClient(zkClient) - /** - * In ZK, we use the special string "default" to represent the default entity. - * In KRaft, we use an empty string. This method builds an EntityData that converts the special ZK string - * to the special KRaft string. + * In ZK, we use the special string "default" to represent the default config entity. + * In KRaft, we use an empty string. This method converts the between the two conventions. */ - private def fromZkEntityName(entityName: String): String = { -if (entityName.equals(ConfigEntityName.DEFAULT)) { + private def fromZkConfigfEntityName(entityName: String): String = { +if (entityName.equals(ZooKeeperInternals.DEFAULT_STRING)) { "" } else { entityName } } - private def toZkEntityName(entityName: String): String = { + private def toZkConfigEntityName(entityName: String): String = { if (entityName.isEmpty) { - ConfigEntityName.DEFAULT + ZooKeeperInternals.DEFAULT_STRING } else { entityName } } - private def buildEntityData(entityType: String, entityName: String): EntityData = { -new EntityData().setEntityType(entityType).setEntityName(fromZkEntityName(entityName)) + private def buildClientQuotaEntityData( +entityType: String, +znodeName: String + ): EntityData = { +val result = new EntityData().setEntityType(entityType) +if (znodeName.equals(ZooKeeperInternals.DEFAULT_STRING)) { + // Default __client quota__ entity names are null. This is different than default __configs__, + // which have their names set to the empty string instead. + result.setEntityName(null) +} else { + // ZNode names are mangled before being stored in ZooKeeper. + // For example, @ is turned into %40. Undo the mangling here. + result.setEntityName(Sanitizer.desanitize(znodeName)) +} +result } - override def iterateClientQuotas(visitor: ClientQuotaVisitor): Unit = { def migrateEntityType(zkEntityType: String, entityType: String): Unit = { - adminZkClient.fetchAllEntityConfigs(zkEntityType).foreach { case (name, props) => -val entity = List(buildEntityData(entityType, name)).asJava + adminZkClient.fetchAllEntityConfigs(zkEntityType).foreach { case (znodeName, props) => +val entity = List(buildClientQuotaEntityData(entityType, znodeName)).asJava ScramMechanism.values().filter(_ != ScramMechanism.UNKNOWN).foreach { mechanism => val propertyValue = props.getProperty(mechanism.mechanismName) if (propertyValue != null) { val scramCredentials = ScramCredentialUtils.credentialFromString(propertyValue) logAndRethrow(this, s"Error in client quota visitor for SCRAM credential. User was $entity.") { - visitor.visitScramCredential(name, mechanism, scramCredentials) + visitor.visitScramCredential(Sanitizer.desanitize(znodeName), mechanism, scramCredentials) Review Comment: That is correct. The previous change to `ZkMigrationClient` from KAFKA-16222 has been removed in this PR, as you can see. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16411: Correctly migrate default client quota entities [kafka]
cmccabe commented on code in PR #15584: URL: https://github.com/apache/kafka/pull/15584#discussion_r1538115580 ## core/src/main/scala/kafka/zk/migration/ZkConfigMigrationClient.scala: ## @@ -50,44 +51,54 @@ class ZkConfigMigrationClient( val adminZkClient = new AdminZkClient(zkClient) - /** - * In ZK, we use the special string "default" to represent the default entity. - * In KRaft, we use an empty string. This method builds an EntityData that converts the special ZK string - * to the special KRaft string. + * In ZK, we use the special string "default" to represent the default config entity. + * In KRaft, we use an empty string. This method converts the between the two conventions. */ - private def fromZkEntityName(entityName: String): String = { -if (entityName.equals(ConfigEntityName.DEFAULT)) { + private def fromZkConfigfEntityName(entityName: String): String = { +if (entityName.equals(ZooKeeperInternals.DEFAULT_STRING)) { "" } else { entityName } } - private def toZkEntityName(entityName: String): String = { + private def toZkConfigEntityName(entityName: String): String = { if (entityName.isEmpty) { - ConfigEntityName.DEFAULT + ZooKeeperInternals.DEFAULT_STRING } else { entityName } } - private def buildEntityData(entityType: String, entityName: String): EntityData = { -new EntityData().setEntityType(entityType).setEntityName(fromZkEntityName(entityName)) + private def buildClientQuotaEntityData( +entityType: String, +znodeName: String + ): EntityData = { +val result = new EntityData().setEntityType(entityType) +if (znodeName.equals(ZooKeeperInternals.DEFAULT_STRING)) { + // Default __client quota__ entity names are null. This is different than default __configs__, + // which have their names set to the empty string instead. + result.setEntityName(null) +} else { + // ZNode names are mangled before being stored in ZooKeeper. + // For example, @ is turned into %40. Undo the mangling here. + result.setEntityName(Sanitizer.desanitize(znodeName)) +} +result } - override def iterateClientQuotas(visitor: ClientQuotaVisitor): Unit = { def migrateEntityType(zkEntityType: String, entityType: String): Unit = { - adminZkClient.fetchAllEntityConfigs(zkEntityType).foreach { case (name, props) => -val entity = List(buildEntityData(entityType, name)).asJava + adminZkClient.fetchAllEntityConfigs(zkEntityType).foreach { case (znodeName, props) => +val entity = List(buildClientQuotaEntityData(entityType, znodeName)).asJava ScramMechanism.values().filter(_ != ScramMechanism.UNKNOWN).foreach { mechanism => val propertyValue = props.getProperty(mechanism.mechanismName) if (propertyValue != null) { val scramCredentials = ScramCredentialUtils.credentialFromString(propertyValue) logAndRethrow(this, s"Error in client quota visitor for SCRAM credential. User was $entity.") { - visitor.visitScramCredential(name, mechanism, scramCredentials) + visitor.visitScramCredential(Sanitizer.desanitize(znodeName), mechanism, scramCredentials) Review Comment: That is correct. The previous change from KAFKA-16222 has been removed in this PR, as you can see. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16411: Correctly migrate default client quota entities [kafka]
soarez commented on code in PR #15584: URL: https://github.com/apache/kafka/pull/15584#discussion_r1537526255 ## core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala: ## @@ -324,6 +325,68 @@ class KRaftClusterTest { } } + def setConsumerByteRate( +admin: Admin, +entity: ClientQuotaEntity, +value: Long + ): Unit = { +admin.alterClientQuotas(Collections.singletonList( + new ClientQuotaAlteration(entity, Collections.singletonList( +new Op("consumer_byte_rate", value.doubleValue()). +all().get() + } + + def getConsumerByteRates(admin: Admin): Map[ClientQuotaEntity, Long] = { +val allFilter = ClientQuotaFilter.contains(Collections.emptyList()) +val results = new java.util.HashMap[ClientQuotaEntity, Long] +admin.describeClientQuotas(allFilter).entities().get().forEach { + case (entity, entityMap) => +Option(entityMap.get("consumer_byte_rate")).foreach { + case value => results.put(entity, value.longValue()) +} +} +results.asScala.toMap + } + + @Test + def testDefaultClientQuotas(): Unit = { +val cluster = new KafkaClusterTestKit.Builder( + new TestKitNodes.Builder(). +setNumBrokerNodes(1). +setNumControllerNodes(1).build()).build() +try { + cluster.format() + cluster.startup() + TestUtils.waitUntilTrue(() => cluster.brokers().get(0).brokerState == BrokerState.RUNNING, +"Broker never made it to RUNNING state.") + val admin = Admin.create(cluster.clientProperties()) + try { +val defaultUser = new ClientQuotaEntity(Collections.singletonMap[String, String]("user", null)) +val bobUser = new ClientQuotaEntity(Collections.singletonMap[String, String]("user", "bob")) Review Comment: Should there also be a test that exercises the sanitizing/desanitizing or mangling/unmangling? ## core/src/main/scala/kafka/zk/migration/ZkConfigMigrationClient.scala: ## @@ -50,44 +51,54 @@ class ZkConfigMigrationClient( val adminZkClient = new AdminZkClient(zkClient) - /** - * In ZK, we use the special string "default" to represent the default entity. - * In KRaft, we use an empty string. This method builds an EntityData that converts the special ZK string - * to the special KRaft string. + * In ZK, we use the special string "default" to represent the default config entity. + * In KRaft, we use an empty string. This method converts the between the two conventions. */ - private def fromZkEntityName(entityName: String): String = { -if (entityName.equals(ConfigEntityName.DEFAULT)) { + private def fromZkConfigfEntityName(entityName: String): String = { +if (entityName.equals(ZooKeeperInternals.DEFAULT_STRING)) { "" } else { entityName } } - private def toZkEntityName(entityName: String): String = { + private def toZkConfigEntityName(entityName: String): String = { if (entityName.isEmpty) { - ConfigEntityName.DEFAULT + ZooKeeperInternals.DEFAULT_STRING } else { entityName } } - private def buildEntityData(entityType: String, entityName: String): EntityData = { -new EntityData().setEntityType(entityType).setEntityName(fromZkEntityName(entityName)) + private def buildClientQuotaEntityData( +entityType: String, +znodeName: String + ): EntityData = { +val result = new EntityData().setEntityType(entityType) +if (znodeName.equals(ZooKeeperInternals.DEFAULT_STRING)) { + // Default __client quota__ entity names are null. This is different than default __configs__, + // which have their names set to the empty string instead. + result.setEntityName(null) +} else { + // ZNode names are mangled before being stored in ZooKeeper. + // For example, @ is turned into %40. Undo the mangling here. + result.setEntityName(Sanitizer.desanitize(znodeName)) +} +result } - override def iterateClientQuotas(visitor: ClientQuotaVisitor): Unit = { def migrateEntityType(zkEntityType: String, entityType: String): Unit = { - adminZkClient.fetchAllEntityConfigs(zkEntityType).foreach { case (name, props) => -val entity = List(buildEntityData(entityType, name)).asJava + adminZkClient.fetchAllEntityConfigs(zkEntityType).foreach { case (znodeName, props) => +val entity = List(buildClientQuotaEntityData(entityType, znodeName)).asJava ScramMechanism.values().filter(_ != ScramMechanism.UNKNOWN).foreach { mechanism => val propertyValue = props.getProperty(mechanism.mechanismName) if (propertyValue != null) { val scramCredentials = ScramCredentialUtils.credentialFromString(propertyValue) logAndRethrow(this, s"Error in client quota visitor for SCRAM credential. User was $entity.") { - visitor.visitScramCredential(name, mechanism, scramCredentials) +