Repository: kafka Updated Branches: refs/heads/trunk 537f98a5d -> 294018a57
KAFKA-4864; added correct zookeeper nodes for security migrator Author: simplesteph <[email protected]> Reviewers: Jun Rao <[email protected]> Closes #2655 from simplesteph/fix-security-migrator-tool Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/294018a5 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/294018a5 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/294018a5 Branch: refs/heads/trunk Commit: 294018a578cbbc187ac123a6b99990468186e349 Parents: 537f98a Author: simplesteph <[email protected]> Authored: Wed Mar 8 21:41:38 2017 -0800 Committer: Jun Rao <[email protected]> Committed: Wed Mar 8 21:41:38 2017 -0800 ---------------------------------------------------------------------- .../src/main/scala/kafka/admin/AdminUtils.scala | 2 +- .../scala/kafka/admin/ZkSecurityMigrator.scala | 4 +- .../security/auth/SimpleAclAuthorizer.scala | 4 +- .../kafka/server/DynamicConfigManager.scala | 2 +- core/src/main/scala/kafka/utils/ZkUtils.scala | 56 ++++++++++++-------- .../SaslScramSslEndToEndAuthorizationTest.scala | 2 +- .../unit/kafka/admin/TopicCommandTest.scala | 2 +- .../security/auth/ZkAuthorizationTest.scala | 6 +-- .../kafka/server/DynamicConfigChangeTest.scala | 2 +- 9 files changed, 46 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/294018a5/core/src/main/scala/kafka/admin/AdminUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index 65ac91c..d4ae4ff 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -565,7 +565,7 @@ object AdminUtils extends Logging with AdminUtilities { writeEntityConfig(zkUtils, entityConfigPath, configs) // create the change notification - val seqNode = ZkUtils.EntityConfigChangesPath + "/" + EntityConfigChangeZnodePrefix + val seqNode = ZkUtils.ConfigChangesPath + "/" + EntityConfigChangeZnodePrefix val content = Json.encode(getConfigChangeZnodeData(sanitizedEntityPath)) zkUtils.zkClient.createPersistentSequential(seqNode, content) } http://git-wip-us.apache.org/repos/asf/kafka/blob/294018a5/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala index 9bd321c..eb5c142 100644 --- a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala +++ b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala @@ -220,12 +220,12 @@ class ZkSecurityMigrator(zkUtils: ZkUtils) extends Logging { private def run(): Unit = { try { setAclIndividually("/") - for (path <- zkUtils.securePersistentZkPaths) { + for (path <- ZkUtils.SecureZkRootPaths) { debug("Going to set ACL for %s".format(path)) zkUtils.makeSurePersistentPathExists(path) setAclsRecursively(path) } - + @tailrec def recurse(): Unit = { val future = futures.synchronized { http://git-wip-us.apache.org/repos/asf/kafka/blob/294018a5/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala index 7ae4796..51de3bc 100644 --- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala +++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala @@ -56,10 +56,10 @@ object SimpleAclAuthorizer { * /kafka-acl/Group/group-1 => {"version": 1, "acls": [ { "host":"host1", "permissionType": "Allow","operation": "Read","principal": "User:alice"}]} * </pre> */ - val AclZkPath = "/kafka-acl" + val AclZkPath = ZkUtils.KafkaAclPath //notification node which gets updated with the resource name when acl on a resource is changed. - val AclChangedZkPath = "/kafka-acl-changes" + val AclChangedZkPath = ZkUtils.KafkaAclChangesPath //prefix of all the change notification sequence node. val AclChangedPrefix = "acl_changes_" http://git-wip-us.apache.org/repos/asf/kafka/blob/294018a5/core/src/main/scala/kafka/server/DynamicConfigManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/DynamicConfigManager.scala b/core/src/main/scala/kafka/server/DynamicConfigManager.scala index e0e6a03..c81ce6c 100644 --- a/core/src/main/scala/kafka/server/DynamicConfigManager.scala +++ b/core/src/main/scala/kafka/server/DynamicConfigManager.scala @@ -148,7 +148,7 @@ class DynamicConfigManager(private val zkUtils: ZkUtils, } } - private val configChangeListener = new ZkNodeChangeNotificationListener(zkUtils, ZkUtils.EntityConfigChangesPath, AdminUtils.EntityConfigChangeZnodePrefix, ConfigChangedNotificationHandler) + private val configChangeListener = new ZkNodeChangeNotificationListener(zkUtils, ZkUtils.ConfigChangesPath, AdminUtils.EntityConfigChangeZnodePrefix, ConfigChangedNotificationHandler) /** * Begin watching for config changes http://git-wip-us.apache.org/repos/asf/kafka/blob/294018a5/core/src/main/scala/kafka/utils/ZkUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 7a6bd63..e67e264 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -43,19 +43,40 @@ import scala.collection._ import scala.collection.JavaConverters._ object ZkUtils { - val ConsumersPath = "/consumers" - val ClusterIdPath = "/cluster/id" - val BrokerIdsPath = "/brokers/ids" - val BrokerTopicsPath = "/brokers/topics" + + + // Important: it is necessary to add any new top level Zookeeper path here + val AdminPath = "/admin" + val BrokersPath = "/brokers" + val ClusterPath = "/cluster" + val ConfigPath = "/config" val ControllerPath = "/controller" val ControllerEpochPath = "/controller_epoch" - val ReassignPartitionsPath = "/admin/reassign_partitions" - val DeleteTopicsPath = "/admin/delete_topics" - val PreferredReplicaLeaderElectionPath = "/admin/preferred_replica_election" - val BrokerSequenceIdPath = "/brokers/seqid" val IsrChangeNotificationPath = "/isr_change_notification" - val EntityConfigPath = "/config" - val EntityConfigChangesPath = "/config/changes" + val KafkaAclPath = "/kafka-acl" + val KafkaAclChangesPath = "/kafka-acl-changes" + + val ConsumersPath = "/consumers" + val ClusterIdPath = s"$ClusterPath/id" + val BrokerIdsPath = s"$BrokersPath/ids" + val BrokerTopicsPath = s"$BrokersPath/topics" + val ReassignPartitionsPath = s"$AdminPath/reassign_partitions" + val DeleteTopicsPath = s"$AdminPath/delete_topics" + val PreferredReplicaLeaderElectionPath = s"$AdminPath/preferred_replica_election" + val BrokerSequenceIdPath = s"$BrokersPath/seqid" + val ConfigChangesPath = s"$ConfigPath/changes" + + + // Important: it is necessary to add any new top level Zookeeper path to the Seq + val SecureZkRootPaths = Seq(AdminPath, + BrokersPath, + ClusterPath, + ConfigPath, + ControllerPath, + ControllerEpochPath, + IsrChangeNotificationPath, + KafkaAclPath, + KafkaAclChangesPath) def apply(zkUrl: String, sessionTimeout: Int, connectionTimeout: Int, isZkSecurityEnabled: Boolean): ZkUtils = { val (zkClient, zkConnection) = createZkClientAndConnection(zkUrl, sessionTimeout, connectionTimeout) @@ -117,13 +138,13 @@ object ZkUtils { getTopicPartitionPath(topic, partitionId) + "/" + "state" def getEntityConfigRootPath(entityType: String): String = - ZkUtils.EntityConfigPath + "/" + entityType + ZkUtils.ConfigPath + "/" + entityType def getEntityConfigPath(entityType: String, entity: String): String = getEntityConfigRootPath(entityType) + "/" + entity def getEntityConfigPath(entityPath: String): String = - ZkUtils.EntityConfigPath + "/" + entityPath + ZkUtils.ConfigPath + "/" + entityPath def getDeleteTopicPath(topic: String): String = DeleteTopicsPath + "/" + topic @@ -191,22 +212,13 @@ class ZkUtils(val zkClient: ZkClient, val persistentZkPaths = Seq(ConsumersPath, BrokerIdsPath, BrokerTopicsPath, - EntityConfigChangesPath, + ConfigChangesPath, getEntityConfigRootPath(ConfigType.Topic), getEntityConfigRootPath(ConfigType.Client), DeleteTopicsPath, BrokerSequenceIdPath, IsrChangeNotificationPath) - val securePersistentZkPaths = Seq(BrokerIdsPath, - BrokerTopicsPath, - EntityConfigChangesPath, - getEntityConfigRootPath(ConfigType.Topic), - getEntityConfigRootPath(ConfigType.Client), - DeleteTopicsPath, - BrokerSequenceIdPath, - IsrChangeNotificationPath) - val DefaultAcls: java.util.List[ACL] = ZkUtils.DefaultAcls(isSecure) def getController(): Int = { http://git-wip-us.apache.org/repos/asf/kafka/blob/294018a5/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala index fe0204a..86db407 100644 --- a/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala @@ -32,7 +32,7 @@ class SaslScramSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTes override def configureSecurityBeforeServersStart() { super.configureSecurityBeforeServersStart() - zkUtils.makeSurePersistentPathExists(ZkUtils.EntityConfigChangesPath) + zkUtils.makeSurePersistentPathExists(ZkUtils.ConfigChangesPath) def configCommandArgs(username: String, password: String) : Array[String] = { val credentials = kafkaServerSaslMechanisms.map(m => s"$m=[iterations=4096,password=$password]") http://git-wip-us.apache.org/repos/asf/kafka/blob/294018a5/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala index 8ce7c90..5215867 100644 --- a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala @@ -49,7 +49,7 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT assertTrue("Properties after creation have incorrect value", props.getProperty(cleanupKey).equals(cleanupVal)) // pre-create the topic config changes path to avoid a NoNodeException - zkUtils.createPersistentPath(EntityConfigChangesPath) + zkUtils.createPersistentPath(ConfigChangesPath) // modify the topic to add new partitions val numPartitionsModified = 3 http://git-wip-us.apache.org/repos/asf/kafka/blob/294018a5/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala index 8cec0c7..3b4c48e 100644 --- a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala +++ b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala @@ -153,7 +153,7 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging { @Test def testDeleteRecursive() { info(s"zkConnect string: $zkConnect") - for (path <- zkUtils.securePersistentZkPaths) { + for (path <- ZkUtils.SecureZkRootPaths) { info(s"Creating $path") zkUtils.makeSurePersistentPathExists(path) zkUtils.createPersistentPath(s"$path/fpjwashere", "") @@ -185,7 +185,7 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging { */ private def testMigration(zkUrl: String, firstZk: ZkUtils, secondZk: ZkUtils) { info(s"zkConnect string: $zkUrl") - for (path <- firstZk.securePersistentZkPaths) { + for (path <- ZkUtils.SecureZkRootPaths) { info(s"Creating $path") firstZk.makeSurePersistentPathExists(path) // Create a child for each znode to exercise the recurrent @@ -206,7 +206,7 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging { } ZkSecurityMigrator.run(Array(s"--zookeeper.acl=$secureOpt", s"--zookeeper.connect=$zkUrl")) info("Done with migration") - for (path <- secondZk.securePersistentZkPaths) { + for (path <- ZkUtils.SecureZkRootPaths) { val listParent = secondZk.zkConnection.getAcl(path).getKey assertTrue(path, isAclCorrect(listParent, secondZk.isSecure)) http://git-wip-us.apache.org/repos/asf/kafka/blob/294018a5/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala index cf0dc6f..dc30fb2 100644 --- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala @@ -148,7 +148,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness { AdminUtils.changeUserOrUserClientIdConfig(zkUtils, "ANONYMOUS/clients/overriddenUserClientId", userClientIdProps) // Remove config change znodes to force quota initialization only through loading of user/client quotas - zkUtils.getChildren(ZkUtils.EntityConfigChangesPath).foreach { p => zkUtils.deletePath(ZkUtils.EntityConfigChangesPath + "/" + p) } + zkUtils.getChildren(ZkUtils.ConfigChangesPath).foreach { p => zkUtils.deletePath(ZkUtils.ConfigChangesPath + "/" + p) } server.startup() val quotaManagers = server.apis.quotas
