Repository: kafka Updated Branches: refs/heads/trunk 3c8925946 -> 8b75a016d
KAFKA-4267; Quota initialization for <user, clientId> uses incorrect ZK path Author: Rajini Sivaram <[email protected]> Reviewers: Ismael Juma <[email protected]> Closes #1987 from rajinisivaram/quota-init-test Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8b75a016 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8b75a016 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8b75a016 Branch: refs/heads/trunk Commit: 8b75a016db16e20c5d5180deb6859bf0ad4c48fd Parents: 3c89259 Author: Rajini Sivaram <[email protected]> Authored: Fri Oct 7 12:10:53 2016 +0100 Committer: Ismael Juma <[email protected]> Committed: Fri Oct 7 12:10:53 2016 +0100 ---------------------------------------------------------------------- .../src/main/scala/kafka/admin/AdminUtils.scala | 2 +- .../kafka/server/DynamicConfigChangeTest.scala | 31 ++++++++++++++++++++ 2 files changed, 32 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/8b75a016/core/src/main/scala/kafka/admin/AdminUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index 96f09b0..aa38f69 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -608,7 +608,7 @@ object AdminUtils extends Logging with AdminUtilities { def fetchAllChildEntityConfigs(zkUtils: ZkUtils, rootEntityType: String, childEntityType: String): Map[String, Properties] = { def entityPaths(zkUtils: ZkUtils, rootPath: Option[String]): Seq[String] = { val root = rootPath match { - case Some(path) => rootEntityType + '/' + rootPath + case Some(path) => rootEntityType + '/' + path case None => rootEntityType } val entityNames = zkUtils.getAllEntitiesWithConfig(root) http://git-wip-us.apache.org/repos/asf/kafka/blob/8b75a016/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala index 4b44b1f..faa23f0 100644 --- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala @@ -129,6 +129,37 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness { } @Test + def testQuotaInitialization() { + val server = servers.head + val clientIdProps = new Properties() + server.shutdown() + clientIdProps.put(DynamicConfig.Client.ProducerByteRateOverrideProp, "1000") + clientIdProps.put(DynamicConfig.Client.ConsumerByteRateOverrideProp, "2000") + val userProps = new Properties() + userProps.put(DynamicConfig.Client.ProducerByteRateOverrideProp, "10000") + userProps.put(DynamicConfig.Client.ConsumerByteRateOverrideProp, "20000") + val userClientIdProps = new Properties() + userClientIdProps.put(DynamicConfig.Client.ProducerByteRateOverrideProp, "100000") + userClientIdProps.put(DynamicConfig.Client.ConsumerByteRateOverrideProp, "200000") + + AdminUtils.changeClientIdConfig(zkUtils, "overriddenClientId", clientIdProps) + AdminUtils.changeUserOrUserClientIdConfig(zkUtils, "overriddenUser", userProps) + AdminUtils.changeUserOrUserClientIdConfig(zkUtils, "ANONYMOUS/clients/overriddenUserClientId", userClientIdProps) + + // Remove config change znodes to force quota initialization only through loading of user/client quotas + zkUtils.getChildren(ZkUtils.EntityConfigChangesPath).foreach { p => zkUtils.deletePath(ZkUtils.EntityConfigChangesPath + "/" + p) } + server.startup() + val quotaManagers = server.apis.quotas + + assertEquals(Quota.upperBound(1000), quotaManagers.produce.quota("someuser", "overriddenClientId")) + assertEquals(Quota.upperBound(2000), quotaManagers.fetch.quota("someuser", "overriddenClientId")) + assertEquals(Quota.upperBound(10000), quotaManagers.produce.quota("overriddenUser", "someclientId")) + assertEquals(Quota.upperBound(20000), quotaManagers.fetch.quota("overriddenUser", "someclientId")) + assertEquals(Quota.upperBound(100000), quotaManagers.produce.quota("ANONYMOUS", "overriddenUserClientId")) + assertEquals(Quota.upperBound(200000), quotaManagers.fetch.quota("ANONYMOUS", "overriddenUserClientId")) + } + + @Test def testConfigChangeOnNonExistingTopic() { val topic = TestUtils.tempTopic try {
