Repository: kafka Updated Branches: refs/heads/trunk 3e5afbfa0 -> 72eebad43
KAFKA-3069: Fix recursion in ZkSecurityMigrator I'm also fixing a bug in the testChroot test case. Author: Flavio Junqueira <[email protected]> Reviewers: Ewen Cheslack-Postava <[email protected]> Closes #736 from fpj/KAFKA-3069 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/72eebad4 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/72eebad4 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/72eebad4 Branch: refs/heads/trunk Commit: 72eebad43d5aaf4bbd29532eedc2a793fc3ee9d5 Parents: 3e5afbf Author: Flavio Junqueira <[email protected]> Authored: Tue Jan 12 09:48:47 2016 -0800 Committer: Ewen Cheslack-Postava <[email protected]> Committed: Tue Jan 12 09:48:47 2016 -0800 ---------------------------------------------------------------------- .../scala/kafka/admin/ZkSecurityMigrator.scala | 26 +++++++++++--- .../security/auth/ZkAuthorizationTest.scala | 36 +++++++++++++------- 2 files changed, 46 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/72eebad4/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala index 8e2f040..2080879 100644 --- a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala +++ b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala @@ -128,16 +128,33 @@ class ZkSecurityMigrator(zkUtils: ZkUtils) extends Logging { private val workQueue = new LinkedBlockingQueue[Runnable] private val futures = new Queue[Future[String]] - private def setAclsRecursively(path: String) = { + private def setAcl(path: String, setPromise: Promise[String]) = { info("Setting ACL for path %s".format(path)) + zkUtils.zkConnection.getZookeeper.setACL(path, ZkUtils.DefaultAcls(zkUtils.isSecure), -1, SetACLCallback, setPromise) + } + + private def getChildren(path: String, childrenPromise: Promise[String]) = { + info("Getting children to set ACLs for path %s".format(path)) + zkUtils.zkConnection.getZookeeper.getChildren(path, false, GetChildrenCallback, childrenPromise) + } + + private def setAclIndividually(path: String) = { + val setPromise = Promise[String] + futures.synchronized { + futures += setPromise.future + } + setAcl(path, setPromise) + } + + private def setAclsRecursively(path: String) = { val setPromise = Promise[String] val childrenPromise = Promise[String] futures.synchronized { futures += setPromise.future futures += childrenPromise.future } - zkUtils.zkConnection.getZookeeper.setACL(path, ZkUtils.DefaultAcls(zkUtils.isSecure), -1, SetACLCallback, setPromise) - zkUtils.zkConnection.getZookeeper.getChildren(path, false, GetChildrenCallback, childrenPromise) + setAcl(path, setPromise) + getChildren(path, childrenPromise) } private object GetChildrenCallback extends ChildrenCallback { @@ -205,11 +222,12 @@ class ZkSecurityMigrator(zkUtils: ZkUtils) extends Logging { private def run(): Unit = { try { + setAclIndividually("/") for (path <- zkUtils.securePersistentZkPaths) { debug("Going to set ACL for %s".format(path)) zkUtils.makeSurePersistentPathExists(path) + setAclsRecursively(path) } - setAclsRecursively("/") @tailrec def recurse(): Unit = { http://git-wip-us.apache.org/repos/asf/kafka/blob/72eebad4/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala index c4e4299..2d73f4d 100644 --- a/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala +++ b/core/src/test/scala/unit/kafka/security/auth/ZkAuthorizationTest.scala @@ -118,7 +118,7 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging{ def testZkMigration() { val unsecureZkUtils = ZkUtils(zkConnect, 6000, 6000, false) try { - testMigration(unsecureZkUtils, zkUtils) + testMigration(zkConnect, unsecureZkUtils, zkUtils) } finally { unsecureZkUtils.close() } @@ -132,7 +132,7 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging{ def testZkAntiMigration() { val unsecureZkUtils = ZkUtils(zkConnect, 6000, 6000, false) try { - testMigration(zkUtils, unsecureZkUtils) + testMigration(zkConnect, zkUtils, unsecureZkUtils) } finally { unsecureZkUtils.close() } @@ -169,11 +169,12 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging{ */ @Test def testChroot { + val zkUrl = zkConnect + "/kafka" zkUtils.createPersistentPath("/kafka") - val unsecureZkUtils = ZkUtils(zkConnect + "/kafka", 6000, 6000, false) - val secureZkUtils = ZkUtils(zkConnect + "/kafka", 6000, 6000, true) + val unsecureZkUtils = ZkUtils(zkUrl, 6000, 6000, false) + val secureZkUtils = ZkUtils(zkUrl, 6000, 6000, true) try { - testMigration(unsecureZkUtils, secureZkUtils) + testMigration(zkUrl, unsecureZkUtils, secureZkUtils) } finally { unsecureZkUtils.close() secureZkUtils.close() @@ -181,11 +182,11 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging{ } /** - * Exercises the migration tool. It is used by two test cases: - * testZkMigration and testZkAntiMigration. + * Exercises the migration tool. It is used in these test cases: + * testZkMigration, testZkAntiMigration, testChroot. */ - private def testMigration(firstZk: ZkUtils, secondZk: ZkUtils) { - info(s"zkConnect string: $zkConnect") + private def testMigration(zkUrl: String, firstZk: ZkUtils, secondZk: ZkUtils) { + info(s"zkConnect string: $zkUrl") for (path <- firstZk.securePersistentZkPaths) { info(s"Creating $path") firstZk.makeSurePersistentPathExists(path) @@ -193,11 +194,19 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging{ // traversal of the data tree firstZk.createPersistentPath(s"$path/fpjwashere", "") } + // Getting security option to determine how to verify ACLs. + // Additionally, we create the consumers znode (not in + // securePersistentZkPaths) to make sure that we don't + // add ACLs to it. val secureOpt: String = secondZk.isSecure match { - case true => "secure" - case false => "unsecure" + case true => + firstZk.createPersistentPath(ZkUtils.ConsumersPath) + "secure" + case false => + secondZk.createPersistentPath(ZkUtils.ConsumersPath) + "unsecure" } - ZkSecurityMigrator.run(Array(s"--zookeeper.acl=$secureOpt", s"--zookeeper.connect=$zkConnect")) + ZkSecurityMigrator.run(Array(s"--zookeeper.acl=$secureOpt", s"--zookeeper.connect=$zkUrl")) info("Done with migration") for (path <- secondZk.securePersistentZkPaths) { val listParent = (secondZk.zkConnection.getAcl(path)).getKey @@ -207,6 +216,9 @@ class ZkAuthorizationTest extends ZooKeeperTestHarness with Logging{ val listChild = (secondZk.zkConnection.getAcl(childPath)).getKey assertTrue(childPath, isAclCorrect(listChild, secondZk.isSecure)) } + // Check consumers path. + val consumersAcl = (firstZk.zkConnection.getAcl(ZkUtils.ConsumersPath)).getKey + assertTrue(ZkUtils.ConsumersPath, isAclCorrect(consumersAcl, false)) } /**
